1 /***
2 *
3 * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.logicblaze.lingo.jms;
19
20 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.logicblaze.lingo.jms.impl.DefaultJmsProducer;
25 import org.logicblaze.lingo.jms.marshall.DefaultMarshaller;
26 import org.logicblaze.lingo.jms.marshall.Marshaller;
27
28 import javax.jms.ConnectionFactory;
29 import javax.jms.Destination;
30 import javax.jms.JMSException;
31 import javax.jms.Message;
32 import javax.jms.MessageConsumer;
33 import javax.jms.Queue;
34 import javax.jms.QueueBrowser;
35
36 import java.util.Enumeration;
37
38 /***
39 * A helper class for working with JMS from inside collection classes
40 *
41 * @version $Revision$
42 */
43 public class JmsClient {
44 private static final Log log = LogFactory.getLog(JmsClient.class);
45
46 private ConnectionFactory connectionFactory;
47 private Destination destination;
48 private JmsProducer producer;
49 private MessageConsumer consumer;
50 private Marshaller marshaller = new DefaultMarshaller();
51 private JmsProducerConfig config = new JmsProducerConfig();
52 private String selector;
53 private boolean noLocal;
54 private long noWaitTimeout = 500;
55
56 public JmsClient() {
57 }
58
59 public JmsClient(ConnectionFactory connectionFactory, Destination destination) {
60 this.connectionFactory = connectionFactory;
61 this.destination = destination;
62 }
63
64 public JmsClient(Destination destination, JmsProducer producer, MessageConsumer consumer) {
65 this.destination = destination;
66 this.producer = producer;
67 this.consumer = consumer;
68 }
69
70 public Message receiveNoWait() throws JMSException {
71 return getConsumer().receive(noWaitTimeout);
72 }
73
74 public Message receive() throws JMSException {
75 return getConsumer().receive();
76 }
77
78 public Message receive(long timeout, TimeUnit unit) throws JMSException {
79 long millis = unit.convert(timeout, TimeUnit.MILLISECONDS);
80 return getConsumer().receive(millis);
81 }
82
83 public QueueBrowser createBrowser() throws JMSException {
84 Destination destination = getDestination();
85 if (destination instanceof Queue) {
86 return getProducer().getSession().createBrowser((Queue) destination);
87 }
88 else {
89 throw new UnsupportedOperationException("You can only peek() inside a collection based on a Queue: " + destination);
90 }
91 }
92
93 public Message peek() throws JMSException {
94 QueueBrowser browser = createBrowser();
95 try {
96 Enumeration iter = browser.getEnumeration();
97 if (iter.hasMoreElements()) {
98 return (Message) iter.nextElement();
99 }
100 return null;
101 }
102 finally {
103 try {
104 browser.close();
105 }
106 catch (JMSException e) {
107 onBrowserCloseException(e);
108 return null;
109 }
110 }
111 }
112
113 public void send(Message message) throws JMSException {
114 getProducer().send(getDestination(), message);
115 }
116
117 public Message createMessage(Object element) throws JMSException {
118 return getMarshaller().createObjectMessage(getProducer().getSession(), element);
119 }
120
121 public void handleException(JMSException e) {
122 throw new RuntimeJMSException(e);
123 }
124
125 public Object readMessage(Message message) throws JMSException {
126 return getMarshaller().readMessage(message);
127 }
128
129 public void close(QueueBrowser browser) {
130 try {
131 browser.close();
132 }
133 catch (JMSException e) {
134
135 log.warn("Could not close queue browser due to: " + e, e);
136 }
137 }
138
139 public void close() {
140 try {
141 if (consumer != null) {
142 consumer.close();
143 }
144 if (producer != null) {
145 producer.close();
146 }
147 }
148 catch (JMSException e) {
149 e.printStackTrace();
150 }
151 finally {
152 consumer = null;
153 producer = null;
154 }
155 }
156
157
158
159 public MessageConsumer getConsumer() throws JMSException {
160 if (consumer == null) {
161 consumer = getProducer().getSession().createConsumer(getDestination(), selector, noLocal);
162 }
163 return consumer;
164 }
165
166 public void setConsumer(MessageConsumer consumer) {
167 this.consumer = consumer;
168 }
169
170 public Destination getDestination() {
171 if (destination == null) {
172 throw new IllegalArgumentException("No destination property configured");
173 }
174 return destination;
175 }
176
177 public void setDestination(Destination destination) {
178 this.destination = destination;
179 }
180
181 public Marshaller getMarshaller() {
182 return marshaller;
183 }
184
185 public void setMarshaller(Marshaller marshaller) {
186 this.marshaller = marshaller;
187 }
188
189 public JmsProducer getProducer() throws JMSException {
190 if (producer == null) {
191 if (connectionFactory == null) {
192 throw new IllegalArgumentException("No producer or connectionFactory property configured");
193 }
194 producer = DefaultJmsProducer.newInstance(connectionFactory, config);
195 }
196 return producer;
197 }
198
199 public void setProducer(JmsProducer producer) {
200 this.producer = producer;
201 }
202
203 public JmsProducerConfig getConfig() {
204 return config;
205 }
206
207 public void setConfig(JmsProducerConfig config) {
208 this.config = config;
209 }
210
211 public ConnectionFactory getConnectionFactory() {
212 return connectionFactory;
213 }
214
215 public void setConnectionFactory(ConnectionFactory connectionFactory) {
216 this.connectionFactory = connectionFactory;
217 }
218
219 public boolean isNoLocal() {
220 return noLocal;
221 }
222
223 /***
224 * Sets if messages sent by this process should be visible to this JVM
225 */
226 public void setNoLocal(boolean noLocal) {
227 this.noLocal = noLocal;
228 }
229
230 public String getSelector() {
231 return selector;
232 }
233
234 /***
235 * Sets the JMS message selector to filter out messages from the consumer
236 */
237 public void setSelector(String selector) {
238 this.selector = selector;
239 }
240
241
242
243 protected void onBrowserCloseException(JMSException e) {
244 log.warn("Failed to close Queue Browser: " + e, e);
245 }
246 }