Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » journal » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.activemq.store.journal;
   19   
   20   import java.io.IOException;
   21   import java.util.ArrayList;
   22   import java.util.Iterator;
   23   import java.util.LinkedHashMap;
   24   import java.util.Map;
   25   
   26   import javax.transaction.xa.XAException;
   27   
   28   import org.apache.activeio.journal.RecordLocation;
   29   import org.apache.activemq.command.JournalTopicAck;
   30   import org.apache.activemq.command.JournalTransaction;
   31   import org.apache.activemq.command.Message;
   32   import org.apache.activemq.command.MessageAck;
   33   import org.apache.activemq.command.TransactionId;
   34   import org.apache.activemq.command.XATransactionId;
   35   import org.apache.activemq.store.TransactionRecoveryListener;
   36   import org.apache.activemq.store.TransactionStore;
   37   
   38   /**
   39    */
   40   public class JournalTransactionStore implements TransactionStore {
   41   
   42       private final JournalPersistenceAdapter peristenceAdapter;
   43       private Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
   44       private Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
   45       private boolean doingRecover;
   46   
   47       public static class TxOperation {
   48   
   49           static final byte ADD_OPERATION_TYPE = 0;
   50           static final byte REMOVE_OPERATION_TYPE = 1;
   51           static final byte ACK_OPERATION_TYPE = 3;
   52   
   53           public byte operationType;
   54           public JournalMessageStore store;
   55           public Object data;
   56   
   57           public TxOperation(byte operationType, JournalMessageStore store, Object data) {
   58               this.operationType = operationType;
   59               this.store = store;
   60               this.data = data;
   61           }
   62   
   63       }
   64   
   65       /**
   66        * Operations
   67        * 
   68        * @version $Revision: 1.6 $
   69        */
   70       public static class Tx {
   71   
   72           private final RecordLocation location;
   73           private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
   74   
   75           public Tx(RecordLocation location) {
   76               this.location = location;
   77           }
   78   
   79           public void add(JournalMessageStore store, Message msg) {
   80               operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
   81           }
   82   
   83           public void add(JournalMessageStore store, MessageAck ack) {
   84               operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
   85           }
   86   
   87           public void add(JournalTopicMessageStore store, JournalTopicAck ack) {
   88               operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
   89           }
   90   
   91           public Message[] getMessages() {
   92               ArrayList<Object> list = new ArrayList<Object>();
   93               for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
   94                   TxOperation op = iter.next();
   95                   if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
   96                       list.add(op.data);
   97                   }
   98               }
   99               Message rc[] = new Message[list.size()];
  100               list.toArray(rc);
  101               return rc;
  102           }
  103   
  104           public MessageAck[] getAcks() {
  105               ArrayList<Object> list = new ArrayList<Object>();
  106               for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
  107                   TxOperation op = iter.next();
  108                   if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
  109                       list.add(op.data);
  110                   }
  111               }
  112               MessageAck rc[] = new MessageAck[list.size()];
  113               list.toArray(rc);
  114               return rc;
  115           }
  116   
  117           public ArrayList<TxOperation> getOperations() {
  118               return operations;
  119           }
  120   
  121       }
  122   
  123       public JournalTransactionStore(JournalPersistenceAdapter adapter) {
  124           this.peristenceAdapter = adapter;
  125       }
  126   
  127       /**
  128        * @throws IOException
  129        * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
  130        */
  131       public void prepare(TransactionId txid) throws IOException {
  132           Tx tx = null;
  133           synchronized (inflightTransactions) {
  134               tx = inflightTransactions.remove(txid);
  135           }
  136           if (tx == null) {
  137               return;
  138           }
  139           peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false),
  140                                          true);
  141           synchronized (preparedTransactions) {
  142               preparedTransactions.put(txid, tx);
  143           }
  144       }
  145   
  146       /**
  147        * @throws IOException
  148        * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
  149        */
  150       public void replayPrepare(TransactionId txid) throws IOException {
  151           Tx tx = null;
  152           synchronized (inflightTransactions) {
  153               tx = inflightTransactions.remove(txid);
  154           }
  155           if (tx == null) {
  156               return;
  157           }
  158           synchronized (preparedTransactions) {
  159               preparedTransactions.put(txid, tx);
  160           }
  161       }
  162   
  163       public Tx getTx(Object txid, RecordLocation location) {
  164           Tx tx = null;
  165           synchronized (inflightTransactions) {
  166               tx = inflightTransactions.get(txid);
  167           }
  168           if (tx == null) {
  169               tx = new Tx(location);
  170               inflightTransactions.put(txid, tx);
  171           }
  172           return tx;
  173       }
  174   
  175       /**
  176        * @throws XAException
  177        * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
  178        */
  179       public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
  180           Tx tx;
  181           if (wasPrepared) {
  182               synchronized (preparedTransactions) {
  183                   tx = preparedTransactions.remove(txid);
  184               }
  185           } else {
  186               synchronized (inflightTransactions) {
  187                   tx = inflightTransactions.remove(txid);
  188               }
  189           }
  190           if (tx == null) {
  191               return;
  192           }
  193           if (txid.isXATransaction()) {
  194               peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid,
  195                                                                     wasPrepared), true);
  196           } else {
  197               peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
  198                                                                     wasPrepared), true);
  199           }
  200       }
  201   
  202       /**
  203        * @throws XAException
  204        * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
  205        */
  206       public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
  207           if (wasPrepared) {
  208               synchronized (preparedTransactions) {
  209                   return preparedTransactions.remove(txid);
  210               }
  211           } else {
  212               synchronized (inflightTransactions) {
  213                   return inflightTransactions.remove(txid);
  214               }
  215           }
  216       }
  217   
  218       /**
  219        * @throws IOException
  220        * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
  221        */
  222       public void rollback(TransactionId txid) throws IOException {
  223           Tx tx = null;
  224           synchronized (inflightTransactions) {
  225               tx = inflightTransactions.remove(txid);
  226           }
  227           if (tx != null) {
  228               synchronized (preparedTransactions) {
  229                   tx = preparedTransactions.remove(txid);
  230               }
  231           }
  232           if (tx != null) {
  233               if (txid.isXATransaction()) {
  234                   peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid,
  235                                                                         false), true);
  236               } else {
  237                   peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,
  238                                                                         txid, false), true);
  239               }
  240           }
  241       }
  242   
  243       /**
  244        * @throws IOException
  245        * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
  246        */
  247       public void replayRollback(TransactionId txid) throws IOException {
  248           boolean inflight = false;
  249           synchronized (inflightTransactions) {
  250               inflight = inflightTransactions.remove(txid) != null;
  251           }
  252           if (inflight) {
  253               synchronized (preparedTransactions) {
  254                   preparedTransactions.remove(txid);
  255               }
  256           }
  257       }
  258   
  259       public void start() throws Exception {
  260       }
  261   
  262       public void stop() throws Exception {
  263       }
  264   
  265       public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
  266           // All the in-flight transactions get rolled back..
  267           synchronized (inflightTransactions) {
  268               inflightTransactions.clear();
  269           }
  270           this.doingRecover = true;
  271           try {
  272               Map<TransactionId, Tx> txs = null;
  273               synchronized (preparedTransactions) {
  274                   txs = new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
  275               }
  276               for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
  277                   Object txid = iter.next();
  278                   Tx tx = txs.get(txid);
  279                   listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
  280               }
  281           } finally {
  282               this.doingRecover = false;
  283           }
  284       }
  285   
  286       /**
  287        * @param message
  288        * @throws IOException
  289        */
  290       void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException {
  291           Tx tx = getTx(message.getTransactionId(), location);
  292           tx.add(store, message);
  293       }
  294   
  295       /**
  296        * @param ack
  297        * @throws IOException
  298        */
  299       public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location)
  300           throws IOException {
  301           Tx tx = getTx(ack.getTransactionId(), location);
  302           tx.add(store, ack);
  303       }
  304   
  305       public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
  306           Tx tx = getTx(ack.getTransactionId(), location);
  307           tx.add(store, ack);
  308       }
  309   
  310       public RecordLocation checkpoint() throws IOException {
  311           // Nothing really to checkpoint.. since, we don't
  312           // checkpoint tx operations in to long term store until they are
  313           // committed.
  314           // But we keep track of the first location of an operation
  315           // that was associated with an active tx. The journal can not
  316           // roll over active tx records.
  317           RecordLocation rc = null;
  318           synchronized (inflightTransactions) {
  319               for (Iterator<Tx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
  320                   Tx tx = iter.next();
  321                   RecordLocation location = tx.location;
  322                   if (rc == null || rc.compareTo(location) < 0) {
  323                       rc = location;
  324                   }
  325               }
  326           }
  327           synchronized (preparedTransactions) {
  328               for (Iterator<Tx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
  329                   Tx tx = iter.next();
  330                   RecordLocation location = tx.location;
  331                   if (rc == null || rc.compareTo(location) < 0) {
  332                       rc = location;
  333                   }
  334               }
  335               return rc;
  336           }
  337       }
  338   
  339       public boolean isDoingRecover() {
  340           return doingRecover;
  341       }
  342   
  343   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » journal » [javadoc | source]