View Javadoc

1   /*** 
2    * 
3    * Copyright 2005 LogicBlaze, Inc.
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.logicblaze.lingo.jms;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.logicblaze.lingo.LingoInvocation;
23  import org.logicblaze.lingo.LingoRemoteInvocationFactory;
24  import org.logicblaze.lingo.MetadataStrategy;
25  import org.logicblaze.lingo.MetadataStrategyHelper;
26  import org.logicblaze.lingo.MethodMetadata;
27  import org.logicblaze.lingo.jms.impl.DefaultJmsProducer;
28  import org.logicblaze.lingo.jms.impl.MultiplexingRequestor;
29  import org.logicblaze.lingo.jms.marshall.DefaultMarshaller;
30  import org.logicblaze.lingo.jms.marshall.Marshaller;
31  import org.springframework.beans.factory.DisposableBean;
32  import org.springframework.beans.factory.InitializingBean;
33  import org.springframework.remoting.support.RemoteInvocation;
34  import org.springframework.remoting.support.RemoteInvocationBasedExporter;
35  import org.springframework.remoting.support.RemoteInvocationFactory;
36  import org.springframework.remoting.support.RemoteInvocationResult;
37  
38  import javax.jms.Connection;
39  import javax.jms.ConnectionFactory;
40  import javax.jms.DeliveryMode;
41  import javax.jms.JMSException;
42  import javax.jms.Message;
43  import javax.jms.MessageListener;
44  import javax.jms.Session;
45  
46  /***
47   * A JMS MessageListener that exports the specified service bean as a JMS
48   * service endpoint, accessible via a JMS proxy. <p/>
49   * <p>
50   * Note: JMS services exported with this class can be accessed by any JMS
51   * client, as there isn't any special handling involved.
52   * 
53   * @author James Strachan
54   * @see JmsProxyFactoryBean
55   * @version $Revision: 1.9 $
56   */
57  public class JmsServiceExporterMessageListener extends RemoteInvocationBasedExporter implements MessageListener, InitializingBean, DisposableBean {
58      private static final Log log = LogFactory.getLog(JmsServiceExporterMessageListener.class);
59  
60      private Object proxy;
61      private ConnectionFactory connectionFactory;
62      private Requestor responseRequestor;
63      private JmsProducerConfig producerConfig = new JmsProducerConfig();
64      private boolean ignoreFailures;
65      private Marshaller marshaller;
66      private MetadataStrategy metadataStrategy;
67      private RemoteInvocationFactory invocationFactory;
68  
69      public JmsServiceExporterMessageListener() {
70      }
71  
72      public JmsServiceExporterMessageListener(Object proxy) {
73          this.proxy = proxy;
74      }
75  
76      public void afterPropertiesSet() throws Exception {
77          if (proxy == null) {
78              this.proxy = getProxyForService();
79              if (proxy == null) {
80                  throw new IllegalArgumentException("proxy is required");
81              }
82          }
83          if (responseRequestor == null) {
84              responseRequestor = MultiplexingRequestor.newInstance(connectionFactory, producerConfig, null);
85          }
86          if (marshaller == null) {
87              marshaller = new DefaultMarshaller();
88          }
89          if (metadataStrategy == null) {
90              metadataStrategy = MetadataStrategyHelper.newInstance();
91          }
92          if (invocationFactory == null) {
93              invocationFactory = new LingoRemoteInvocationFactory(metadataStrategy);
94          }
95      }
96  
97      public void onMessage(Message message) {
98          try {
99              RemoteInvocation invocation = marshaller.readRemoteInvocation(message);
100             doInvoke(message, invocation);
101         }
102         catch (JMSException e) {
103             onException(message, e);
104         }
105     }
106 
107     public void destroy() throws Exception {
108         if (responseRequestor != null) {
109             responseRequestor.close();
110         }
111     }
112 
113     // Properties
114     // -------------------------------------------------------------------------
115     public ConnectionFactory getConnectionFactory() {
116         return connectionFactory;
117     }
118 
119     /***
120      * Used to create a default {@link JmsProducer} if no producer is explicitly
121      * configured.
122      */
123     public void setConnectionFactory(ConnectionFactory connectionFactory) {
124         this.connectionFactory = connectionFactory;
125     }
126 
127     public Requestor getResponseRequestor() {
128         return responseRequestor;
129     }
130 
131     public void setResponseRequestor(Requestor responseRequestor) {
132         this.responseRequestor = responseRequestor;
133     }
134 
135     public Marshaller getMarshaller() {
136         return marshaller;
137     }
138 
139     public void setMarshaller(Marshaller marshaller) {
140         this.marshaller = marshaller;
141     }
142 
143     public RemoteInvocationFactory getInvocationFactory() {
144         return invocationFactory;
145     }
146 
147     public void setInvocationFactory(RemoteInvocationFactory invocationFactory) {
148         this.invocationFactory = invocationFactory;
149     }
150 
151     public boolean isIgnoreFailures() {
152         return ignoreFailures;
153     }
154 
155     /***
156      * Sets whether or not failures should be ignored (and just logged) or
157      * thrown as runtime exceptions into the JMS provider
158      */
159     public void setIgnoreFailures(boolean ignoreFailures) {
160         this.ignoreFailures = ignoreFailures;
161     }
162 
163     public JmsProducerConfig getProducerConfig() {
164         return producerConfig;
165     }
166 
167     /***
168      * Sets the configuration of the producer used to send back responses
169      */
170     public void setProducerConfig(JmsProducerConfig producerConfig) {
171         this.producerConfig = producerConfig;
172     }
173 
174     public boolean isPersistentDelivery() {
175         return producerConfig.getDeliveryMode() == DeliveryMode.PERSISTENT;
176     }
177 
178     /***
179      * Sets the delivery mode to be persistent or non-persistent.
180      */
181     public void setPersistentDelivery(boolean persistent) {
182         producerConfig.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
183     }
184 
185     public String getClientID() {
186         return producerConfig.getClientID();
187     }
188 
189     /***
190      * Sets the JMS connections unique clientID. This is optional unless you
191      * wish to use durable topic subscriptions. Only one connection can have a
192      * given clientID at any time.
193      */
194     public void setClientID(String clientID) {
195         producerConfig.setClientID(clientID);
196     }
197 
198     public Object getProxy() {
199         return proxy;
200     }
201 
202     public MetadataStrategy getMetadataStrategy() {
203         return metadataStrategy;
204     }
205 
206     public void setMetadataStrategy(MetadataStrategy metadataStrategy) {
207         this.metadataStrategy = metadataStrategy;
208     }
209 
210     // Implementation methods
211     // -------------------------------------------------------------------------
212     protected void doInvoke(Message message, RemoteInvocation invocation) throws JMSException {
213         if (invocation != null) {
214             boolean oneway = false;
215             if (invocation instanceof LingoInvocation) {
216                 LingoInvocation lingoInvocation = (LingoInvocation) invocation;
217                 oneway = lingoInvocation.getMetadata().isOneWay();
218                 introduceRemoteReferences(lingoInvocation, message);
219             }
220             RemoteInvocationResult result = invokeAndCreateResult(invocation, this.proxy);
221             if (!oneway) {
222                 writeRemoteInvocationResult(message, result);
223             }
224         }
225     }
226 
227     /***
228      * Creates the invocation result response message
229      * 
230      * @param session
231      *            the JMS session to use
232      * @param message
233      *            the original request message, in case we want to attach any
234      *            properties etc.
235      * @param result
236      *            the invocation result
237      * @return the message response to send
238      * @throws javax.jms.JMSException
239      *             if creating the messsage failed
240      */
241     protected Message createResponseMessage(Session session, Message message, RemoteInvocationResult result) throws JMSException {
242         // an alternative strategy could be to use XStream and text messages
243         // though some JMS providers, like ActiveMQ, might do this kind of thing
244         // for us under the covers
245         if (result == null) {
246             throw new IllegalArgumentException("result cannot be null");
247         }
248 
249         Message answer = getMarshaller().createResponseMessage(session, result, message);
250 
251         // lets preserve the correlation ID
252         answer.setJMSCorrelationID(message.getJMSCorrelationID());
253         return answer;
254     }
255 
256     /***
257      * Lets replace any remote object correlation IDs with dynamic proxies
258      * 
259      * @param invocation
260      * @param requestMessage
261      */
262     protected void introduceRemoteReferences(LingoInvocation invocation, Message requestMessage) throws JMSException {
263         MethodMetadata metadata = invocation.getMetadata();
264         Object[] arguments = invocation.getArguments();
265         Class[] parameterTypes = invocation.getParameterTypes();
266         for (int i = 0; i < parameterTypes.length; i++) {
267             if (metadata.isRemoteParameter(i)) {
268                 arguments[i] = createRemoteProxy(requestMessage, parameterTypes[i], arguments[i]);
269             }
270         }
271     }
272 
273     protected Object createRemoteProxy(Message message, Class parameterType, Object argument) throws JMSException {
274         JmsProxyFactoryBean factory = new JmsProxyFactoryBean();
275         factory.setDestination(message.getJMSReplyTo());
276         String correlationID = (String) argument;
277         if (log.isDebugEnabled()) {
278             log.debug("Creating a server side remote proxy for correlationID: " + correlationID);
279         }
280         factory.setCorrelationID(correlationID);
281         factory.setMarshaller(getMarshaller());
282         factory.setRemoteInvocationFactory(invocationFactory);
283         factory.setServiceInterface(parameterType);
284         factory.setRequestor(responseRequestor);
285         factory.afterPropertiesSet();
286         return factory.getObject();
287     }
288 
289     /***
290      * Handle the processing of an exception when processing an inbound messsage
291      */
292     protected void onException(Message message, JMSException e) {
293         String text = "Failed to process inbound message due to: " + e + ". Message will be discarded: " + message;
294         log.info(text, e);
295         if (!ignoreFailures) {
296             throw new RuntimeException(text, e);
297         }
298     }
299     /***
300      * Send the given RemoteInvocationResult as a JMS message to the originator
301      * 
302      * @param message
303      *            current HTTP message
304      * @param result
305      *            the RemoteInvocationResult object
306      * @throws javax.jms.JMSException
307      *             if thrown by trying to send the message
308      */
309     protected void writeRemoteInvocationResult(final Message message, final RemoteInvocationResult result) throws JMSException {
310         Message responseMessage = createResponseMessage(getResponseRequestor().getSession(), message, result);
311         getResponseRequestor().send(message.getJMSReplyTo(), responseMessage);
312     }
313 }