1 /***
2 *
3 * Copyright 2005 LogicBlaze, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.logicblaze.lingo.jms.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.logicblaze.lingo.jms.JmsProducerConfig;
23 import org.logicblaze.lingo.jms.ReplyHandler;
24 import org.logicblaze.lingo.jms.Requestor;
25 import org.springframework.beans.factory.DisposableBean;
26
27 import javax.jms.Connection;
28 import javax.jms.Destination;
29 import javax.jms.JMSException;
30 import javax.jms.Message;
31 import javax.jms.MessageProducer;
32 import javax.jms.Session;
33
34 /***
35 * A simple requestor which only supports one-way and so does not need a
36 * consumer.
37 *
38 * @version $Revision: 1.10 $
39 */
40 public class OneWayRequestor implements Requestor, DisposableBean {
41 private static final Log log = LogFactory.getLog(OneWayRequestor.class);
42
43 private Connection connection;
44 private Session session;
45 private MessageProducer producer;
46 private boolean ownsConnection = true;
47 private Destination serverDestination;
48 private long counter;
49
50 public static OneWayRequestor newInstance(Connection connection, JmsProducerConfig config, boolean ownsConnection) throws JMSException {
51 Session session = config.createSession(connection);
52 MessageProducer producer = config.createMessageProducer(session);
53 return new OneWayRequestor(connection, session, producer, null, ownsConnection);
54 }
55
56 public OneWayRequestor(JmsProducerConfig config, Destination serverDestination) throws JMSException {
57 this.serverDestination = serverDestination;
58 this.ownsConnection = true;
59 this.connection = config.createConnection();
60 this.session = config.createSession(connection);
61 this.producer = config.createMessageProducer(session);
62 }
63
64 public OneWayRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination, boolean ownsConnection) {
65 this.connection = connection;
66 this.session = session;
67 this.producer = producer;
68 this.serverDestination = serverDestination;
69 this.ownsConnection = ownsConnection;
70 }
71
72 public void close() throws JMSException {
73 if (producer != null) {
74 MessageProducer tmp = producer;
75 producer = null;
76 tmp.close();
77 }
78 if (session != null) {
79 Session tmp = session;
80 session = null;
81 tmp.close();
82 }
83 if (connection != null) {
84 Connection tmp = connection;
85 connection = null;
86 if (ownsConnection) {
87 tmp.close();
88 }
89 }
90 }
91
92 public void destroy() throws Exception {
93 close();
94 }
95
96 public void send(Destination destination, Message message) throws JMSException {
97 send(destination, message, getTimeToLive());
98 }
99
100 public void send(Destination destination, Message message, long timeToLive) throws JMSException {
101 doSend(destination, message, timeToLive);
102 }
103
104 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
105 doSend(destination, message, deliveryMode, priority, timeToLive);
106 }
107
108 public Message receive(long timeout) throws JMSException {
109 throw new JMSException("receive(long) not implemented for OneWayRequestor");
110 }
111
112 public Message request(Destination destination, Message message) throws JMSException {
113 throw new JMSException("request(Destination, Message) not implemented for OneWayRequestor");
114 }
115
116 public Message request(Destination destination, Message message, long timeout) throws JMSException {
117 throw new JMSException("request(Destination, Message, long) not implemented for OneWayRequestor");
118 }
119
120 public void request(Destination destination, Message requestMessage, ReplyHandler handler, long timeout) throws JMSException {
121 throw new JMSException("request(Destination, Message, ReplyHandler, long) not implemented for OneWayRequestor");
122 }
123
124
125
126 public Connection getConnection() {
127 return connection;
128 }
129
130 public Session getSession() {
131 return session;
132 }
133
134 public MessageProducer getMessageProducer() {
135 return producer;
136 }
137
138 public int getDeliveryMode() throws JMSException {
139 return getMessageProducer().getDeliveryMode();
140 }
141
142 /***
143 * Sets the default delivery mode of request messages
144 *
145 * @throws JMSException
146 */
147 public void setDeliveryMode(int deliveryMode) throws JMSException {
148 getMessageProducer().setDeliveryMode(deliveryMode);
149 }
150
151 public int getPriority() throws JMSException {
152 return getMessageProducer().getPriority();
153 }
154
155 /***
156 * Sets the default priority of request messages
157 *
158 * @throws JMSException
159 */
160 public void setPriority(int priority) throws JMSException {
161 getMessageProducer().setPriority(priority);
162 }
163
164 /***
165 * The default time to live on request messages
166 *
167 * @throws JMSException
168 */
169 public long getTimeToLive() throws JMSException {
170 return getMessageProducer().getTimeToLive();
171 }
172
173 /***
174 * Sets the maximum time to live for requests
175 *
176 * @throws JMSException
177 */
178 public void setTimeToLive(long timeToLive) throws JMSException {
179 getMessageProducer().setTimeToLive(timeToLive);
180 }
181
182
183
184
185 /***
186 * A hook to allow custom implementations to process headers differently.
187 */
188 protected void populateHeaders(Message message) throws JMSException {
189 }
190
191 protected void doSend(Destination destination, Message message, long timeToLive) throws JMSException {
192 destination = validateDestination(destination);
193 if (log.isDebugEnabled()) {
194 log.debug("Sending message to: " + destination + " message: " + message);
195 }
196 getMessageProducer().send(destination, message, getDeliveryMode(), getPriority(), timeToLive);
197 }
198
199 protected void doSend(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
200 destination = validateDestination(destination);
201 if (log.isDebugEnabled()) {
202 log.debug("Sending message to: " + destination + " message: " + message);
203 }
204 getMessageProducer().send(destination, message, deliveryMode, priority, timeToLive);
205 }
206
207 protected Destination validateDestination(Destination destination) {
208 if (destination == null) {
209 destination = serverDestination;
210 }
211 return destination;
212 }
213
214 /***
215 * Creates a new correlation ID. Note that because the correlationID is used
216 * on a per-temporary destination basis, it does not need to be unique
217 * across more than one destination. So a simple counter will suffice.
218 *
219 * @return
220 */
221 public String createCorrelationID() {
222 return Long.toString(nextCounter());
223 }
224
225 protected synchronized long nextCounter() {
226 return ++counter;
227 }
228 }