| Method from org.jboss.mq.Connection Detail: |
void addConsumer(SpyConsumer consumer) throws JMSException {
checkClosed();
Subscription req = consumer.getSubscription();
synchronized (subCountLock)
{
req.subscriptionId = subscriptionCounter++;
}
req.connectionToken = connectionToken;
if (trace)
log.trace("addConsumer sub=" + req);
try
{
synchronized (subscriptions)
{
subscriptions.put(new Integer(req.subscriptionId), consumer);
LinkedList ll = (LinkedList) destinationSubscriptions.get(req.destination);
if (ll == null)
{
ll = new LinkedList();
destinationSubscriptions.put(req.destination, ll);
}
ll.add(consumer);
}
serverIL.subscribe(connectionToken, req);
}
catch (JMSSecurityException ex)
{
removeConsumerInternal(consumer);
throw ex;
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot subscribe to this Destination: ", t);
}
}
A new Consumer has been created.
We have to handle security issues, a consumer may actually not be allowed
to be created |
protected void askForAnID() throws JMSException {
if (trace)
log.trace("Ask for an id " + this);
try
{
if (clientID == null)
clientID = serverIL.getID();
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t);
}
}
|
protected void askForAnID(String userName,
String password) throws JMSException {
if (trace)
log.trace("Ask for an id user=" + userName + " " + this);
try
{
String configuredClientID = serverIL.checkUser(userName, password);
if (configuredClientID != null)
clientID = configuredClientID;
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t);
}
}
|
public void asynchClose() {
// If we receive a close and we did not initiate it, then fire the exception listener
if (closing.get() == false)
asynchFailure("Asynchronous close from server.", new IOException("Close request from the server or transport layer."));
}
Notification from the server that the connection is closed |
public void asynchDeleteTemporaryDestination(SpyDestination dest) {
if (trace)
log.trace("Deleting temporary destination " + dest);
try
{
deleteTemporaryDestination(dest);
}
catch (Throwable t)
{
asynchFailure("Error deleting temporary destination " + dest, t);
}
}
Called by a TemporaryDestination which is going to be deleted() |
public void asynchDeliver(ReceiveRequest[] requests) {
// If we are closing the connection, the server will nack the messages
if (closing.get())
return;
if (trace)
log.trace("Async deliver requests=" + Arrays.asList(requests) + " " + this);
try
{
for (int i = 0; i < requests.length; i++)
{
ReceiveRequest r = requests[i];
if (trace)
log.trace("Processing request=" + r + " " + this);
SpyConsumer consumer = (SpyConsumer) subscriptions.get(r.subscriptionId);
r.message.createAcknowledgementRequest(r.subscriptionId.intValue());
if (consumer == null)
{
send(r.message.getAcknowledgementRequest(false));
log.debug("WARNING: NACK issued due to non existent subscription " + r.message.header.messageId);
continue;
}
if (trace)
log.trace("Delivering messageid=" + r.message.header.messageId + " to consumer=" + consumer);
consumer.addMessage(r.message);
}
}
catch (Throwable t)
{
asynchFailure("Error during async delivery", t);
}
}
Gets the first consumer that is listening to a destination. |
public void asynchFailure(String reason,
Throwable t) {
if (trace)
log.trace("Notified of failure reason=" + reason + " " + this, t);
// Exceptions due to closing will be ignored.
if (closing.get())
return;
JMSException excep = SpyJMSException.getAsJMSException(reason, t);
synchronized (elLock)
{
ExceptionListener el = exceptionListener;
if (el != null && elThread == null)
{
try
{
Runnable run = new ExceptionListenerRunnable(el, excep);
elThread = new Thread(getThreadGroup(), run, "ExceptionListener " + this);
elThread.setDaemon(false);
elThread.start();
}
catch (Throwable t1)
{
log.warn("Connection failure: ", excep);
log.warn("Unable to start exception listener thread: ", t1);
}
}
else if (elThread != null)
log.warn("Connection failure, already in the exception listener", excep);
else
log.warn("Connection failure, use javax.jms.Connection.setExceptionListener() to handle this error and reconnect", excep);
}
}
Notification of a failure on this connection |
public void asynchPong(long serverTime) {
if (trace)
log.trace("PONG serverTime=" + serverTime + " " + this);
ponged = true;
}
Invoked when the server pong us |
protected void authenticate(String userName,
String password) throws JMSException {
if (trace)
log.trace("Authenticating user " + userName + " " + this);
try
{
sessionId = serverIL.authenticate(userName, password);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot authenticate user", t);
}
}
|
SpyMessage[] browse(Queue queue,
String selector) throws JMSException {
checkClosed();
if (trace)
log.trace("Browsing queue=" + queue + " selector=" + selector + " " + this);
try
{
return serverIL.browse(connectionToken, queue, selector);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot browse the Queue", t);
throw new UnreachableStatementException();
}
}
|
protected synchronized void checkClientID() throws JMSException {
if (setClientIdAllowed == false)
return;
setClientIdAllowed = false;
if (trace)
log.trace("Checking clientID=" + clientID + " " + this);
if (clientID == null)
{
askForAnID();//Request a random one
if (clientID == null)
throw new JMSException("Could not get a clientID");
connectionToken.setClientID(clientID);
if (trace)
log.trace("ClientID established " + this);
}
}
Check that a clientID exists. If not get one from server.
Also sets the setClientIdAllowed to false.
Check clientId, must be called by all public methods on the
jacax.jmx.Connection interface and its children. |
protected void checkClosed() throws IllegalStateException {
if (closed.get())
throw new IllegalStateException("The connection is closed");
}
Check whether we are closed |
void checkTemporary(Destination destination) throws JMSException {
if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic)
{
synchronized (temps)
{
if (temps.contains(destination) == false)
throw new JMSException("Cannot create a consumer for a temporary destination from a different session. " + destination);
}
}
}
Check a tempoary destination |
public synchronized void close() throws JMSException {
if (closed.get())
return;
if (trace)
log.trace("Closing connection " + this);
closing.set(true);
// We don't want to notify the exception listener
exceptionListener = null;
// The first exception
JMSException exception = null;
try
{
doStop();
}
catch (Throwable t)
{
log.trace("Error during stop", t);
}
if (trace)
log.trace("Closing sessions " + this);
Object[] vect = null;
synchronized (createdSessions)
{
vect = createdSessions.toArray();
}
for (int i = 0; i < vect.length; i++)
{
SpySession session = (SpySession) vect[i];
try
{
session.close();
}
catch (Throwable t)
{
if (trace)
log.trace("Error closing session " + session, t);
}
}
if (trace)
log.trace("Closed sessions " + this);
if (trace)
log.trace("Notifying the server of close " + this);
try
{
serverIL.connectionClosing(connectionToken);
}
catch (Throwable t)
{
log.trace("Cannot close properly the connection", t);
}
if (trace)
log.trace("Stopping ping thread " + this);
try
{
stopPingThread();
}
catch (Throwable t)
{
if (exception == null)
exception = SpyJMSException.getAsJMSException("Cannot stop the ping thread", t);
}
if (trace)
log.trace("Stopping the ClientIL service " + this);
try
{
stopILService();
}
catch (Throwable t)
{
log.trace("Cannot stop the client il service", t);
}
// Only set the closed flag after all the objects that depend
// on this connection have been closed.
closed.set(true);
if (trace)
log.trace("Disconnected from server " + this);
// Throw the first exception
if (exception != null)
throw exception;
}
|
public void deleteTemporaryDestination(SpyDestination dest) throws JMSException {
checkClosed();
if (trace)
log.trace("DeleteDestination dest=" + dest + " " + this);
try
{
//Ask the broker to delete() this TemporaryDestination
serverIL.deleteTemporaryDestination(connectionToken, dest);
//Remove it from the destinations list
synchronized (subscriptions)
{
destinationSubscriptions.remove(dest);
}
// Remove it from the temps list
synchronized (temps)
{
temps.remove(dest);
}
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot delete the TemporaryDestination", t);
}
}
Called by a TemporaryDestination which is going to be deleted |
public void doStop() throws JMSException {
if (modeStop)
return;
modeStop = true;
if (trace)
log.trace("Stopping connection " + this);
try
{
serverIL.setEnabled(connectionToken, false);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot disable the connection with the JMS server", t);
}
}
|
public String getClientID() throws JMSException {
checkClosed();
return clientID;
}
|
public ExceptionListener getExceptionListener() throws JMSException {
checkClosed();
checkClientID();
return exceptionListener;
}
|
public ConnectionMetaData getMetaData() throws JMSException {
checkClosed();
checkClientID();
return new SpyConnectionMetaData();
}
|
String getNewMessageID() throws JMSException {
checkClosed();
synchronized (sb)
{
sb.setLength(0);
sb.append(clientID);
sb.append('-');
long time = System.currentTimeMillis();
int count = 0;
do
{
charStack[count] = (char) ('0' + (time % 10));
time = time / 10;
++count;
}
while (time != 0);
--count;
for (; count >= 0; --count)
{
sb.append(charStack[count]);
}
++lastMessageID;
//avoid having to deal with negative numbers.
if (lastMessageID < 0)
{
lastMessageID = 0;
}
int id = lastMessageID;
count = 0;
do
{
charStack[count] = (char) ('0' + (id % 10));
id = id / 10;
++count;
}
while (id != 0);
--count;
for (; count >= 0; --count)
{
sb.append(charStack[count]);
}
return sb.toString();
}
}
Get the next message id
All longs are less than 22 digits long
Note that in this routine we assume that System.currentTimeMillis() is
non-negative always be non-negative (so don't set lastMessageID to a
positive for a start). |
public ServerIL getServerIL() {
return serverIL;
}
Gets the ServerIL attribute of the Connection object |
public static ThreadGroup getThreadGroup() {
if (threadGroup.isDestroyed())
threadGroup = new ThreadGroup("JBossMQ Client Threads");
return threadGroup;
}
|
void pingServer(long clientTime) throws JMSException {
checkClosed();
trace = log.isTraceEnabled();
if (trace)
log.trace("PING " + clientTime + " " + this);
try
{
serverIL.ping(connectionToken, clientTime);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot ping the JMS server", t);
}
}
|
SpyMessage receive(Subscription sub,
long wait) throws JMSException {
checkClosed();
if (trace)
log.trace("Receive subscription=" + sub + " wait=" + wait);
try
{
SpyMessage message = serverIL.receive(connectionToken, sub.subscriptionId, wait);
if (message != null)
message.createAcknowledgementRequest(sub.subscriptionId);
return message;
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot receive ", t);
throw new UnreachableStatementException();
}
}
|
protected Xid[] recover(int flags) throws JMSException {
checkClosed();
if (trace)
log.trace("Recover flags=" + flags + " " + this);
try
{
if (serverIL instanceof Recoverable)
{
Recoverable recoverableIL = (Recoverable) serverIL;
return recoverableIL.recover(connectionToken, flags);
}
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot recover", t);
}
log.warn(serverIL + " does not implement " + Recoverable.class.getName());
return new Xid[0];
}
|
void removeConsumer(SpyConsumer consumer) throws JMSException {
checkClosed();
Subscription req = consumer.getSubscription();
if (trace)
log.trace("removeConsumer req=" + req);
try
{
serverIL.unsubscribe(connectionToken, req.subscriptionId);
removeConsumerInternal(consumer);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot unsubscribe to this destination", t);
}
}
|
protected void send(AcknowledgementRequest item) throws JMSException {
checkClosed();
if (trace)
log.trace("Acknowledge item=" + item + " " + this);
try
{
serverIL.acknowledge(connectionToken, item);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot acknowlege a message", t);
}
}
Acknowledge/Nack a message |
protected void send(TransactionRequest transaction) throws JMSException {
checkClosed();
if (trace)
log.trace("Transact request=" + transaction + " " + this);
try
{
serverIL.transact(connectionToken, transaction);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot process a transaction", t);
}
}
|
void sendToServer(SpyMessage mes) throws JMSException {
checkClosed();
if (trace)
log.trace("SendToServer message=" + mes.header.jmsMessageID + " " + this);
try
{
serverIL.addMessage(connectionToken, mes);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot send a message to the JMS server", t);
}
}
Send a message to the server |
void sessionClosing(SpySession who) {
if (trace)
log.trace("Closing session " + who);
synchronized (createdSessions)
{
createdSessions.remove(who);
}
//This session should not be in the "destinations" object anymore.
//We could check this, though
}
|
public void setClientID(String cID) throws JMSException {
checkClosed();
if (clientID != null)
throw new IllegalStateException("The connection has already a clientID");
if (setClientIdAllowed == false)
throw new IllegalStateException("SetClientID was not called emediately after creation of connection");
if (trace)
log.trace("SetClientID clientID=" + clientID + " " + this);
try
{
serverIL.checkID(cID);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot connect to the JMSServer", t);
}
clientID = cID;
connectionToken.setClientID(clientID);
}
|
public void setExceptionListener(ExceptionListener listener) throws JMSException {
checkClosed();
checkClientID();
exceptionListener = listener;
}
|
public void start() throws JMSException {
checkClosed();
checkClientID();
if (modeStop == false)
return;
modeStop = false;
if (trace)
log.trace("Starting connection " + this);
try
{
serverIL.setEnabled(connectionToken, true);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot enable the connection with the JMS server", t);
}
}
|
protected void startILService() throws JMSException {
if (trace)
log.trace("Starting the client il " + this);
try
{
clientILService = genericConnectionFactory.createClientILService(this);
clientILService.start();
if (trace)
log.trace("Using client id " + clientILService + " " + this);
connectionToken = new ConnectionToken(clientID, clientILService.getClientIL(), sessionId);
serverIL.setConnectionToken(connectionToken);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot start a the client IL service", t);
}
}
|
public void stop() throws JMSException {
checkClosed();
checkClientID();
doStop();
}
|
protected void stopILService() throws JMSException {
try
{
clientILService.stop();
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot stop a the client IL service", t);
}
}
|
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("Connection@").append(System.identityHashCode(this));
buffer.append('[');
if (connectionToken != null)
buffer.append("token=").append(connectionToken);
else
buffer.append("clientID=").append(clientID);
if (closed.get())
buffer.append(" CLOSED");
else if (closing.get())
buffer.append(" CLOSING");
buffer.append(" rcvstate=");
if (modeStop)
buffer.append("STOPPED");
else
buffer.append("STARTED");
buffer.append(']');
return buffer.toString();
}
|
void unsubscribe(DurableSubscriptionID id) throws JMSException {
if (trace)
log.trace("Unsubscribe id=" + id + " " + this);
try
{
serverIL.destroySubscription(connectionToken, id);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Cannot destroy durable subscription " + id, t);
}
}
|