Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » journal » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.journal;
   18   
   19   import java.io.IOException;
   20   import java.util.HashMap;
   21   import java.util.Iterator;
   22   
   23   import org.apache.activeio.journal.RecordLocation;
   24   import org.apache.activemq.broker.ConnectionContext;
   25   import org.apache.activemq.command.ActiveMQTopic;
   26   import org.apache.activemq.command.JournalTopicAck;
   27   import org.apache.activemq.command.Message;
   28   import org.apache.activemq.command.MessageId;
   29   import org.apache.activemq.command.SubscriptionInfo;
   30   import org.apache.activemq.store.MessageRecoveryListener;
   31   import org.apache.activemq.store.TopicMessageStore;
   32   import org.apache.activemq.transaction.Synchronization;
   33   import org.apache.activemq.util.Callback;
   34   import org.apache.activemq.util.SubscriptionKey;
   35   import org.apache.commons.logging.Log;
   36   import org.apache.commons.logging.LogFactory;
   37   
   38   /**
   39    * A MessageStore that uses a Journal to store it's messages.
   40    * 
   41    * @version $Revision: 1.13 $
   42    */
   43   public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
   44   
   45       private static final Log LOG = LogFactory.getLog(JournalTopicMessageStore.class);
   46   
   47       private TopicMessageStore longTermStore;
   48       private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
   49   
   50       public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore,
   51                                       ActiveMQTopic destinationName) {
   52           super(adapter, checkpointStore, destinationName);
   53           this.longTermStore = checkpointStore;
   54       }
   55   
   56       public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
   57           throws Exception {
   58           this.peristenceAdapter.checkpoint(true, true);
   59           longTermStore.recoverSubscription(clientId, subscriptionName, listener);
   60       }
   61   
   62       public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
   63                                       MessageRecoveryListener listener) throws Exception {
   64           this.peristenceAdapter.checkpoint(true, true);
   65           longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
   66   
   67       }
   68   
   69       public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
   70           return longTermStore.lookupSubscription(clientId, subscriptionName);
   71       }
   72   
   73       public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
   74           this.peristenceAdapter.checkpoint(true, true);
   75           longTermStore.addSubsciption(subscriptionInfo, retroactive);
   76       }
   77   
   78       public void addMessage(ConnectionContext context, Message message) throws IOException {
   79           super.addMessage(context, message);
   80       }
   81   
   82       /**
   83        */
   84       public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
   85                               final MessageId messageId) throws IOException {
   86           final boolean debug = LOG.isDebugEnabled();
   87   
   88           JournalTopicAck ack = new JournalTopicAck();
   89           ack.setDestination(destination);
   90           ack.setMessageId(messageId);
   91           ack.setMessageSequenceId(messageId.getBrokerSequenceId());
   92           ack.setSubscritionName(subscriptionName);
   93           ack.setClientId(clientId);
   94           ack.setTransactionId(context.getTransaction() != null
   95               ? context.getTransaction().getTransactionId() : null);
   96           final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
   97   
   98           final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
   99           if (!context.isInTransaction()) {
  100               if (debug) {
  101                   LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
  102               }
  103               acknowledge(messageId, location, key);
  104           } else {
  105               if (debug) {
  106                   LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
  107               }
  108               synchronized (this) {
  109                   inFlightTxLocations.add(location);
  110               }
  111               transactionStore.acknowledge(this, ack, location);
  112               context.getTransaction().addSynchronization(new Synchronization() {
  113                   public void afterCommit() throws Exception {
  114                       if (debug) {
  115                           LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
  116                       }
  117                       synchronized (JournalTopicMessageStore.this) {
  118                           inFlightTxLocations.remove(location);
  119                           acknowledge(messageId, location, key);
  120                       }
  121                   }
  122   
  123                   public void afterRollback() throws Exception {
  124                       if (debug) {
  125                           LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
  126                       }
  127                       synchronized (JournalTopicMessageStore.this) {
  128                           inFlightTxLocations.remove(location);
  129                       }
  130                   }
  131               });
  132           }
  133   
  134       }
  135   
  136       public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName,
  137                                     MessageId messageId) {
  138           try {
  139               SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
  140               if (sub != null) {
  141                   longTermStore.acknowledge(context, clientId, subscritionName, messageId);
  142               }
  143           } catch (Throwable e) {
  144               LOG.debug("Could not replay acknowledge for message '" + messageId
  145                         + "'.  Message may have already been acknowledged. reason: " + e);
  146           }
  147       }
  148   
  149       /**
  150        * @param messageId
  151        * @param location
  152        * @param key
  153        */
  154       protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
  155           synchronized (this) {
  156               lastLocation = location;
  157               ackedLastAckLocations.put(key, messageId);
  158           }
  159       }
  160   
  161       public RecordLocation checkpoint() throws IOException {
  162   
  163           final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
  164   
  165           // swap out the hash maps..
  166           synchronized (this) {
  167               cpAckedLastAckLocations = this.ackedLastAckLocations;
  168               this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
  169           }
  170   
  171           return super.checkpoint(new Callback() {
  172               public void execute() throws Exception {
  173   
  174                   // Checkpoint the acknowledged messages.
  175                   Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
  176                   while (iterator.hasNext()) {
  177                       SubscriptionKey subscriptionKey = iterator.next();
  178                       MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
  179                       longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
  180                                                 subscriptionKey.subscriptionName, identity);
  181                   }
  182   
  183               }
  184           });
  185   
  186       }
  187   
  188       /**
  189        * @return Returns the longTermStore.
  190        */
  191       public TopicMessageStore getLongTermTopicMessageStore() {
  192           return longTermStore;
  193       }
  194   
  195       public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
  196           longTermStore.deleteSubscription(clientId, subscriptionName);
  197       }
  198   
  199       public SubscriptionInfo[] getAllSubscriptions() throws IOException {
  200           return longTermStore.getAllSubscriptions();
  201       }
  202   
  203       public int getMessageCount(String clientId, String subscriberName) throws IOException {
  204           this.peristenceAdapter.checkpoint(true, true);
  205           return longTermStore.getMessageCount(clientId, subscriberName);
  206       }
  207   
  208       public void resetBatching(String clientId, String subscriptionName) {
  209           longTermStore.resetBatching(clientId, subscriptionName);
  210       }
  211   
  212   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » journal » [javadoc | source]