public void onMessage(Message msg) {
boolean trace = log.isTraceEnabled();
if( trace )
log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
", " + session + ", " + xaSession + ", " + useLocalTX);
// Used if run with useLocalTX if true
Xid localXid = null;
boolean localRollbackFlag=false;
// Used if run with useLocalTX if false
Transaction trans = null;
try
{
if (useLocalTX)
{
// Use JBossMQ One Phase Commit to commit the TX
localXid = xidFactory.newXid();//new XidImpl();
XAResource res = xaSession.getXAResource();
res.start(localXid, XAResource.TMNOFLAGS);
if( trace )
log.trace("Using optimized 1p commit to control TX.");
}
else
{
// Use the TM to control the TX
tm.begin();
trans = tm.getTransaction();
if (xaSession != null)
{
XAResource res = xaSession.getXAResource();
if (!trans.enlistResource(res))
{
throw new JMSException("could not enlist resource");
}
if( trace )
log.trace("XAResource '" + res + "' enlisted.");
}
}
//currentTransactionId = connection.spyXAResourceManager.startTx();
// run the session
//session.run();
// Call delegate listener
delegateListener.onMessage(msg);
}
catch (Exception e)
{
log.error("session failed to run; setting rollback only", e);
if (useLocalTX)
{
// Use JBossMQ One Phase Commit to commit the TX
localRollbackFlag = true;
}
else
{
// Mark for tollback TX via TM
try
{
// The transaction will be rolledback in the finally
if( trace )
log.trace("Using TM to mark TX for rollback.");
trans.setRollbackOnly();
}
catch (Exception x)
{
log.error("failed to set rollback only", x);
}
}
}
finally
{
try
{
if (useLocalTX)
{
if( localRollbackFlag == true )
{
if( trace )
log.trace("Using optimized 1p commit to rollback TX.");
XAResource res = xaSession.getXAResource();
res.end(localXid, XAResource.TMSUCCESS);
res.rollback(localXid);
}
else
{
if( trace )
log.trace("Using optimized 1p commit to commit TX.");
XAResource res = xaSession.getXAResource();
res.end(localXid, XAResource.TMSUCCESS);
res.commit(localXid, true);
}
}
else
{
// Use the TM to commit the Tx (assert the correct association)
Transaction currentTx = tm.getTransaction();
if (trans.equals(currentTx) == false)
throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
// Marked rollback
if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
if( trace )
log.trace("Rolling back JMS transaction");
// actually roll it back
tm.rollback();
// NO XASession? then manually rollback.
// This is not so good but
// it's the best we can do if we have no XASession.
if (xaSession == null && serverSessionPool.isTransacted())
{
session.rollback();
}
}
else if (trans.getStatus() == Status.STATUS_ACTIVE)
{
// Commit tx
// This will happen if
// a) everything goes well
// b) app. exception was thrown
if( trace )
log.trace("Commiting the JMS transaction");
tm.commit();
// NO XASession? then manually commit. This is not so good but
// it's the best we can do if we have no XASession.
if (xaSession == null && serverSessionPool.isTransacted())
{
session.commit();
}
}
}
}
catch (Exception e)
{
log.error("failed to commit/rollback", e);
}
}
if( trace )
log.trace("onMessage done");
}
Will get called from session for each message stuffed into it.
Starts a transaction with the TransactionManager
and enlists the XAResource of the JMS XASession if a XASession was
available. A good JMS implementation should provide the XASession for use
in the ASF. So we optimize for the case where we have an XASession. So,
for the case where we do not have an XASession and the bean is not
transacted, we have the unneeded overhead of creating a Transaction. I'm
leaving it this way since it keeps the code simpler and that case should
not be too common (JBossMQ provides XASessions). |