Home » activemq-parent-5.3.1-source-release » org.apache » activemq » camel » component » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.camel.component;
   18   
   19   import java.io.File;
   20   import java.io.IOException;
   21   import java.io.InterruptedIOException;
   22   import java.util.concurrent.atomic.AtomicReference;
   23   
   24   import org.apache.activemq.kaha.impl.async.AsyncDataManager;
   25   import org.apache.activemq.kaha.impl.async.Location;
   26   import org.apache.activemq.util.ByteSequence;
   27   import org.apache.camel.CamelExchangeException;
   28   import org.apache.camel.Consumer;
   29   import org.apache.camel.Exchange;
   30   import org.apache.camel.NoTypeConversionAvailableException;
   31   import org.apache.camel.Processor;
   32   import org.apache.camel.Producer;
   33   import org.apache.camel.RuntimeCamelException;
   34   import org.apache.camel.ExchangePattern;
   35   import org.apache.camel.impl.DefaultConsumer;
   36   import org.apache.camel.impl.DefaultEndpoint;
   37   import org.apache.camel.impl.DefaultExchange;
   38   import org.apache.camel.impl.DefaultProducer;
   39   import org.apache.commons.logging.Log;
   40   import org.apache.commons.logging.LogFactory;
   41   
   42   public class JournalEndpoint extends DefaultEndpoint {
   43   
   44       private static final transient Log LOG = LogFactory.getLog(JournalEndpoint.class);
   45   
   46       private final File directory;
   47       private final AtomicReference<DefaultConsumer> consumer = new AtomicReference<DefaultConsumer>();
   48       private final Object activationMutex = new Object();
   49       private int referenceCount;
   50       private AsyncDataManager dataManager;
   51       private Thread thread;
   52       private Location lastReadLocation;
   53       private long idleDelay = 1000;
   54       private boolean syncProduce = true;
   55       private boolean syncConsume;
   56   
   57       public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) {
   58           super(uri, journalComponent.getCamelContext());
   59           this.directory = directory;
   60       }
   61   
   62       public JournalEndpoint(String endpointUri, File directory) {
   63           super(endpointUri);
   64           this.directory = directory;
   65       }
   66   
   67       public boolean isSingleton() {
   68           return true;
   69       }
   70   
   71       public File getDirectory() {
   72           return directory;
   73       }
   74   
   75       public Consumer createConsumer(Processor processor) throws Exception {
   76           return new DefaultConsumer(this, processor) {
   77               @Override
   78               public void start() throws Exception {
   79                   super.start();
   80                   activateConsumer(this);
   81               }
   82   
   83               @Override
   84               public void stop() throws Exception {
   85                   deactivateConsumer(this);
   86                   super.stop();
   87               }
   88           };
   89       }
   90   
   91       protected void decrementReference() throws IOException {
   92           synchronized (activationMutex) {
   93               referenceCount--;
   94               if (referenceCount == 0) {
   95                   LOG.debug("Closing data manager: " + directory);
   96                   LOG.debug("Last mark at: " + lastReadLocation);
   97                   dataManager.close();
   98                   dataManager = null;
   99               }
  100           }
  101       }
  102   
  103       protected void incrementReference() throws IOException {
  104           synchronized (activationMutex) {
  105               referenceCount++;
  106               if (referenceCount == 1) {
  107                   LOG.debug("Opening data manager: " + directory);
  108                   dataManager = new AsyncDataManager();
  109                   dataManager.setDirectory(directory);
  110                   dataManager.start();
  111   
  112                   lastReadLocation = dataManager.getMark();
  113                   LOG.debug("Last mark at: " + lastReadLocation);
  114               }
  115           }
  116       }
  117   
  118       protected void deactivateConsumer(DefaultConsumer consumer) throws IOException {
  119           synchronized (activationMutex) {
  120               if (this.consumer.get() != consumer) {
  121                   throw new RuntimeCamelException("Consumer was not active.");
  122               }
  123               this.consumer.set(null);
  124               try {
  125                   thread.join();
  126               } catch (InterruptedException e) {
  127                   throw new InterruptedIOException();
  128               }
  129               decrementReference();
  130           }
  131       }
  132   
  133       protected void activateConsumer(DefaultConsumer consumer) throws IOException {
  134           synchronized (activationMutex) {
  135               if (this.consumer.get() != null) {
  136                   throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
  137               }
  138               incrementReference();
  139               this.consumer.set(consumer);
  140               thread = new Thread() {
  141                   @Override
  142                   public void run() {
  143                       dispatchToConsumer();
  144                   }
  145               };
  146               thread.setName("Dipatch thread: " + getEndpointUri());
  147               thread.setDaemon(true);
  148               thread.start();
  149           }
  150       }
  151   
  152       protected void dispatchToConsumer() {
  153           try {
  154               DefaultConsumer consumer;
  155               while ((consumer = this.consumer.get()) != null) {
  156                   // See if there is a new record to process
  157                   Location location = dataManager.getNextLocation(lastReadLocation);
  158                   if (location != null) {
  159   
  160                       // Send it on.
  161                       ByteSequence read = dataManager.read(location);
  162                       Exchange exchange = createExchange();
  163                       exchange.getIn().setBody(read);
  164                       exchange.getIn().setHeader("journal", getEndpointUri());
  165                       exchange.getIn().setHeader("location", location);
  166                       consumer.getProcessor().process(exchange);
  167   
  168                       // Setting the mark makes the data manager forget about
  169                       // everything
  170                       // before that record.
  171                       if (LOG.isDebugEnabled()) {
  172                           LOG.debug("Consumed record at: " + location);
  173                       }
  174                       dataManager.setMark(location, syncConsume);
  175                       lastReadLocation = location;
  176                   } else {
  177                       // Avoid a tight CPU loop if there is no new record to read.
  178                       LOG.debug("Sleeping due to no records being available.");
  179                       Thread.sleep(idleDelay);
  180                   }
  181               }
  182           } catch (Throwable e) {
  183               e.printStackTrace();
  184           }
  185       }
  186   
  187       public Producer createProducer() throws Exception {
  188           return new DefaultProducer(this) {
  189               public void process(Exchange exchange) throws Exception {
  190                   incrementReference();
  191                   try {
  192                       ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
  193                       if (body == null) {
  194                           byte[] bytes = exchange.getIn().getBody(byte[].class);
  195                           if (bytes != null) {
  196                               body = new ByteSequence(bytes);
  197                           }
  198                       }
  199                       if (body == null) {
  200                           throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
  201                       }
  202                       dataManager.write(body, syncProduce);
  203   
  204                   } finally {
  205                       decrementReference();
  206                   }
  207               }
  208           };
  209       }
  210   
  211       public boolean isSyncConsume() {
  212           return syncConsume;
  213       }
  214   
  215       public void setSyncConsume(boolean syncConsume) {
  216           this.syncConsume = syncConsume;
  217       }
  218   
  219       public boolean isSyncProduce() {
  220           return syncProduce;
  221       }
  222   
  223       public void setSyncProduce(boolean syncProduce) {
  224           this.syncProduce = syncProduce;
  225       }
  226   
  227       boolean isOpen() {
  228           synchronized (activationMutex) {
  229               return referenceCount > 0;
  230           }
  231       }
  232   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » camel » component » [javadoc | source]