View Javadoc

1   /***
2    * 
3    * Copyright RAJD Consultancy Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
6    * the License. You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11   * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12   * specific language governing permissions and limitations under the License.
13   * 
14   */
15  package org.logicblaze.lingo.jmx.remote.jms;
16  
17  import java.util.Map;
18  import javax.jms.Destination;
19  import javax.jms.JMSException;
20  import javax.jms.Message;
21  import javax.jms.MessageConsumer;
22  import javax.jms.MessageListener;
23  import javax.jms.MessageProducer;
24  import javax.jms.ObjectMessage;
25  import javax.jms.Session;
26  import javax.jms.Topic;
27  import javax.management.Notification;
28  import javax.management.NotificationListener;
29  import org.apache.activemq.advisory.AdvisorySupport;
30  import org.apache.activemq.command.ActiveMQDestination;
31  import org.apache.activemq.command.ActiveMQMessage;
32  import org.apache.activemq.command.RemoveInfo;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  /***
36   * @version $Revision: 1.2 $
37   */
38  class ServerListenerInfo implements NotificationListener,MessageListener{
39      private static final Log log=LogFactory.getLog(ServerListenerInfo.class);
40      private String id;
41      private Map holder;
42      private Destination replyTo;
43      private Session session;
44      private MessageProducer producer;
45      private MessageConsumer advisoryConsumer;
46  
47      ServerListenerInfo(String id,Map holder,Destination replyTo,Session session) throws JMSException{
48          this.id=id;
49          this.holder=holder;
50          this.replyTo=replyTo;
51          this.session=session;
52          this.producer=session.createProducer(replyTo);
53          Topic advisoryTopic=AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) replyTo);
54          advisoryConsumer=session.createConsumer(advisoryTopic);
55          advisoryConsumer.setMessageListener(this);
56      }
57  
58      /***
59       * NotificationListener implementation
60       * 
61       * @param notification
62       * @param handback
63       */
64      public void handleNotification(Notification notification,Object handback){
65          try{
66              ObjectMessage msg=session.createObjectMessage(notification);
67              producer.send(msg);
68          }catch(JMSException e){
69              log.error("Failed to handle notification: "+notification,e);
70          }
71      }
72  
73      /***
74       * MessageListener implementation
75       * 
76       * @param message
77       */
78      public void onMessage(Message message){
79          try{
80              if(message!=null&&message instanceof ObjectMessage){
81                  ObjectMessage objMsg=(ObjectMessage) message;
82                  Object obj=objMsg.getObject();
83                  if(obj!=null&&obj instanceof RemoveInfo){
84                      close();
85                  }
86              }else {
87                  if (message instanceof ActiveMQMessage ){
88                      ActiveMQMessage amqMsg = (ActiveMQMessage) message;
89                      if (amqMsg.getDataStructure() instanceof RemoveInfo){
90                          close();
91                      }
92                  }
93              }
94          }catch(JMSException e){
95              log.warn("Failed to process onMessage",e);
96          }
97      }
98  
99      /***
100      * close the info
101      */
102     public void close(){
103         try{
104             holder.remove(id);
105             producer.close();
106             advisoryConsumer.close();
107         }catch(JMSException e){
108             log.warn("problem closing",e);
109         }
110     }
111 
112     /***
113      * @return Returns the holder.
114      */
115     public Map getHolder(){
116         return holder;
117     }
118 
119     /***
120      * @param holder
121      *            The holder to set.
122      */
123     public void setHolder(Map holder){
124         this.holder=holder;
125     }
126 
127     /***
128      * @return Returns the id.
129      */
130     public String getId(){
131         return id;
132     }
133 
134     /***
135      * @param id
136      *            The id to set.
137      */
138     public void setId(String id){
139         this.id=id;
140     }
141 
142     /***
143      * @return Returns the producer.
144      */
145     public MessageProducer getProducer(){
146         return producer;
147     }
148 
149     /***
150      * @param producer
151      *            The producer to set.
152      */
153     public void setProducer(MessageProducer producer){
154         this.producer=producer;
155     }
156 
157     /***
158      * @return Returns the replyTo.
159      */
160     public Destination getReplyTo(){
161         return replyTo;
162     }
163 
164     /***
165      * @param replyTo
166      *            The replyTo to set.
167      */
168     public void setReplyTo(Destination replyTo){
169         this.replyTo=replyTo;
170     }
171 }