1 /***
2 *
3 * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.logicblaze.lingo.jms;
19
20 import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
21 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
22
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.QueueBrowser;
26
27 import java.util.AbstractCollection;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.Enumeration;
31 import java.util.HashSet;
32 import java.util.Iterator;
33 import java.util.NoSuchElementException;
34 import java.util.Set;
35
36 /***
37 * An implementation of the Queue interface which maps to a JMS Destination.
38 *
39 * Note that when you have finished using this object you should call the
40 * {@link #close()} method to free up any resources.
41 *
42 * @version $Revision$
43 */
44 public class JmsQueue extends AbstractCollection implements BlockingQueue {
45
46 private JmsClient jmsClient;
47
48 public JmsQueue(JmsClient jmsClient) {
49 this.jmsClient = jmsClient;
50 }
51
52 public Iterator iterator() {
53 try {
54 QueueBrowser browser = getJmsClient().createBrowser();
55 Enumeration enumeration = browser.getEnumeration();
56 return new JmsQueueIterator(enumeration, browser);
57 }
58 catch (JMSException e) {
59 getJmsClient().handleException(e);
60 return Collections.EMPTY_LIST.iterator();
61 }
62 }
63
64 public boolean isEmpty() {
65 try {
66 Message message = getJmsClient().peek();
67 if (message != null) {
68 return false;
69 }
70 }
71 catch (JMSException e) {
72 getJmsClient().handleException(e);
73 }
74 return true;
75 }
76
77 /***
78 * This method could be quite slow for huge queues as this results in
79 * iterating through all of the available objects to count them.
80 */
81 public int size() {
82 int count = 0;
83 Iterator iter = iterator();
84 while (iter.hasNext()) {
85 count++;
86 }
87 return count;
88 }
89
90 /***
91 * This method could be quite slow for huge queues as this method may have
92 * to iterate through the entire queue
93 */
94 public boolean contains(Object element) {
95 JmsQueueIterator iter = (JmsQueueIterator) iterator();
96 try {
97 while (iter.hasNext()) {
98 Object value = iter.next();
99 if (equals(element, value)) {
100 return true;
101 }
102 }
103 return false;
104 }
105 finally {
106 iter.close();
107 }
108 }
109
110 /***
111 * This method could be quite slow for huge queues as this results in
112 * iterating through all of the available objects to count them.
113 */
114 public boolean containsAll(Collection coll) {
115 JmsQueueIterator iter = (JmsQueueIterator) iterator();
116 Set set = new HashSet(coll);
117 try {
118 while (iter.hasNext()) {
119 Object value = iter.next();
120 if (set.remove(value)) {
121 if (set.isEmpty()) {
122 return true;
123 }
124 }
125 }
126 return false;
127 }
128 finally {
129 iter.close();
130 }
131 }
132
133 public boolean add(Object element) {
134 try {
135 Message message = getJmsClient().createMessage(element);
136 getJmsClient().send(message);
137 return true;
138 }
139 catch (JMSException e) {
140 getJmsClient().handleException(e);
141 return false;
142 }
143 }
144
145 public boolean offer(Object element) {
146 return add(element);
147 }
148
149 public Object remove() {
150 try {
151 Message message = getJmsClient().receiveNoWait();
152 if (message != null) {
153 return getJmsClient().readMessage(message);
154 }
155 else {
156 throw new NoSuchElementException();
157 }
158 }
159 catch (JMSException e) {
160 getJmsClient().handleException(e);
161 }
162 return null;
163 }
164
165 public Object poll() {
166 try {
167 Message message = getJmsClient().receiveNoWait();
168 if (message != null) {
169 return getJmsClient().readMessage(message);
170 }
171 }
172 catch (JMSException e) {
173 getJmsClient().handleException(e);
174 }
175 return null;
176 }
177
178 public Object element() {
179 return peek();
180 }
181
182 public Object peek() {
183 try {
184 Message message = getJmsClient().peek();
185 if (message != null) {
186 return getJmsClient().readMessage(message);
187 }
188 }
189 catch (JMSException e) {
190 getJmsClient().handleException(e);
191 }
192 return null;
193 }
194
195 public void put(Object element) throws InterruptedException {
196 add(element);
197 }
198
199 public boolean offer(Object element, long timeout, TimeUnit unit) throws InterruptedException {
200 return add(element);
201 }
202
203 public Object take() throws InterruptedException {
204 try {
205 Message message = getJmsClient().receive();
206 if (message != null) {
207 return getJmsClient().readMessage(message);
208 }
209 }
210 catch (JMSException e) {
211 getJmsClient().handleException(e);
212 }
213 return null;
214 }
215
216 public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
217 try {
218 Message message = getJmsClient().receive(timeout, unit);
219 if (message != null) {
220 return getJmsClient().readMessage(message);
221 }
222 }
223 catch (JMSException e) {
224 getJmsClient().handleException(e);
225 }
226 return null;
227 }
228
229 public int remainingCapacity() {
230 return Integer.MAX_VALUE;
231 }
232
233 public int drainTo(Collection coll) {
234 int count = 0;
235 while (true) {
236 Object answer = poll();
237 if (answer != null) {
238 coll.add(answer);
239 count++;
240 }
241 else {
242 return count;
243 }
244 }
245 }
246
247 public int drainTo(Collection coll, int maximumElements) {
248 int count = 0;
249 while (count < maximumElements) {
250 Object answer = poll();
251 if (answer != null) {
252 coll.add(answer);
253 count++;
254 }
255 else {
256 break;
257 }
258 }
259 return count;
260 }
261
262 public void clear() {
263 while (true) {
264 Object value = poll();
265 if (value == null) {
266 break;
267 }
268 }
269 }
270
271 public boolean removeAll(Collection coll) {
272 throw new UnsupportedOperationException("removeAll() is not supported");
273 }
274
275 public boolean retainAll(Collection c) {
276 throw new UnsupportedOperationException("retainAll() is not supported");
277 }
278
279
280
281 public void close() {
282 jmsClient.close();
283 }
284
285
286
287 protected JmsClient getJmsClient() {
288 return jmsClient;
289 }
290
291 /***
292 * returns true if both values are null or identical or equal to each other
293 */
294 protected boolean equals(Object element, Object value) {
295 if (element == value) {
296 return true;
297 }
298 else if (element == null || value == null) {
299 return false;
300 }
301 else {
302 return element.equals(value);
303 }
304 }
305
306 protected final class JmsQueueIterator implements Iterator {
307 private final Enumeration enumeration;
308 private final QueueBrowser browser;
309
310 private boolean closed = false;
311 private Message message;
312 private Object element;
313
314 public JmsQueueIterator(Enumeration enumeration, QueueBrowser browser) {
315 this.enumeration = enumeration;
316 this.browser = browser;
317 }
318
319 public boolean hasNext() {
320 boolean answer = enumeration.hasMoreElements();
321 message = (Message) enumeration.nextElement();
322 element = null;
323 if (!answer) {
324 close();
325 }
326 return answer;
327 }
328
329 public void close() {
330 if (!closed) {
331 closed = true;
332 getJmsClient().close(browser);
333 }
334 }
335
336 public Object next() {
337 if (element == null) {
338 try {
339 element = getJmsClient().readMessage(message);
340 }
341 catch (JMSException e) {
342 getJmsClient().handleException(e);
343 }
344 }
345 return element;
346 }
347
348 public void remove() {
349 throw new UnsupportedOperationException("remove() not supported");
350 }
351 }
352
353 }