View Javadoc

1   /***
2    *
3    * Copyright 2005 LogicBlaze, Inc.
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.logicblaze.lingo.jms;
20  
21  import org.aopalliance.intercept.MethodInterceptor;
22  import org.aopalliance.intercept.MethodInvocation;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.logicblaze.lingo.LingoInvocation;
26  import org.logicblaze.lingo.LingoRemoteInvocationFactory;
27  import org.logicblaze.lingo.MetadataStrategy;
28  import org.logicblaze.lingo.MetadataStrategyHelper;
29  import org.logicblaze.lingo.MethodMetadata;
30  import org.logicblaze.lingo.ResultJoinStrategy;
31  import org.logicblaze.lingo.jms.impl.AsyncReplyHandler;
32  import org.logicblaze.lingo.jms.impl.MultiplexingRequestor;
33  import org.logicblaze.lingo.jms.impl.ResultJoinHandler;
34  import org.logicblaze.lingo.jms.marshall.DefaultMarshaller;
35  import org.logicblaze.lingo.jms.marshall.Marshaller;
36  import org.springframework.aop.support.AopUtils;
37  import org.springframework.beans.factory.DisposableBean;
38  import org.springframework.beans.factory.InitializingBean;
39  import org.springframework.remoting.RemoteAccessException;
40  import org.springframework.remoting.support.RemoteInvocationBasedAccessor;
41  import org.springframework.remoting.support.RemoteInvocationFactory;
42  import org.springframework.remoting.support.RemoteInvocationResult;
43  
44  import javax.jms.ConnectionFactory;
45  import javax.jms.DeliveryMode;
46  import javax.jms.Destination;
47  import javax.jms.JMSException;
48  import javax.jms.Message;
49  import javax.jms.Topic;
50  
51  import java.util.Iterator;
52  import java.util.Map;
53  import java.util.WeakHashMap;
54  
55  /***
56   * Interceptor for accessing a JMS based service which must be configured with a
57   * {@link org.logicblaze.lingo.LingoRemoteInvocationFactory} instance.
58   * 
59   * @author James Strachan
60   * @see #setServiceInterface
61   * @see #setServiceUrl
62   * @see JmsServiceExporter
63   * @see JmsProxyFactoryBean
64   */
65  public class JmsClientInterceptor extends RemoteInvocationBasedAccessor implements MethodInterceptor, InitializingBean, DisposableBean {
66      private static final Log log = LogFactory.getLog(JmsClientInterceptor.class);
67  
68      private Map remoteObjects = new WeakHashMap();
69      private Requestor requestor;
70      private Destination destination;
71      private Destination responseDestination;
72      private String correlationID;
73      private Marshaller marshaller;
74      private ConnectionFactory connectionFactory;
75      private String jmsType;
76      private Map messageProperties;
77      private int jmsExpiration = -1;
78      private JmsProducerConfig producerConfig = new JmsProducerConfig();
79      private MetadataStrategy metadataStrategy;
80      private boolean multipleResponsesExpected;
81      private long multipleResponseTimeout = 5000L;
82      private long remoteReferenceTimeout = 60000L;
83  
84      public JmsClientInterceptor() {
85          setRemoteInvocationFactory(createRemoteInvocationFactory());
86      }
87  
88      public JmsClientInterceptor(Requestor requestor) {
89          this.requestor = requestor;
90          setRemoteInvocationFactory(createRemoteInvocationFactory());
91      }
92  
93      public JmsClientInterceptor(Requestor requestor, LingoRemoteInvocationFactory factory) {
94          this.requestor = requestor;
95          setRemoteInvocationFactory(factory);
96      }
97  
98      public void afterPropertiesSet() throws JMSException {
99          RemoteInvocationFactory factory = getRemoteInvocationFactory();
100         if (!(factory instanceof LingoRemoteInvocationFactory)) {
101             throw new IllegalArgumentException("remoteInvocationFactory must be an instance of LingoRemoteInvocationFactory but was: " + factory);
102 
103         }
104         else {
105             LingoRemoteInvocationFactory invocationFactory = (LingoRemoteInvocationFactory) factory;
106             invocationFactory.setMetadataStrategy(getMetadataStrategy());
107         }
108         if (requestor == null) {
109             if (connectionFactory == null) {
110                 throw new IllegalArgumentException("requestor or connectionFactory is required");
111             }
112             else {
113                 requestor = createRequestor();
114             }
115         }
116         if (marshaller == null) {
117             // default to standard JMS marshaling
118             marshaller = new DefaultMarshaller();
119         }
120     }
121 
122     public Object invoke(MethodInvocation methodInvocation) throws Throwable {
123         if (AopUtils.isToStringMethod(methodInvocation.getMethod())) {
124             return "JMS invoker proxy for service URL [" + getServiceUrl() + "]";
125         }
126         LingoInvocation invocation = (LingoInvocation) createRemoteInvocation(methodInvocation);
127         MethodMetadata metadata = invocation.getMetadata();
128         replaceRemoteReferences(invocation, metadata);
129         try {
130             Message requestMessage = marshaller.createRequestMessage(requestor, invocation);
131             populateHeaders(requestMessage);
132             if (metadata.isOneWay()) {
133                 requestor.send(destination, requestMessage);
134                 return null;
135             }
136             else if (!isMultipleResponse(methodInvocation, metadata)) {
137                 Message response = requestor.request(destination, requestMessage);
138                 RemoteInvocationResult result = marshaller.extractInvocationResult(response);
139                 return recreateRemoteInvocationResult(result);
140             }
141             else {
142                 ResultJoinHandler handler = createResultJoinHandler(methodInvocation, metadata);
143                 requestor.request(destination, requestMessage, handler, getMultipleResponseTimeout());
144                 RemoteInvocationResult result = handler.waitForResult();
145                 return recreateRemoteInvocationResult(result);
146             }
147         }
148         catch (JMSException e) {
149             log.warn("Remote access error: " + methodInvocation, e);
150             throw new RemoteAccessException("Cannot access JMS invoker remote service at [" + getServiceUrl() + "]", e);
151         }
152     }
153 
154     public void destroy() throws Exception {
155         requestor.close();
156     }
157 
158     // Properties
159     // -------------------------------------------------------------------------
160     public Requestor getRequestor() {
161         return requestor;
162     }
163 
164     public void setRequestor(Requestor requestor) {
165         this.requestor = requestor;
166     }
167 
168     public Destination getDestination() {
169         return destination;
170     }
171 
172     /***
173      * Sets the destination used to make requests
174      * 
175      * @param destination
176      */
177     public void setDestination(Destination destination) {
178         this.destination = destination;
179     }
180 
181     public Destination getResponseDestination() {
182         return responseDestination;
183     }
184 
185     /***
186      * Sets the destination used to consume responses on - or null and a
187      * temporary queue will be created.
188      * 
189      * @param responseDestination
190      */
191     public void setResponseDestination(Destination responseDestination) {
192         this.responseDestination = responseDestination;
193     }
194 
195     public void setCorrelationID(String correlationID) {
196         this.correlationID = correlationID;
197     }
198 
199     public String getJmsType() {
200         return jmsType;
201     }
202 
203     /***
204      * Sets the JMS message type string which is appended to messages if set
205      */
206     public void setJmsType(String jmsType) {
207         this.jmsType = jmsType;
208     }
209 
210     public Map getMessageProperties() {
211         return messageProperties;
212     }
213 
214     public int getJmsExpiration() {
215         return jmsExpiration;
216     }
217 
218     /***
219      * Sets the JMS expiration timeout (in milliseconds) of the request message
220      */
221     public void setJmsExpiration(int jmsExpiration) {
222         this.jmsExpiration = jmsExpiration;
223     }
224 
225     public int getJmsPriority() {
226         return producerConfig.getPriority();
227     }
228 
229     /***
230      * Sets the JMS priority of the request message
231      */
232     public void setJmsPriority(int jmsPriority) {
233         producerConfig.setPriority(jmsPriority);
234     }
235 
236     public int getTimeToLive() {
237         return producerConfig.getTimeToLive();
238     }
239 
240     /***
241      * Sets the time to live on each message request
242      */
243     public void setTimeToLive(int timeToLive) {
244         producerConfig.setTimeToLive(timeToLive);
245     }
246 
247     /***
248      * Sets the message properties to be added to each message. Note that the
249      * keys should be Strings and the values should be primitive types.
250      */
251     public void setMessageProperties(Map messageProperties) {
252         this.messageProperties = messageProperties;
253     }
254 
255     public Marshaller getMarshaller() {
256         return marshaller;
257     }
258 
259     public void setMarshaller(Marshaller marshaller) {
260         this.marshaller = marshaller;
261     }
262 
263     public ConnectionFactory getConnectionFactory() {
264         return connectionFactory;
265     }
266 
267     /***
268      * Used to create a default {@link Requestor} if no requestor is explicitly
269      * configured.
270      */
271     public void setConnectionFactory(ConnectionFactory connectionFactory) {
272         this.connectionFactory = connectionFactory;
273     }
274 
275     public JmsProducerConfig getProducerConfig() {
276         return producerConfig;
277     }
278 
279     /***
280      * Sets the configuration of the producer used to send back responses
281      */
282     public void setProducerConfig(JmsProducerConfig producerConfig) {
283         this.producerConfig = producerConfig;
284     }
285 
286     public boolean isPersistentDelivery() {
287         return producerConfig.getDeliveryMode() == DeliveryMode.PERSISTENT;
288     }
289 
290     /***
291      * Sets the delivery mode to be persistent or non-persistent.
292      */
293     public void setPersistentDelivery(boolean persistent) {
294         producerConfig.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
295     }
296 
297     public String getClientID() {
298         return producerConfig.getClientID();
299     }
300 
301     /***
302      * Sets the JMS connections unique clientID. This is optional unless you
303      * wish to use durable topic subscriptions. Only one connection can have a
304      * given clientID at any time.
305      */
306     public void setClientID(String clientID) {
307         producerConfig.setClientID(clientID);
308     }
309 
310     public MetadataStrategy getMetadataStrategy() {
311         if (metadataStrategy == null) {
312             metadataStrategy = createMetadataStrategy();
313         }
314         return metadataStrategy;
315     }
316 
317     public void setMetadataStrategy(MetadataStrategy metadataStrategy) {
318         this.metadataStrategy = metadataStrategy;
319     }
320 
321     public boolean isMultipleResponsesExpected() {
322         return multipleResponsesExpected;
323     }
324 
325     /***
326      * Sets whether or not multiple response messages are expected. Typically
327      * multiple responses are only expected when the {@link #getDestination()}
328      * method returns a {@link Topic} but there could be circumstances when
329      * sending a request to a queue results in messages being fanned out to many
330      * servers which could all respond.
331      */
332     public void setMultipleResponsesExpected(boolean multipleResponsesExpected) {
333         this.multipleResponsesExpected = multipleResponsesExpected;
334     }
335 
336     public long getRemoteReferenceTimeout() {
337         return remoteReferenceTimeout;
338     }
339 
340     /***
341      * Sets the maximum amount of time an inactive remote object reference will
342      * keep around until it is garbage collected.
343      */
344     public void setRemoteReferenceTimeout(long remoteReferenceTimeout) {
345         this.remoteReferenceTimeout = remoteReferenceTimeout;
346     }
347 
348     public long getMultipleResponseTimeout() {
349         return multipleResponseTimeout;
350     }
351 
352     /***
353      * Sets the maximum amount of time to wait for multiple results to come back
354      * if communicating with multiple servers and aggregating together the
355      * results.
356      */
357     public void setMultipleResponseTimeout(long multipleResponseTimeout) {
358         this.multipleResponseTimeout = multipleResponseTimeout;
359     }
360 
361     // Implementation methods
362     // -------------------------------------------------------------------------
363 
364     /***
365      * Returns true if this method expects multiple response messages such as
366      * when sending a message over a topic.
367      */
368     protected boolean isMultipleResponse(MethodInvocation methodInvocation, MethodMetadata metadata) {
369         return (getDestination() instanceof Topic) || isMultipleResponsesExpected();
370     }
371 
372     protected void populateHeaders(Message requestMessage) throws JMSException {
373         if (correlationID != null) {
374             requestMessage.setJMSCorrelationID(correlationID);
375         }
376         if (jmsType != null) {
377             requestMessage.setJMSType(jmsType);
378         }
379         if (jmsExpiration >= 0) {
380             requestMessage.setJMSExpiration(jmsExpiration);
381         }
382         int jmsPriority = getJmsPriority();
383         if (jmsPriority >= 0) {
384             requestMessage.setJMSPriority(jmsPriority);
385         }
386         if (messageProperties != null) {
387             for (Iterator iter = messageProperties.entrySet().iterator(); iter.hasNext();) {
388                 Map.Entry entry = (Map.Entry) iter.next();
389                 String name = entry.getKey().toString();
390                 Object value = entry.getValue();
391                 requestMessage.setObjectProperty(name, value);
392             }
393         }
394     }
395 
396     /***
397      * Recreate the invocation result contained in the given
398      * RemoteInvocationResult object. The default implementation calls the
399      * default recreate method.
400      * <p>
401      * Can be overridden in subclass to provide custom recreation, potentially
402      * processing the returned result object.
403      * 
404      * @param result
405      *            the RemoteInvocationResult to recreate
406      * @return a return value if the invocation result is a successful return
407      * @throws Throwable
408      *             if the invocation result is an exception
409      * @see org.springframework.remoting.support.RemoteInvocationResult#recreate
410      */
411     protected Object recreateRemoteInvocationResult(RemoteInvocationResult result) throws Throwable {
412         return result.recreate();
413     }
414 
415     protected void replaceRemoteReferences(LingoInvocation invocation, MethodMetadata metadata) {
416         Object[] arguments = invocation.getArguments();
417         Class[] parameterTypes = invocation.getParameterTypes();
418         for (int i = 0; i < parameterTypes.length; i++) {
419             if (metadata.isRemoteParameter(i)) {
420                 arguments[i] = remoteReference(parameterTypes[i], arguments[i]);
421             }
422         }
423     }
424 
425     protected Object remoteReference(Class type, Object value) {
426         if (value == null) {
427             return null;
428         }
429         String correlationID = (String) remoteObjects.get(value);
430         if (correlationID == null) {
431             correlationID = requestor.createCorrelationID();
432             remoteObjects.put(value, correlationID);
433         }
434         if (requestor instanceof MultiplexingRequestor) {
435             MultiplexingRequestor multiplexingRequestor = (MultiplexingRequestor) requestor;
436             multiplexingRequestor.registerHandler(correlationID, createAsyncHandler(value), getRemoteReferenceTimeout());
437         }
438         else {
439             throw new IllegalArgumentException("You can only pass remote references with a MultiplexingRequestor");
440         }
441         return correlationID;
442     }
443 
444     protected ResultJoinHandler createResultJoinHandler(MethodInvocation methodInvocation, MethodMetadata metadata) {
445         ResultJoinStrategy joinStrategy = getMetadataStrategy().getResultJoinStrategy(methodInvocation, metadata);
446         return new ResultJoinHandler(marshaller, joinStrategy);
447     }
448 
449     protected AsyncReplyHandler createAsyncHandler(Object value) {
450         AsyncReplyHandler replyHandler = new AsyncReplyHandler(value, marshaller, getMetadataStrategy());
451         replyHandler.setConnectionFactory(connectionFactory);
452         replyHandler.setMarshaller(marshaller);
453         replyHandler.setProducerConfig(producerConfig);
454         replyHandler.setResponseRequestor(getRequestor());
455         replyHandler.setInvocationFactory(getRemoteInvocationFactory());
456         try {
457             replyHandler.afterPropertiesSet();
458         }
459         catch (Exception e) {
460             throw new RuntimeException("Could not create the AsyncHandler: " + e, e);
461         }
462         return replyHandler;
463     }
464 
465     protected Requestor createRequestor() throws JMSException {
466         return MultiplexingRequestor.newInstance(connectionFactory, getProducerConfig(), destination, responseDestination);
467     }
468 
469     /***
470      * Factory method to create a default lingo based invocation factory if none
471      * is configured
472      */
473     protected LingoRemoteInvocationFactory createRemoteInvocationFactory() {
474         return new LingoRemoteInvocationFactory(getMetadataStrategy());
475     }
476 
477     /***
478      * Factory method to create a default metadata strategy if none is
479      * configured
480      * 
481      * @return
482      */
483     protected MetadataStrategy createMetadataStrategy() {
484         return MetadataStrategyHelper.newInstance();
485     }
486 
487 }