Home » activemq-parent-5.3.1-source-release » org.apache » activemq » camel » component » [javadoc | source]

    1   /**
    2    *
    3    * Licensed to the Apache Software Foundation (ASF) under one or more
    4    * contributor license agreements.  See the NOTICE file distributed with
    5    * this work for additional information regarding copyright ownership.
    6    * The ASF licenses this file to You under the Apache License, Version 2.0
    7    * (the "License"); you may not use this file except in compliance with
    8    * the License.  You may obtain a copy of the License at
    9    *
   10    * http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   package org.apache.activemq.camel.component;
   19   
   20   import java.util.Set;
   21   
   22   import javax.annotation.PostConstruct;
   23   import javax.annotation.PreDestroy;
   24   import javax.jms.Connection;
   25   import javax.jms.ConnectionFactory;
   26   
   27   import org.apache.activemq.EnhancedConnection;
   28   import org.apache.activemq.advisory.DestinationEvent;
   29   import org.apache.activemq.advisory.DestinationListener;
   30   import org.apache.activemq.advisory.DestinationSource;
   31   import org.apache.activemq.command.ActiveMQDestination;
   32   import org.apache.activemq.command.ActiveMQQueue;
   33   import org.apache.activemq.command.ActiveMQTopic;
   34   import org.apache.camel.CamelContext;
   35   import org.apache.camel.CamelContextAware;
   36   import org.apache.camel.Endpoint;
   37   import org.apache.camel.component.jms.JmsEndpoint;
   38   import org.apache.camel.component.jms.JmsQueueEndpoint;
   39   import org.apache.camel.util.ObjectHelper;
   40   import org.apache.commons.logging.Log;
   41   import org.apache.commons.logging.LogFactory;
   42   
   43   /**
   44    * A helper bean which populates a {@link CamelContext} with ActiveMQ Queue endpoints
   45    *
   46    * @version $Revision: 1.1 $
   47    * @org.apache.xbean.XBean
   48    */
   49   public class CamelEndpointLoader implements CamelContextAware {
   50       private static final transient Log LOG = LogFactory.getLog(CamelEndpointLoader.class);
   51       private CamelContext camelContext;
   52       private EnhancedConnection connection;
   53       private ConnectionFactory connectionFactory;
   54       private ActiveMQComponent component;
   55   
   56       public CamelEndpointLoader() {
   57       }
   58   
   59       public CamelEndpointLoader(CamelContext camelContext) {
   60           this.camelContext = camelContext;
   61       }
   62   
   63       /**
   64        *
   65        * @throws Exception
   66        * @org.apache.xbean.InitMethod
   67        */
   68       @PostConstruct
   69       public void afterPropertiesSet() throws Exception {
   70           ObjectHelper.notNull(camelContext, "camelContext");
   71           if (connection == null) {
   72               Connection value = getConnectionFactory().createConnection();
   73               if (value instanceof EnhancedConnection) {
   74                   connection = (EnhancedConnection) value;
   75               }
   76               else {
   77                   throw new IllegalArgumentException("Created JMS Connection is not an EnhancedConnection: " + value);
   78               }
   79           }
   80           connection.start();
   81           DestinationSource source = connection.getDestinationSource();
   82           source.setDestinationListener(new DestinationListener() {
   83               public void onDestinationEvent(DestinationEvent event) {
   84                   try {
   85                       ActiveMQDestination destination = event.getDestination();
   86                       if (destination instanceof ActiveMQQueue) {
   87                           ActiveMQQueue queue = (ActiveMQQueue) destination;
   88                           if (event.isAddOperation()) {
   89                               addQueue(queue);
   90                           }
   91                           else {
   92                               removeQueue(queue);
   93                           }
   94                       }
   95                       else if (destination instanceof ActiveMQTopic) {
   96                         ActiveMQTopic topic = (ActiveMQTopic) destination;
   97                         if (event.isAddOperation()) {
   98                             addTopic(topic);
   99                         }
  100                         else {
  101                             removeTopic(topic);
  102                         }
  103                       }
  104                   }
  105                   catch (Exception e) {
  106                       LOG.warn("Caught: " + e, e);
  107                   }
  108               }
  109           });
  110   
  111           Set<ActiveMQQueue> queues = source.getQueues();
  112           for (ActiveMQQueue queue : queues) {
  113               addQueue(queue);
  114           }
  115   
  116           Set<ActiveMQTopic> topics = source.getTopics();
  117           for (ActiveMQTopic topic : topics) {
  118               addTopic(topic);
  119           }
  120       }
  121   
  122   
  123       /**
  124        *
  125        * @throws Exception
  126        * @org.apache.xbean.DestroyMethod
  127        */
  128       @PreDestroy
  129       public void destroy() throws Exception {
  130           if (connection != null) {
  131               connection.close();
  132               connection = null;
  133           }
  134       }
  135   
  136       // Properties
  137       //-------------------------------------------------------------------------
  138       public CamelContext getCamelContext() {
  139           return camelContext;
  140       }
  141   
  142       public void setCamelContext(CamelContext camelContext) {
  143           this.camelContext = camelContext;
  144       }
  145   
  146       public EnhancedConnection getConnection() {
  147           return connection;
  148       }
  149   
  150       public ConnectionFactory getConnectionFactory() {
  151           if (connectionFactory == null
  152                   && getComponent().getConfiguration() instanceof ActiveMQConfiguration) {
  153               connectionFactory = ((ActiveMQConfiguration) getComponent()
  154                       .getConfiguration()).createConnectionFactory();
  155           }
  156           return connectionFactory;
  157       }
  158   
  159       public void setConnectionFactory(ConnectionFactory connectionFactory) {
  160           this.connectionFactory = connectionFactory;
  161       }
  162   
  163       public ActiveMQComponent getComponent() {
  164           if (component == null) {
  165               component = camelContext.getComponent("activemq", ActiveMQComponent.class);
  166           }
  167           return component;
  168       }
  169   
  170       public void setComponent(ActiveMQComponent component) {
  171           this.component = component;
  172       }
  173   
  174       // Implementation methods
  175       //-------------------------------------------------------------------------
  176   
  177       protected void addQueue(ActiveMQQueue queue) throws Exception {
  178           String queueUri = getQueueUri(queue);
  179           ActiveMQComponent jmsComponent = getComponent();
  180           Endpoint endpoint = new JmsQueueEndpoint(queueUri, jmsComponent, queue.getPhysicalName(), jmsComponent.getConfiguration());
  181           camelContext.addEndpoint(queueUri, endpoint);
  182       }
  183   
  184       protected String getQueueUri(ActiveMQQueue queue) {
  185           return "activemq:" + queue.getPhysicalName();
  186       }
  187   
  188       protected void removeQueue(ActiveMQQueue queue) throws Exception {
  189           String queueUri = getQueueUri(queue);
  190           camelContext.removeEndpoints(queueUri);
  191       }
  192       
  193       protected void addTopic(ActiveMQTopic topic) throws Exception {
  194           String topicUri = getTopicUri(topic);
  195           ActiveMQComponent jmsComponent = getComponent();
  196           Endpoint endpoint = new JmsEndpoint(topicUri, jmsComponent, topic.getPhysicalName(), true, jmsComponent.getConfiguration());
  197           camelContext.addEndpoint(topicUri, endpoint);
  198       }
  199   
  200       protected String getTopicUri(ActiveMQTopic topic) {
  201           return "activemq:topic:" + topic.getPhysicalName();
  202       }
  203   
  204       protected void removeTopic(ActiveMQTopic topic) throws Exception {
  205           String topicUri = getTopicUri(topic);
  206           camelContext.removeEndpoints(topicUri);
  207       }
  208   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » camel » component » [javadoc | source]