Method from org.apache.activemq.broker.ft.MasterBroker Detail: |
public void acknowledge(ConsumerBrokerExchange consumerExchange,
MessageAck ack) throws Exception {
sendToSlave(ack);
super.acknowledge(consumerExchange, ack);
}
|
public void addConnection(ConnectionContext context,
ConnectionInfo info) throws Exception {
super.addConnection(context, info);
sendAsyncToSlave(info);
}
A client is establishing a connection with the broker. |
public Subscription addConsumer(ConnectionContext context,
ConsumerInfo info) throws Exception {
sendSyncToSlave(info);
consumers.put(info.getConsumerId(), info.getConsumerId());
return super.addConsumer(context, info);
}
|
public void addDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception {
super.addDestinationInfo(context, info);
if (info.getDestination().isTemporary()) {
sendAsyncToSlave(info);
}
}
|
public void addProducer(ConnectionContext context,
ProducerInfo info) throws Exception {
super.addProducer(context, info);
sendAsyncToSlave(info);
}
|
public void addSession(ConnectionContext context,
SessionInfo info) throws Exception {
super.addSession(context, info);
sendAsyncToSlave(info);
}
|
public void beginTransaction(ConnectionContext context,
TransactionId xid) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN);
sendAsyncToSlave(info);
super.beginTransaction(context, xid);
}
|
public void commitTransaction(ConnectionContext context,
TransactionId xid,
boolean onePhase) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.COMMIT_ONE_PHASE);
sendSyncToSlave(info);
super.commitTransaction(context, xid, onePhase);
}
|
public void forgetTransaction(ConnectionContext context,
TransactionId xid) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET);
sendAsyncToSlave(info);
super.forgetTransaction(context, xid);
}
|
public boolean isFaultTolerantConfiguration() {
return true;
}
|
public void preProcessDispatch(MessageDispatch messageDispatch) {
super.preProcessDispatch(messageDispatch);
MessageDispatchNotification mdn = new MessageDispatchNotification();
mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
mdn.setDestination(messageDispatch.getDestination());
if (messageDispatch.getMessage() != null) {
Message msg = messageDispatch.getMessage();
mdn.setMessageId(msg.getMessageId());
if (consumers.containsKey(messageDispatch.getConsumerId())) {
sendSyncToSlave(mdn);
}
}
}
Notifiy the Broker that a dispatch will happen
Do in 'pre' so that slave will avoid getting ack before dispatch
similar logic to send() below. |
public int prepareTransaction(ConnectionContext context,
TransactionId xid) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE);
sendSyncToSlave(info);
int result = super.prepareTransaction(context, xid);
return result;
}
Prepares a transaction. Only valid for xa transactions. |
public void removeConnection(ConnectionContext context,
ConnectionInfo info,
Throwable error) throws Exception {
super.removeConnection(context, info, error);
sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
}
A client is disconnecting from the broker. |
public void removeConsumer(ConnectionContext context,
ConsumerInfo info) throws Exception {
super.removeConsumer(context, info);
consumers.remove(info.getConsumerId());
sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
}
|
public void removeDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception {
super.removeDestinationInfo(context, info);
if (info.getDestination().isTemporary()) {
sendAsyncToSlave(info);
}
}
|
public void removeProducer(ConnectionContext context,
ProducerInfo info) throws Exception {
super.removeProducer(context, info);
sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
}
|
public void removeSession(ConnectionContext context,
SessionInfo info) throws Exception {
super.removeSession(context, info);
sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
}
|
public void removeSubscription(ConnectionContext context,
RemoveSubscriptionInfo info) throws Exception {
super.removeSubscription(context, info);
sendAsyncToSlave(info);
}
|
public void rollbackTransaction(ConnectionContext context,
TransactionId xid) throws Exception {
TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK);
sendAsyncToSlave(info);
super.rollbackTransaction(context, xid);
}
|
public void send(ProducerBrokerExchange producerExchange,
Message message) throws Exception {
/**
* A message can be dispatched before the super.send() method returns so -
* here the order is switched to avoid problems on the slave with
* receiving acks for messages not received yet
*/
sendSyncToSlave(message);
super.send(producerExchange, message);
}
|
protected void sendAsyncToSlave(Command command) {
try {
slave.oneway(command);
} catch (Throwable e) {
LOG.error("Slave Failed", e);
stopProcessing();
}
}
|
protected void sendSyncToSlave(Command command) {
try {
Response response = (Response)slave.request(command);
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
LOG.error("Slave Failed", er.getException());
}
} catch (Throwable e) {
LOG.error("Slave Failed", e);
}
}
|
protected void sendToSlave(Message message) {
if (message.isResponseRequired()) {
sendSyncToSlave(message);
} else {
sendAsyncToSlave(message);
}
}
|
protected void sendToSlave(MessageAck ack) {
if (ack.isResponseRequired()) {
sendAsyncToSlave(ack);
} else {
sendSyncToSlave(ack);
}
}
|
public void startProcessing() {
started.set(true);
try {
Connection[] connections = getClients();
ConnectionControl command = new ConnectionControl();
command.setFaultTolerant(true);
if (connections != null) {
for (int i = 0; i < connections.length; i++) {
if (connections[i].isActive() && connections[i].isManageable()) {
connections[i].dispatchAsync(command);
}
}
}
} catch (Exception e) {
LOG.error("Failed to get Connections", e);
}
}
start processing this broker |
public void stop() throws Exception {
stopProcessing();
}
|
public void stopProcessing() {
if (started.compareAndSet(true, false)) {
remove();
}
}
stop processing this broker |