| Method from org.apache.activemq.ActiveMQConnection Detail: |
protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
this.connectionConsumers.add(connectionConsumer);
}
|
public void addDispatcher(ConsumerId consumerId,
ActiveMQDispatcher dispatcher) {
dispatchers.put(consumerId, dispatcher);
}
|
public void addInputStream(ActiveMQInputStream stream) {
inputStreams.add(stream);
}
|
public void addOutputStream(ActiveMQOutputStream stream) {
outputStreams.add(stream);
}
|
public void addProducer(ProducerId producerId,
ActiveMQMessageProducer producer) {
producers.put(producerId, producer);
}
|
protected void addSession(ActiveMQSession session) throws JMSException {
this.sessions.add(session);
if (sessions.size() > 1 || session.isTransacted()) {
optimizedMessageDispatch = false;
}
}
Used internally for adding Sessions to the Connection |
public void addTransportListener(TransportListener transportListener) {
transportListeners.add(transportListener);
}
Adds a transport listener so that a client can be notified of events in
the underlying transport |
public void asyncSendPacket(Command command) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
doAsyncSendPacket(command);
}
}
send a Packet through the Connection - for internal use only |
public void changeUserInfo(String userName,
String password) throws JMSException {
if (isConnectionInfoSentToBroker) {
throw new IllegalStateException("changeUserInfo used Connection is not allowed");
}
this.info.setUserName(userName);
this.info.setPassword(password);
}
Changes the associated username/password that is associated with this
connection. If the connection has been used, you must called cleanup()
before calling this method. |
public void checkClientIDWasManuallySpecified() throws JMSException {
if (!userSpecifiedClientID) {
throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
}
}
Ensures that the clientID was manually specified and not auto-generated.
If the clientID was not specified this method will throw an exception.
This method is used to ensure that the clientID + durableSubscriber name
are used correctly. |
protected synchronized void checkClosed() throws JMSException {
if (closed.get()) {
throw new ConnectionClosedException();
}
}
simply throws an exception if the Connection is already closed |
protected synchronized void checkClosedOrFailed() throws JMSException {
checkClosed();
if (transportFailed.get()) {
throw new ConnectionFailedException(firstFailureError);
}
}
simply throws an exception if the Connection is already closed or the
Transport has failed |
public void cleanup() throws JMSException {
if (advisoryConsumer != null) {
advisoryConsumer.dispose();
advisoryConsumer = null;
}
for (Iterator< ActiveMQSession > i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.dispose();
}
for (Iterator< ActiveMQConnectionConsumer > i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = i.next();
c.dispose();
}
for (Iterator< ActiveMQInputStream > i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = i.next();
c.dispose();
}
for (Iterator< ActiveMQOutputStream > i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = i.next();
c.dispose();
}
if (isConnectionInfoSentToBroker) {
if (!transportFailed.get() && !closing.get()) {
asyncSendPacket(info.createRemoveCommand());
}
isConnectionInfoSentToBroker = false;
}
if (userSpecifiedClientID) {
info.setClientId(null);
userSpecifiedClientID = false;
}
clientIDSet = false;
started.set(false);
}
Cleans up this connection so that it's state is as if the connection was
just created. This allows the Resource Adapter to clean up a connection
so that it can be reused without having to close and recreate the
connection. |
public void close() throws JMSException {
try {
// If we were running, lets stop first.
stop();
synchronized (this) {
if (!closed.get()) {
closing.set(true);
if (destinationSource != null) {
destinationSource.stop();
destinationSource = null;
}
if (advisoryConsumer != null) {
advisoryConsumer.dispose();
advisoryConsumer = null;
}
for (Iterator< ActiveMQSession > i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.dispose();
}
for (Iterator< ActiveMQConnectionConsumer > i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = i.next();
c.dispose();
}
for (Iterator< ActiveMQInputStream > i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = i.next();
c.dispose();
}
for (Iterator< ActiveMQOutputStream > i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = i.next();
c.dispose();
}
if (isConnectionInfoSentToBroker) {
// If we announced ourselfs to the broker.. Try to let
// the broker
// know that the connection is being shutdown.
doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
doAsyncSendPacket(new ShutdownInfo());
}
ServiceSupport.dispose(this.transport);
started.set(false);
// TODO if we move the TaskRunnerFactory to the connection
// factory
// then we may need to call
// factory.onConnectionClose(this);
sessionTaskRunner.shutdown();
if (asyncConnectionThread != null){
asyncConnectionThread.shutdown();
}
closed.set(true);
closing.set(false);
}
}
} finally {
factoryStats.removeConnection(this);
}
}
Closes the connection.
Since a provider typically allocates significant resources outside the
JVM on behalf of a connection, clients should close these resources when
they are not needed. Relying on garbage collection to eventually reclaim
these resources may not be timely enough.
There is no need to close the sessions, producers, and consumers of a
closed connection.
Closing a connection causes all temporary destinations to be deleted.
When this method is invoked, it should not return until message
processing has been shut down in an orderly fashion. This means that all
message listeners that may have been running have returned, and that all
pending receives have returned. A close terminates all pending message
receives on the connection's sessions' consumers. The receives may return
with a message or with null, depending on whether there was a message
available at the time of the close. If one or more of the connection's
sessions' message listeners is processing a message at the time when
connection close is invoked, all the facilities of the
connection and its sessions must remain available to those listeners
until they return control to the JMS provider.
Closing a connection causes any of its sessions' transactions in progress
to be rolled back. In the case where a session's work is coordinated by
an external transaction manager, a session's commit and
rollback methods are not used and the result of a closed
session's work is determined later by the transaction manager. Closing a
connection does NOT force an acknowledgment of client-acknowledged
sessions.
Invoking the acknowledge method of a received message from
a closed connection's session must throw an
IllegalStateException. Closing a closed connection must
NOT throw an exception. |
protected BlobTransferPolicy createBlobTransferPolicy() {
return new BlobTransferPolicy();
}
|
public ConnectionConsumer createConnectionConsumer(Topic topic,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
}
Creates a connection consumer for this connection (optional operation).
This is an expert facility not used by regular JMS clients. |
public ConnectionConsumer createConnectionConsumer(Queue queue,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
}
Creates a connection consumer for this connection (optional operation).
This is an expert facility not used by regular JMS clients. |
public ConnectionConsumer createConnectionConsumer(Destination destination,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
}
Creates a connection consumer for this connection (optional operation).
This is an expert facility not used by regular JMS clients. |
public ConnectionConsumer createConnectionConsumer(Destination destination,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages,
boolean noLocal) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
ConsumerId consumerId = createConsumerId();
ConsumerInfo info = new ConsumerInfo(consumerId);
info.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
info.setSelector(messageSelector);
info.setPrefetchSize(maxMessages);
info.setNoLocal(noLocal);
info.setDispatchAsync(isDispatchAsync());
// Allows the options on the destination to configure the consumerInfo
if (info.getDestination().getOptions() != null) {
Map< String, String > options = new HashMap< String, String >(info.getDestination().getOptions());
IntrospectionSupport.setProperties(info, options, "consumer.");
}
return new ActiveMQConnectionConsumer(this, sessionPool, info);
}
|
public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
}
Create a durable connection consumer for this connection (optional
operation). This is an expert facility not used by regular JMS clients. |
public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages,
boolean noLocal) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
SessionId sessionId = new SessionId(info.getConnectionId(), -1);
ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
info.setSubscriptionName(subscriptionName);
info.setSelector(messageSelector);
info.setPrefetchSize(maxMessages);
info.setDispatchAsync(isDispatchAsync());
// Allows the options on the destination to configure the consumerInfo
if (info.getDestination().getOptions() != null) {
Map< String, String > options = new HashMap< String, String >(info.getDestination().getOptions());
IntrospectionSupport.setProperties(this.info, options, "consumer.");
}
return new ActiveMQConnectionConsumer(this, sessionPool, info);
}
Create a durable connection consumer for this connection (optional
operation). This is an expert facility not used by regular JMS clients. |
public InputStream createDurableInputStream(Topic dest,
String name) throws JMSException {
return createInputStream(dest, null, false);
}
|
public InputStream createDurableInputStream(Topic dest,
String name,
String messageSelector) throws JMSException {
return createDurableInputStream(dest, name, messageSelector, false);
}
|
public InputStream createDurableInputStream(Topic dest,
String name,
String messageSelector,
boolean noLocal) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, name);
}
|
public InputStream createInputStream(Destination dest) throws JMSException {
return createInputStream(dest, null);
}
|
public InputStream createInputStream(Destination dest,
String messageSelector) throws JMSException {
return createInputStream(dest, messageSelector, false);
}
|
public InputStream createInputStream(Destination dest,
String messageSelector,
boolean noLocal) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null);
}
|
public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
}
Creates a non persistent output stream; messages will not be written to
disk |
public OutputStream createOutputStream(Destination dest) throws JMSException {
return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
}
Creates a persistent output stream; individual messages will be written
to disk/database by the broker |
public OutputStream createOutputStream(Destination dest,
Map streamProperties,
int deliveryMode,
int priority,
long timeToLive) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
}
Creates an output stream allowing full control over the delivery mode,
the priority and time to live of the messages and the properties added to
messages on the stream. |
public QueueSession createQueueSession(boolean transacted,
int acknowledgeMode) throws JMSException {
return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
}
Creates a QueueSession object. |
public Session createSession(boolean transacted,
int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
}
Creates a Session object. |
protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
// Check if Destination info is of temporary type.
ActiveMQTempDestination dest;
if (topic) {
dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
} else {
dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
}
DestinationInfo info = new DestinationInfo();
info.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
info.setDestination(dest);
syncSendPacket(info);
dest.setConnection(this);
activeTempDestinations.put(dest, dest);
return dest;
}
Create the DestinationInfo object for the temporary destination. |
public TopicSession createTopicSession(boolean transacted,
int acknowledgeMode) throws JMSException {
return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
}
Creates a TopicSession object. |
public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
checkClosedOrFailed();
for (Iterator< ActiveMQSession > i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
if (s.isInUse(destination)) {
throw new JMSException("A consumer is consuming from the temporary destination");
}
}
activeTempDestinations.remove(destination);
DestinationInfo info = new DestinationInfo();
info.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination);
info.setTimeout(0);
syncSendPacket(info);
}
|
public void destroyDestination(ActiveMQDestination destination) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
DestinationInfo info = new DestinationInfo();
info.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination);
info.setTimeout(0);
syncSendPacket(info);
}
|
protected synchronized void ensureConnectionInfoSent() throws JMSException {
// Can we skip sending the ConnectionInfo packet??
if (isConnectionInfoSentToBroker || closed.get()) {
return;
}
if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
info.setClientId(clientIdGenerator.generateId());
}
syncSendPacket(info);
this.isConnectionInfoSentToBroker = true;
// Add a temp destination advisory consumer so that
// We know what the valid temporary destinations are on the
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
if (watchTopicAdvisories) {
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
}
}
Send the ConnectionInfo to the Broker |
public BlobTransferPolicy getBlobTransferPolicy() {
if (blobTransferPolicy == null) {
blobTransferPolicy = createBlobTransferPolicy();
}
return blobTransferPolicy;
}
|
public BrokerInfo getBrokerInfo() {
return brokerInfo;
}
Returns the broker information if it is available or null if it is not
available yet. |
public String getBrokerName() {
try {
brokerInfoReceived.await(5, TimeUnit.SECONDS);
if (brokerInfo == null) {
return null;
}
return brokerInfo.getBrokerName();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
Returns the broker name if one is available or null if one is not
available yet. |
public String getClientID() throws JMSException {
checkClosedOrFailed();
return this.info.getClientId();
}
Gets the client identifier for this connection.
This value is specific to the JMS provider. It is either preconfigured by
an administrator in a ConnectionFactory object or assigned
dynamically by the application by calling the setClientID
method. |
public int getCloseTimeout() {
return closeTimeout;
}
|
public ConnectionInfo getConnectionInfo() {
return this.info;
}
|
public JMSConnectionStatsImpl getConnectionStats() {
return stats;
}
|
public DestinationSource getDestinationSource() throws JMSException {
if (destinationSource == null) {
destinationSource = new DestinationSource(this);
destinationSource.start();
}
return destinationSource;
}
Returns the DestinationSource object which can be used to listen to destinations
being created or destroyed or to enquire about the current destinations available on the broker |
public ExceptionListener getExceptionListener() throws JMSException {
checkClosedOrFailed();
return this.exceptionListener;
}
Gets the ExceptionListener object for this connection. Not
every Connection has an ExceptionListener
associated with it. |
public String getInitializedClientID() throws JMSException {
ensureConnectionInfoSent();
return info.getClientId();
}
|
public LongSequenceGenerator getLocalTransactionIdGenerator() {
return localTransactionIdGenerator;
}
|
public ConnectionMetaData getMetaData() throws JMSException {
checkClosedOrFailed();
return ActiveMQConnectionMetaData.INSTANCE;
}
Gets the metadata for this connection. |
protected SessionId getNextSessionId() {
return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
}
|
public ActiveMQPrefetchPolicy getPrefetchPolicy() {
return prefetchPolicy;
}
|
public int getProducerWindowSize() {
return producerWindowSize;
}
|
public int getProtocolVersion() {
return protocolVersion.get();
}
|
public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
return redeliveryPolicy;
}
|
public String getResourceManagerId() throws JMSException {
waitForBrokerInfo();
if (brokerInfo == null) {
throw new JMSException("Connection failed before Broker info was received.");
}
return brokerInfo.getBrokerId().getValue();
}
|
public int getSendTimeout() {
return sendTimeout;
}
|
public TaskRunnerFactory getSessionTaskRunner() {
return sessionTaskRunner;
}
|
public StatsImpl getStats() {
return stats;
}
|
public long getTimeCreated() {
return timeCreated;
}
Returns the time this connection was created |
public MessageTransformer getTransformer() {
return transformer;
}
|
public Transport getTransport() {
return transport;
}
|
public Transport getTransportChannel() {
return transport;
}
|
public long getWarnAboutUnstartedConnectionTimeout() {
return warnAboutUnstartedConnectionTimeout;
}
|
public boolean isAlwaysSessionAsync() {
return alwaysSessionAsync;
}
|
public boolean isAlwaysSyncSend() {
return this.alwaysSyncSend;
}
|
public boolean isClosed() {
return closed.get();
}
Returns true if the connection is closed |
public boolean isClosing() {
return closing.get();
}
Returns true if the connection is in the process of being closed |
public boolean isCopyMessageOnSend() {
return copyMessageOnSend;
}
|
public boolean isDeleted(ActiveMQDestination dest) {
// If we are not watching the advisories.. then
// we will assume that the temp destination does exist.
if (advisoryConsumer == null) {
return false;
}
return !activeTempDestinations.contains(dest);
}
|
public boolean isDisableTimeStampsByDefault() {
return disableTimeStampsByDefault;
}
|
public boolean isDispatchAsync() {
return dispatchAsync;
}
|
protected boolean isDuplicate(ActiveMQDispatcher dispatcher,
Message message) {
return connectionAudit.isDuplicate(dispatcher, message);
}
|
public boolean isExclusiveConsumer() {
return exclusiveConsumer;
}
|
public boolean isNestedMapAndListEnabled() {
return nestedMapAndListEnabled;
}
|
public boolean isObjectMessageSerializationDefered() {
return objectMessageSerializationDefered;
}
|
public boolean isOptimizeAcknowledge() {
return optimizeAcknowledge;
}
|
public boolean isOptimizedMessageDispatch() {
return optimizedMessageDispatch;
}
|
public boolean isStarted() {
return started.get();
}
Returns true if this connection has been started |
public boolean isStatsEnabled() {
return this.stats.isEnabled();
}
|
public boolean isTransportFailed() {
return transportFailed.get();
}
Returns true if the underlying transport has failed |
public boolean isUseAsyncSend() {
return useAsyncSend;
}
|
public boolean isUseCompression() {
return useCompression;
}
|
public boolean isUseRetroactiveConsumer() {
return useRetroactiveConsumer;
}
|
public synchronized boolean isWatchTopicAdvisories() {
return watchTopicAdvisories;
}
|
public static ActiveMQConnection makeConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
return (ActiveMQConnection)factory.createConnection();
}
A static helper method to create a new connection |
public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
return (ActiveMQConnection)factory.createConnection();
}
A static helper method to create a new connection |
public static ActiveMQConnection makeConnection(String user,
String password,
String uri) throws JMSException, URISyntaxException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
return (ActiveMQConnection)factory.createConnection();
}
A static helper method to create a new connection |
public void onAsyncException(Throwable error) {
if (!closed.get() && !closing.get()) {
if (this.exceptionListener != null) {
if (!(error instanceof JMSException)) {
error = JMSExceptionSupport.create(error);
}
final JMSException e = (JMSException)error;
asyncConnectionThread.execute(new Runnable() {
public void run() {
ActiveMQConnection.this.exceptionListener.onException(e);
}
});
} else {
LOG.debug("Async exception with no exception listener: " + error, error);
}
}
}
Used for handling async exceptions |
public void onCommand(Object o) {
final Command command = (Command)o;
if (!closed.get() && command != null) {
try {
command.visit(new CommandVisitorAdapter() {
@Override
public Response processMessageDispatch(MessageDispatch md) throws Exception {
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
md.setMessage(msg);
}
dispatcher.dispatch(md);
}
return null;
}
@Override
public Response processProducerAck(ProducerAck pa) throws Exception {
if (pa != null && pa.getProducerId() != null) {
ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
if (producer != null) {
producer.onProducerAck(pa);
}
}
return null;
}
@Override
public Response processBrokerInfo(BrokerInfo info) throws Exception {
brokerInfo = info;
brokerInfoReceived.countDown();
optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
return null;
}
@Override
public Response processConnectionError(final ConnectionError error) throws Exception {
asyncConnectionThread.execute(new Runnable() {
public void run() {
onAsyncException(error.getException());
}
});
return null;
}
@Override
public Response processControlCommand(ControlCommand command) throws Exception {
onControlCommand(command);
return null;
}
@Override
public Response processConnectionControl(ConnectionControl control) throws Exception {
onConnectionControl((ConnectionControl)command);
return null;
}
@Override
public Response processConsumerControl(ConsumerControl control) throws Exception {
onConsumerControl((ConsumerControl)command);
return null;
}
@Override
public Response processWireFormat(WireFormatInfo info) throws Exception {
onWireFormatInfo((WireFormatInfo)command);
return null;
}
});
} catch (Exception e) {
onAsyncException(e);
}
}
for (Iterator< TransportListener > iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onCommand(command);
}
}
|
protected void onConnectionControl(ConnectionControl command) {
if (command.isFaultTolerant()) {
this.optimizeAcknowledge = false;
for (Iterator< ActiveMQSession > i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.setOptimizeAcknowledge(false);
}
}
}
|
protected void onConsumerControl(ConsumerControl command) {
if (command.isClose()) {
for (Iterator< ActiveMQSession > i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.close(command.getConsumerId());
}
} else {
for (Iterator< ActiveMQSession > i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
}
}
}
|
protected void onControlCommand(ControlCommand command) {
String text = command.getCommand();
if (text != null) {
if (text.equals("shutdown")) {
LOG.info("JVM told to shutdown");
System.exit(0);
}
}
}
|
public void onException(IOException error) {
onAsyncException(error);
if (!closing.get() && !closed.get()) {
asyncConnectionThread.execute(new Runnable() {
public void run() {
transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
for (Iterator< TransportListener > iter = transportListeners
.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onException(error);
}
}
});
}
}
|
protected void onWireFormatInfo(WireFormatInfo info) {
protocolVersion.set(info.getVersion());
}
|
protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
this.connectionConsumers.remove(connectionConsumer);
this.removeDispatcher(connectionConsumer);
}
Remove a ConnectionConsumer |
public void removeDispatcher(ConsumerId consumerId) {
dispatchers.remove(consumerId);
}
|
protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
connectionAudit.removeDispatcher(dispatcher);
}
|
public void removeInputStream(ActiveMQInputStream stream) {
inputStreams.remove(stream);
}
|
public void removeOutputStream(ActiveMQOutputStream stream) {
outputStreams.remove(stream);
}
|
public void removeProducer(ProducerId producerId) {
producers.remove(producerId);
}
|
protected void removeSession(ActiveMQSession session) {
this.sessions.remove(session);
this.removeDispatcher(session);
}
Used interanlly for removing Sessions from a Connection |
public void removeTransportListener(TransportListener transportListener) {
transportListeners.remove(transportListener);
}
|
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher,
Message message) {
connectionAudit.rollbackDuplicate(dispatcher, message);
}
|
void send(ActiveMQDestination destination,
ActiveMQMessage msg,
MessageId messageId,
int deliveryMode,
int priority,
long timeToLive,
boolean async) throws JMSException {
checkClosedOrFailed();
if (destination.isTemporary() && isDeleted(destination)) {
throw new JMSException("Cannot publish to a deleted Destination: " + destination);
}
msg.setJMSDestination(destination);
msg.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
if (!isDisableTimeStampsByDefault()) {
long timeStamp = System.currentTimeMillis();
msg.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
msg.setJMSExpiration(expiration);
msg.setJMSPriority(priority);
msg.setJMSRedelivered(false);
msg.setMessageId(messageId);
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending message: " + msg);
}
if (async) {
asyncSendPacket(msg);
} else {
syncSendPacket(msg);
}
}
Internal send method optimized: - It does not copy the message - It can
only handle ActiveMQ messages. - You can specify if the send is async or
sync - Does not allow you to send /w a transaction. |
public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
this.alwaysSessionAsync = alwaysSessionAsync;
}
If this flag is set then a separate thread is not used for dispatching
messages for each Session in the Connection. However, a separate thread
is always used if there is more than one session, or the session isn't in
auto acknowledge or duplicates ok mode |
public void setAlwaysSyncSend(boolean alwaysSyncSend) {
this.alwaysSyncSend = alwaysSyncSend;
}
Set true if always require messages to be sync sent |
public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
this.blobTransferPolicy = blobTransferPolicy;
}
Sets the policy used to describe how out-of-band BLOBs (Binary Large
OBjects) are transferred from producers to brokers to consumers |
public void setClientID(String newClientID) throws JMSException {
checkClosedOrFailed();
if (this.clientIDSet) {
throw new IllegalStateException("The clientID has already been set");
}
if (this.isConnectionInfoSentToBroker) {
throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
}
this.info.setClientId(newClientID);
this.userSpecifiedClientID = true;
ensureConnectionInfoSent();
}
Sets the client identifier for this connection.
The preferred way to assign a JMS client's client identifier is for it to
be configured in a client-specific ConnectionFactory
object and transparently assigned to the Connection object
it creates.
Alternatively, a client can set a connection's client identifier using a
provider-specific value. The facility to set a connection's client
identifier explicitly is not a mechanism for overriding the identifier
that has been administratively configured. It is provided for the case
where no administratively specified identifier exists. If one does exist,
an attempt to change it by setting it must throw an
IllegalStateException. If a client sets the client
identifier explicitly, it must do so immediately after it creates the
connection and before any other action on the connection is taken. After
this point, setting the client identifier is a programming error that
should throw an IllegalStateException.
The purpose of the client identifier is to associate a connection and its
objects with a state maintained on behalf of the client by a provider.
The only such state identified by the JMS API is that required to support
durable subscriptions.
If another connection with the same clientID is already
running when this method is called, the JMS provider should detect the
duplicate ID and throw an InvalidClientIDException. |
public void setCloseTimeout(int closeTimeout) {
this.closeTimeout = closeTimeout;
}
Sets the timeout before a close is considered complete. Normally a
close() on a connection waits for confirmation from the broker; this
allows that operation to timeout to save the client hanging if there is
no broker |
public void setCopyMessageOnSend(boolean copyMessageOnSend) {
this.copyMessageOnSend = copyMessageOnSend;
}
Should a JMS message be copied to a new JMS Message object as part of the
send() method in JMS. This is enabled by default to be compliant with the
JMS specification. You can disable it if you do not mutate JMS messages
after they are sent for a performance boost |
public void setDefaultClientID(String clientID) throws JMSException {
this.info.setClientId(clientID);
this.userSpecifiedClientID = true;
}
Sets the default client id that the connection will use if explicitly not
set with the setClientId() call. |
public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
this.disableTimeStampsByDefault = timeStampsDisableByDefault;
}
Sets whether or not timestamps on messages should be disabled or not. If
you disable them it adds a small performance boost. |
public void setDispatchAsync(boolean asyncDispatch) {
this.dispatchAsync = asyncDispatch;
}
Enables or disables the default setting of whether or not consumers have
their messages dispatched
synchronously or asynchronously by the broker. For non-durable
topics for example we typically dispatch synchronously by default to
minimize context switches which boost performance. However sometimes its
better to go slower to ensure that a single blocked consumer socket does
not block delivery to other consumers. |
public void setExceptionListener(ExceptionListener listener) throws JMSException {
checkClosedOrFailed();
this.exceptionListener = listener;
}
Sets an exception listener for this connection.
If a JMS provider detects a serious problem with a connection, it informs
the connection's ExceptionListener, if one has been
registered. It does this by calling the listener's onException
method, passing it a JMSException object describing the
problem.
An exception listener allows a client to be notified of a problem
asynchronously. Some connections only consume messages, so they would
have no other way to learn their connection has failed.
A connection serializes execution of its ExceptionListener.
A JMS provider should attempt to resolve connection problems itself
before it notifies the client of them. |
public void setExclusiveConsumer(boolean exclusiveConsumer) {
this.exclusiveConsumer = exclusiveConsumer;
}
Enables or disables whether or not queue consumers should be exclusive or
not for example to preserve ordering when not using Message Groups |
public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
this.nestedMapAndListEnabled = structuredMapsEnabled;
}
Enables/disables whether or not Message properties and MapMessage entries
support Nested
Structures of Map and List objects |
public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
this.objectMessageSerializationDefered = objectMessageSerializationDefered;
}
When an object is set on an ObjectMessage, the JMS spec requires the
object to be serialized by that set method. Enabling this flag causes the
object to not get serialized. The object may subsequently get serialized
if the message needs to be sent over a socket or stored to disk. |
public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
this.optimizeAcknowledge = optimizeAcknowledge;
}
Enables an optimised acknowledgement mode where messages are acknowledged
in batches rather than individually |
public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
this.optimizedMessageDispatch = dispatchOptimizedMessage;
}
If this flag is set then an larger prefetch limit is used - only
applicable for durable topic subscribers. |
protected void setPassword(String password) {
this.info.setPassword(password);
}
|
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy;
}
|
public void setProducerWindowSize(int producerWindowSize) {
this.producerWindowSize = producerWindowSize;
}
|
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
this.redeliveryPolicy = redeliveryPolicy;
}
Sets the redelivery policy to be used when messages are rolled back |
public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}
|
public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
this.sessionTaskRunner = sessionTaskRunner;
}
|
public void setStatsEnabled(boolean statsEnabled) {
this.stats.setEnabled(statsEnabled);
}
|
public void setTransformer(MessageTransformer transformer) {
this.transformer = transformer;
}
Sets the transformer used to transform messages before they are sent on
to the JMS bus or when they are received from the bus but before they are
delivered to the JMS client |
public void setUseAsyncSend(boolean useAsyncSend) {
this.useAsyncSend = useAsyncSend;
}
Forces the use of Async Sends which
adds a massive performance boost; but means that the send() method will
return immediately whether the message has been sent or not which could
lead to message loss. |
public void setUseCompression(boolean useCompression) {
this.useCompression = useCompression;
}
Enables the use of compression of the message bodies |
public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
this.useRetroactiveConsumer = useRetroactiveConsumer;
}
Sets whether or not retroactive consumers are enabled. Retroactive
consumers allow non-durable topic subscribers to receive old messages
that were published before the non-durable subscriber started. |
protected void setUserName(String userName) {
this.info.setUserName(userName);
}
|
public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
}
Enables the timeout from a connection creation to when a warning is
generated if the connection is not properly started via #start()
and a message is received by a consumer. It is a very common gotcha to
forget to start
the connection so this option makes the default case to create a
warning if the user forgets. To disable the warning just set the value to <
0 (say -1). |
public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
this.watchTopicAdvisories = watchTopicAdvisories;
}
|
public void start() throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
if (started.compareAndSet(false, true)) {
for (Iterator< ActiveMQSession > i = sessions.iterator(); i.hasNext();) {
ActiveMQSession session = i.next();
session.start();
}
}
}
Starts (or restarts) a connection's delivery of incoming messages. A call
to start on a connection that has already been started is
ignored. |
public void stop() throws JMSException {
checkClosedOrFailed();
if (started.compareAndSet(true, false)) {
for (Iterator< ActiveMQSession > i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.stop();
}
}
}
Temporarily stops a connection's delivery of incoming messages. Delivery
can be restarted using the connection's start method. When
the connection is stopped, delivery to all the connection's message
consumers is inhibited: synchronous receives block, and messages are not
delivered to message listeners.
This call blocks until receives and/or message listeners in progress have
completed.
Stopping a connection has no effect on its ability to send messages. A
call to stop on a connection that has already been stopped
is ignored.
A call to stop must not return until delivery of messages
has paused. This means that a client can rely on the fact that none of
its message listeners will be called and that all threads of control
waiting for receive calls to return will not return with a
message until the connection is restarted. The receive timers for a
stopped connection continue to advance, so receives may time out while
the connection is stopped.
If message listeners are running when stop is invoked, the
stop call must wait until all of them have returned before
it may return. While these message listeners are completing, they must
have the full services of the connection available to them. |
public Response syncSendPacket(Command command) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
try {
Response response = (Response)this.transport.request(command);
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
} else {
throw JMSExceptionSupport.create(er.getException());
}
}
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
}
Send a packet through a Connection - for internal use only |
public Response syncSendPacket(Command command,
int timeout) throws JMSException {
if (isClosed() || closing.get()) {
throw new ConnectionClosedException();
} else {
return doSyncSendPacket(command, timeout);
}
}
Send a packet through a Connection - for internal use only |
public String toString() {
return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
}
|
protected void transportFailed(IOException error) {
transportFailed.set(true);
if (firstFailureError == null) {
firstFailureError = error;
}
}
|
public void transportInterupted() {
for (Iterator< ActiveMQSession > i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.clearMessagesInProgress();
}
for (Iterator< TransportListener > iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.transportInterupted();
}
}
|
public void transportResumed() {
for (Iterator< TransportListener > iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.transportResumed();
}
for (Iterator< ActiveMQSession > i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.deliverAcks();
}
}
|
public void unsubscribe(String name) throws JMSException {
checkClosedOrFailed();
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(getConnectionInfo().getConnectionId());
rsi.setSubscriptionName(name);
rsi.setClientId(getConnectionInfo().getClientId());
syncSendPacket(rsi);
}
Unsubscribes a durable subscription that has been created by a client.
This method deletes the state being maintained on behalf of the
subscriber by its provider.
It is erroneous for a client to delete a durable subscription while there
is an active MessageConsumer or
TopicSubscriber for the subscription, or while a consumed
message is part of a pending transaction or has not been acknowledged in
the session. |