View Javadoc

1   /*** 
2    * 
3    * Copyright 2005 LogicBlaze, Inc.
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.logicblaze.lingo.jms.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.logicblaze.lingo.ResultJoinStrategy;
23  import org.logicblaze.lingo.jms.ReplyHandler;
24  import org.logicblaze.lingo.jms.marshall.Marshaller;
25  import org.springframework.remoting.support.RemoteInvocationResult;
26  
27  import javax.jms.JMSException;
28  import javax.jms.Message;
29  
30  /***
31   * A {@link org.logicblaze.lingo.jms.ReplyHandler} which can handle join
32   * multiple results to the same request which are then aggregated together into
33   * a single value.
34   * 
35   * @version $Revision$
36   */
37  public class ResultJoinHandler implements ReplyHandler {
38  
39      private static final Log log = LogFactory.getLog(ResultJoinHandler.class);
40  
41      private Marshaller marshaller;
42      private ResultJoinStrategy joinStrategy;
43      private Object lock = new Object();
44      private int responseCount;
45      private RemoteInvocationResult result;
46      private long timeout = 2000;
47  
48      public ResultJoinHandler(Marshaller marshaller, ResultJoinStrategy joinStrategy) {
49          this.marshaller = marshaller;
50          this.joinStrategy = joinStrategy;
51      }
52  
53      public boolean handle(Message message) throws JMSException {
54          RemoteInvocationResult newResult = marshaller.extractInvocationResult(message);
55          synchronized (lock) {
56              ++responseCount;
57              if (result == null) {
58                  result = newResult;
59              }
60              else {
61                  result = joinStrategy.mergeResponses(result, newResult, responseCount);
62              }
63              if (joinStrategy.unblockCallerThread(result, responseCount)) {
64                  lock.notifyAll();
65              }
66          }
67          return joinStrategy.removeHandler(result, responseCount);
68      }
69  
70      /***
71       * This method will block the calling thread until the result is available.
72       */
73      public RemoteInvocationResult waitForResult() {
74          while (true) {
75              synchronized (lock) {
76                  if (result != null) {
77                      return result;
78                  }
79                  try {
80                      lock.wait(timeout);
81                  }
82                  catch (InterruptedException e) {
83                      log.debug("Ignored interrupt exception: " + e, e);
84                  }
85              }
86          }
87      }
88  
89      public RemoteInvocationResult pollResult() {
90          synchronized (lock) {
91              return result;
92          }
93      }
94  
95      public int getResponseCount() {
96          synchronized (lock) {
97              return responseCount;
98          }
99      }
100 
101     public long getTimeout() {
102         return timeout;
103     }
104 
105     /***
106      * Sets the {@link #wait(long)} method timeout period before resuming the
107      * wait.
108      */
109     public void setTimeout(long timeout) {
110         this.timeout = timeout;
111     }
112 
113 }