1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.CopyOnWriteArrayList;
22
23 import org.cometd.Bayeux;
24 import org.cometd.Channel;
25 import org.cometd.ChannelBayeuxListener;
26 import org.cometd.ChannelListener;
27 import org.cometd.Client;
28 import org.cometd.DataFilter;
29 import org.cometd.Message;
30 import org.cometd.SubscriptionListener;
31 import org.mortbay.log.Log;
32 import org.mortbay.util.LazyList;
33
34
35
36
37
38
39
40
41 public class ChannelImpl implements Channel
42 {
43 private final AbstractBayeux _bayeux;
44 private final ChannelId _id;
45 private final ConcurrentHashMap<String,ChannelImpl> _children=new ConcurrentHashMap<String,ChannelImpl>();
46 private final List<ClientImpl> _subscribers=new CopyOnWriteArrayList<ClientImpl>();
47 private final List<DataFilter> _dataFilters=new CopyOnWriteArrayList<DataFilter>();
48 private final List<SubscriptionListener> _subscriptionListeners=new CopyOnWriteArrayList<SubscriptionListener>();
49 private volatile ChannelImpl _wild;
50 private volatile ChannelImpl _wildWild;
51 private volatile boolean _persistent;
52 private volatile int _split;
53 private volatile boolean _lazy;
54
55
56 protected ChannelImpl(String id, AbstractBayeux bayeux)
57 {
58 _id=new ChannelId(id);
59 _bayeux=bayeux;
60 }
61
62
63
64
65
66
67
68
69 public boolean isLazy()
70 {
71 return _lazy;
72 }
73
74
75
76
77
78
79
80
81
82 public void setLazy(boolean lazy)
83 {
84 _lazy=lazy;
85 }
86
87
88
89
90
91
92
93
94 public ChannelImpl addChild(ChannelImpl channel)
95 {
96 ChannelId child=channel.getChannelId();
97 if (!_id.isParentOf(child))
98 {
99 throw new IllegalArgumentException(_id + " not parent of " + child);
100 }
101
102 String next=child.getSegment(_id.depth());
103
104 if ((child.depth() - _id.depth()) == 1)
105 {
106
107 ChannelImpl old=_children.putIfAbsent(next,channel);
108 if (old != null)
109 return old;
110
111 if (ChannelId.WILD.equals(next))
112 _wild=channel;
113 else if (ChannelId.WILDWILD.equals(next))
114 _wildWild=channel;
115 _bayeux.addChannel(channel);
116 return channel;
117 }
118 else
119 {
120 ChannelImpl branch=(ChannelImpl)_bayeux.getChannel((_id.depth() == 0?"/":(_id.toString() + "/")) + next,true);
121 return branch.addChild(channel);
122 }
123 }
124
125
126
127
128
129 public void addDataFilter(DataFilter filter)
130 {
131 _dataFilters.add(filter);
132 }
133
134
135
136
137
138 public ChannelId getChannelId()
139 {
140 return _id;
141 }
142
143
144 public ChannelImpl getChild(ChannelId id)
145 {
146 String next=id.getSegment(_id.depth());
147 if (next == null)
148 return null;
149
150 ChannelImpl channel=_children.get(next);
151
152 if (channel == null || channel.getChannelId().depth() == id.depth())
153 {
154 return channel;
155 }
156 return channel.getChild(id);
157 }
158
159
160 public void getChannels(List<Channel> list)
161 {
162 list.add(this);
163 for (ChannelImpl channel : _children.values())
164 channel.getChannels(list);
165 }
166
167
168 public int getChannelCount()
169 {
170 return _children.size();
171 }
172
173
174
175
176
177 public String getId()
178 {
179 return _id.toString();
180 }
181
182
183 public boolean isPersistent()
184 {
185 return _persistent;
186 }
187
188
189 public void deliver(Client from, Iterable<Client> to, Object data, String id)
190 {
191 MessageImpl message=_bayeux.newMessage();
192 message.put(Bayeux.CHANNEL_FIELD,getId());
193 message.put(Bayeux.DATA_FIELD,data);
194 if (id != null)
195 message.put(Bayeux.ID_FIELD,id);
196
197 Message m=_bayeux.extendSendBayeux(from,message);
198
199 if (m != null)
200 {
201 for (Client t : to)
202 deliverToSubscriber((ClientImpl)t,from,m);
203 }
204 if (m instanceof MessageImpl)
205 ((MessageImpl)m).decRef();
206 }
207
208
209 public void publish(Client fromClient, Object data, String msgId)
210 {
211 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);
212 }
213
214
215 public void publishLazy(Client fromClient, Object data, String msgId)
216 {
217 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);
218 }
219
220
221 public boolean remove()
222 {
223 return _bayeux.removeChannel(this);
224 }
225
226
227 public boolean doRemove(ChannelImpl channel, List<ChannelBayeuxListener> listeners)
228 {
229 ChannelId channelId=channel.getChannelId();
230 int diff=channel._id.depth() - _id.depth();
231
232 if (diff >= 1)
233 {
234 String key=channelId.getSegment(_id.depth());
235 ChannelImpl child=_children.get(key);
236
237 if (child != null)
238 {
239
240 if (diff == 1)
241 {
242 if (!child.isPersistent())
243 {
244
245 child=_children.remove(key);
246 if (child !=null)
247 {
248 if (_wild==channel)
249 _wild=null;
250 else if (_wildWild==channel)
251 _wildWild=null;
252 if ( child.getChannelCount() > 0)
253 {
254
255 for (ChannelImpl c : child._children.values())
256 child.doRemove(c,listeners);
257 }
258 for (ChannelBayeuxListener l : listeners)
259 l.channelRemoved(child);
260 }
261 return true;
262 }
263 return false;
264 }
265
266 boolean removed=child.doRemove(channel,listeners);
267
268
269 if (removed && !child.isPersistent() && child.getChannelCount() == 0 && child.getSubscriberCount() == 0)
270 {
271 child=_children.remove(key);
272 if (child!=null)
273 for (ChannelBayeuxListener l : listeners)
274 l.channelRemoved(child);
275 }
276
277 return removed;
278 }
279
280 }
281 return false;
282 }
283
284
285
286
287
288 public DataFilter removeDataFilter(DataFilter filter)
289 {
290 _dataFilters.remove(filter);
291 return filter;
292 }
293
294
295 public void setPersistent(boolean persistent)
296 {
297 _persistent=persistent;
298 }
299
300
301
302
303
304 public void subscribe(Client client)
305 {
306 if (!(client instanceof ClientImpl))
307 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
308
309 for (ClientImpl c : _subscribers)
310 {
311 if (client.equals(c))
312 return;
313 }
314
315 _subscribers.add((ClientImpl)client);
316
317 for (SubscriptionListener l : _subscriptionListeners)
318 l.subscribed(client,this);
319
320 ((ClientImpl)client).addSubscription(this);
321 }
322
323
324 @Override
325 public String toString()
326 {
327 return _id.toString();
328 }
329
330
331
332
333
334 public void unsubscribe(Client c)
335 {
336 if (!(c instanceof ClientImpl))
337 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
338 ClientImpl client = (ClientImpl)c;
339
340 client.removeSubscription(this);
341
342 _subscribers.remove(client);
343
344 for (SubscriptionListener l : _subscriptionListeners)
345 l.unsubscribed(client,this);
346
347 if (!_persistent && _subscribers.size() == 0 && _children.size() == 0)
348 remove();
349 }
350
351
352 protected void doDelivery(ChannelId to, Client from, Message msg)
353 {
354 int tail=to.depth() - _id.depth();
355
356 Object data=msg.getData();
357
358
359 if (data != null)
360 {
361 Object old=data;
362
363 try
364 {
365 switch(tail)
366 {
367 case 0:
368 {
369 for (DataFilter filter : _dataFilters)
370 {
371 data=filter.filter(from,this,data);
372 if (data == null)
373 return;
374 }
375 }
376 break;
377
378 case 1:
379 final ChannelImpl wild = _wild;
380 if (wild != null)
381 {
382 for (DataFilter filter : wild._dataFilters)
383 {
384 data=filter.filter(from,this,data);
385 if (data == null)
386 return;
387 }
388 }
389
390 default:
391 final ChannelImpl wildWild = _wildWild;
392 if (wildWild != null)
393 {
394 for (DataFilter filter : wildWild._dataFilters)
395 {
396 data=filter.filter(from,this,data);
397 if (data == null)
398 return;
399 }
400 }
401 }
402 }
403 catch(IllegalStateException e)
404 {
405 Log.ignore(e);
406 return;
407 }
408
409
410
411 if (data != old)
412 msg.put(AbstractBayeux.DATA_FIELD,data);
413 }
414
415 switch(tail)
416 {
417 case 0:
418 {
419 if (_lazy && msg instanceof MessageImpl)
420 ((MessageImpl)msg).setLazy(true);
421
422 final ClientImpl[] subscribers=_subscribers.toArray(new ClientImpl[_subscribers.size()]);
423 if (subscribers.length > 0)
424 {
425
426 int split=_split++ % subscribers.length;
427 for (int i=split; i < subscribers.length; i++)
428 deliverToSubscriber(subscribers[i],from,msg);
429 for (int i=0; i < split; i++)
430 deliverToSubscriber(subscribers[i],from,msg);
431 }
432 break;
433 }
434
435 case 1:
436 final ChannelImpl wild = _wild;
437 if (wild != null)
438 {
439 if (wild._lazy && msg instanceof MessageImpl)
440 ((MessageImpl)msg).setLazy(true);
441 for (ClientImpl client : wild._subscribers)
442 wild.deliverToSubscriber(client,from,msg);
443 }
444
445 default:
446 {
447 final ChannelImpl wildWild = _wildWild;
448 if (wildWild != null)
449 {
450 if (wildWild._lazy && msg instanceof MessageImpl)
451 ((MessageImpl)msg).setLazy(true);
452 for (ClientImpl client : wildWild._subscribers)
453 wildWild.deliverToSubscriber(client,from,msg);
454 }
455 String next=to.getSegment(_id.depth());
456 ChannelImpl channel=_children.get(next);
457 if (channel != null)
458 channel.doDelivery(to,from,msg);
459 }
460 }
461 }
462
463 private void deliverToSubscriber(ClientImpl subscriber, Client from, Message message)
464 {
465 if (_bayeux.hasClient(subscriber.getId()))
466 subscriber.doDelivery(from, message);
467 else
468 unsubscribe(subscriber);
469 }
470
471
472 public Collection<Client> getSubscribers()
473 {
474 return new ArrayList<Client>(_subscribers);
475 }
476
477
478 public int getSubscriberCount()
479 {
480 return _subscribers.size();
481 }
482
483
484
485
486
487
488
489 public Collection<DataFilter> getDataFilters()
490 {
491 return new ArrayList<DataFilter>(_dataFilters);
492 }
493
494
495 public void addListener(ChannelListener listener)
496 {
497 if (listener instanceof SubscriptionListener)
498 {
499 _subscriptionListeners.add((SubscriptionListener)listener);
500 }
501 }
502
503 public void removeListener(ChannelListener listener)
504 {
505 if (listener instanceof SubscriptionListener)
506 {
507 _subscriptionListeners.remove((SubscriptionListener)listener);
508 }
509 }
510 }