001 /**
002 *
003 * Copyright 2005 LogicBlaze, Inc.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.logicblaze.lingo.jms.impl;
019
020 import apple.awt.CImage;
021 import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
022 import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
023 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.logicblaze.lingo.jms.FailedToProcessResponse;
028 import org.logicblaze.lingo.jms.JmsProducerConfig;
029 import org.logicblaze.lingo.jms.ReplyHandler;
030 import org.logicblaze.lingo.jms.Requestor;
031 import org.logicblaze.lingo.util.DefaultTimeoutMap;
032 import org.logicblaze.lingo.util.TimeoutMap;
033
034 import javax.jms.Connection;
035 import javax.jms.ConnectionFactory;
036 import javax.jms.Destination;
037 import javax.jms.JMSException;
038 import javax.jms.Message;
039 import javax.jms.MessageListener;
040 import javax.jms.MessageProducer;
041 import javax.jms.Session;
042
043 /**
044 * A {@link org.logicblaze.lingo.jms.Requestor} which will use a single
045 * producer, consumer and temporary topic for resource efficiency, but will use
046 * correlation IDs on each message and response to ensure that each threads
047 * requests can occur synchronously. <p/> This class can be used concurrently by
048 * many different threads at the same time.
049 *
050 * @version $Revision: 1.14 $
051 */
052 public class MultiplexingRequestor extends SingleThreadedRequestor implements MessageListener {
053 private static final Log log = LogFactory.getLog(MultiplexingRequestor.class);
054
055 private TimeoutMap requestMap = new DefaultTimeoutMap();
056
057 public static Requestor newInstance(ConnectionFactory connectionFactory, JmsProducerConfig config, Destination destination, Destination responseDestination)
058 throws JMSException {
059 Connection connection = config.createConnection(connectionFactory);
060 Session session = config.createSession(connection);
061 MessageProducer producer = config.createMessageProducer(session);
062 return new MultiplexingRequestor(connection, session, producer, destination, responseDestination, true);
063 }
064
065 public static Requestor newInstance(ConnectionFactory connectionFactory, JmsProducerConfig config, Destination destination) throws JMSException {
066 Connection connection = config.createConnection(connectionFactory);
067 Session session = config.createSession(connection);
068 MessageProducer producer = config.createMessageProducer(session);
069 Destination responseDestination = config.createTemporaryDestination(session);
070 return new MultiplexingRequestor(connection, session, producer, destination, responseDestination, true);
071 }
072
073 public MultiplexingRequestor(JmsProducerConfig config, Destination serverDestination, Destination clientDestination) throws JMSException {
074 super(config, serverDestination, clientDestination);
075 init();
076 }
077
078 public MultiplexingRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination,
079 Destination clientDestination, boolean ownsConnection) throws JMSException {
080 super(connection, session, producer, serverDestination, clientDestination, ownsConnection);
081 init();
082 }
083
084 private void init() throws JMSException {
085 ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
086 this.requestMap = new DefaultTimeoutMap(executor, 1000L);
087 getReceiver().setMessageListener(this);
088 }
089
090 public void registerHandler(String correlationID, ReplyHandler handler, long timeout) {
091 requestMap.put(correlationID, handler, timeout);
092 }
093
094 public Message request(Destination destination, Message message) throws JMSException {
095 long timeout = getTimeToLive();
096 return request(destination, message, timeout);
097 }
098
099 public Message request(Destination destination, Message message, long timeout) throws JMSException {
100 // lets create a correlationID unless we are already given one -
101 // we are already given a correaltionID if we are on the server side
102 // responding to a remote object reference
103 FutureTask future = null;
104 String correlationID = message.getJMSCorrelationID();
105 if (correlationID == null) {
106 correlationID = createCorrelationID();
107 message.setJMSCorrelationID(correlationID);
108 }
109 else {
110 Object currentHandler = requestMap.get(correlationID);
111 if (currentHandler instanceof AsyncReplyHandler) {
112 AsyncReplyHandler handler = (AsyncReplyHandler) currentHandler;
113 future = handler.newResultHandler();
114 }
115 }
116
117 if (future == null) {
118 FutureHandler futureHandler = new FutureHandler();
119 future = futureHandler;
120 requestMap.put(correlationID, futureHandler, timeout);
121 }
122 populateHeaders(message);
123 send(destination, message);
124
125 try {
126 if (timeout < 0) {
127 return (Message) future.get();
128 }
129 else {
130 return (Message) future.get(timeout, TimeUnit.MILLISECONDS);
131 }
132 }
133 catch (Exception e) {
134 throw createJMSException(e);
135 }
136 }
137
138 public void request(Destination destination, Message message, ReplyHandler handler, long timeout) throws JMSException {
139 String correlationID = message.getJMSCorrelationID();
140 if (correlationID == null) {
141 correlationID = createCorrelationID();
142 message.setJMSCorrelationID(correlationID);
143 }
144 synchronized (requestMap) {
145 Object currentHandler = requestMap.get(correlationID);
146 if (currentHandler instanceof AsyncReplyHandler) {
147 AsyncReplyHandler remoteObjectHandler = (AsyncReplyHandler) currentHandler;
148 remoteObjectHandler.setParent(handler);
149 }
150 else {
151 requestMap.put(correlationID, handler, timeout);
152 }
153 }
154 populateHeaders(message);
155 send(destination, message);
156 }
157
158 /**
159 * Processes inbound responses from requests
160 */
161 public void onMessage(Message message) {
162 try {
163 String correlationID = message.getJMSCorrelationID();
164
165 // lets notify the monitor for this response
166 Object handler = requestMap.get(correlationID);
167 if (handler == null) {
168 log.warn("Response received for unknown correlationID: " + correlationID + " request: " + message);
169 }
170 else if (handler instanceof ReplyHandler) {
171 ReplyHandler replyHandler = (ReplyHandler) handler;
172 boolean complete = replyHandler.handle(message);
173 if (complete) {
174 requestMap.remove(correlationID);
175 }
176 }
177 }
178 catch (JMSException e) {
179 throw new FailedToProcessResponse(message, e);
180 }
181
182 }
183
184 // Lets ensure only one thread performs a send/receive at once
185 public synchronized Message receive(long timeout) throws JMSException {
186 return super.receive(timeout);
187 }
188
189 protected synchronized void doSend(Destination destination, Message message, long timeout) throws JMSException {
190 super.doSend(destination, message, timeout);
191 }
192
193 // Properties
194 // -------------------------------------------------------------------------
195 public TimeoutMap getRequestMap() {
196 return requestMap;
197 }
198
199 public void setRequestMap(TimeoutMap requests) {
200 this.requestMap = requests;
201 }
202
203 // Implementation methods
204 // -------------------------------------------------------------------------
205 protected JMSException createJMSException(Exception e) {
206 JMSException answer = new JMSException(e.toString());
207 answer.setLinkedException(e);
208 return answer;
209 }
210
211 }