View Javadoc

1   /***
2    * 
3    * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.logicblaze.lingo.jms;
19  
20  import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
21  import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
22  
23  import javax.jms.JMSException;
24  import javax.jms.Message;
25  import javax.jms.QueueBrowser;
26  
27  import java.util.AbstractCollection;
28  import java.util.Collection;
29  import java.util.Collections;
30  import java.util.Enumeration;
31  import java.util.HashSet;
32  import java.util.Iterator;
33  import java.util.NoSuchElementException;
34  import java.util.Set;
35  
36  /***
37   * An implementation of the Queue interface which maps to a JMS Destination.
38   * 
39   * Note that when you have finished using this object you should call the
40   * {@link #close()} method to free up any resources.
41   * 
42   * @version $Revision$
43   */
44  public class JmsQueue extends AbstractCollection implements BlockingQueue {
45  
46      private JmsClient jmsClient;
47  
48      public JmsQueue(JmsClient jmsClient) {
49          this.jmsClient = jmsClient;
50      }
51  
52      public Iterator iterator() {
53          try {
54              QueueBrowser browser = getJmsClient().createBrowser();
55              Enumeration enumeration = browser.getEnumeration();
56              return new JmsQueueIterator(enumeration, browser);
57          }
58          catch (JMSException e) {
59              getJmsClient().handleException(e);
60              return Collections.EMPTY_LIST.iterator();
61          }
62      }
63  
64      public boolean isEmpty() {
65          try {
66              Message message = getJmsClient().peek();
67              if (message != null) {
68                  return false;
69              }
70          }
71          catch (JMSException e) {
72              getJmsClient().handleException(e);
73          }
74          return true;
75      }
76  
77      /***
78       * This method could be quite slow for huge queues as this results in
79       * iterating through all of the available objects to count them.
80       */
81      public int size() {
82          int count = 0;
83          Iterator iter = iterator();
84          while (iter.hasNext()) {
85              count++;
86          }
87          return count;
88      }
89  
90      /***
91       * This method could be quite slow for huge queues as this method may have
92       * to iterate through the entire queue
93       */
94      public boolean contains(Object element) {
95          JmsQueueIterator iter = (JmsQueueIterator) iterator();
96          try {
97              while (iter.hasNext()) {
98                  Object value = iter.next();
99                  if (equals(element, value)) {
100                     return true;
101                 }
102             }
103             return false;
104         }
105         finally {
106             iter.close();
107         }
108     }
109 
110     /***
111      * This method could be quite slow for huge queues as this results in
112      * iterating through all of the available objects to count them.
113      */
114     public boolean containsAll(Collection coll) {
115         JmsQueueIterator iter = (JmsQueueIterator) iterator();
116         Set set = new HashSet(coll);
117         try {
118             while (iter.hasNext()) {
119                 Object value = iter.next();
120                 if (set.remove(value)) {
121                     if (set.isEmpty()) {
122                         return true;
123                     }
124                 }
125             }
126             return false;
127         }
128         finally {
129             iter.close();
130         }
131     }
132 
133     public boolean add(Object element) {
134         try {
135             Message message = getJmsClient().createMessage(element);
136             getJmsClient().send(message);
137             return true;
138         }
139         catch (JMSException e) {
140             getJmsClient().handleException(e);
141             return false;
142         }
143     }
144 
145     public boolean offer(Object element) {
146         return add(element);
147     }
148 
149     public Object remove() {
150         try {
151             Message message = getJmsClient().receiveNoWait();
152             if (message != null) {
153                 return getJmsClient().readMessage(message);
154             }
155             else {
156                 throw new NoSuchElementException();
157             }
158         }
159         catch (JMSException e) {
160             getJmsClient().handleException(e);
161         }
162         return null;
163     }
164 
165     public Object poll() {
166         try {
167             Message message = getJmsClient().receiveNoWait();
168             if (message != null) {
169                 return getJmsClient().readMessage(message);
170             }
171         }
172         catch (JMSException e) {
173             getJmsClient().handleException(e);
174         }
175         return null;
176     }
177 
178     public Object element() {
179         return peek();
180     }
181 
182     public Object peek() {
183         try {
184             Message message = getJmsClient().peek();
185             if (message != null) {
186                 return getJmsClient().readMessage(message);
187             }
188         }
189         catch (JMSException e) {
190             getJmsClient().handleException(e);
191         }
192         return null;
193     }
194 
195     public void put(Object element) throws InterruptedException {
196         add(element);
197     }
198 
199     public boolean offer(Object element, long timeout, TimeUnit unit) throws InterruptedException {
200         return add(element);
201     }
202 
203     public Object take() throws InterruptedException {
204         try {
205             Message message = getJmsClient().receive();
206             if (message != null) {
207                 return getJmsClient().readMessage(message);
208             }
209         }
210         catch (JMSException e) {
211             getJmsClient().handleException(e);
212         }
213         return null;
214     }
215 
216     public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
217         try {
218             Message message = getJmsClient().receive(timeout, unit);
219             if (message != null) {
220                 return getJmsClient().readMessage(message);
221             }
222         }
223         catch (JMSException e) {
224             getJmsClient().handleException(e);
225         }
226         return null;
227     }
228 
229     public int remainingCapacity() {
230         return Integer.MAX_VALUE;
231     }
232 
233     public int drainTo(Collection coll) {
234         int count = 0;
235         while (true) {
236             Object answer = poll();
237             if (answer != null) {
238                 coll.add(answer);
239                 count++;
240             }
241             else {
242                 return count;
243             }
244         }
245     }
246 
247     public int drainTo(Collection coll, int maximumElements) {
248         int count = 0;
249         while (count < maximumElements) {
250             Object answer = poll();
251             if (answer != null) {
252                 coll.add(answer);
253                 count++;
254             }
255             else {
256                 break;
257             }
258         }
259         return count;
260     }
261 
262     public void clear() {
263         while (true) {
264             Object value = poll();
265             if (value == null) {
266                 break;
267             }
268         }
269     }
270 
271     public boolean removeAll(Collection coll) {
272         throw new UnsupportedOperationException("removeAll() is not supported");
273     }
274 
275     public boolean retainAll(Collection c) {
276         throw new UnsupportedOperationException("retainAll() is not supported");
277     }
278 
279     // Extension APIs
280     // -------------------------------------------------------------------------
281     public void close() {
282         jmsClient.close();
283     }
284 
285     // Implementation methods
286     // -------------------------------------------------------------------------
287     protected JmsClient getJmsClient() {
288         return jmsClient;
289     }
290 
291     /***
292      * returns true if both values are null or identical or equal to each other
293      */
294     protected boolean equals(Object element, Object value) {
295         if (element == value) {
296             return true;
297         }
298         else if (element == null || value == null) {
299             return false;
300         }
301         else {
302             return element.equals(value);
303         }
304     }
305 
306     protected final class JmsQueueIterator implements Iterator {
307         private final Enumeration enumeration;
308         private final QueueBrowser browser;
309 
310         private boolean closed = false;
311         private Message message;
312         private Object element;
313 
314         public JmsQueueIterator(Enumeration enumeration, QueueBrowser browser) {
315             this.enumeration = enumeration;
316             this.browser = browser;
317         }
318 
319         public boolean hasNext() {
320             boolean answer = enumeration.hasMoreElements();
321             message = (Message) enumeration.nextElement();
322             element = null;
323             if (!answer) {
324                 close();
325             }
326             return answer;
327         }
328 
329         public void close() {
330             if (!closed) {
331                 closed = true;
332                 getJmsClient().close(browser);
333             }
334         }
335 
336         public Object next() {
337             if (element == null) {
338                 try {
339                     element = getJmsClient().readMessage(message);
340                 }
341                 catch (JMSException e) {
342                     getJmsClient().handleException(e);
343                 }
344             }
345             return element;
346         }
347 
348         public void remove() {
349             throw new UnsupportedOperationException("remove() not supported");
350         }
351     }
352 
353 }