Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » util » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.util;
   18   
   19   import java.io.DataOutputStream;
   20   import java.io.IOException;
   21   import java.net.DatagramPacket;
   22   import java.net.DatagramSocket;
   23   import java.net.InetAddress;
   24   import java.net.InetSocketAddress;
   25   import java.net.SocketAddress;
   26   import java.net.URI;
   27   import java.net.URISyntaxException;
   28   import java.net.UnknownHostException;
   29   
   30   import org.apache.activemq.broker.BrokerPluginSupport;
   31   import org.apache.activemq.broker.ConnectionContext;
   32   import org.apache.activemq.broker.ConsumerBrokerExchange;
   33   import org.apache.activemq.broker.ProducerBrokerExchange;
   34   import org.apache.activemq.broker.region.Subscription;
   35   import org.apache.activemq.command.ActiveMQDestination;
   36   import org.apache.activemq.command.BrokerId;
   37   import org.apache.activemq.command.ConnectionInfo;
   38   import org.apache.activemq.command.ConsumerInfo;
   39   import org.apache.activemq.command.DataStructure;
   40   import org.apache.activemq.command.DestinationInfo;
   41   import org.apache.activemq.command.JournalTrace;
   42   import org.apache.activemq.command.Message;
   43   import org.apache.activemq.command.MessageAck;
   44   import org.apache.activemq.command.MessageDispatch;
   45   import org.apache.activemq.command.MessageDispatchNotification;
   46   import org.apache.activemq.command.MessagePull;
   47   import org.apache.activemq.command.ProducerInfo;
   48   import org.apache.activemq.command.RemoveSubscriptionInfo;
   49   import org.apache.activemq.command.Response;
   50   import org.apache.activemq.command.SessionInfo;
   51   import org.apache.activemq.command.TransactionId;
   52   import org.apache.activemq.command.TransactionInfo;
   53   import org.apache.activemq.openwire.OpenWireFormatFactory;
   54   import org.apache.activemq.util.ByteArrayOutputStream;
   55   import org.apache.activemq.util.ByteSequence;
   56   import org.apache.activemq.wireformat.WireFormat;
   57   import org.apache.activemq.wireformat.WireFormatFactory;
   58   import org.apache.commons.logging.Log;
   59   import org.apache.commons.logging.LogFactory;
   60   
   61   /**
   62    * A Broker interceptor which allows you to trace all operations to a UDP
   63    * socket.
   64    * 
   65    * @org.apache.xbean.XBean element="udpTraceBrokerPlugin"
   66    * @version $Revision: 427613 $
   67    */
   68   public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
   69   
   70       private static final Log LOG = LogFactory.getLog(UDPTraceBrokerPlugin.class);
   71       protected WireFormat wireFormat;
   72       protected WireFormatFactory wireFormatFactory;
   73       protected int maxTraceDatagramSize = 1024 * 4;
   74       protected URI destination;
   75       protected DatagramSocket socket;
   76   
   77       protected BrokerId brokerId;
   78       protected SocketAddress address;
   79       protected boolean broadcast;
   80   
   81       public UDPTraceBrokerPlugin() {
   82           try {
   83               destination = new URI("udp://127.0.0.1:61616");
   84           } catch (URISyntaxException wontHappen) {
   85           }
   86       }
   87   
   88       public void start() throws Exception {
   89           super.start();
   90           if (getWireFormat() == null) {
   91               throw new IllegalArgumentException("Wireformat must be specifed.");
   92           }
   93           if (address == null) {
   94               address = createSocketAddress(destination);
   95           }
   96           socket = createSocket();
   97   
   98           brokerId = super.getBrokerId();
   99           trace(new JournalTrace("START"));
  100       }
  101   
  102       protected DatagramSocket createSocket() throws IOException {
  103           DatagramSocket s = new DatagramSocket();
  104           s.setSendBufferSize(maxTraceDatagramSize);
  105           s.setBroadcast(broadcast);
  106           return s;
  107       }
  108   
  109       public void stop() throws Exception {
  110           trace(new JournalTrace("STOP"));
  111           socket.close();
  112           super.stop();
  113       }
  114   
  115       private void trace(DataStructure command) {
  116           try {
  117   
  118               ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize);
  119               DataOutputStream out = new DataOutputStream(baos);
  120               wireFormat.marshal(brokerId, out);
  121               wireFormat.marshal(command, out);
  122               out.close();
  123               ByteSequence sequence = baos.toByteSequence();
  124               DatagramPacket datagram = new DatagramPacket(sequence.getData(), sequence.getOffset(), sequence.getLength(), address);
  125               socket.send(datagram);
  126   
  127           } catch (Throwable e) {
  128               LOG.debug("Failed to trace: " + command, e);
  129           }
  130       }
  131   
  132       public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
  133           trace(messageSend);
  134           super.send(producerExchange, messageSend);
  135       }
  136   
  137       public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
  138           trace(ack);
  139           super.acknowledge(consumerExchange, ack);
  140       }
  141   
  142       public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
  143           trace(info);
  144           super.addConnection(context, info);
  145       }
  146   
  147       public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
  148           trace(info);
  149           return super.addConsumer(context, info);
  150       }
  151   
  152       public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
  153           trace(info);
  154           super.addDestinationInfo(context, info);
  155       }
  156   
  157       public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
  158           trace(info);
  159           super.addProducer(context, info);
  160       }
  161   
  162       public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
  163           trace(info);
  164           super.addSession(context, info);
  165       }
  166   
  167       public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  168           trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN));
  169           super.beginTransaction(context, xid);
  170       }
  171   
  172       public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
  173           trace(new TransactionInfo(context.getConnectionId(), xid, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE));
  174           super.commitTransaction(context, xid, onePhase);
  175       }
  176   
  177       public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  178           trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET));
  179           super.forgetTransaction(context, xid);
  180       }
  181   
  182       public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
  183           trace(pull);
  184           return super.messagePull(context, pull);
  185       }
  186   
  187       public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  188           trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE));
  189           return super.prepareTransaction(context, xid);
  190       }
  191   
  192       public void postProcessDispatch(MessageDispatch messageDispatch) {
  193           trace(messageDispatch);
  194           super.postProcessDispatch(messageDispatch);
  195       }
  196   
  197       public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
  198           trace(messageDispatchNotification);
  199           super.processDispatchNotification(messageDispatchNotification);
  200       }
  201   
  202       public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
  203           trace(info.createRemoveCommand());
  204           super.removeConnection(context, info, error);
  205       }
  206   
  207       public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
  208           trace(info.createRemoveCommand());
  209           super.removeConsumer(context, info);
  210       }
  211   
  212       public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
  213           super.removeDestination(context, destination, timeout);
  214       }
  215   
  216       public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
  217           trace(info);
  218           super.removeDestinationInfo(context, info);
  219       }
  220   
  221       public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
  222           trace(info.createRemoveCommand());
  223           super.removeProducer(context, info);
  224       }
  225   
  226       public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
  227           trace(info.createRemoveCommand());
  228           super.removeSession(context, info);
  229       }
  230   
  231       public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
  232           trace(info);
  233           super.removeSubscription(context, info);
  234       }
  235   
  236       public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  237           trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK));
  238           super.rollbackTransaction(context, xid);
  239       }
  240   
  241       public WireFormat getWireFormat() {
  242           if (wireFormat == null) {
  243               wireFormat = createWireFormat();
  244           }
  245           return wireFormat;
  246       }
  247   
  248       protected WireFormat createWireFormat() {
  249           return getWireFormatFactory().createWireFormat();
  250       }
  251   
  252       public void setWireFormat(WireFormat wireFormat) {
  253           this.wireFormat = wireFormat;
  254       }
  255   
  256       public WireFormatFactory getWireFormatFactory() {
  257           if (wireFormatFactory == null) {
  258               wireFormatFactory = createWireFormatFactory();
  259           }
  260           return wireFormatFactory;
  261       }
  262   
  263       protected OpenWireFormatFactory createWireFormatFactory() {
  264           OpenWireFormatFactory wf = new OpenWireFormatFactory();
  265           wf.setCacheEnabled(false);
  266           wf.setVersion(1);
  267           wf.setTightEncodingEnabled(true);
  268           wf.setSizePrefixDisabled(true);
  269           return wf;
  270       }
  271   
  272       public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
  273           this.wireFormatFactory = wireFormatFactory;
  274       }
  275   
  276       protected SocketAddress createSocketAddress(URI location) throws UnknownHostException {
  277           InetAddress a = InetAddress.getByName(location.getHost());
  278           int port = location.getPort();
  279           return new InetSocketAddress(a, port);
  280       }
  281   
  282       public URI getDestination() {
  283           return destination;
  284       }
  285   
  286       public void setDestination(URI destination) {
  287           this.destination = destination;
  288       }
  289   
  290       public int getMaxTraceDatagramSize() {
  291           return maxTraceDatagramSize;
  292       }
  293   
  294       public void setMaxTraceDatagramSize(int maxTraceDatagramSize) {
  295           this.maxTraceDatagramSize = maxTraceDatagramSize;
  296       }
  297   
  298       public boolean isBroadcast() {
  299           return broadcast;
  300       }
  301   
  302       public void setBroadcast(boolean broadcast) {
  303           this.broadcast = broadcast;
  304       }
  305   
  306       public SocketAddress getAddress() {
  307           return address;
  308       }
  309   
  310       public void setAddress(SocketAddress address) {
  311           this.address = address;
  312       }
  313   
  314   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » util » [javadoc | source]