View Javadoc

1   /***
2    * 
3    * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.logicblaze.lingo.jms;
19  
20  import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.logicblaze.lingo.jms.impl.DefaultJmsProducer;
25  import org.logicblaze.lingo.jms.marshall.DefaultMarshaller;
26  import org.logicblaze.lingo.jms.marshall.Marshaller;
27  
28  import javax.jms.ConnectionFactory;
29  import javax.jms.Destination;
30  import javax.jms.JMSException;
31  import javax.jms.Message;
32  import javax.jms.MessageConsumer;
33  import javax.jms.Queue;
34  import javax.jms.QueueBrowser;
35  
36  import java.util.Enumeration;
37  
38  /***
39   * A helper class for working with JMS from inside collection classes
40   * 
41   * @version $Revision$
42   */
43  public class JmsClient {
44      private static final Log log = LogFactory.getLog(JmsClient.class);
45  
46      private ConnectionFactory connectionFactory;
47      private Destination destination;
48      private JmsProducer producer;
49      private MessageConsumer consumer;
50      private Marshaller marshaller = new DefaultMarshaller();
51      private JmsProducerConfig config = new JmsProducerConfig();
52      private String selector;
53      private boolean noLocal;
54      private long noWaitTimeout = 500;
55  
56      public JmsClient() {
57      }
58  
59      public JmsClient(ConnectionFactory connectionFactory, Destination destination) {
60          this.connectionFactory = connectionFactory;
61          this.destination = destination;
62      }
63  
64      public JmsClient(Destination destination, JmsProducer producer, MessageConsumer consumer) {
65          this.destination = destination;
66          this.producer = producer;
67          this.consumer = consumer;
68      }
69  
70      public Message receiveNoWait() throws JMSException {
71          return getConsumer().receive(noWaitTimeout);
72      }
73  
74      public Message receive() throws JMSException {
75          return getConsumer().receive();
76      }
77  
78      public Message receive(long timeout, TimeUnit unit) throws JMSException {
79          long millis = unit.convert(timeout, TimeUnit.MILLISECONDS);
80          return getConsumer().receive(millis);
81      }
82  
83      public QueueBrowser createBrowser() throws JMSException {
84          Destination destination = getDestination();
85          if (destination instanceof Queue) {
86              return getProducer().getSession().createBrowser((Queue) destination);
87          }
88          else {
89              throw new UnsupportedOperationException("You can only peek() inside a collection based on a Queue: " + destination);
90          }
91      }
92  
93      public Message peek() throws JMSException {
94          QueueBrowser browser = createBrowser();
95          try {
96              Enumeration iter = browser.getEnumeration();
97              if (iter.hasMoreElements()) {
98                  return (Message) iter.nextElement();
99              }
100             return null;
101         }
102         finally {
103             try {
104                 browser.close();
105             }
106             catch (JMSException e) {
107                 onBrowserCloseException(e);
108                 return null;
109             }
110         }
111     }
112 
113     public void send(Message message) throws JMSException {
114         getProducer().send(getDestination(), message);
115     }
116 
117     public Message createMessage(Object element) throws JMSException {
118         return getMarshaller().createObjectMessage(getProducer().getSession(), element);
119     }
120 
121     public void handleException(JMSException e) {
122         throw new RuntimeJMSException(e);
123     }
124 
125     public Object readMessage(Message message) throws JMSException {
126         return getMarshaller().readMessage(message);
127     }
128 
129     public void close(QueueBrowser browser) {
130         try {
131             browser.close();
132         }
133         catch (JMSException e) {
134             // ignore exception as we're called from iterators
135             log.warn("Could not close queue browser due to: " + e, e);
136         }
137     }
138 
139     public void close() {
140         try {
141             if (consumer != null) {
142                 consumer.close();
143             }
144             if (producer != null) {
145                 producer.close();
146             }
147         }
148         catch (JMSException e) {
149             e.printStackTrace();
150         }
151         finally {
152             consumer = null;
153             producer = null;
154         }
155     }
156 
157     // Properties
158     // -------------------------------------------------------------------------
159     public MessageConsumer getConsumer() throws JMSException {
160         if (consumer == null) {
161             consumer = getProducer().getSession().createConsumer(getDestination(), selector, noLocal);
162         }
163         return consumer;
164     }
165 
166     public void setConsumer(MessageConsumer consumer) {
167         this.consumer = consumer;
168     }
169 
170     public Destination getDestination() {
171         if (destination == null) {
172             throw new IllegalArgumentException("No destination property configured");
173         }
174         return destination;
175     }
176 
177     public void setDestination(Destination destination) {
178         this.destination = destination;
179     }
180 
181     public Marshaller getMarshaller() {
182         return marshaller;
183     }
184 
185     public void setMarshaller(Marshaller marshaller) {
186         this.marshaller = marshaller;
187     }
188 
189     public JmsProducer getProducer() throws JMSException {
190         if (producer == null) {
191             if (connectionFactory == null) {
192                 throw new IllegalArgumentException("No producer or connectionFactory property configured");
193             }
194             producer = DefaultJmsProducer.newInstance(connectionFactory, config);
195         }
196         return producer;
197     }
198 
199     public void setProducer(JmsProducer producer) {
200         this.producer = producer;
201     }
202 
203     public JmsProducerConfig getConfig() {
204         return config;
205     }
206 
207     public void setConfig(JmsProducerConfig config) {
208         this.config = config;
209     }
210 
211     public ConnectionFactory getConnectionFactory() {
212         return connectionFactory;
213     }
214 
215     public void setConnectionFactory(ConnectionFactory connectionFactory) {
216         this.connectionFactory = connectionFactory;
217     }
218 
219     public boolean isNoLocal() {
220         return noLocal;
221     }
222 
223     /***
224      * Sets if messages sent by this process should be visible to this JVM
225      */
226     public void setNoLocal(boolean noLocal) {
227         this.noLocal = noLocal;
228     }
229 
230     public String getSelector() {
231         return selector;
232     }
233 
234     /***
235      * Sets the JMS message selector to filter out messages from the consumer
236      */
237     public void setSelector(String selector) {
238         this.selector = selector;
239     }
240 
241     // Implementation methods
242     // -------------------------------------------------------------------------
243     protected void onBrowserCloseException(JMSException e) {
244         log.warn("Failed to close Queue Browser: " + e, e);
245     }
246 }