View Javadoc

1   /***
2    *
3    * Copyright 2005 LogicBlaze, Inc.
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.logicblaze.lingo.jms.impl;
19  
20  import org.logicblaze.lingo.jms.JmsProducerConfig;
21  
22  import javax.jms.Connection;
23  import javax.jms.Destination;
24  import javax.jms.JMSException;
25  import javax.jms.Message;
26  import javax.jms.MessageConsumer;
27  import javax.jms.MessageProducer;
28  import javax.jms.Session;
29  import javax.jms.TemporaryQueue;
30  import javax.jms.TemporaryTopic;
31  
32  /***
33   * A simple {@link org.logicblaze.lingo.jms.Requestor} which can only be used by
34   * one thread at once and only used for one message exchange at once.
35   * 
36   * @version $Revision: 1.8 $
37   */
38  public class SingleThreadedRequestor extends OneWayRequestor {
39      private Destination inboundDestination;
40      private MessageConsumer receiver;
41      private boolean deleteTemporaryDestinationsOnClose;
42  
43      public SingleThreadedRequestor(JmsProducerConfig config, Destination serverDestination, Destination clientDestination) throws JMSException {
44          super(config, serverDestination);
45          this.inboundDestination = clientDestination;
46      }
47  
48      public SingleThreadedRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination,
49              Destination clientDestination, boolean ownsConnection) throws JMSException {
50          super(connection, session, producer, serverDestination, ownsConnection);
51          this.inboundDestination = clientDestination;
52      }
53      
54      public SingleThreadedRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination, boolean ownsConnection) throws JMSException {
55          this(connection, session, producer, serverDestination, createTemporaryDestination(session), ownsConnection);
56      }
57  
58      public Message request(Destination destination, Message message) throws JMSException {
59          populateHeaders(message);
60          send(destination, message);
61          long timeout = getTimeToLive();
62          return receive(timeout);
63      }
64  
65      public Message request(Destination destination, Message message, long timeout) throws JMSException {
66          populateHeaders(message);
67          send(destination, message, timeout);
68          return receive(timeout);
69      }
70  
71      public Message receive(long timeout) throws JMSException {
72          if (timeout < 0) {
73              return getReceiver().receive();
74          }
75          else if (timeout == 0) {
76              return getReceiver().receiveNoWait();
77          }
78          return getReceiver().receive(timeout);
79      }
80  
81      public synchronized void close() throws JMSException {
82          // producer and consumer created by constructor are implicitly closed.
83          super.close();
84  
85          if (deleteTemporaryDestinationsOnClose) {
86              if (inboundDestination instanceof TemporaryQueue) {
87                  ((TemporaryQueue) inboundDestination).delete();
88              }
89              else if (inboundDestination instanceof TemporaryTopic) {
90                  ((TemporaryTopic) inboundDestination).delete();
91              }
92          }
93          inboundDestination = null;
94      }
95  
96      // Properties
97      // -------------------------------------------------------------------------
98      public boolean isDeleteTemporaryDestinationsOnClose() {
99          return deleteTemporaryDestinationsOnClose;
100     }
101 
102     public void setDeleteTemporaryDestinationsOnClose(boolean deleteTemporaryDestinationsOnClose) {
103         this.deleteTemporaryDestinationsOnClose = deleteTemporaryDestinationsOnClose;
104     }
105 
106     // Implementation methods
107     // -------------------------------------------------------------------------
108     protected static TemporaryQueue createTemporaryDestination(Session session) throws JMSException {
109         return session.createTemporaryQueue();
110     }
111 
112     protected void populateHeaders(Message message) throws JMSException {
113         message.setJMSReplyTo(inboundDestination);
114     }
115 
116     protected MessageConsumer getReceiver() throws JMSException {
117         if (receiver == null) {
118             if (inboundDestination == null) {
119                 inboundDestination = createTemporaryDestination(getSession());
120             }
121             receiver = getSession().createConsumer(inboundDestination);
122         }
123         return receiver;
124     }
125 
126     public Destination getInboundDestination() throws JMSException {
127         return inboundDestination;
128     }
129 
130 }