1 /***
2 *
3 * Copyright 2005 LogicBlaze, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.logicblaze.lingo.jms.impl;
19
20 import apple.awt.CImage;
21 import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
22 import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
23 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.logicblaze.lingo.jms.FailedToProcessResponse;
28 import org.logicblaze.lingo.jms.JmsProducerConfig;
29 import org.logicblaze.lingo.jms.ReplyHandler;
30 import org.logicblaze.lingo.jms.Requestor;
31 import org.logicblaze.lingo.util.DefaultTimeoutMap;
32 import org.logicblaze.lingo.util.TimeoutMap;
33
34 import javax.jms.Connection;
35 import javax.jms.ConnectionFactory;
36 import javax.jms.Destination;
37 import javax.jms.JMSException;
38 import javax.jms.Message;
39 import javax.jms.MessageListener;
40 import javax.jms.MessageProducer;
41 import javax.jms.Session;
42
43 /***
44 * A {@link org.logicblaze.lingo.jms.Requestor} which will use a single
45 * producer, consumer and temporary topic for resource efficiency, but will use
46 * correlation IDs on each message and response to ensure that each threads
47 * requests can occur synchronously. <p/> This class can be used concurrently by
48 * many different threads at the same time.
49 *
50 * @version $Revision: 1.14 $
51 */
52 public class MultiplexingRequestor extends SingleThreadedRequestor implements MessageListener {
53 private static final Log log = LogFactory.getLog(MultiplexingRequestor.class);
54
55 private TimeoutMap requestMap = new DefaultTimeoutMap();
56
57 public static Requestor newInstance(ConnectionFactory connectionFactory, JmsProducerConfig config, Destination destination, Destination responseDestination)
58 throws JMSException {
59 Connection connection = config.createConnection(connectionFactory);
60 Session session = config.createSession(connection);
61 MessageProducer producer = config.createMessageProducer(session);
62 return new MultiplexingRequestor(connection, session, producer, destination, responseDestination, true);
63 }
64
65 public static Requestor newInstance(ConnectionFactory connectionFactory, JmsProducerConfig config, Destination destination) throws JMSException {
66 Connection connection = config.createConnection(connectionFactory);
67 Session session = config.createSession(connection);
68 MessageProducer producer = config.createMessageProducer(session);
69 Destination responseDestination = config.createTemporaryDestination(session);
70 return new MultiplexingRequestor(connection, session, producer, destination, responseDestination, true);
71 }
72
73 public MultiplexingRequestor(JmsProducerConfig config, Destination serverDestination, Destination clientDestination) throws JMSException {
74 super(config, serverDestination, clientDestination);
75 init();
76 }
77
78 public MultiplexingRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination,
79 Destination clientDestination, boolean ownsConnection) throws JMSException {
80 super(connection, session, producer, serverDestination, clientDestination, ownsConnection);
81 init();
82 }
83
84 private void init() throws JMSException {
85 ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
86 this.requestMap = new DefaultTimeoutMap(executor, 1000L);
87 getReceiver().setMessageListener(this);
88 }
89
90 public void registerHandler(String correlationID, ReplyHandler handler, long timeout) {
91 requestMap.put(correlationID, handler, timeout);
92 }
93
94 public Message request(Destination destination, Message message) throws JMSException {
95 long timeout = getTimeToLive();
96 return request(destination, message, timeout);
97 }
98
99 public Message request(Destination destination, Message message, long timeout) throws JMSException {
100
101
102
103 FutureTask future = null;
104 String correlationID = message.getJMSCorrelationID();
105 if (correlationID == null) {
106 correlationID = createCorrelationID();
107 message.setJMSCorrelationID(correlationID);
108 }
109 else {
110 Object currentHandler = requestMap.get(correlationID);
111 if (currentHandler instanceof AsyncReplyHandler) {
112 AsyncReplyHandler handler = (AsyncReplyHandler) currentHandler;
113 future = handler.newResultHandler();
114 }
115 }
116
117 if (future == null) {
118 FutureHandler futureHandler = new FutureHandler();
119 future = futureHandler;
120 requestMap.put(correlationID, futureHandler, timeout);
121 }
122 populateHeaders(message);
123 send(destination, message);
124
125 try {
126 if (timeout < 0) {
127 return (Message) future.get();
128 }
129 else {
130 return (Message) future.get(timeout, TimeUnit.MILLISECONDS);
131 }
132 }
133 catch (Exception e) {
134 throw createJMSException(e);
135 }
136 }
137
138 public void request(Destination destination, Message message, ReplyHandler handler, long timeout) throws JMSException {
139 String correlationID = message.getJMSCorrelationID();
140 if (correlationID == null) {
141 correlationID = createCorrelationID();
142 message.setJMSCorrelationID(correlationID);
143 }
144 synchronized (requestMap) {
145 Object currentHandler = requestMap.get(correlationID);
146 if (currentHandler instanceof AsyncReplyHandler) {
147 AsyncReplyHandler remoteObjectHandler = (AsyncReplyHandler) currentHandler;
148 remoteObjectHandler.setParent(handler);
149 }
150 else {
151 requestMap.put(correlationID, handler, timeout);
152 }
153 }
154 populateHeaders(message);
155 send(destination, message);
156 }
157
158 /***
159 * Processes inbound responses from requests
160 */
161 public void onMessage(Message message) {
162 try {
163 String correlationID = message.getJMSCorrelationID();
164
165
166 Object handler = requestMap.get(correlationID);
167 if (handler == null) {
168 log.warn("Response received for unknown correlationID: " + correlationID + " request: " + message);
169 }
170 else if (handler instanceof ReplyHandler) {
171 ReplyHandler replyHandler = (ReplyHandler) handler;
172 boolean complete = replyHandler.handle(message);
173 if (complete) {
174 requestMap.remove(correlationID);
175 }
176 }
177 }
178 catch (JMSException e) {
179 throw new FailedToProcessResponse(message, e);
180 }
181
182 }
183
184
185 public synchronized Message receive(long timeout) throws JMSException {
186 return super.receive(timeout);
187 }
188
189 protected synchronized void doSend(Destination destination, Message message, long timeout) throws JMSException {
190 super.doSend(destination, message, timeout);
191 }
192
193
194
195 public TimeoutMap getRequestMap() {
196 return requestMap;
197 }
198
199 public void setRequestMap(TimeoutMap requests) {
200 this.requestMap = requests;
201 }
202
203
204
205 protected JMSException createJMSException(Exception e) {
206 JMSException answer = new JMSException(e.toString());
207 answer.setLinkedException(e);
208 return answer;
209 }
210
211 }