View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.ArrayList;
18  import java.util.Collection;
19  import java.util.List;
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.CopyOnWriteArrayList;
22  
23  import org.cometd.Bayeux;
24  import org.cometd.Channel;
25  import org.cometd.ChannelBayeuxListener;
26  import org.cometd.ChannelListener;
27  import org.cometd.Client;
28  import org.cometd.DataFilter;
29  import org.cometd.Message;
30  import org.cometd.SubscriptionListener;
31  import org.mortbay.log.Log;
32  import org.mortbay.util.LazyList;
33  
34  /* ------------------------------------------------------------ */
35  /**
36   * A Bayuex Channel
37   *
38   * @author gregw
39   *
40   */
41  public class ChannelImpl implements Channel
42  {
43      private final AbstractBayeux _bayeux;
44      private final ChannelId _id;
45      private final ConcurrentHashMap<String,ChannelImpl> _children=new ConcurrentHashMap<String,ChannelImpl>();
46      private final List<ClientImpl> _subscribers=new CopyOnWriteArrayList<ClientImpl>();
47      private final List<DataFilter> _dataFilters=new CopyOnWriteArrayList<DataFilter>();
48      private final List<SubscriptionListener> _subscriptionListeners=new CopyOnWriteArrayList<SubscriptionListener>();
49      private volatile ChannelImpl _wild;
50      private volatile ChannelImpl _wildWild;
51      private volatile boolean _persistent;
52      private volatile int _split;
53      private volatile boolean _lazy;
54  
55      /* ------------------------------------------------------------ */
56      protected ChannelImpl(String id, AbstractBayeux bayeux)
57      {
58          _id=new ChannelId(id);
59          _bayeux=bayeux;
60      }
61  
62      /* ------------------------------------------------------------ */
63      /**
64       * A Lazy channel marks published messages as lazy. Lazy messages are queued
65       * but do not wake up waiting clients.
66       *
67       * @return true if message is lazy
68       */
69      public boolean isLazy()
70      {
71          return _lazy;
72      }
73  
74      /* ------------------------------------------------------------ */
75      /**
76       * A Lazy channel marks published messages as lazy. Lazy messages are queued
77       * but do not wake up waiting clients.
78       *
79       * @param lazy
80       *            true if message is lazy
81       */
82      public void setLazy(boolean lazy)
83      {
84          _lazy=lazy;
85      }
86  
87      /* ------------------------------------------------------------ */
88      /**
89       * Add a channel
90       * @param channel
91       * @return The added channel, or the existing channel if another thread
92       * already added the channel
93       */
94      public ChannelImpl addChild(ChannelImpl channel)
95      {
96          ChannelId child=channel.getChannelId();
97          if (!_id.isParentOf(child))
98          {
99              throw new IllegalArgumentException(_id + " not parent of " + child);
100         }
101 
102         String next=child.getSegment(_id.depth());
103 
104         if ((child.depth() - _id.depth()) == 1)
105         {
106             // add the channel to this channels
107             ChannelImpl old=_children.putIfAbsent(next,channel);
108             if (old != null)
109                 return old;
110 
111             if (ChannelId.WILD.equals(next))
112                 _wild=channel;
113             else if (ChannelId.WILDWILD.equals(next))
114                 _wildWild=channel;
115             _bayeux.addChannel(channel);
116             return channel;
117         }
118         else
119         {
120             ChannelImpl branch=(ChannelImpl)_bayeux.getChannel((_id.depth() == 0?"/":(_id.toString() + "/")) + next,true);
121             return branch.addChild(channel);
122         }
123     }
124 
125     /* ------------------------------------------------------------ */
126     /**
127      * @param filter
128      */
129     public void addDataFilter(DataFilter filter)
130     {
131         _dataFilters.add(filter);
132     }
133 
134     /* ------------------------------------------------------------ */
135     /**
136      * @return
137      */
138     public ChannelId getChannelId()
139     {
140         return _id;
141     }
142 
143     /* ------------------------------------------------------------ */
144     public ChannelImpl getChild(ChannelId id)
145     {
146         String next=id.getSegment(_id.depth());
147         if (next == null)
148             return null;
149 
150         ChannelImpl channel=_children.get(next);
151 
152         if (channel == null || channel.getChannelId().depth() == id.depth())
153         {
154             return channel;
155         }
156         return channel.getChild(id);
157     }
158 
159     /* ------------------------------------------------------------ */
160     public void getChannels(List<Channel> list)
161     {
162         list.add(this);
163         for (ChannelImpl channel : _children.values())
164             channel.getChannels(list);
165     }
166 
167     /* ------------------------------------------------------------ */
168     public int getChannelCount()
169     {
170         return _children.size();
171     }
172 
173     /* ------------------------------------------------------------ */
174     /**
175      * @return
176      */
177     public String getId()
178     {
179         return _id.toString();
180     }
181 
182     /* ------------------------------------------------------------ */
183     public boolean isPersistent()
184     {
185         return _persistent;
186     }
187 
188     /* ------------------------------------------------------------ */
189     public void deliver(Client from, Iterable<Client> to, Object data, String id)
190     {
191         MessageImpl message=_bayeux.newMessage();
192         message.put(Bayeux.CHANNEL_FIELD,getId());
193         message.put(Bayeux.DATA_FIELD,data);
194         if (id != null)
195             message.put(Bayeux.ID_FIELD,id);
196 
197         Message m=_bayeux.extendSendBayeux(from,message);
198 
199         if (m != null)
200         {
201             for (Client t : to)
202                 deliverToSubscriber((ClientImpl)t,from,m);
203         }
204         if (m instanceof MessageImpl)
205             ((MessageImpl)m).decRef();
206     }
207 
208     /* ------------------------------------------------------------ */
209     public void publish(Client fromClient, Object data, String msgId)
210     {
211         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);
212     }
213 
214     /* ------------------------------------------------------------ */
215     public void publishLazy(Client fromClient, Object data, String msgId)
216     {
217         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);
218     }
219 
220     /* ------------------------------------------------------------ */
221     public boolean remove()
222     {
223         return _bayeux.removeChannel(this);
224     }
225 
226     /* ------------------------------------------------------------ */
227     public boolean doRemove(ChannelImpl channel, List<ChannelBayeuxListener> listeners)
228     {
229         ChannelId channelId=channel.getChannelId();
230         int diff=channel._id.depth() - _id.depth();
231 
232         if (diff >= 1)
233         {
234             String key=channelId.getSegment(_id.depth());
235             ChannelImpl child=_children.get(key);
236 
237             if (child != null)
238             {
239                 // is it this child we are removing?
240                 if (diff == 1)
241                 {
242                     if (!child.isPersistent())
243                     {
244                         // remove the child
245                         child=_children.remove(key);
246                         if (child !=null)
247                         {
248                             if (_wild==channel)
249                                 _wild=null;
250                             else if (_wildWild==channel)
251                                 _wildWild=null;
252                             if ( child.getChannelCount() > 0)
253                             {
254                                 // remove the children of the child
255                                 for (ChannelImpl c : child._children.values())
256                                     child.doRemove(c,listeners);
257                             }
258                             for (ChannelBayeuxListener l : listeners)
259                                 l.channelRemoved(child);
260                         }
261                         return true;
262                     }
263                     return false;
264                 }
265 
266                 boolean removed=child.doRemove(channel,listeners);
267 
268                 // Do we remove a non persistent child?
269                 if (removed && !child.isPersistent() && child.getChannelCount() == 0 && child.getSubscriberCount() == 0)
270                 {
271                     child=_children.remove(key);
272                     if (child!=null)
273                         for (ChannelBayeuxListener l : listeners)
274                             l.channelRemoved(child);
275                 }
276 
277                 return removed;
278             }
279 
280         }
281         return false;
282     }
283 
284     /* ------------------------------------------------------------ */
285     /**
286      * @param filter
287      */
288     public DataFilter removeDataFilter(DataFilter filter)
289     {
290         _dataFilters.remove(filter);
291         return filter;
292     }
293 
294     /* ------------------------------------------------------------ */
295     public void setPersistent(boolean persistent)
296     {
297         _persistent=persistent;
298     }
299 
300     /* ------------------------------------------------------------ */
301     /**
302      * @param client
303      */
304     public void subscribe(Client client)
305     {
306         if (!(client instanceof ClientImpl))
307             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
308 
309         for (ClientImpl c : _subscribers)
310         {
311             if (client.equals(c))
312                 return;
313         }
314 
315         _subscribers.add((ClientImpl)client);
316 
317         for (SubscriptionListener l : _subscriptionListeners)
318             l.subscribed(client,this);
319 
320         ((ClientImpl)client).addSubscription(this);
321     }
322 
323     /* ------------------------------------------------------------ */
324     @Override
325     public String toString()
326     {
327         return _id.toString();
328     }
329 
330     /* ------------------------------------------------------------ */
331     /**
332      * @param client
333      */
334     public void unsubscribe(Client c)
335     {
336         if (!(c instanceof ClientImpl))
337             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
338         ClientImpl client = (ClientImpl)c;
339 
340         client.removeSubscription(this);
341 
342         _subscribers.remove(client);
343 
344         for (SubscriptionListener l : _subscriptionListeners)
345             l.unsubscribed(client,this);
346 
347         if (!_persistent && _subscribers.size() == 0 && _children.size() == 0)
348             remove();
349     }
350 
351     /* ------------------------------------------------------------ */
352     protected void doDelivery(ChannelId to, Client from, Message msg)
353     {
354         int tail=to.depth() - _id.depth();
355 
356         Object data=msg.getData();
357 
358         // if we have data, filter it
359         if (data != null)
360         {
361             Object old=data;
362 
363             try
364             {
365                 switch(tail)
366                 {
367                     case 0:
368                     {
369                         for (DataFilter filter : _dataFilters)
370                         {
371                             data=filter.filter(from,this,data);
372                             if (data == null)
373                                 return;
374                         }
375                     }
376                         break;
377 
378                     case 1:
379                         final ChannelImpl wild = _wild;
380                         if (wild != null)
381                         {
382                             for (DataFilter filter : wild._dataFilters)
383                             {
384                                 data=filter.filter(from,this,data);
385                                 if (data == null)
386                                     return;
387                             }
388                         }
389 
390                     default:
391                         final ChannelImpl wildWild = _wildWild;
392                         if (wildWild != null)
393                         {
394                             for (DataFilter filter : wildWild._dataFilters)
395                             {
396                                 data=filter.filter(from,this,data);
397                                 if (data == null)
398                                     return;
399                             }
400                         }
401                 }
402             }
403             catch(IllegalStateException e)
404             {
405                 Log.ignore(e);
406                 return;
407             }
408 
409             // TODO this may not be correct if the message is reused.
410             // probably should close message ?
411             if (data != old)
412                 msg.put(AbstractBayeux.DATA_FIELD,data);
413         }
414 
415         switch(tail)
416         {
417             case 0:
418             {
419                 if (_lazy && msg instanceof MessageImpl)
420                     ((MessageImpl)msg).setLazy(true);
421 
422                 final ClientImpl[] subscribers=_subscribers.toArray(new ClientImpl[_subscribers.size()]);
423                 if (subscribers.length > 0)
424                 {
425                     // fair delivery
426                     int split=_split++ % subscribers.length;
427                     for (int i=split; i < subscribers.length; i++)
428                         deliverToSubscriber(subscribers[i],from,msg);
429                     for (int i=0; i < split; i++)
430                         deliverToSubscriber(subscribers[i],from,msg);
431                 }
432                 break;
433             }
434 
435             case 1:
436                 final ChannelImpl wild = _wild;
437                 if (wild != null)
438                 {
439                     if (wild._lazy && msg instanceof MessageImpl)
440                         ((MessageImpl)msg).setLazy(true);
441                     for (ClientImpl client : wild._subscribers)
442                         wild.deliverToSubscriber(client,from,msg);
443                 }
444 
445             default:
446             {
447                 final ChannelImpl wildWild = _wildWild;
448                 if (wildWild != null)
449                 {
450                     if (wildWild._lazy && msg instanceof MessageImpl)
451                         ((MessageImpl)msg).setLazy(true);
452                     for (ClientImpl client : wildWild._subscribers)
453                         wildWild.deliverToSubscriber(client,from,msg);
454                 }
455                 String next=to.getSegment(_id.depth());
456                 ChannelImpl channel=_children.get(next);
457                 if (channel != null)
458                     channel.doDelivery(to,from,msg);
459             }
460         }
461     }
462 
463     private void deliverToSubscriber(ClientImpl subscriber, Client from, Message message)
464     {
465         if (_bayeux.hasClient(subscriber.getId()))
466             subscriber.doDelivery(from, message);
467         else
468             unsubscribe(subscriber);
469     }
470 
471     /* ------------------------------------------------------------ */
472     public Collection<Client> getSubscribers()
473     {
474         return new ArrayList<Client>(_subscribers);
475     }
476 
477     /* ------------------------------------------------------------ */
478     public int getSubscriberCount()
479     {
480         return _subscribers.size();
481     }
482 
483     /* ------------------------------------------------------------ */
484     /*
485      * (non-Javadoc)
486      *
487      * @see dojox.cometd.Channel#getFilters()
488      */
489     public Collection<DataFilter> getDataFilters()
490     {
491         return new ArrayList<DataFilter>(_dataFilters);
492     }
493 
494     /* ------------------------------------------------------------ */
495     public void addListener(ChannelListener listener)
496     {
497         if (listener instanceof SubscriptionListener)
498         {
499             _subscriptionListeners.add((SubscriptionListener)listener);
500         }
501     }
502 
503     public void removeListener(ChannelListener listener)
504     {
505         if (listener instanceof SubscriptionListener)
506         {
507             _subscriptionListeners.remove((SubscriptionListener)listener);
508         }
509     }
510 }