1 /***
2 *
3 * Copyright 2005 LogicBlaze, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.logicblaze.lingo.jms;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.logicblaze.lingo.LingoInvocation;
23 import org.logicblaze.lingo.LingoRemoteInvocationFactory;
24 import org.logicblaze.lingo.MetadataStrategy;
25 import org.logicblaze.lingo.MetadataStrategyHelper;
26 import org.logicblaze.lingo.MethodMetadata;
27 import org.logicblaze.lingo.jms.impl.DefaultJmsProducer;
28 import org.logicblaze.lingo.jms.impl.MultiplexingRequestor;
29 import org.logicblaze.lingo.jms.marshall.DefaultMarshaller;
30 import org.logicblaze.lingo.jms.marshall.Marshaller;
31 import org.springframework.beans.factory.DisposableBean;
32 import org.springframework.beans.factory.InitializingBean;
33 import org.springframework.remoting.support.RemoteInvocation;
34 import org.springframework.remoting.support.RemoteInvocationBasedExporter;
35 import org.springframework.remoting.support.RemoteInvocationFactory;
36 import org.springframework.remoting.support.RemoteInvocationResult;
37
38 import javax.jms.Connection;
39 import javax.jms.ConnectionFactory;
40 import javax.jms.DeliveryMode;
41 import javax.jms.JMSException;
42 import javax.jms.Message;
43 import javax.jms.MessageListener;
44 import javax.jms.Session;
45
46 /***
47 * A JMS MessageListener that exports the specified service bean as a JMS
48 * service endpoint, accessible via a JMS proxy. <p/>
49 * <p>
50 * Note: JMS services exported with this class can be accessed by any JMS
51 * client, as there isn't any special handling involved.
52 *
53 * @author James Strachan
54 * @see JmsProxyFactoryBean
55 * @version $Revision: 1.9 $
56 */
57 public class JmsServiceExporterMessageListener extends RemoteInvocationBasedExporter implements MessageListener, InitializingBean, DisposableBean {
58 private static final Log log = LogFactory.getLog(JmsServiceExporterMessageListener.class);
59
60 private Object proxy;
61 private ConnectionFactory connectionFactory;
62 private Requestor responseRequestor;
63 private JmsProducerConfig producerConfig = new JmsProducerConfig();
64 private boolean ignoreFailures;
65 private Marshaller marshaller;
66 private MetadataStrategy metadataStrategy;
67 private RemoteInvocationFactory invocationFactory;
68
69 public JmsServiceExporterMessageListener() {
70 }
71
72 public JmsServiceExporterMessageListener(Object proxy) {
73 this.proxy = proxy;
74 }
75
76 public void afterPropertiesSet() throws Exception {
77 if (proxy == null) {
78 this.proxy = getProxyForService();
79 if (proxy == null) {
80 throw new IllegalArgumentException("proxy is required");
81 }
82 }
83 if (responseRequestor == null) {
84 responseRequestor = MultiplexingRequestor.newInstance(connectionFactory, producerConfig, null);
85 }
86 if (marshaller == null) {
87 marshaller = new DefaultMarshaller();
88 }
89 if (metadataStrategy == null) {
90 metadataStrategy = MetadataStrategyHelper.newInstance();
91 }
92 if (invocationFactory == null) {
93 invocationFactory = new LingoRemoteInvocationFactory(metadataStrategy);
94 }
95 }
96
97 public void onMessage(Message message) {
98 try {
99 RemoteInvocation invocation = marshaller.readRemoteInvocation(message);
100 doInvoke(message, invocation);
101 }
102 catch (JMSException e) {
103 onException(message, e);
104 }
105 }
106
107 public void destroy() throws Exception {
108 if (responseRequestor != null) {
109 responseRequestor.close();
110 }
111 }
112
113
114
115 public ConnectionFactory getConnectionFactory() {
116 return connectionFactory;
117 }
118
119 /***
120 * Used to create a default {@link JmsProducer} if no producer is explicitly
121 * configured.
122 */
123 public void setConnectionFactory(ConnectionFactory connectionFactory) {
124 this.connectionFactory = connectionFactory;
125 }
126
127 public Requestor getResponseRequestor() {
128 return responseRequestor;
129 }
130
131 public void setResponseRequestor(Requestor responseRequestor) {
132 this.responseRequestor = responseRequestor;
133 }
134
135 public Marshaller getMarshaller() {
136 return marshaller;
137 }
138
139 public void setMarshaller(Marshaller marshaller) {
140 this.marshaller = marshaller;
141 }
142
143 public RemoteInvocationFactory getInvocationFactory() {
144 return invocationFactory;
145 }
146
147 public void setInvocationFactory(RemoteInvocationFactory invocationFactory) {
148 this.invocationFactory = invocationFactory;
149 }
150
151 public boolean isIgnoreFailures() {
152 return ignoreFailures;
153 }
154
155 /***
156 * Sets whether or not failures should be ignored (and just logged) or
157 * thrown as runtime exceptions into the JMS provider
158 */
159 public void setIgnoreFailures(boolean ignoreFailures) {
160 this.ignoreFailures = ignoreFailures;
161 }
162
163 public JmsProducerConfig getProducerConfig() {
164 return producerConfig;
165 }
166
167 /***
168 * Sets the configuration of the producer used to send back responses
169 */
170 public void setProducerConfig(JmsProducerConfig producerConfig) {
171 this.producerConfig = producerConfig;
172 }
173
174 public boolean isPersistentDelivery() {
175 return producerConfig.getDeliveryMode() == DeliveryMode.PERSISTENT;
176 }
177
178 /***
179 * Sets the delivery mode to be persistent or non-persistent.
180 */
181 public void setPersistentDelivery(boolean persistent) {
182 producerConfig.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
183 }
184
185 public String getClientID() {
186 return producerConfig.getClientID();
187 }
188
189 /***
190 * Sets the JMS connections unique clientID. This is optional unless you
191 * wish to use durable topic subscriptions. Only one connection can have a
192 * given clientID at any time.
193 */
194 public void setClientID(String clientID) {
195 producerConfig.setClientID(clientID);
196 }
197
198 public Object getProxy() {
199 return proxy;
200 }
201
202 public MetadataStrategy getMetadataStrategy() {
203 return metadataStrategy;
204 }
205
206 public void setMetadataStrategy(MetadataStrategy metadataStrategy) {
207 this.metadataStrategy = metadataStrategy;
208 }
209
210
211
212 protected void doInvoke(Message message, RemoteInvocation invocation) throws JMSException {
213 if (invocation != null) {
214 boolean oneway = false;
215 if (invocation instanceof LingoInvocation) {
216 LingoInvocation lingoInvocation = (LingoInvocation) invocation;
217 oneway = lingoInvocation.getMetadata().isOneWay();
218 introduceRemoteReferences(lingoInvocation, message);
219 }
220 RemoteInvocationResult result = invokeAndCreateResult(invocation, this.proxy);
221 if (!oneway) {
222 writeRemoteInvocationResult(message, result);
223 }
224 }
225 }
226
227 /***
228 * Creates the invocation result response message
229 *
230 * @param session
231 * the JMS session to use
232 * @param message
233 * the original request message, in case we want to attach any
234 * properties etc.
235 * @param result
236 * the invocation result
237 * @return the message response to send
238 * @throws javax.jms.JMSException
239 * if creating the messsage failed
240 */
241 protected Message createResponseMessage(Session session, Message message, RemoteInvocationResult result) throws JMSException {
242
243
244
245 if (result == null) {
246 throw new IllegalArgumentException("result cannot be null");
247 }
248
249 Message answer = getMarshaller().createResponseMessage(session, result, message);
250
251
252 answer.setJMSCorrelationID(message.getJMSCorrelationID());
253 return answer;
254 }
255
256 /***
257 * Lets replace any remote object correlation IDs with dynamic proxies
258 *
259 * @param invocation
260 * @param requestMessage
261 */
262 protected void introduceRemoteReferences(LingoInvocation invocation, Message requestMessage) throws JMSException {
263 MethodMetadata metadata = invocation.getMetadata();
264 Object[] arguments = invocation.getArguments();
265 Class[] parameterTypes = invocation.getParameterTypes();
266 for (int i = 0; i < parameterTypes.length; i++) {
267 if (metadata.isRemoteParameter(i)) {
268 arguments[i] = createRemoteProxy(requestMessage, parameterTypes[i], arguments[i]);
269 }
270 }
271 }
272
273 protected Object createRemoteProxy(Message message, Class parameterType, Object argument) throws JMSException {
274 JmsProxyFactoryBean factory = new JmsProxyFactoryBean();
275 factory.setDestination(message.getJMSReplyTo());
276 String correlationID = (String) argument;
277 if (log.isDebugEnabled()) {
278 log.debug("Creating a server side remote proxy for correlationID: " + correlationID);
279 }
280 factory.setCorrelationID(correlationID);
281 factory.setMarshaller(getMarshaller());
282 factory.setRemoteInvocationFactory(invocationFactory);
283 factory.setServiceInterface(parameterType);
284 factory.setRequestor(responseRequestor);
285 factory.afterPropertiesSet();
286 return factory.getObject();
287 }
288
289 /***
290 * Handle the processing of an exception when processing an inbound messsage
291 */
292 protected void onException(Message message, JMSException e) {
293 String text = "Failed to process inbound message due to: " + e + ". Message will be discarded: " + message;
294 log.info(text, e);
295 if (!ignoreFailures) {
296 throw new RuntimeException(text, e);
297 }
298 }
299 /***
300 * Send the given RemoteInvocationResult as a JMS message to the originator
301 *
302 * @param message
303 * current HTTP message
304 * @param result
305 * the RemoteInvocationResult object
306 * @throws javax.jms.JMSException
307 * if thrown by trying to send the message
308 */
309 protected void writeRemoteInvocationResult(final Message message, final RemoteInvocationResult result) throws JMSException {
310 Message responseMessage = createResponseMessage(getResponseRequestor().getSession(), message, result);
311 getResponseRequestor().send(message.getJMSReplyTo(), responseMessage);
312 }
313 }