View Javadoc

1   /*** 
2    * 
3    * Copyright 2005 LogicBlaze, Inc.
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.logicblaze.lingo.jms.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.logicblaze.lingo.jms.JmsProducerConfig;
23  import org.logicblaze.lingo.jms.ReplyHandler;
24  import org.logicblaze.lingo.jms.Requestor;
25  import org.springframework.beans.factory.DisposableBean;
26  
27  import javax.jms.Connection;
28  import javax.jms.Destination;
29  import javax.jms.JMSException;
30  import javax.jms.Message;
31  import javax.jms.MessageProducer;
32  import javax.jms.Session;
33  
34  /***
35   * A simple requestor which only supports one-way and so does not need a
36   * consumer.
37   * 
38   * @version $Revision: 1.10 $
39   */
40  public class OneWayRequestor implements Requestor, DisposableBean {
41      private static final Log log = LogFactory.getLog(OneWayRequestor.class);
42  
43      private Connection connection;
44      private Session session;
45      private MessageProducer producer;
46      private boolean ownsConnection = true;
47      private Destination serverDestination;
48      private long counter;
49  
50      public static OneWayRequestor newInstance(Connection connection, JmsProducerConfig config, boolean ownsConnection) throws JMSException {
51          Session session = config.createSession(connection);
52          MessageProducer producer = config.createMessageProducer(session);
53          return new OneWayRequestor(connection, session, producer, null, ownsConnection);
54      }
55  
56      public OneWayRequestor(JmsProducerConfig config, Destination serverDestination) throws JMSException {
57          this.serverDestination = serverDestination;
58          this.ownsConnection = true;
59          this.connection = config.createConnection();
60          this.session = config.createSession(connection);
61          this.producer = config.createMessageProducer(session);
62      }
63  
64      public OneWayRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination, boolean ownsConnection) {
65          this.connection = connection;
66          this.session = session;
67          this.producer = producer;
68          this.serverDestination = serverDestination;
69          this.ownsConnection = ownsConnection;
70      }
71  
72      public void close() throws JMSException {
73          if (producer != null) {
74              MessageProducer tmp = producer;
75              producer = null;
76              tmp.close();
77          }
78          if (session != null) {
79              Session tmp = session;
80              session = null;
81              tmp.close();
82          }
83          if (connection != null) {
84              Connection tmp = connection;
85              connection = null;
86              if (ownsConnection) {
87                  tmp.close();
88              }
89          }
90      }
91  
92      public void destroy() throws Exception {
93          close();
94      }
95  
96      public void send(Destination destination, Message message) throws JMSException {
97          send(destination, message, getTimeToLive());
98      }
99  
100     public void send(Destination destination, Message message, long timeToLive) throws JMSException {
101         doSend(destination, message, timeToLive);
102     }
103 
104     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
105         doSend(destination, message, deliveryMode, priority, timeToLive);
106     }
107 
108     public Message receive(long timeout) throws JMSException {
109         throw new JMSException("receive(long) not implemented for OneWayRequestor");
110     }
111 
112     public Message request(Destination destination, Message message) throws JMSException {
113         throw new JMSException("request(Destination, Message) not implemented for OneWayRequestor");
114     }
115 
116     public Message request(Destination destination, Message message, long timeout) throws JMSException {
117         throw new JMSException("request(Destination, Message, long) not implemented for OneWayRequestor");
118     }
119 
120     public void request(Destination destination, Message requestMessage, ReplyHandler handler, long timeout) throws JMSException {
121         throw new JMSException("request(Destination, Message, ReplyHandler, long) not implemented for OneWayRequestor");
122     }
123 
124     // Properties
125     // -------------------------------------------------------------------------
126     public Connection getConnection() {
127         return connection;
128     }
129 
130     public Session getSession() {
131         return session;
132     }
133 
134     public MessageProducer getMessageProducer() {
135         return producer;
136     }
137 
138     public int getDeliveryMode() throws JMSException {
139         return getMessageProducer().getDeliveryMode();
140     }
141 
142     /***
143      * Sets the default delivery mode of request messages
144      * 
145      * @throws JMSException
146      */
147     public void setDeliveryMode(int deliveryMode) throws JMSException {
148         getMessageProducer().setDeliveryMode(deliveryMode);
149     }
150 
151     public int getPriority() throws JMSException {
152         return getMessageProducer().getPriority();
153     }
154 
155     /***
156      * Sets the default priority of request messages
157      * 
158      * @throws JMSException
159      */
160     public void setPriority(int priority) throws JMSException {
161         getMessageProducer().setPriority(priority);
162     }
163 
164     /***
165      * The default time to live on request messages
166      * 
167      * @throws JMSException
168      */
169     public long getTimeToLive() throws JMSException {
170         return getMessageProducer().getTimeToLive();
171     }
172 
173     /***
174      * Sets the maximum time to live for requests
175      * 
176      * @throws JMSException
177      */
178     public void setTimeToLive(long timeToLive) throws JMSException {
179         getMessageProducer().setTimeToLive(timeToLive);
180     }
181 
182     // Implementation methods
183     // -------------------------------------------------------------------------
184 
185     /***
186      * A hook to allow custom implementations to process headers differently.
187      */
188     protected void populateHeaders(Message message) throws JMSException {
189     }
190 
191     protected void doSend(Destination destination, Message message, long timeToLive) throws JMSException {
192         destination = validateDestination(destination);
193         if (log.isDebugEnabled()) {
194             log.debug("Sending message to: " + destination + " message: " + message);
195         }
196         getMessageProducer().send(destination, message, getDeliveryMode(), getPriority(), timeToLive);
197     }
198 
199     protected void doSend(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
200         destination = validateDestination(destination);
201         if (log.isDebugEnabled()) {
202             log.debug("Sending message to: " + destination + " message: " + message);
203         }
204         getMessageProducer().send(destination, message, deliveryMode, priority, timeToLive);
205     }
206 
207     protected Destination validateDestination(Destination destination) {
208         if (destination == null) {
209             destination = serverDestination;
210         }
211         return destination;
212     }
213 
214     /***
215      * Creates a new correlation ID. Note that because the correlationID is used
216      * on a per-temporary destination basis, it does not need to be unique
217      * across more than one destination. So a simple counter will suffice.
218      * 
219      * @return
220      */
221     public String createCorrelationID() {
222         return Long.toString(nextCounter());
223     }
224 
225     protected synchronized long nextCounter() {
226         return ++counter;
227     }
228 }