View Javadoc

1   /***
2    *
3    * Copyright 2005 LogicBlaze, Inc.
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.logicblaze.lingo.jms.impl;
19  
20  import apple.awt.CImage;
21  import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
22  import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
23  import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.logicblaze.lingo.jms.FailedToProcessResponse;
28  import org.logicblaze.lingo.jms.JmsProducerConfig;
29  import org.logicblaze.lingo.jms.ReplyHandler;
30  import org.logicblaze.lingo.jms.Requestor;
31  import org.logicblaze.lingo.util.DefaultTimeoutMap;
32  import org.logicblaze.lingo.util.TimeoutMap;
33  
34  import javax.jms.Connection;
35  import javax.jms.ConnectionFactory;
36  import javax.jms.Destination;
37  import javax.jms.JMSException;
38  import javax.jms.Message;
39  import javax.jms.MessageListener;
40  import javax.jms.MessageProducer;
41  import javax.jms.Session;
42  
43  /***
44   * A {@link org.logicblaze.lingo.jms.Requestor} which will use a single
45   * producer, consumer and temporary topic for resource efficiency, but will use
46   * correlation IDs on each message and response to ensure that each threads
47   * requests can occur synchronously. <p/> This class can be used concurrently by
48   * many different threads at the same time.
49   * 
50   * @version $Revision: 1.14 $
51   */
52  public class MultiplexingRequestor extends SingleThreadedRequestor implements MessageListener {
53      private static final Log log = LogFactory.getLog(MultiplexingRequestor.class);
54  
55      private TimeoutMap requestMap = new DefaultTimeoutMap();
56  
57      public static Requestor newInstance(ConnectionFactory connectionFactory, JmsProducerConfig config, Destination destination, Destination responseDestination)
58              throws JMSException {
59          Connection connection = config.createConnection(connectionFactory);
60          Session session = config.createSession(connection);
61          MessageProducer producer = config.createMessageProducer(session);
62          return new MultiplexingRequestor(connection, session, producer, destination, responseDestination, true);
63      }
64  
65      public static Requestor newInstance(ConnectionFactory connectionFactory, JmsProducerConfig config, Destination destination) throws JMSException {
66          Connection connection = config.createConnection(connectionFactory);
67          Session session = config.createSession(connection);
68          MessageProducer producer = config.createMessageProducer(session);
69          Destination responseDestination = config.createTemporaryDestination(session);
70          return new MultiplexingRequestor(connection, session, producer, destination, responseDestination, true);
71      }
72  
73      public MultiplexingRequestor(JmsProducerConfig config, Destination serverDestination, Destination clientDestination) throws JMSException {
74          super(config, serverDestination, clientDestination);
75          init();
76      }
77  
78      public MultiplexingRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination,
79              Destination clientDestination, boolean ownsConnection) throws JMSException {
80          super(connection, session, producer, serverDestination, clientDestination, ownsConnection);
81          init();
82      }
83  
84      private void init() throws JMSException {
85          ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
86          this.requestMap = new DefaultTimeoutMap(executor, 1000L);
87          getReceiver().setMessageListener(this);
88      }
89  
90      public void registerHandler(String correlationID, ReplyHandler handler, long timeout) {
91          requestMap.put(correlationID, handler, timeout);
92      }
93  
94      public Message request(Destination destination, Message message) throws JMSException {
95          long timeout = getTimeToLive();
96          return request(destination, message, timeout);
97      }
98  
99      public Message request(Destination destination, Message message, long timeout) throws JMSException {
100         // lets create a correlationID unless we are already given one -
101         // we are already given a correaltionID if we are on the server side
102         // responding to a remote object reference
103         FutureTask future = null;
104         String correlationID = message.getJMSCorrelationID();
105         if (correlationID == null) {
106             correlationID = createCorrelationID();
107             message.setJMSCorrelationID(correlationID);
108         }
109         else {
110             Object currentHandler = requestMap.get(correlationID);
111             if (currentHandler instanceof AsyncReplyHandler) {
112                 AsyncReplyHandler handler = (AsyncReplyHandler) currentHandler;
113                 future = handler.newResultHandler();
114             }
115         }
116 
117         if (future == null) {
118             FutureHandler futureHandler = new FutureHandler();
119             future = futureHandler;
120             requestMap.put(correlationID, futureHandler, timeout);
121         }
122         populateHeaders(message);
123         send(destination, message);
124 
125         try {
126             if (timeout < 0) {
127                 return (Message) future.get();
128             }
129             else {
130                 return (Message) future.get(timeout, TimeUnit.MILLISECONDS);
131             }
132         }
133         catch (Exception e) {
134             throw createJMSException(e);
135         }
136     }
137 
138     public void request(Destination destination, Message message, ReplyHandler handler, long timeout) throws JMSException {
139         String correlationID = message.getJMSCorrelationID();
140         if (correlationID == null) {
141             correlationID = createCorrelationID();
142             message.setJMSCorrelationID(correlationID);
143         }
144         synchronized (requestMap) {
145             Object currentHandler = requestMap.get(correlationID);
146             if (currentHandler instanceof AsyncReplyHandler) {
147                 AsyncReplyHandler remoteObjectHandler = (AsyncReplyHandler) currentHandler;
148                 remoteObjectHandler.setParent(handler);
149             }
150             else {
151                 requestMap.put(correlationID, handler, timeout);
152             }
153         }
154         populateHeaders(message);
155         send(destination, message);
156     }
157 
158     /***
159      * Processes inbound responses from requests
160      */
161     public void onMessage(Message message) {
162         try {
163             String correlationID = message.getJMSCorrelationID();
164 
165             // lets notify the monitor for this response
166             Object handler = requestMap.get(correlationID);
167             if (handler == null) {
168                 log.warn("Response received for unknown correlationID: " + correlationID + " request: " + message);
169             }
170             else if (handler instanceof ReplyHandler) {
171                 ReplyHandler replyHandler = (ReplyHandler) handler;
172                 boolean complete = replyHandler.handle(message);
173                 if (complete) {
174                     requestMap.remove(correlationID);
175                 }
176             }
177         }
178         catch (JMSException e) {
179             throw new FailedToProcessResponse(message, e);
180         }
181 
182     }
183 
184     // Lets ensure only one thread performs a send/receive at once
185     public synchronized Message receive(long timeout) throws JMSException {
186         return super.receive(timeout);
187     }
188 
189     protected synchronized void doSend(Destination destination, Message message, long timeout) throws JMSException {
190         super.doSend(destination, message, timeout);
191     }
192 
193     // Properties
194     // -------------------------------------------------------------------------
195     public TimeoutMap getRequestMap() {
196         return requestMap;
197     }
198 
199     public void setRequestMap(TimeoutMap requests) {
200         this.requestMap = requests;
201     }
202 
203     // Implementation methods
204     // -------------------------------------------------------------------------
205     protected JMSException createJMSException(Exception e) {
206         JMSException answer = new JMSException(e.toString());
207         answer.setLinkedException(e);
208         return answer;
209     }
210 
211 }