Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transaction » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transaction;
   18   
   19   import java.io.IOException;
   20   
   21   import javax.transaction.xa.XAException;
   22   
   23   import org.apache.activemq.broker.ConnectionContext;
   24   import org.apache.activemq.command.LocalTransactionId;
   25   import org.apache.activemq.command.TransactionId;
   26   import org.apache.activemq.store.TransactionStore;
   27   import org.apache.commons.logging.Log;
   28   import org.apache.commons.logging.LogFactory;
   29   
   30   /**
   31    * @version $Revision: 1.3 $
   32    */
   33   public class LocalTransaction extends Transaction {
   34   
   35       private static final Log LOG = LogFactory.getLog(LocalTransaction.class);
   36   
   37       private final TransactionStore transactionStore;
   38       private final LocalTransactionId xid;
   39       private final ConnectionContext context;
   40   
   41       public LocalTransaction(TransactionStore transactionStore, LocalTransactionId xid, ConnectionContext context) {
   42           this.transactionStore = transactionStore;
   43           this.xid = xid;
   44           this.context = context;
   45       }
   46   
   47       public void commit(boolean onePhase) throws XAException, IOException {
   48           if (LOG.isDebugEnabled()) {
   49               LOG.debug("commit: "  + xid
   50                       + " syncCount: " + size());
   51           }
   52           
   53           // Get ready for commit.
   54           try {
   55               prePrepare();
   56           } catch (XAException e) {
   57               throw e;
   58           } catch (Throwable e) {
   59               LOG.warn("COMMIT FAILED: ", e);
   60               rollback();
   61               // Let them know we rolled back.
   62               XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
   63               xae.errorCode = XAException.XA_RBOTHER;
   64               xae.initCause(e);
   65               throw xae;
   66           }
   67   
   68           setState(Transaction.FINISHED_STATE);
   69           context.getTransactions().remove(xid);
   70           // Sync on transaction store to avoid out of order messages in the cursor
   71           // https://issues.apache.org/activemq/browse/AMQ-2594
   72           synchronized (transactionStore) {
   73               transactionStore.commit(getTransactionId(), false);
   74   
   75               try {
   76                   fireAfterCommit();
   77               } catch (Throwable e) {
   78                   // I guess this could happen. Post commit task failed
   79                   // to execute properly.
   80                   LOG.warn("POST COMMIT FAILED: ", e);
   81                   XAException xae = new XAException("POST COMMIT FAILED");
   82                   xae.errorCode = XAException.XAER_RMERR;
   83                   xae.initCause(e);
   84                   throw xae;
   85               }
   86           }
   87       }
   88   
   89       public void rollback() throws XAException, IOException {
   90   
   91           if (LOG.isDebugEnabled()) {
   92               LOG.debug("rollback: "  + xid
   93                       + " syncCount: " + size());
   94           }
   95           setState(Transaction.FINISHED_STATE);
   96           context.getTransactions().remove(xid);
   97           // Sync on transaction store to avoid out of order messages in the cursor
   98           // https://issues.apache.org/activemq/browse/AMQ-2594
   99           synchronized (transactionStore) {
  100              transactionStore.rollback(getTransactionId());
  101   
  102               try {
  103                   fireAfterRollback();
  104               } catch (Throwable e) {
  105                   LOG.warn("POST ROLLBACK FAILED: ", e);
  106                   XAException xae = new XAException("POST ROLLBACK FAILED");
  107                   xae.errorCode = XAException.XAER_RMERR;
  108                   xae.initCause(e);
  109                   throw xae;
  110               }
  111           }
  112       }
  113   
  114       public int prepare() throws XAException {
  115           XAException xae = new XAException("Prepare not implemented on Local Transactions.");
  116           xae.errorCode = XAException.XAER_RMERR;
  117           throw xae;
  118       }
  119   
  120       public TransactionId getTransactionId() {
  121           return xid;
  122       }
  123   
  124   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transaction » [javadoc | source]