Home » geronimo-2.2-source-release » org.apache.geronimo.console.jmsmanager.helper » [javadoc | source]

    1   /**
    2    *  Licensed to the Apache Software Foundation (ASF) under one or more
    3    *  contributor license agreements.  See the NOTICE file distributed with
    4    *  this work for additional information regarding copyright ownership.
    5    *  The ASF licenses this file to You under the Apache License, Version 2.0
    6    *  (the "License"); you may not use this file except in compliance with
    7    *  the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    *  Unless required by applicable law or agreed to in writing, software
   12    *  distributed under the License is distributed on an "AS IS" BASIS,
   13    *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    *  See the License for the specific language governing permissions and
   15    *  limitations under the License.
   16    */
   17   
   18   package org.apache.geronimo.console.jmsmanager.helper;
   19   
   20   import java.util.Date;
   21   import java.util.Iterator;
   22   import java.util.Set;
   23   
   24   import javax.jms.ConnectionFactory;
   25   import javax.jms.JMSException;
   26   import javax.management.MBeanServer;
   27   import javax.management.MBeanServerInvocationHandler;
   28   import javax.management.MalformedObjectNameException;
   29   import javax.management.ObjectInstance;
   30   import javax.management.ObjectName;
   31   import javax.management.openmbean.CompositeData;
   32   import javax.portlet.PortletRequest;
   33   
   34   import org.apache.activemq.ActiveMQConnection;
   35   import org.apache.activemq.ActiveMQConnectionFactory;
   36   import org.apache.activemq.broker.jmx.BrokerViewMBean;
   37   import org.apache.activemq.broker.jmx.CompositeDataConstants;
   38   import org.apache.activemq.broker.jmx.DestinationViewMBean;
   39   import org.apache.activemq.broker.jmx.QueueViewMBean;
   40   import org.apache.activemq.broker.jmx.TopicViewMBean;
   41   import org.apache.activemq.command.BrokerInfo;
   42   import org.apache.geronimo.console.jmsmanager.DestinationStatistics;
   43   import org.apache.geronimo.console.jmsmanager.DestinationType;
   44   import org.apache.geronimo.console.jmsmanager.JMSDestinationInfo;
   45   import org.apache.geronimo.console.jmsmanager.JMSMessageInfo;
   46   import org.apache.geronimo.console.util.PortletManager;
   47   import org.apache.geronimo.gbean.AbstractName;
   48   import org.apache.geronimo.kernel.Kernel;
   49   import org.apache.geronimo.management.geronimo.ResourceAdapterModule;
   50   import org.apache.geronimo.system.jmx.MBeanServerReference;
   51   import org.slf4j.Logger;
   52   import org.slf4j.LoggerFactory;
   53   
   54   /**
   55    * @version $Rev: 813742 $ $Date: 2009-09-11 02:38:26 -0700 (Fri, 11 Sep 2009) $
   56    */
   57   public class AmqJMSMessageHelper extends JMSMessageHelper {
   58   
   59       private static final Logger logger = LoggerFactory.getLogger(AmqJMSMessageHelper.class);
   60   
   61       @SuppressWarnings("unchecked")
   62       public void purge(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException {
   63           try {
   64               if (destinationInfo.getType().equals(DestinationType.Queue)) {
   65                   BrokerInfo brokerInfo = getBrokerInfo(portletRequest, destinationInfo);
   66                   if (brokerInfo == null || !isInLocalMBeanServer(brokerInfo)) {
   67                       throw new JMSException("Currently, only queue belong to local broker is supported");
   68                   }
   69                   MBeanServer server = getMBeanServer();
   70                   ObjectName destinationObjectName = createDestinationObjectName(brokerInfo.getBrokerName(), destinationInfo.getType().name(), destinationInfo.getPhysicalName());
   71                   QueueViewMBean proxy;
   72                   if (!server.isRegistered(destinationObjectName)) {
   73                       // mbean is not yet registered.Adding the destination to activemq broker.
   74                       ObjectName brokerObjectName = createBrokerObjectName(brokerInfo.getBrokerName());
   75                       Set set = server.queryMBeans(brokerObjectName, null);
   76                       Iterator it = set.iterator();
   77                       if (it.hasNext()) {
   78                           ObjectInstance instance = (ObjectInstance) it.next();
   79                           brokerObjectName = instance.getObjectName();
   80                       }
   81                       BrokerViewMBean brokerMBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, brokerObjectName, BrokerViewMBean.class, true);
   82                       brokerMBean.addQueue(destinationInfo.getPhysicalName());
   83                   }
   84                   proxy = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, destinationObjectName, QueueViewMBean.class, true);
   85                   proxy.purge();
   86               } else {
   87                   throw new JMSException("Purge action on topic is not supported");
   88               }
   89           } catch (MalformedObjectNameException e) {
   90               throw createJMSException("Fail to find the target object name", e);
   91           } catch (JMSException e) {
   92               throw e;
   93           } catch (Exception e) {
   94               throw createJMSException("Error occured in the purge action", e);
   95           }
   96       }
   97   
   98       private JMSException createJMSException(String reason, Exception e) {
   99           JMSException jmsException = new JMSException(reason);
  100           jmsException.setLinkedException(e);
  101           return jmsException;
  102       }
  103   
  104       protected ObjectName createDestinationObjectName(String brokerName, String destinationType, String destinactionPhysicalName) throws MalformedObjectNameException {
  105           return new ObjectName("org.apache.activemq" + ":BrokerName=" + brokerName + ",Type=" + destinationType + ",Destination=" + destinactionPhysicalName);
  106       }
  107   
  108       protected ObjectName createBrokerObjectName(String brokerName) throws MalformedObjectNameException {
  109           return new ObjectName("org.apache.activemq" + ":BrokerName=" + brokerName + ",Type=Broker");
  110       }
  111   
  112       @Override
  113       protected ConnectionFactory getConnectionFactory(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException{
  114           ConnectionFactory connectionFactory = super.getConnectionFactory(portletRequest, destinationInfo);
  115           if (connectionFactory == null) {
  116               connectionFactory = createActiveMQConnectionFactory(portletRequest, destinationInfo);
  117           }
  118           return connectionFactory;
  119       }
  120   
  121       private ActiveMQConnectionFactory createActiveMQConnectionFactory(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException {
  122           try {
  123               Kernel kernel = PortletManager.getKernel();
  124               ResourceAdapterModule resourceAdapterModule = (ResourceAdapterModule) PortletManager.getManagementHelper(portletRequest).getObject(destinationInfo.getResourceAdapterModuleAbName());
  125               AbstractName resourceAdapterAbstractName = PortletManager.getNameFor(portletRequest, resourceAdapterModule.getResourceAdapterInstances()[0].getJCAResourceImplementations()[0]
  126                       .getResourceAdapterInstances()[0]);
  127               if (kernel.isRunning(resourceAdapterAbstractName)) {
  128                   String serverUrl = (String) kernel.getAttribute(resourceAdapterAbstractName, "ServerUrl");
  129                   String userName = (String) kernel.getAttribute(resourceAdapterAbstractName, "UserName");
  130                   String password = (String) kernel.getAttribute(resourceAdapterAbstractName, "Password");
  131                   return new ActiveMQConnectionFactory(userName, password, serverUrl);
  132               }
  133               throw new JMSException("Fail to create ActiveMQConnectionFactory for the resource adapter module is not in running status");
  134           } catch (JMSException e) {
  135               throw e;
  136           } catch (Exception e) {
  137               logger.error("Fail to create ActiveMQConnectionFactory", e);
  138               throw createJMSException("Fail to create ActiveMQConnectionFactory", e);
  139           }
  140       }
  141   
  142       @Override
  143       public DestinationStatistics getDestinationStatistics(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException {
  144           DestinationStatistics stat = new DestinationStatistics();
  145           try {
  146               BrokerInfo brokerInfo = getBrokerInfo(portletRequest, destinationInfo);
  147               if (brokerInfo == null || !isInLocalMBeanServer(brokerInfo)) {
  148                   return stat;
  149               }
  150               MBeanServer server = getMBeanServer();
  151               ObjectName objName = createDestinationObjectName(brokerInfo.getBrokerName(), destinationInfo.getType().name(), destinationInfo.getPhysicalName());
  152               DestinationViewMBean proxy;
  153               if (destinationInfo.getType().equals(DestinationType.Queue)) {
  154                   if (!server.isRegistered(objName)) {
  155                       // mbean is not yet registered.Adding the destination to activemq broker.
  156                       ObjectName brokerObj = createBrokerObjectName(brokerInfo.getBrokerName());
  157                       Set set = server.queryMBeans(brokerObj, null);
  158                       Iterator it = set.iterator();
  159                       if (it.hasNext()) {
  160                           ObjectInstance instance = (ObjectInstance) it.next();
  161                           brokerObj = instance.getObjectName();
  162                       }
  163                       BrokerViewMBean brokerMBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, brokerObj, BrokerViewMBean.class, true);
  164                       brokerMBean.addQueue(destinationInfo.getPhysicalName());
  165                   }
  166                   proxy = (DestinationViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, objName, QueueViewMBean.class, true);
  167               } else {
  168                   if (!server.isRegistered(objName)) {
  169                       // mbean is not yet registered.Adding the destination to activemq broker.
  170                       ObjectName brokerObj = createBrokerObjectName(brokerInfo.getBrokerName());
  171                       Set set = server.queryMBeans(brokerObj, null);
  172                       Iterator it = set.iterator();
  173                       if (it.hasNext()) {
  174                           ObjectInstance instance = (ObjectInstance) it.next();
  175                           brokerObj = instance.getObjectName();
  176                       }
  177                       BrokerViewMBean brokerMBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, brokerObj, BrokerViewMBean.class, true);
  178                       brokerMBean.addTopic(destinationInfo.getPhysicalName());
  179                   }
  180                   proxy = (DestinationViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, objName, TopicViewMBean.class, true);
  181               }
  182               stat.setConsumerCount(proxy.getConsumerCount());
  183               stat.setEnqueueCount(proxy.getEnqueueCount());
  184               stat.setDequeueCount(proxy.getDequeueCount());
  185               stat.setQueueSize(proxy.getQueueSize());
  186           } catch (Exception ex) {
  187               logger.warn("Failed to get ActiveMQ stats", ex);
  188           }
  189           return stat;
  190       }
  191   
  192       protected JMSMessageInfo[] getMessagesFromTopic(PortletRequest portletRequest, JMSDestinationInfo destinationInfo, String selector) throws JMSException {
  193           BrokerInfo brokerInfo = getBrokerInfo(portletRequest, destinationInfo);
  194           if (brokerInfo == null || !isInLocalMBeanServer(brokerInfo)) {
  195               return new JMSMessageInfo[0];
  196           }
  197           try {
  198               ObjectName destinationObjectName = createDestinationObjectName(brokerInfo.getBrokerName(), destinationInfo.getType().name(), destinationInfo.getPhysicalName());
  199               MBeanServer mBeanServer = getMBeanServer();
  200               ObjectInstance objectInstance = mBeanServer.getObjectInstance(destinationObjectName);
  201               CompositeData[] compositeData = (CompositeData[]) mBeanServer.invoke(objectInstance.getObjectName(), "browse", new Object[] { selector }, new String[] { String.class.getName() });
  202               if (compositeData.length > 0) {
  203                   JMSMessageInfo[] messageInfos = new JMSMessageInfo[compositeData.length];
  204                   for (int i = 0; i < compositeData.length; i++) {
  205                       JMSMessageInfo jmsMessageInfo = new JMSMessageInfo();
  206                       CompositeData data = compositeData[i];
  207                       if (compositeData[0].getCompositeType().getTypeName().equals("org.apache.activemq.command.ActiveMQTextMessage")) {
  208                           jmsMessageInfo.setMessage((String) data.get(CompositeDataConstants.MESSAGE_TEXT));
  209                       } else {
  210                           jmsMessageInfo.setMessage("Only Text Messages will be displayed..");
  211                       }
  212                       jmsMessageInfo.setPriority((Integer) data.get("JMSPriority"));
  213                       jmsMessageInfo.setMessageId((String) data.get("JMSMessageID"));
  214                       jmsMessageInfo.setDestination((String) data.get("JMSDestination"));
  215                       jmsMessageInfo.setTimeStamp(((Date) data.get("JMSTimestamp")).getTime());
  216                       jmsMessageInfo.setExpiration((Long) data.get("JMSExpiration"));
  217                       jmsMessageInfo.setJmsType((String) data.get("JMSType"));
  218                       jmsMessageInfo.setReplyTo((String) data.get("JMSReplyTo"));
  219                       jmsMessageInfo.setCorrelationId((String) data.get("JMSCorrelationID"));
  220                       messageInfos[i] = jmsMessageInfo;
  221                   }
  222                   return messageInfos;
  223               }
  224               return new JMSMessageInfo[0];
  225           } catch (Exception e) {
  226               throw createJMSException("Fail to get messages of the topic " + destinationInfo.getPhysicalName(), e); 
  227           }
  228       }
  229   
  230       protected MBeanServer getMBeanServer() {
  231           MBeanServerReference ref;
  232           try {
  233               ref = kernel.getGBean(MBeanServerReference.class);
  234               return ref.getMBeanServer();
  235           } catch (Exception e) {
  236               return null;
  237           }
  238       }
  239   
  240       private boolean isInLocalMBeanServer(BrokerInfo brokerInfo) {
  241           try {
  242               ObjectName brokerObjectNameQuery = new ObjectName("org.apache.activemq" + ":*,Type=Broker");
  243               MBeanServer mBeanServer = getMBeanServer();
  244               Set<ObjectInstance> brokerObjectInstances = mBeanServer.queryMBeans(brokerObjectNameQuery, null);
  245               String targetBrokerId = brokerInfo.getBrokerId().getValue();
  246               for (ObjectInstance objectInstance : brokerObjectInstances) {
  247                   String brokerId = (String) mBeanServer.getAttribute(objectInstance.getObjectName(), "BrokerId");
  248                   if (targetBrokerId.equals(brokerId)) {
  249                       return true;
  250                   }
  251               }
  252           } catch (Exception e) {
  253               logger.error("Fail to check the broker in local mbeanserver", e);
  254               return false;
  255           }
  256           return false;
  257       }
  258   
  259       private BrokerInfo getBrokerInfo(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException {
  260           ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory(portletRequest, destinationInfo);
  261           ActiveMQConnection connection = null;
  262           try {
  263               connection = (ActiveMQConnection) connectionFactory.createConnection();
  264               connection.start();
  265               return connection.getBrokerInfo();
  266           } finally {
  267               if (connection != null) {
  268                   try {
  269                       connection.close();
  270                   } catch (Exception e) {
  271                   }
  272               }
  273           }
  274       }
  275   }

Home » geronimo-2.2-source-release » org.apache.geronimo.console.jmsmanager.helper » [javadoc | source]