1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.camel.component; 18 19 import java.io.File; 20 import java.io.IOException; 21 import java.io.InterruptedIOException; 22 import java.util.concurrent.atomic.AtomicReference; 23 24 import org.apache.activemq.kaha.impl.async.AsyncDataManager; 25 import org.apache.activemq.kaha.impl.async.Location; 26 import org.apache.activemq.util.ByteSequence; 27 import org.apache.camel.CamelExchangeException; 28 import org.apache.camel.Consumer; 29 import org.apache.camel.Exchange; 30 import org.apache.camel.NoTypeConversionAvailableException; 31 import org.apache.camel.Processor; 32 import org.apache.camel.Producer; 33 import org.apache.camel.RuntimeCamelException; 34 import org.apache.camel.ExchangePattern; 35 import org.apache.camel.impl.DefaultConsumer; 36 import org.apache.camel.impl.DefaultEndpoint; 37 import org.apache.camel.impl.DefaultExchange; 38 import org.apache.camel.impl.DefaultProducer; 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 42 public class JournalEndpoint extends DefaultEndpoint { 43 44 private static final transient Log LOG = LogFactory.getLog(JournalEndpoint.class); 45 46 private final File directory; 47 private final AtomicReference<DefaultConsumer> consumer = new AtomicReference<DefaultConsumer>(); 48 private final Object activationMutex = new Object(); 49 private int referenceCount; 50 private AsyncDataManager dataManager; 51 private Thread thread; 52 private Location lastReadLocation; 53 private long idleDelay = 1000; 54 private boolean syncProduce = true; 55 private boolean syncConsume; 56 57 public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) { 58 super(uri, journalComponent.getCamelContext()); 59 this.directory = directory; 60 } 61 62 public JournalEndpoint(String endpointUri, File directory) { 63 super(endpointUri); 64 this.directory = directory; 65 } 66 67 public boolean isSingleton() { 68 return true; 69 } 70 71 public File getDirectory() { 72 return directory; 73 } 74 75 public Consumer createConsumer(Processor processor) throws Exception { 76 return new DefaultConsumer(this, processor) { 77 @Override 78 public void start() throws Exception { 79 super.start(); 80 activateConsumer(this); 81 } 82 83 @Override 84 public void stop() throws Exception { 85 deactivateConsumer(this); 86 super.stop(); 87 } 88 }; 89 } 90 91 protected void decrementReference() throws IOException { 92 synchronized (activationMutex) { 93 referenceCount--; 94 if (referenceCount == 0) { 95 LOG.debug("Closing data manager: " + directory); 96 LOG.debug("Last mark at: " + lastReadLocation); 97 dataManager.close(); 98 dataManager = null; 99 } 100 } 101 } 102 103 protected void incrementReference() throws IOException { 104 synchronized (activationMutex) { 105 referenceCount++; 106 if (referenceCount == 1) { 107 LOG.debug("Opening data manager: " + directory); 108 dataManager = new AsyncDataManager(); 109 dataManager.setDirectory(directory); 110 dataManager.start(); 111 112 lastReadLocation = dataManager.getMark(); 113 LOG.debug("Last mark at: " + lastReadLocation); 114 } 115 } 116 } 117 118 protected void deactivateConsumer(DefaultConsumer consumer) throws IOException { 119 synchronized (activationMutex) { 120 if (this.consumer.get() != consumer) { 121 throw new RuntimeCamelException("Consumer was not active."); 122 } 123 this.consumer.set(null); 124 try { 125 thread.join(); 126 } catch (InterruptedException e) { 127 throw new InterruptedIOException(); 128 } 129 decrementReference(); 130 } 131 } 132 133 protected void activateConsumer(DefaultConsumer consumer) throws IOException { 134 synchronized (activationMutex) { 135 if (this.consumer.get() != null) { 136 throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer"); 137 } 138 incrementReference(); 139 this.consumer.set(consumer); 140 thread = new Thread() { 141 @Override 142 public void run() { 143 dispatchToConsumer(); 144 } 145 }; 146 thread.setName("Dipatch thread: " + getEndpointUri()); 147 thread.setDaemon(true); 148 thread.start(); 149 } 150 } 151 152 protected void dispatchToConsumer() { 153 try { 154 DefaultConsumer consumer; 155 while ((consumer = this.consumer.get()) != null) { 156 // See if there is a new record to process 157 Location location = dataManager.getNextLocation(lastReadLocation); 158 if (location != null) { 159 160 // Send it on. 161 ByteSequence read = dataManager.read(location); 162 Exchange exchange = createExchange(); 163 exchange.getIn().setBody(read); 164 exchange.getIn().setHeader("journal", getEndpointUri()); 165 exchange.getIn().setHeader("location", location); 166 consumer.getProcessor().process(exchange); 167 168 // Setting the mark makes the data manager forget about 169 // everything 170 // before that record. 171 if (LOG.isDebugEnabled()) { 172 LOG.debug("Consumed record at: " + location); 173 } 174 dataManager.setMark(location, syncConsume); 175 lastReadLocation = location; 176 } else { 177 // Avoid a tight CPU loop if there is no new record to read. 178 LOG.debug("Sleeping due to no records being available."); 179 Thread.sleep(idleDelay); 180 } 181 } 182 } catch (Throwable e) { 183 e.printStackTrace(); 184 } 185 } 186 187 public Producer createProducer() throws Exception { 188 return new DefaultProducer(this) { 189 public void process(Exchange exchange) throws Exception { 190 incrementReference(); 191 try { 192 ByteSequence body = exchange.getIn().getBody(ByteSequence.class); 193 if (body == null) { 194 byte[] bytes = exchange.getIn().getBody(byte[].class); 195 if (bytes != null) { 196 body = new ByteSequence(bytes); 197 } 198 } 199 if (body == null) { 200 throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange); 201 } 202 dataManager.write(body, syncProduce); 203 204 } finally { 205 decrementReference(); 206 } 207 } 208 }; 209 } 210 211 public boolean isSyncConsume() { 212 return syncConsume; 213 } 214 215 public void setSyncConsume(boolean syncConsume) { 216 this.syncConsume = syncConsume; 217 } 218 219 public boolean isSyncProduce() { 220 return syncProduce; 221 } 222 223 public void setSyncProduce(boolean syncProduce) { 224 this.syncProduce = syncProduce; 225 } 226 227 boolean isOpen() { 228 synchronized (activationMutex) { 229 return referenceCount > 0; 230 } 231 } 232 }