001    /**
002     *
003     * Copyright 2005 LogicBlaze, Inc.
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.logicblaze.lingo.jms.impl;
019    
020    import apple.awt.CImage;
021    import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
022    import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
023    import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.logicblaze.lingo.jms.FailedToProcessResponse;
028    import org.logicblaze.lingo.jms.JmsProducerConfig;
029    import org.logicblaze.lingo.jms.ReplyHandler;
030    import org.logicblaze.lingo.jms.Requestor;
031    import org.logicblaze.lingo.util.DefaultTimeoutMap;
032    import org.logicblaze.lingo.util.TimeoutMap;
033    
034    import javax.jms.Connection;
035    import javax.jms.ConnectionFactory;
036    import javax.jms.Destination;
037    import javax.jms.JMSException;
038    import javax.jms.Message;
039    import javax.jms.MessageListener;
040    import javax.jms.MessageProducer;
041    import javax.jms.Session;
042    
043    /**
044     * A {@link org.logicblaze.lingo.jms.Requestor} which will use a single
045     * producer, consumer and temporary topic for resource efficiency, but will use
046     * correlation IDs on each message and response to ensure that each threads
047     * requests can occur synchronously. <p/> This class can be used concurrently by
048     * many different threads at the same time.
049     * 
050     * @version $Revision: 1.14 $
051     */
052    public class MultiplexingRequestor extends SingleThreadedRequestor implements MessageListener {
053        private static final Log log = LogFactory.getLog(MultiplexingRequestor.class);
054    
055        private TimeoutMap requestMap = new DefaultTimeoutMap();
056    
057        public static Requestor newInstance(ConnectionFactory connectionFactory, JmsProducerConfig config, Destination destination, Destination responseDestination)
058                throws JMSException {
059            Connection connection = config.createConnection(connectionFactory);
060            Session session = config.createSession(connection);
061            MessageProducer producer = config.createMessageProducer(session);
062            return new MultiplexingRequestor(connection, session, producer, destination, responseDestination, true);
063        }
064    
065        public static Requestor newInstance(ConnectionFactory connectionFactory, JmsProducerConfig config, Destination destination) throws JMSException {
066            Connection connection = config.createConnection(connectionFactory);
067            Session session = config.createSession(connection);
068            MessageProducer producer = config.createMessageProducer(session);
069            Destination responseDestination = config.createTemporaryDestination(session);
070            return new MultiplexingRequestor(connection, session, producer, destination, responseDestination, true);
071        }
072    
073        public MultiplexingRequestor(JmsProducerConfig config, Destination serverDestination, Destination clientDestination) throws JMSException {
074            super(config, serverDestination, clientDestination);
075            init();
076        }
077    
078        public MultiplexingRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination,
079                Destination clientDestination, boolean ownsConnection) throws JMSException {
080            super(connection, session, producer, serverDestination, clientDestination, ownsConnection);
081            init();
082        }
083    
084        private void init() throws JMSException {
085            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
086            this.requestMap = new DefaultTimeoutMap(executor, 1000L);
087            getReceiver().setMessageListener(this);
088        }
089    
090        public void registerHandler(String correlationID, ReplyHandler handler, long timeout) {
091            requestMap.put(correlationID, handler, timeout);
092        }
093    
094        public Message request(Destination destination, Message message) throws JMSException {
095            long timeout = getTimeToLive();
096            return request(destination, message, timeout);
097        }
098    
099        public Message request(Destination destination, Message message, long timeout) throws JMSException {
100            // lets create a correlationID unless we are already given one -
101            // we are already given a correaltionID if we are on the server side
102            // responding to a remote object reference
103            FutureTask future = null;
104            String correlationID = message.getJMSCorrelationID();
105            if (correlationID == null) {
106                correlationID = createCorrelationID();
107                message.setJMSCorrelationID(correlationID);
108            }
109            else {
110                Object currentHandler = requestMap.get(correlationID);
111                if (currentHandler instanceof AsyncReplyHandler) {
112                    AsyncReplyHandler handler = (AsyncReplyHandler) currentHandler;
113                    future = handler.newResultHandler();
114                }
115            }
116    
117            if (future == null) {
118                FutureHandler futureHandler = new FutureHandler();
119                future = futureHandler;
120                requestMap.put(correlationID, futureHandler, timeout);
121            }
122            populateHeaders(message);
123            send(destination, message);
124    
125            try {
126                if (timeout < 0) {
127                    return (Message) future.get();
128                }
129                else {
130                    return (Message) future.get(timeout, TimeUnit.MILLISECONDS);
131                }
132            }
133            catch (Exception e) {
134                throw createJMSException(e);
135            }
136        }
137    
138        public void request(Destination destination, Message message, ReplyHandler handler, long timeout) throws JMSException {
139            String correlationID = message.getJMSCorrelationID();
140            if (correlationID == null) {
141                correlationID = createCorrelationID();
142                message.setJMSCorrelationID(correlationID);
143            }
144            synchronized (requestMap) {
145                Object currentHandler = requestMap.get(correlationID);
146                if (currentHandler instanceof AsyncReplyHandler) {
147                    AsyncReplyHandler remoteObjectHandler = (AsyncReplyHandler) currentHandler;
148                    remoteObjectHandler.setParent(handler);
149                }
150                else {
151                    requestMap.put(correlationID, handler, timeout);
152                }
153            }
154            populateHeaders(message);
155            send(destination, message);
156        }
157    
158        /**
159         * Processes inbound responses from requests
160         */
161        public void onMessage(Message message) {
162            try {
163                String correlationID = message.getJMSCorrelationID();
164    
165                // lets notify the monitor for this response
166                Object handler = requestMap.get(correlationID);
167                if (handler == null) {
168                    log.warn("Response received for unknown correlationID: " + correlationID + " request: " + message);
169                }
170                else if (handler instanceof ReplyHandler) {
171                    ReplyHandler replyHandler = (ReplyHandler) handler;
172                    boolean complete = replyHandler.handle(message);
173                    if (complete) {
174                        requestMap.remove(correlationID);
175                    }
176                }
177            }
178            catch (JMSException e) {
179                throw new FailedToProcessResponse(message, e);
180            }
181    
182        }
183    
184        // Lets ensure only one thread performs a send/receive at once
185        public synchronized Message receive(long timeout) throws JMSException {
186            return super.receive(timeout);
187        }
188    
189        protected synchronized void doSend(Destination destination, Message message, long timeout) throws JMSException {
190            super.doSend(destination, message, timeout);
191        }
192    
193        // Properties
194        // -------------------------------------------------------------------------
195        public TimeoutMap getRequestMap() {
196            return requestMap;
197        }
198    
199        public void setRequestMap(TimeoutMap requests) {
200            this.requestMap = requests;
201        }
202    
203        // Implementation methods
204        // -------------------------------------------------------------------------
205        protected JMSException createJMSException(Exception e) {
206            JMSException answer = new JMSException(e.toString());
207            answer.setLinkedException(e);
208            return answer;
209        }
210    
211    }