1 /***
2 *
3 * Copyright 2005 LogicBlaze, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.logicblaze.lingo.jms.impl;
19
20 import org.logicblaze.lingo.jms.JmsProducerConfig;
21
22 import javax.jms.Connection;
23 import javax.jms.Destination;
24 import javax.jms.JMSException;
25 import javax.jms.Message;
26 import javax.jms.MessageConsumer;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29 import javax.jms.TemporaryQueue;
30 import javax.jms.TemporaryTopic;
31
32 /***
33 * A simple {@link org.logicblaze.lingo.jms.Requestor} which can only be used by
34 * one thread at once and only used for one message exchange at once.
35 *
36 * @version $Revision: 1.8 $
37 */
38 public class SingleThreadedRequestor extends OneWayRequestor {
39 private Destination inboundDestination;
40 private MessageConsumer receiver;
41 private boolean deleteTemporaryDestinationsOnClose;
42
43 public SingleThreadedRequestor(JmsProducerConfig config, Destination serverDestination, Destination clientDestination) throws JMSException {
44 super(config, serverDestination);
45 this.inboundDestination = clientDestination;
46 }
47
48 public SingleThreadedRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination,
49 Destination clientDestination, boolean ownsConnection) throws JMSException {
50 super(connection, session, producer, serverDestination, ownsConnection);
51 this.inboundDestination = clientDestination;
52 }
53
54 public SingleThreadedRequestor(Connection connection, Session session, MessageProducer producer, Destination serverDestination, boolean ownsConnection) throws JMSException {
55 this(connection, session, producer, serverDestination, createTemporaryDestination(session), ownsConnection);
56 }
57
58 public Message request(Destination destination, Message message) throws JMSException {
59 populateHeaders(message);
60 send(destination, message);
61 long timeout = getTimeToLive();
62 return receive(timeout);
63 }
64
65 public Message request(Destination destination, Message message, long timeout) throws JMSException {
66 populateHeaders(message);
67 send(destination, message, timeout);
68 return receive(timeout);
69 }
70
71 public Message receive(long timeout) throws JMSException {
72 if (timeout < 0) {
73 return getReceiver().receive();
74 }
75 else if (timeout == 0) {
76 return getReceiver().receiveNoWait();
77 }
78 return getReceiver().receive(timeout);
79 }
80
81 public synchronized void close() throws JMSException {
82
83 super.close();
84
85 if (deleteTemporaryDestinationsOnClose) {
86 if (inboundDestination instanceof TemporaryQueue) {
87 ((TemporaryQueue) inboundDestination).delete();
88 }
89 else if (inboundDestination instanceof TemporaryTopic) {
90 ((TemporaryTopic) inboundDestination).delete();
91 }
92 }
93 inboundDestination = null;
94 }
95
96
97
98 public boolean isDeleteTemporaryDestinationsOnClose() {
99 return deleteTemporaryDestinationsOnClose;
100 }
101
102 public void setDeleteTemporaryDestinationsOnClose(boolean deleteTemporaryDestinationsOnClose) {
103 this.deleteTemporaryDestinationsOnClose = deleteTemporaryDestinationsOnClose;
104 }
105
106
107
108 protected static TemporaryQueue createTemporaryDestination(Session session) throws JMSException {
109 return session.createTemporaryQueue();
110 }
111
112 protected void populateHeaders(Message message) throws JMSException {
113 message.setJMSReplyTo(inboundDestination);
114 }
115
116 protected MessageConsumer getReceiver() throws JMSException {
117 if (receiver == null) {
118 if (inboundDestination == null) {
119 inboundDestination = createTemporaryDestination(getSession());
120 }
121 receiver = getSession().createConsumer(inboundDestination);
122 }
123 return receiver;
124 }
125
126 public Destination getInboundDestination() throws JMSException {
127 return inboundDestination;
128 }
129
130 }