1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.geronimo.console.jmsmanager.helper; 19 20 import java.util.Date; 21 import java.util.Iterator; 22 import java.util.Set; 23 24 import javax.jms.ConnectionFactory; 25 import javax.jms.JMSException; 26 import javax.management.MBeanServer; 27 import javax.management.MBeanServerInvocationHandler; 28 import javax.management.MalformedObjectNameException; 29 import javax.management.ObjectInstance; 30 import javax.management.ObjectName; 31 import javax.management.openmbean.CompositeData; 32 import javax.portlet.PortletRequest; 33 34 import org.apache.activemq.ActiveMQConnection; 35 import org.apache.activemq.ActiveMQConnectionFactory; 36 import org.apache.activemq.broker.jmx.BrokerViewMBean; 37 import org.apache.activemq.broker.jmx.CompositeDataConstants; 38 import org.apache.activemq.broker.jmx.DestinationViewMBean; 39 import org.apache.activemq.broker.jmx.QueueViewMBean; 40 import org.apache.activemq.broker.jmx.TopicViewMBean; 41 import org.apache.activemq.command.BrokerInfo; 42 import org.apache.geronimo.console.jmsmanager.DestinationStatistics; 43 import org.apache.geronimo.console.jmsmanager.DestinationType; 44 import org.apache.geronimo.console.jmsmanager.JMSDestinationInfo; 45 import org.apache.geronimo.console.jmsmanager.JMSMessageInfo; 46 import org.apache.geronimo.console.util.PortletManager; 47 import org.apache.geronimo.gbean.AbstractName; 48 import org.apache.geronimo.kernel.Kernel; 49 import org.apache.geronimo.management.geronimo.ResourceAdapterModule; 50 import org.apache.geronimo.system.jmx.MBeanServerReference; 51 import org.slf4j.Logger; 52 import org.slf4j.LoggerFactory; 53 54 /** 55 * @version $Rev: 813742 $ $Date: 2009-09-11 02:38:26 -0700 (Fri, 11 Sep 2009) $ 56 */ 57 public class AmqJMSMessageHelper extends JMSMessageHelper { 58 59 private static final Logger logger = LoggerFactory.getLogger(AmqJMSMessageHelper.class); 60 61 @SuppressWarnings("unchecked") 62 public void purge(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException { 63 try { 64 if (destinationInfo.getType().equals(DestinationType.Queue)) { 65 BrokerInfo brokerInfo = getBrokerInfo(portletRequest, destinationInfo); 66 if (brokerInfo == null || !isInLocalMBeanServer(brokerInfo)) { 67 throw new JMSException("Currently, only queue belong to local broker is supported"); 68 } 69 MBeanServer server = getMBeanServer(); 70 ObjectName destinationObjectName = createDestinationObjectName(brokerInfo.getBrokerName(), destinationInfo.getType().name(), destinationInfo.getPhysicalName()); 71 QueueViewMBean proxy; 72 if (!server.isRegistered(destinationObjectName)) { 73 // mbean is not yet registered.Adding the destination to activemq broker. 74 ObjectName brokerObjectName = createBrokerObjectName(brokerInfo.getBrokerName()); 75 Set set = server.queryMBeans(brokerObjectName, null); 76 Iterator it = set.iterator(); 77 if (it.hasNext()) { 78 ObjectInstance instance = (ObjectInstance) it.next(); 79 brokerObjectName = instance.getObjectName(); 80 } 81 BrokerViewMBean brokerMBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, brokerObjectName, BrokerViewMBean.class, true); 82 brokerMBean.addQueue(destinationInfo.getPhysicalName()); 83 } 84 proxy = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, destinationObjectName, QueueViewMBean.class, true); 85 proxy.purge(); 86 } else { 87 throw new JMSException("Purge action on topic is not supported"); 88 } 89 } catch (MalformedObjectNameException e) { 90 throw createJMSException("Fail to find the target object name", e); 91 } catch (JMSException e) { 92 throw e; 93 } catch (Exception e) { 94 throw createJMSException("Error occured in the purge action", e); 95 } 96 } 97 98 private JMSException createJMSException(String reason, Exception e) { 99 JMSException jmsException = new JMSException(reason); 100 jmsException.setLinkedException(e); 101 return jmsException; 102 } 103 104 protected ObjectName createDestinationObjectName(String brokerName, String destinationType, String destinactionPhysicalName) throws MalformedObjectNameException { 105 return new ObjectName("org.apache.activemq" + ":BrokerName=" + brokerName + ",Type=" + destinationType + ",Destination=" + destinactionPhysicalName); 106 } 107 108 protected ObjectName createBrokerObjectName(String brokerName) throws MalformedObjectNameException { 109 return new ObjectName("org.apache.activemq" + ":BrokerName=" + brokerName + ",Type=Broker"); 110 } 111 112 @Override 113 protected ConnectionFactory getConnectionFactory(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException{ 114 ConnectionFactory connectionFactory = super.getConnectionFactory(portletRequest, destinationInfo); 115 if (connectionFactory == null) { 116 connectionFactory = createActiveMQConnectionFactory(portletRequest, destinationInfo); 117 } 118 return connectionFactory; 119 } 120 121 private ActiveMQConnectionFactory createActiveMQConnectionFactory(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException { 122 try { 123 Kernel kernel = PortletManager.getKernel(); 124 ResourceAdapterModule resourceAdapterModule = (ResourceAdapterModule) PortletManager.getManagementHelper(portletRequest).getObject(destinationInfo.getResourceAdapterModuleAbName()); 125 AbstractName resourceAdapterAbstractName = PortletManager.getNameFor(portletRequest, resourceAdapterModule.getResourceAdapterInstances()[0].getJCAResourceImplementations()[0] 126 .getResourceAdapterInstances()[0]); 127 if (kernel.isRunning(resourceAdapterAbstractName)) { 128 String serverUrl = (String) kernel.getAttribute(resourceAdapterAbstractName, "ServerUrl"); 129 String userName = (String) kernel.getAttribute(resourceAdapterAbstractName, "UserName"); 130 String password = (String) kernel.getAttribute(resourceAdapterAbstractName, "Password"); 131 return new ActiveMQConnectionFactory(userName, password, serverUrl); 132 } 133 throw new JMSException("Fail to create ActiveMQConnectionFactory for the resource adapter module is not in running status"); 134 } catch (JMSException e) { 135 throw e; 136 } catch (Exception e) { 137 logger.error("Fail to create ActiveMQConnectionFactory", e); 138 throw createJMSException("Fail to create ActiveMQConnectionFactory", e); 139 } 140 } 141 142 @Override 143 public DestinationStatistics getDestinationStatistics(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException { 144 DestinationStatistics stat = new DestinationStatistics(); 145 try { 146 BrokerInfo brokerInfo = getBrokerInfo(portletRequest, destinationInfo); 147 if (brokerInfo == null || !isInLocalMBeanServer(brokerInfo)) { 148 return stat; 149 } 150 MBeanServer server = getMBeanServer(); 151 ObjectName objName = createDestinationObjectName(brokerInfo.getBrokerName(), destinationInfo.getType().name(), destinationInfo.getPhysicalName()); 152 DestinationViewMBean proxy; 153 if (destinationInfo.getType().equals(DestinationType.Queue)) { 154 if (!server.isRegistered(objName)) { 155 // mbean is not yet registered.Adding the destination to activemq broker. 156 ObjectName brokerObj = createBrokerObjectName(brokerInfo.getBrokerName()); 157 Set set = server.queryMBeans(brokerObj, null); 158 Iterator it = set.iterator(); 159 if (it.hasNext()) { 160 ObjectInstance instance = (ObjectInstance) it.next(); 161 brokerObj = instance.getObjectName(); 162 } 163 BrokerViewMBean brokerMBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, brokerObj, BrokerViewMBean.class, true); 164 brokerMBean.addQueue(destinationInfo.getPhysicalName()); 165 } 166 proxy = (DestinationViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, objName, QueueViewMBean.class, true); 167 } else { 168 if (!server.isRegistered(objName)) { 169 // mbean is not yet registered.Adding the destination to activemq broker. 170 ObjectName brokerObj = createBrokerObjectName(brokerInfo.getBrokerName()); 171 Set set = server.queryMBeans(brokerObj, null); 172 Iterator it = set.iterator(); 173 if (it.hasNext()) { 174 ObjectInstance instance = (ObjectInstance) it.next(); 175 brokerObj = instance.getObjectName(); 176 } 177 BrokerViewMBean brokerMBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, brokerObj, BrokerViewMBean.class, true); 178 brokerMBean.addTopic(destinationInfo.getPhysicalName()); 179 } 180 proxy = (DestinationViewMBean) MBeanServerInvocationHandler.newProxyInstance(server, objName, TopicViewMBean.class, true); 181 } 182 stat.setConsumerCount(proxy.getConsumerCount()); 183 stat.setEnqueueCount(proxy.getEnqueueCount()); 184 stat.setDequeueCount(proxy.getDequeueCount()); 185 stat.setQueueSize(proxy.getQueueSize()); 186 } catch (Exception ex) { 187 logger.warn("Failed to get ActiveMQ stats", ex); 188 } 189 return stat; 190 } 191 192 protected JMSMessageInfo[] getMessagesFromTopic(PortletRequest portletRequest, JMSDestinationInfo destinationInfo, String selector) throws JMSException { 193 BrokerInfo brokerInfo = getBrokerInfo(portletRequest, destinationInfo); 194 if (brokerInfo == null || !isInLocalMBeanServer(brokerInfo)) { 195 return new JMSMessageInfo[0]; 196 } 197 try { 198 ObjectName destinationObjectName = createDestinationObjectName(brokerInfo.getBrokerName(), destinationInfo.getType().name(), destinationInfo.getPhysicalName()); 199 MBeanServer mBeanServer = getMBeanServer(); 200 ObjectInstance objectInstance = mBeanServer.getObjectInstance(destinationObjectName); 201 CompositeData[] compositeData = (CompositeData[]) mBeanServer.invoke(objectInstance.getObjectName(), "browse", new Object[] { selector }, new String[] { String.class.getName() }); 202 if (compositeData.length > 0) { 203 JMSMessageInfo[] messageInfos = new JMSMessageInfo[compositeData.length]; 204 for (int i = 0; i < compositeData.length; i++) { 205 JMSMessageInfo jmsMessageInfo = new JMSMessageInfo(); 206 CompositeData data = compositeData[i]; 207 if (compositeData[0].getCompositeType().getTypeName().equals("org.apache.activemq.command.ActiveMQTextMessage")) { 208 jmsMessageInfo.setMessage((String) data.get(CompositeDataConstants.MESSAGE_TEXT)); 209 } else { 210 jmsMessageInfo.setMessage("Only Text Messages will be displayed.."); 211 } 212 jmsMessageInfo.setPriority((Integer) data.get("JMSPriority")); 213 jmsMessageInfo.setMessageId((String) data.get("JMSMessageID")); 214 jmsMessageInfo.setDestination((String) data.get("JMSDestination")); 215 jmsMessageInfo.setTimeStamp(((Date) data.get("JMSTimestamp")).getTime()); 216 jmsMessageInfo.setExpiration((Long) data.get("JMSExpiration")); 217 jmsMessageInfo.setJmsType((String) data.get("JMSType")); 218 jmsMessageInfo.setReplyTo((String) data.get("JMSReplyTo")); 219 jmsMessageInfo.setCorrelationId((String) data.get("JMSCorrelationID")); 220 messageInfos[i] = jmsMessageInfo; 221 } 222 return messageInfos; 223 } 224 return new JMSMessageInfo[0]; 225 } catch (Exception e) { 226 throw createJMSException("Fail to get messages of the topic " + destinationInfo.getPhysicalName(), e); 227 } 228 } 229 230 protected MBeanServer getMBeanServer() { 231 MBeanServerReference ref; 232 try { 233 ref = kernel.getGBean(MBeanServerReference.class); 234 return ref.getMBeanServer(); 235 } catch (Exception e) { 236 return null; 237 } 238 } 239 240 private boolean isInLocalMBeanServer(BrokerInfo brokerInfo) { 241 try { 242 ObjectName brokerObjectNameQuery = new ObjectName("org.apache.activemq" + ":*,Type=Broker"); 243 MBeanServer mBeanServer = getMBeanServer(); 244 Set<ObjectInstance> brokerObjectInstances = mBeanServer.queryMBeans(brokerObjectNameQuery, null); 245 String targetBrokerId = brokerInfo.getBrokerId().getValue(); 246 for (ObjectInstance objectInstance : brokerObjectInstances) { 247 String brokerId = (String) mBeanServer.getAttribute(objectInstance.getObjectName(), "BrokerId"); 248 if (targetBrokerId.equals(brokerId)) { 249 return true; 250 } 251 } 252 } catch (Exception e) { 253 logger.error("Fail to check the broker in local mbeanserver", e); 254 return false; 255 } 256 return false; 257 } 258 259 private BrokerInfo getBrokerInfo(PortletRequest portletRequest, JMSDestinationInfo destinationInfo) throws JMSException { 260 ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory(portletRequest, destinationInfo); 261 ActiveMQConnection connection = null; 262 try { 263 connection = (ActiveMQConnection) connectionFactory.createConnection(); 264 connection.start(); 265 return connection.getBrokerInfo(); 266 } finally { 267 if (connection != null) { 268 try { 269 connection.close(); 270 } catch (Exception e) { 271 } 272 } 273 } 274 } 275 }