| Method from org.jboss.resource.adapter.jms.inflow.JmsActivation Detail: |
public JmsActivationSpec getActivationSpec() {
return spec;
}
|
public Connection getConnection() {
return connection;
}
|
public DLQHandler getDLQHandler() {
return dlqHandler;
}
|
public Destination getDestination() {
return destination;
}
|
public MessageEndpointFactory getMessageEndpointFactory() {
return endpointFactory;
}
|
public JMSProviderAdapter getProviderAdapter() {
return adapter;
}
|
public TransactionManager getTransactionManager() {
if (tm == null)
tm = TransactionManagerLocator.locateTransactionManager();
return tm;
}
|
public WorkManager getWorkManager() {
return ra.getWorkManager();
}
|
public void handleFailure(Throwable failure) {
log.warn("Failure in jms activation " + spec, failure);
int reconnectCount = 0;
// Only enter the failure loop once
if (inFailure.getAndSet(true))
return;
try
{
while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
{
teardown();
sendNotification(FAILURE_NOTIFICATION, failure);
try
{
Thread.sleep(spec.getReconnectIntervalLong());
}
catch (InterruptedException e)
{
log.debug("Interrupted trying to reconnect " + spec, e);
break;
}
log.info("Attempting to reconnect " + spec);
try
{
setup();
log.info("Reconnected with messaging provider.");
break;
}
catch (Throwable t)
{
log.error("Unable to reconnect " + spec, t);
}
++reconnectCount;
}
}
finally
{
// Leaving failure recovery loop
inFailure.set(false);
}
}
Handles any failure by trying to reconnect |
public boolean isDeliveryTransacted() {
return isDeliveryTransacted;
}
|
public void onException(JMSException exception) {
handleFailure(exception);
}
|
protected void sendNotification(String event,
Object userData) {
if (emitter == null)
return;
try
{
Notification notif = new Notification(event, spec, emitter.nextNotificationSequenceNumber());
notif.setUserData(userData);
emitter.sendNotification(notif);
}
catch (Throwable t)
{
log.warn("Error sending notification: " + event, t);
}
}
|
protected void setup() throws Exception {
log.debug("Setting up " + spec);
sendNotification(CONNECTING_NOTIFICATION, null);
setupJMSProviderAdapter();
Context ctx = adapter.getInitialContext();
log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
try
{
setupDLQ(ctx);
setupDestination(ctx);
setupConnection(ctx);
}
finally
{
ctx.close();
}
setupSessionPool();
log.debug("Setup complete " + this);
sendNotification(CONNECTED_NOTIFICATION, null);
}
|
protected void setupConnection(Context ctx) throws Exception {
log.debug("setup connection " + this);
String user = spec.getUser();
String pass = spec.getPassword();
String clientID = spec.getClientId();
if (spec.isTopic())
connection = setupTopicConnection(ctx, user, pass, clientID);
else
connection = setupQueueConnection(ctx, user, pass, clientID);
log.debug("established connection " + this);
}
|
protected void setupDLQ(Context ctx) throws Exception {
if (spec.isUseDLQ())
{
Class< ? > clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
dlqHandler = (DLQHandler) clazz.newInstance();
dlqHandler.setup(this, ctx);
}
log.debug("Setup DLQ " + this);
}
|
protected void setupDestination(Context ctx) throws Exception {
Class< ? > destinationType;
if (spec.isTopic())
destinationType = Topic.class;
else
destinationType = Queue.class;
String destinationName = spec.getDestination();
log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
destination = (Destination) Util.lookup(ctx, destinationName, destinationType);
log.debug("Got destination " + destination + " from " + destinationName);
}
|
protected void setupJMSProviderAdapter() throws Exception {
String providerAdapterJNDI = spec.getProviderAdapterJNDI();
if (providerAdapterJNDI.startsWith("java:") == false)
providerAdapterJNDI = "java:" + providerAdapterJNDI;
log.debug("Retrieving the jms provider adapter " + providerAdapterJNDI + " for " + this);
adapter = (JMSProviderAdapter) Util.lookup(providerAdapterJNDI, JMSProviderAdapter.class);
log.debug("Using jms provider adapter " + adapter + " for " + this);
}
|
protected QueueConnection setupQueueConnection(Context ctx,
String user,
String pass,
String clientID) throws Exception {
String queueFactoryRef = adapter.getQueueFactoryRef();
log.debug("Attempting to lookup queue connection factory " + queueFactoryRef);
QueueConnectionFactory qcf = (QueueConnectionFactory) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory.class);
log.debug("Got queue connection factory " + qcf + " from " + queueFactoryRef);
log.debug("Attempting to create queue connection with user " + user);
QueueConnection result;
if (qcf instanceof XAQueueConnectionFactory && isDeliveryTransacted)
{
XAQueueConnectionFactory xaqcf = (XAQueueConnectionFactory) qcf;
if (user != null)
result = xaqcf.createXAQueueConnection(user, pass);
else
result = xaqcf.createXAQueueConnection();
}
else
{
if (user != null)
result = qcf.createQueueConnection(user, pass);
else
result = qcf.createQueueConnection();
}
try
{
if (clientID != null)
result.setClientID(clientID);
result.setExceptionListener(this);
log.debug("Using queue connection " + result);
return result;
}
catch (Throwable t)
{
try
{
result.close();
}
catch (Exception e)
{
log.trace("Ignored error closing connection", e);
}
if (t instanceof Exception)
throw (Exception) t;
throw new RuntimeException("Error configuring connection", t);
}
}
|
protected void setupSessionPool() throws Exception {
pool = new JmsServerSessionPool(this);
log.debug("Created session pool " + pool);
log.debug("Starting session pool " + pool);
pool.start();
log.debug("Started session pool " + pool);
log.debug("Starting delivery " + connection);
connection.start();
log.debug("Started delivery " + connection);
}
Setup the server session pool |
protected TopicConnection setupTopicConnection(Context ctx,
String user,
String pass,
String clientID) throws Exception {
String topicFactoryRef = adapter.getTopicFactoryRef();
log.debug("Attempting to lookup topic connection factory " + topicFactoryRef);
TopicConnectionFactory tcf = (TopicConnectionFactory) Util.lookup(ctx, topicFactoryRef, TopicConnectionFactory.class);
log.debug("Got topic connection factory " + tcf + " from " + topicFactoryRef);
log.debug("Attempting to create topic connection with user " + user);
TopicConnection result;
if (tcf instanceof XATopicConnectionFactory && isDeliveryTransacted)
{
XATopicConnectionFactory xatcf = (XATopicConnectionFactory) tcf;
if (user != null)
result = xatcf.createXATopicConnection(user, pass);
else
result = xatcf.createXATopicConnection();
}
else
{
if (user != null)
result = tcf.createTopicConnection(user, pass);
else
result = tcf.createTopicConnection();
}
try
{
if (clientID != null)
result.setClientID(clientID);
result.setExceptionListener(this);
log.debug("Using topic connection " + result);
return result;
}
catch (Throwable t)
{
try
{
result.close();
}
catch (Exception e)
{
log.trace("Ignored error closing connection", e);
}
if (t instanceof Exception)
throw (Exception) t;
throw new RuntimeException("Error configuring connection", t);
}
}
|
public void start() throws ResourceException {
deliveryActive.set(true);
ra.getWorkManager().scheduleWork(new SetupActivation());
}
|
public void stop() {
deliveryActive.set(false);
teardown();
}
|
protected void teardown() {
log.debug("Tearing down " + spec);
sendNotification(DISCONNECTING_NOTIFICATION, null);
teardownSessionPool();
teardownConnection();
teardownDestination();
teardownDLQ();
log.debug("Tearing down complete " + this);
sendNotification(DISCONNECTED_NOTIFICATION, null);
}
|
protected void teardownConnection() {
try
{
if (connection != null)
{
log.debug("Closing the " + connection);
connection.close();
}
}
catch (Throwable t)
{
log.debug("Error closing the connection " + connection, t);
}
connection = null;
}
|
protected void teardownDLQ() {
log.debug("Removing DLQ " + this);
try
{
if (dlqHandler != null)
dlqHandler.teardown();
}
catch (Throwable t)
{
log.debug("Error tearing down the DLQ " + dlqHandler, t);
}
dlqHandler = null;
}
|
protected void teardownDestination() {
destination = null;
}
|
protected void teardownSessionPool() {
try
{
if (connection != null)
{
log.debug("Stopping delivery " + connection);
connection.stop();
}
}
catch (Throwable t)
{
log.debug("Error stopping delivery " + connection, t);
}
try
{
if (pool != null)
{
log.debug("Stopping the session pool " + pool);
pool.stop();
}
}
catch (Throwable t)
{
log.debug("Error clearing the pool " + pool, t);
}
pool = null;
}
Teardown the server session pool |
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append(Strings.defaultToString(this)).append('(");
buffer.append("spec=").append(Strings.defaultToString(spec));
buffer.append(" mepf=").append(Strings.defaultToString(endpointFactory));
buffer.append(" active=").append(deliveryActive.get());
if (destination != null)
buffer.append(" destination=").append(destination);
if (connection != null)
buffer.append(" connection=").append(connection);
if (pool != null)
buffer.append(" pool=").append(Strings.defaultToString(pool));
if (dlqHandler != null)
buffer.append(" dlq=").append(Strings.defaultToString(dlqHandler));
buffer.append(" transacted=").append(isDeliveryTransacted);
buffer.append(')");
return buffer.toString();
}
|