Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Method from org.apache.activemq.camel.component.JournalEndpoint Detail: |
protected void activateConsumer(DefaultConsumer consumer) throws IOException {
synchronized (activationMutex) {
if (this.consumer.get() != null) {
throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
}
incrementReference();
this.consumer.set(consumer);
thread = new Thread() {
@Override
public void run() {
dispatchToConsumer();
}
};
thread.setName("Dipatch thread: " + getEndpointUri());
thread.setDaemon(true);
thread.start();
}
}
|
public Consumer createConsumer(Processor processor) throws Exception {
return new DefaultConsumer(this, processor) {
@Override
public void start() throws Exception {
super.start();
activateConsumer(this);
}
@Override
public void stop() throws Exception {
deactivateConsumer(this);
super.stop();
}
};
}
|
public Producer createProducer() throws Exception {
return new DefaultProducer(this) {
public void process(Exchange exchange) throws Exception {
incrementReference();
try {
ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
if (body == null) {
byte[] bytes = exchange.getIn().getBody(byte[].class);
if (bytes != null) {
body = new ByteSequence(bytes);
}
}
if (body == null) {
throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
}
dataManager.write(body, syncProduce);
} finally {
decrementReference();
}
}
};
}
|
protected void deactivateConsumer(DefaultConsumer consumer) throws IOException {
synchronized (activationMutex) {
if (this.consumer.get() != consumer) {
throw new RuntimeCamelException("Consumer was not active.");
}
this.consumer.set(null);
try {
thread.join();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
decrementReference();
}
}
|
protected void decrementReference() throws IOException {
synchronized (activationMutex) {
referenceCount--;
if (referenceCount == 0) {
LOG.debug("Closing data manager: " + directory);
LOG.debug("Last mark at: " + lastReadLocation);
dataManager.close();
dataManager = null;
}
}
}
|
protected void dispatchToConsumer() {
try {
DefaultConsumer consumer;
while ((consumer = this.consumer.get()) != null) {
// See if there is a new record to process
Location location = dataManager.getNextLocation(lastReadLocation);
if (location != null) {
// Send it on.
ByteSequence read = dataManager.read(location);
Exchange exchange = createExchange();
exchange.getIn().setBody(read);
exchange.getIn().setHeader("journal", getEndpointUri());
exchange.getIn().setHeader("location", location);
consumer.getProcessor().process(exchange);
// Setting the mark makes the data manager forget about
// everything
// before that record.
if (LOG.isDebugEnabled()) {
LOG.debug("Consumed record at: " + location);
}
dataManager.setMark(location, syncConsume);
lastReadLocation = location;
} else {
// Avoid a tight CPU loop if there is no new record to read.
LOG.debug("Sleeping due to no records being available.");
Thread.sleep(idleDelay);
}
}
} catch (Throwable e) {
e.printStackTrace();
}
}
|
public File getDirectory() {
return directory;
}
|
protected void incrementReference() throws IOException {
synchronized (activationMutex) {
referenceCount++;
if (referenceCount == 1) {
LOG.debug("Opening data manager: " + directory);
dataManager = new AsyncDataManager();
dataManager.setDirectory(directory);
dataManager.start();
lastReadLocation = dataManager.getMark();
LOG.debug("Last mark at: " + lastReadLocation);
}
}
}
|
boolean isOpen() {
synchronized (activationMutex) {
return referenceCount > 0;
}
}
|
public boolean isSingleton() {
return true;
}
|
public boolean isSyncConsume() {
return syncConsume;
}
|
public boolean isSyncProduce() {
return syncProduce;
}
|
public void setSyncConsume(boolean syncConsume) {
this.syncConsume = syncConsume;
}
|
public void setSyncProduce(boolean syncProduce) {
this.syncProduce = syncProduce;
}
|