1 /***
2 *
3 * Copyright 2005 LogicBlaze, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.logicblaze.lingo.jms;
20
21 import org.aopalliance.intercept.MethodInterceptor;
22 import org.aopalliance.intercept.MethodInvocation;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.logicblaze.lingo.LingoInvocation;
26 import org.logicblaze.lingo.LingoRemoteInvocationFactory;
27 import org.logicblaze.lingo.MetadataStrategy;
28 import org.logicblaze.lingo.MetadataStrategyHelper;
29 import org.logicblaze.lingo.MethodMetadata;
30 import org.logicblaze.lingo.ResultJoinStrategy;
31 import org.logicblaze.lingo.jms.impl.AsyncReplyHandler;
32 import org.logicblaze.lingo.jms.impl.MultiplexingRequestor;
33 import org.logicblaze.lingo.jms.impl.ResultJoinHandler;
34 import org.logicblaze.lingo.jms.marshall.DefaultMarshaller;
35 import org.logicblaze.lingo.jms.marshall.Marshaller;
36 import org.springframework.aop.support.AopUtils;
37 import org.springframework.beans.factory.DisposableBean;
38 import org.springframework.beans.factory.InitializingBean;
39 import org.springframework.remoting.RemoteAccessException;
40 import org.springframework.remoting.support.RemoteInvocationBasedAccessor;
41 import org.springframework.remoting.support.RemoteInvocationFactory;
42 import org.springframework.remoting.support.RemoteInvocationResult;
43
44 import javax.jms.ConnectionFactory;
45 import javax.jms.DeliveryMode;
46 import javax.jms.Destination;
47 import javax.jms.JMSException;
48 import javax.jms.Message;
49 import javax.jms.Topic;
50
51 import java.util.Iterator;
52 import java.util.Map;
53 import java.util.WeakHashMap;
54
55 /***
56 * Interceptor for accessing a JMS based service which must be configured with a
57 * {@link org.logicblaze.lingo.LingoRemoteInvocationFactory} instance.
58 *
59 * @author James Strachan
60 * @see #setServiceInterface
61 * @see #setServiceUrl
62 * @see JmsServiceExporter
63 * @see JmsProxyFactoryBean
64 */
65 public class JmsClientInterceptor extends RemoteInvocationBasedAccessor implements MethodInterceptor, InitializingBean, DisposableBean {
66 private static final Log log = LogFactory.getLog(JmsClientInterceptor.class);
67
68 private Map remoteObjects = new WeakHashMap();
69 private Requestor requestor;
70 private Destination destination;
71 private Destination responseDestination;
72 private String correlationID;
73 private Marshaller marshaller;
74 private ConnectionFactory connectionFactory;
75 private String jmsType;
76 private Map messageProperties;
77 private int jmsExpiration = -1;
78 private JmsProducerConfig producerConfig = new JmsProducerConfig();
79 private MetadataStrategy metadataStrategy;
80 private boolean multipleResponsesExpected;
81 private long multipleResponseTimeout = 5000L;
82 private long remoteReferenceTimeout = 60000L;
83
84 public JmsClientInterceptor() {
85 setRemoteInvocationFactory(createRemoteInvocationFactory());
86 }
87
88 public JmsClientInterceptor(Requestor requestor) {
89 this.requestor = requestor;
90 setRemoteInvocationFactory(createRemoteInvocationFactory());
91 }
92
93 public JmsClientInterceptor(Requestor requestor, LingoRemoteInvocationFactory factory) {
94 this.requestor = requestor;
95 setRemoteInvocationFactory(factory);
96 }
97
98 public void afterPropertiesSet() throws JMSException {
99 RemoteInvocationFactory factory = getRemoteInvocationFactory();
100 if (!(factory instanceof LingoRemoteInvocationFactory)) {
101 throw new IllegalArgumentException("remoteInvocationFactory must be an instance of LingoRemoteInvocationFactory but was: " + factory);
102
103 }
104 else {
105 LingoRemoteInvocationFactory invocationFactory = (LingoRemoteInvocationFactory) factory;
106 invocationFactory.setMetadataStrategy(getMetadataStrategy());
107 }
108 if (requestor == null) {
109 if (connectionFactory == null) {
110 throw new IllegalArgumentException("requestor or connectionFactory is required");
111 }
112 else {
113 requestor = createRequestor();
114 }
115 }
116 if (marshaller == null) {
117
118 marshaller = new DefaultMarshaller();
119 }
120 }
121
122 public Object invoke(MethodInvocation methodInvocation) throws Throwable {
123 if (AopUtils.isToStringMethod(methodInvocation.getMethod())) {
124 return "JMS invoker proxy for service URL [" + getServiceUrl() + "]";
125 }
126 LingoInvocation invocation = (LingoInvocation) createRemoteInvocation(methodInvocation);
127 MethodMetadata metadata = invocation.getMetadata();
128 replaceRemoteReferences(invocation, metadata);
129 try {
130 Message requestMessage = marshaller.createRequestMessage(requestor, invocation);
131 populateHeaders(requestMessage);
132 if (metadata.isOneWay()) {
133 requestor.send(destination, requestMessage);
134 return null;
135 }
136 else if (!isMultipleResponse(methodInvocation, metadata)) {
137 Message response = requestor.request(destination, requestMessage);
138 RemoteInvocationResult result = marshaller.extractInvocationResult(response);
139 return recreateRemoteInvocationResult(result);
140 }
141 else {
142 ResultJoinHandler handler = createResultJoinHandler(methodInvocation, metadata);
143 requestor.request(destination, requestMessage, handler, getMultipleResponseTimeout());
144 RemoteInvocationResult result = handler.waitForResult();
145 return recreateRemoteInvocationResult(result);
146 }
147 }
148 catch (JMSException e) {
149 log.warn("Remote access error: " + methodInvocation, e);
150 throw new RemoteAccessException("Cannot access JMS invoker remote service at [" + getServiceUrl() + "]", e);
151 }
152 }
153
154 public void destroy() throws Exception {
155 requestor.close();
156 }
157
158
159
160 public Requestor getRequestor() {
161 return requestor;
162 }
163
164 public void setRequestor(Requestor requestor) {
165 this.requestor = requestor;
166 }
167
168 public Destination getDestination() {
169 return destination;
170 }
171
172 /***
173 * Sets the destination used to make requests
174 *
175 * @param destination
176 */
177 public void setDestination(Destination destination) {
178 this.destination = destination;
179 }
180
181 public Destination getResponseDestination() {
182 return responseDestination;
183 }
184
185 /***
186 * Sets the destination used to consume responses on - or null and a
187 * temporary queue will be created.
188 *
189 * @param responseDestination
190 */
191 public void setResponseDestination(Destination responseDestination) {
192 this.responseDestination = responseDestination;
193 }
194
195 public void setCorrelationID(String correlationID) {
196 this.correlationID = correlationID;
197 }
198
199 public String getJmsType() {
200 return jmsType;
201 }
202
203 /***
204 * Sets the JMS message type string which is appended to messages if set
205 */
206 public void setJmsType(String jmsType) {
207 this.jmsType = jmsType;
208 }
209
210 public Map getMessageProperties() {
211 return messageProperties;
212 }
213
214 public int getJmsExpiration() {
215 return jmsExpiration;
216 }
217
218 /***
219 * Sets the JMS expiration timeout (in milliseconds) of the request message
220 */
221 public void setJmsExpiration(int jmsExpiration) {
222 this.jmsExpiration = jmsExpiration;
223 }
224
225 public int getJmsPriority() {
226 return producerConfig.getPriority();
227 }
228
229 /***
230 * Sets the JMS priority of the request message
231 */
232 public void setJmsPriority(int jmsPriority) {
233 producerConfig.setPriority(jmsPriority);
234 }
235
236 public int getTimeToLive() {
237 return producerConfig.getTimeToLive();
238 }
239
240 /***
241 * Sets the time to live on each message request
242 */
243 public void setTimeToLive(int timeToLive) {
244 producerConfig.setTimeToLive(timeToLive);
245 }
246
247 /***
248 * Sets the message properties to be added to each message. Note that the
249 * keys should be Strings and the values should be primitive types.
250 */
251 public void setMessageProperties(Map messageProperties) {
252 this.messageProperties = messageProperties;
253 }
254
255 public Marshaller getMarshaller() {
256 return marshaller;
257 }
258
259 public void setMarshaller(Marshaller marshaller) {
260 this.marshaller = marshaller;
261 }
262
263 public ConnectionFactory getConnectionFactory() {
264 return connectionFactory;
265 }
266
267 /***
268 * Used to create a default {@link Requestor} if no requestor is explicitly
269 * configured.
270 */
271 public void setConnectionFactory(ConnectionFactory connectionFactory) {
272 this.connectionFactory = connectionFactory;
273 }
274
275 public JmsProducerConfig getProducerConfig() {
276 return producerConfig;
277 }
278
279 /***
280 * Sets the configuration of the producer used to send back responses
281 */
282 public void setProducerConfig(JmsProducerConfig producerConfig) {
283 this.producerConfig = producerConfig;
284 }
285
286 public boolean isPersistentDelivery() {
287 return producerConfig.getDeliveryMode() == DeliveryMode.PERSISTENT;
288 }
289
290 /***
291 * Sets the delivery mode to be persistent or non-persistent.
292 */
293 public void setPersistentDelivery(boolean persistent) {
294 producerConfig.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
295 }
296
297 public String getClientID() {
298 return producerConfig.getClientID();
299 }
300
301 /***
302 * Sets the JMS connections unique clientID. This is optional unless you
303 * wish to use durable topic subscriptions. Only one connection can have a
304 * given clientID at any time.
305 */
306 public void setClientID(String clientID) {
307 producerConfig.setClientID(clientID);
308 }
309
310 public MetadataStrategy getMetadataStrategy() {
311 if (metadataStrategy == null) {
312 metadataStrategy = createMetadataStrategy();
313 }
314 return metadataStrategy;
315 }
316
317 public void setMetadataStrategy(MetadataStrategy metadataStrategy) {
318 this.metadataStrategy = metadataStrategy;
319 }
320
321 public boolean isMultipleResponsesExpected() {
322 return multipleResponsesExpected;
323 }
324
325 /***
326 * Sets whether or not multiple response messages are expected. Typically
327 * multiple responses are only expected when the {@link #getDestination()}
328 * method returns a {@link Topic} but there could be circumstances when
329 * sending a request to a queue results in messages being fanned out to many
330 * servers which could all respond.
331 */
332 public void setMultipleResponsesExpected(boolean multipleResponsesExpected) {
333 this.multipleResponsesExpected = multipleResponsesExpected;
334 }
335
336 public long getRemoteReferenceTimeout() {
337 return remoteReferenceTimeout;
338 }
339
340 /***
341 * Sets the maximum amount of time an inactive remote object reference will
342 * keep around until it is garbage collected.
343 */
344 public void setRemoteReferenceTimeout(long remoteReferenceTimeout) {
345 this.remoteReferenceTimeout = remoteReferenceTimeout;
346 }
347
348 public long getMultipleResponseTimeout() {
349 return multipleResponseTimeout;
350 }
351
352 /***
353 * Sets the maximum amount of time to wait for multiple results to come back
354 * if communicating with multiple servers and aggregating together the
355 * results.
356 */
357 public void setMultipleResponseTimeout(long multipleResponseTimeout) {
358 this.multipleResponseTimeout = multipleResponseTimeout;
359 }
360
361
362
363
364 /***
365 * Returns true if this method expects multiple response messages such as
366 * when sending a message over a topic.
367 */
368 protected boolean isMultipleResponse(MethodInvocation methodInvocation, MethodMetadata metadata) {
369 return (getDestination() instanceof Topic) || isMultipleResponsesExpected();
370 }
371
372 protected void populateHeaders(Message requestMessage) throws JMSException {
373 if (correlationID != null) {
374 requestMessage.setJMSCorrelationID(correlationID);
375 }
376 if (jmsType != null) {
377 requestMessage.setJMSType(jmsType);
378 }
379 if (jmsExpiration >= 0) {
380 requestMessage.setJMSExpiration(jmsExpiration);
381 }
382 int jmsPriority = getJmsPriority();
383 if (jmsPriority >= 0) {
384 requestMessage.setJMSPriority(jmsPriority);
385 }
386 if (messageProperties != null) {
387 for (Iterator iter = messageProperties.entrySet().iterator(); iter.hasNext();) {
388 Map.Entry entry = (Map.Entry) iter.next();
389 String name = entry.getKey().toString();
390 Object value = entry.getValue();
391 requestMessage.setObjectProperty(name, value);
392 }
393 }
394 }
395
396 /***
397 * Recreate the invocation result contained in the given
398 * RemoteInvocationResult object. The default implementation calls the
399 * default recreate method.
400 * <p>
401 * Can be overridden in subclass to provide custom recreation, potentially
402 * processing the returned result object.
403 *
404 * @param result
405 * the RemoteInvocationResult to recreate
406 * @return a return value if the invocation result is a successful return
407 * @throws Throwable
408 * if the invocation result is an exception
409 * @see org.springframework.remoting.support.RemoteInvocationResult#recreate
410 */
411 protected Object recreateRemoteInvocationResult(RemoteInvocationResult result) throws Throwable {
412 return result.recreate();
413 }
414
415 protected void replaceRemoteReferences(LingoInvocation invocation, MethodMetadata metadata) {
416 Object[] arguments = invocation.getArguments();
417 Class[] parameterTypes = invocation.getParameterTypes();
418 for (int i = 0; i < parameterTypes.length; i++) {
419 if (metadata.isRemoteParameter(i)) {
420 arguments[i] = remoteReference(parameterTypes[i], arguments[i]);
421 }
422 }
423 }
424
425 protected Object remoteReference(Class type, Object value) {
426 if (value == null) {
427 return null;
428 }
429 String correlationID = (String) remoteObjects.get(value);
430 if (correlationID == null) {
431 correlationID = requestor.createCorrelationID();
432 remoteObjects.put(value, correlationID);
433 }
434 if (requestor instanceof MultiplexingRequestor) {
435 MultiplexingRequestor multiplexingRequestor = (MultiplexingRequestor) requestor;
436 multiplexingRequestor.registerHandler(correlationID, createAsyncHandler(value), getRemoteReferenceTimeout());
437 }
438 else {
439 throw new IllegalArgumentException("You can only pass remote references with a MultiplexingRequestor");
440 }
441 return correlationID;
442 }
443
444 protected ResultJoinHandler createResultJoinHandler(MethodInvocation methodInvocation, MethodMetadata metadata) {
445 ResultJoinStrategy joinStrategy = getMetadataStrategy().getResultJoinStrategy(methodInvocation, metadata);
446 return new ResultJoinHandler(marshaller, joinStrategy);
447 }
448
449 protected AsyncReplyHandler createAsyncHandler(Object value) {
450 AsyncReplyHandler replyHandler = new AsyncReplyHandler(value, marshaller, getMetadataStrategy());
451 replyHandler.setConnectionFactory(connectionFactory);
452 replyHandler.setMarshaller(marshaller);
453 replyHandler.setProducerConfig(producerConfig);
454 replyHandler.setResponseRequestor(getRequestor());
455 replyHandler.setInvocationFactory(getRemoteInvocationFactory());
456 try {
457 replyHandler.afterPropertiesSet();
458 }
459 catch (Exception e) {
460 throw new RuntimeException("Could not create the AsyncHandler: " + e, e);
461 }
462 return replyHandler;
463 }
464
465 protected Requestor createRequestor() throws JMSException {
466 return MultiplexingRequestor.newInstance(connectionFactory, getProducerConfig(), destination, responseDestination);
467 }
468
469 /***
470 * Factory method to create a default lingo based invocation factory if none
471 * is configured
472 */
473 protected LingoRemoteInvocationFactory createRemoteInvocationFactory() {
474 return new LingoRemoteInvocationFactory(getMetadataStrategy());
475 }
476
477 /***
478 * Factory method to create a default metadata strategy if none is
479 * configured
480 *
481 * @return
482 */
483 protected MetadataStrategy createMetadataStrategy() {
484 return MetadataStrategyHelper.newInstance();
485 }
486
487 }