Home » activemq-parent-5.3.1-source-release » org.apache » activemq » state » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.state;
   18   
   19   import java.io.IOException;
   20   import java.util.Iterator;
   21   import java.util.LinkedHashMap;
   22   import java.util.Map;
   23   import java.util.Vector;
   24   import java.util.concurrent.ConcurrentHashMap;
   25   
   26   import javax.jms.TransactionRolledBackException;
   27   
   28   import org.apache.activemq.command.Command;
   29   import org.apache.activemq.command.ConnectionId;
   30   import org.apache.activemq.command.ConnectionInfo;
   31   import org.apache.activemq.command.ConsumerId;
   32   import org.apache.activemq.command.ConsumerInfo;
   33   import org.apache.activemq.command.DestinationInfo;
   34   import org.apache.activemq.command.ExceptionResponse;
   35   import org.apache.activemq.command.Message;
   36   import org.apache.activemq.command.MessageId;
   37   import org.apache.activemq.command.ProducerId;
   38   import org.apache.activemq.command.ProducerInfo;
   39   import org.apache.activemq.command.Response;
   40   import org.apache.activemq.command.SessionId;
   41   import org.apache.activemq.command.SessionInfo;
   42   import org.apache.activemq.command.TransactionInfo;
   43   import org.apache.activemq.transport.Transport;
   44   import org.apache.activemq.util.IOExceptionSupport;
   45   import org.apache.commons.logging.Log;
   46   import org.apache.commons.logging.LogFactory;
   47   
   48   /**
   49    * Tracks the state of a connection so a newly established transport can be
   50    * re-initialized to the state that was tracked.
   51    * 
   52    * @version $Revision$
   53    */
   54   public class ConnectionStateTracker extends CommandVisitorAdapter {
   55       private static final Log LOG = LogFactory.getLog(ConnectionStateTracker.class);
   56   
   57       private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
   58   
   59       protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
   60        
   61       private boolean trackTransactions;
   62       private boolean restoreSessions = true;
   63       private boolean restoreConsumers = true;
   64       private boolean restoreProducers = true;
   65       private boolean restoreTransaction = true;
   66       private boolean trackMessages = true;
   67       private boolean trackTransactionProducers = true;
   68       private int maxCacheSize = 128 * 1024;
   69       private int currentCacheSize;
   70       private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
   71           protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) {
   72               boolean result = currentCacheSize > maxCacheSize;
   73               if (result) {
   74                   currentCacheSize -= eldest.getValue().getSize();
   75               }
   76               return result;
   77           }
   78       };
   79       
   80       
   81       private class RemoveTransactionAction implements Runnable {
   82           private final TransactionInfo info;
   83   
   84           public RemoveTransactionAction(TransactionInfo info) {
   85               this.info = info;
   86           }
   87   
   88           public void run() {
   89               ConnectionId connectionId = info.getConnectionId();
   90               ConnectionState cs = connectionStates.get(connectionId);
   91               cs.removeTransactionState(info.getTransactionId());
   92           }
   93       }
   94   
   95       /**
   96        * 
   97        * 
   98        * @param command
   99        * @return null if the command is not state tracked.
  100        * @throws IOException
  101        */
  102       public Tracked track(Command command) throws IOException {
  103           try {
  104               return (Tracked)command.visit(this);
  105           } catch (IOException e) {
  106               throw e;
  107           } catch (Throwable e) {
  108               throw IOExceptionSupport.create(e);
  109           }
  110       }
  111       
  112       public void trackBack(Command command) {
  113           if (trackMessages && command != null && command.isMessage()) {
  114               Message message = (Message) command;
  115               if (message.getTransactionId()==null) {
  116                   currentCacheSize = currentCacheSize +  message.getSize();
  117               }
  118           }
  119       }
  120   
  121       public void restore(Transport transport) throws IOException {
  122           // Restore the connections.
  123           for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
  124               ConnectionState connectionState = iter.next();
  125               if (LOG.isDebugEnabled()) {
  126                   LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
  127               }
  128               transport.oneway(connectionState.getInfo());
  129               restoreTempDestinations(transport, connectionState);
  130   
  131               if (restoreSessions) {
  132                   restoreSessions(transport, connectionState);
  133               }
  134   
  135               if (restoreTransaction) {
  136                   restoreTransactions(transport, connectionState);
  137               }
  138           }
  139           //now flush messages
  140           for (Message msg:messageCache.values()) {
  141               transport.oneway(msg);
  142           }
  143       }
  144   
  145       private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
  146           Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
  147           for (TransactionState transactionState : connectionState.getTransactionStates()) {
  148               if (LOG.isDebugEnabled()) {
  149                   LOG.debug("tx: " + transactionState.getId());
  150               }
  151               
  152               // rollback any completed transactions - no way to know if commit got there
  153               // or if reply went missing
  154               //
  155               if (!transactionState.getCommands().isEmpty()) {
  156                   Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
  157                   if (lastCommand instanceof TransactionInfo) {
  158                       TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
  159                       if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
  160                           if (LOG.isDebugEnabled()) {
  161                               LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
  162                           }
  163                           toRollback.add(transactionInfo);
  164                           continue;
  165                       }
  166                   }
  167               }
  168               
  169               // replay short lived producers that may have been involved in the transaction
  170               for (ProducerState producerState : transactionState.getProducerStates().values()) {
  171                   if (LOG.isDebugEnabled()) {
  172                       LOG.debug("tx replay producer :" + producerState.getInfo());
  173                   }
  174                   transport.oneway(producerState.getInfo());
  175               }
  176               
  177               for (Command command : transactionState.getCommands()) {
  178                   if (LOG.isDebugEnabled()) {
  179                       LOG.debug("tx replay: " + command);
  180                   }
  181                   transport.oneway(command);
  182               }
  183               
  184               for (ProducerState producerState : transactionState.getProducerStates().values()) {
  185                   if (LOG.isDebugEnabled()) {
  186                       LOG.debug("tx remove replayed producer :" + producerState.getInfo());
  187                   }
  188                   transport.oneway(producerState.getInfo().createRemoveCommand());
  189               }
  190           }
  191           
  192           for (TransactionInfo command: toRollback) {
  193               // respond to the outstanding commit
  194               ExceptionResponse response = new ExceptionResponse();
  195               response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
  196               response.setCorrelationId(command.getCommandId());
  197               transport.getTransportListener().onCommand(response);
  198           }
  199       }
  200   
  201       /**
  202        * @param transport
  203        * @param connectionState
  204        * @throws IOException
  205        */
  206       protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
  207           // Restore the connection's sessions
  208           for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
  209               SessionState sessionState = (SessionState)iter2.next();
  210               if (LOG.isDebugEnabled()) {
  211                   LOG.debug("session: " + sessionState.getInfo().getSessionId());
  212               }
  213               transport.oneway(sessionState.getInfo());
  214   
  215               if (restoreProducers) {
  216                   restoreProducers(transport, sessionState);
  217               }
  218   
  219               if (restoreConsumers) {
  220                   restoreConsumers(transport, sessionState);
  221               }
  222           }
  223       }
  224   
  225       /**
  226        * @param transport
  227        * @param sessionState
  228        * @throws IOException
  229        */
  230       protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
  231           // Restore the session's consumers
  232           for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) {
  233               ConsumerState consumerState = (ConsumerState)iter3.next();
  234               if (LOG.isDebugEnabled()) {
  235                   LOG.debug("restore consumer: " + consumerState.getInfo().getConsumerId());
  236               }
  237               transport.oneway(consumerState.getInfo());
  238           }
  239       }
  240   
  241       /**
  242        * @param transport
  243        * @param sessionState
  244        * @throws IOException
  245        */
  246       protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
  247           // Restore the session's producers
  248           for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
  249               ProducerState producerState = (ProducerState)iter3.next();
  250               if (LOG.isDebugEnabled()) {
  251                   LOG.debug("producer: " + producerState.getInfo().getProducerId());
  252               }
  253               transport.oneway(producerState.getInfo());
  254           }
  255       }
  256   
  257       /**
  258        * @param transport
  259        * @param connectionState
  260        * @throws IOException
  261        */
  262       protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
  263           throws IOException {
  264           // Restore the connection's temp destinations.
  265           for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) {
  266               transport.oneway((DestinationInfo)iter2.next());
  267           }
  268       }
  269   
  270       public Response processAddDestination(DestinationInfo info) {
  271           if (info != null) {
  272               ConnectionState cs = connectionStates.get(info.getConnectionId());
  273               if (cs != null && info.getDestination().isTemporary()) {
  274                   cs.addTempDestination(info);
  275               }
  276           }
  277           return TRACKED_RESPONSE_MARKER;
  278       }
  279   
  280       public Response processRemoveDestination(DestinationInfo info) {
  281           if (info != null) {
  282               ConnectionState cs = connectionStates.get(info.getConnectionId());
  283               if (cs != null && info.getDestination().isTemporary()) {
  284                   cs.removeTempDestination(info.getDestination());
  285               }
  286           }
  287           return TRACKED_RESPONSE_MARKER;
  288       }
  289   
  290       public Response processAddProducer(ProducerInfo info) {
  291           if (info != null && info.getProducerId() != null) {
  292               SessionId sessionId = info.getProducerId().getParentId();
  293               if (sessionId != null) {
  294                   ConnectionId connectionId = sessionId.getParentId();
  295                   if (connectionId != null) {
  296                       ConnectionState cs = connectionStates.get(connectionId);
  297                       if (cs != null) {
  298                           SessionState ss = cs.getSessionState(sessionId);
  299                           if (ss != null) {
  300                               ss.addProducer(info);
  301                           }
  302                       }
  303                   }
  304               }
  305           }
  306           return TRACKED_RESPONSE_MARKER;
  307       }
  308   
  309       public Response processRemoveProducer(ProducerId id) {
  310           if (id != null) {
  311               SessionId sessionId = id.getParentId();
  312               if (sessionId != null) {
  313                   ConnectionId connectionId = sessionId.getParentId();
  314                   if (connectionId != null) {
  315                       ConnectionState cs = connectionStates.get(connectionId);
  316                       if (cs != null) {
  317                           SessionState ss = cs.getSessionState(sessionId);
  318                           if (ss != null) {
  319                               ss.removeProducer(id);
  320                           }
  321                       }
  322                   }
  323               }
  324           }
  325           return TRACKED_RESPONSE_MARKER;
  326       }
  327   
  328       public Response processAddConsumer(ConsumerInfo info) {
  329           if (info != null) {
  330               SessionId sessionId = info.getConsumerId().getParentId();
  331               if (sessionId != null) {
  332                   ConnectionId connectionId = sessionId.getParentId();
  333                   if (connectionId != null) {
  334                       ConnectionState cs = connectionStates.get(connectionId);
  335                       if (cs != null) {
  336                           SessionState ss = cs.getSessionState(sessionId);
  337                           if (ss != null) {
  338                               ss.addConsumer(info);
  339                           }
  340                       }
  341                   }
  342               }
  343           }
  344           return TRACKED_RESPONSE_MARKER;
  345       }
  346   
  347       public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
  348           if (id != null) {
  349               SessionId sessionId = id.getParentId();
  350               if (sessionId != null) {
  351                   ConnectionId connectionId = sessionId.getParentId();
  352                   if (connectionId != null) {
  353                       ConnectionState cs = connectionStates.get(connectionId);
  354                       if (cs != null) {
  355                           SessionState ss = cs.getSessionState(sessionId);
  356                           if (ss != null) {
  357                               ss.removeConsumer(id);
  358                           }
  359                       }
  360                   }
  361               }
  362           }
  363           return TRACKED_RESPONSE_MARKER;
  364       }
  365   
  366       public Response processAddSession(SessionInfo info) {
  367           if (info != null) {
  368               ConnectionId connectionId = info.getSessionId().getParentId();
  369               if (connectionId != null) {
  370                   ConnectionState cs = connectionStates.get(connectionId);
  371                   if (cs != null) {
  372                       cs.addSession(info);
  373                   }
  374               }
  375           }
  376           return TRACKED_RESPONSE_MARKER;
  377       }
  378   
  379       public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
  380           if (id != null) {
  381               ConnectionId connectionId = id.getParentId();
  382               if (connectionId != null) {
  383                   ConnectionState cs = connectionStates.get(connectionId);
  384                   if (cs != null) {
  385                       cs.removeSession(id);
  386                   }
  387               }
  388           }
  389           return TRACKED_RESPONSE_MARKER;
  390       }
  391   
  392       public Response processAddConnection(ConnectionInfo info) {
  393           if (info != null) {
  394               connectionStates.put(info.getConnectionId(), new ConnectionState(info));
  395           }
  396           return TRACKED_RESPONSE_MARKER;
  397       }
  398   
  399       public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
  400           if (id != null) {
  401               connectionStates.remove(id);
  402           }
  403           return TRACKED_RESPONSE_MARKER;
  404       }
  405   
  406       public Response processMessage(Message send) throws Exception {
  407           if (send != null) {
  408               if (trackTransactions && send.getTransactionId() != null) {
  409                   ProducerId producerId = send.getProducerId();
  410                   ConnectionId connectionId = producerId.getParentId().getParentId();
  411                   if (connectionId != null) {
  412                       ConnectionState cs = connectionStates.get(connectionId);
  413                       if (cs != null) {
  414                           TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
  415                           if (transactionState != null) {
  416                               transactionState.addCommand(send);
  417                               
  418                               if (trackTransactionProducers) {
  419                                   // for jmstemplate, track the producer in case it is closed before commit
  420                                   // and needs to be replayed
  421                                   SessionState ss = cs.getSessionState(producerId.getParentId());
  422                                   ProducerState producerState = ss.getProducerState(producerId);
  423                                   producerState.setTransactionState(transactionState);            
  424                               }
  425                           }
  426                       }
  427                   }
  428                   return TRACKED_RESPONSE_MARKER;
  429               }else if (trackMessages) {
  430                   messageCache.put(send.getMessageId(), send.copy());
  431               }
  432           }
  433           return null;
  434       }
  435   
  436       public Response processBeginTransaction(TransactionInfo info) {
  437           if (trackTransactions && info != null && info.getTransactionId() != null) {
  438               ConnectionId connectionId = info.getConnectionId();
  439               if (connectionId != null) {
  440                   ConnectionState cs = connectionStates.get(connectionId);
  441                   if (cs != null) {
  442                       cs.addTransactionState(info.getTransactionId());
  443                       TransactionState state = cs.getTransactionState(info.getTransactionId());
  444                       state.addCommand(info);
  445                   }
  446               }
  447               return TRACKED_RESPONSE_MARKER;
  448           }
  449           return null;
  450       }
  451   
  452       public Response processPrepareTransaction(TransactionInfo info) throws Exception {
  453           if (trackTransactions && info != null) {
  454               ConnectionId connectionId = info.getConnectionId();
  455               if (connectionId != null) {
  456                   ConnectionState cs = connectionStates.get(connectionId);
  457                   if (cs != null) {
  458                       TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
  459                       if (transactionState != null) {
  460                           transactionState.addCommand(info);
  461                       }
  462                   }
  463               }
  464               return TRACKED_RESPONSE_MARKER;
  465           }
  466           return null;
  467       }
  468   
  469       public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
  470           if (trackTransactions && info != null) {
  471               ConnectionId connectionId = info.getConnectionId();
  472               if (connectionId != null) {
  473                   ConnectionState cs = connectionStates.get(connectionId);
  474                   if (cs != null) {
  475                       TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
  476                       if (transactionState != null) {
  477                           transactionState.addCommand(info);
  478                           return new Tracked(new RemoveTransactionAction(info));
  479                       }
  480                   }
  481               }
  482           }
  483           return null;
  484       }
  485   
  486       public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
  487           if (trackTransactions && info != null) {
  488               ConnectionId connectionId = info.getConnectionId();
  489               if (connectionId != null) {
  490                   ConnectionState cs = connectionStates.get(connectionId);
  491                   if (cs != null) {
  492                       TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
  493                       if (transactionState != null) {
  494                           transactionState.addCommand(info);
  495                           return new Tracked(new RemoveTransactionAction(info));
  496                       }
  497                   }
  498               }
  499           }
  500           return null;
  501       }
  502   
  503       public Response processRollbackTransaction(TransactionInfo info) throws Exception {
  504           if (trackTransactions && info != null) {
  505               ConnectionId connectionId = info.getConnectionId();
  506               if (connectionId != null) {
  507                   ConnectionState cs = connectionStates.get(connectionId);
  508                   if (cs != null) {
  509                       TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
  510                       if (transactionState != null) {
  511                           transactionState.addCommand(info);
  512                           return new Tracked(new RemoveTransactionAction(info));
  513                       }
  514                   }
  515               }
  516           }
  517           return null;
  518       }
  519   
  520       public Response processEndTransaction(TransactionInfo info) throws Exception {
  521           if (trackTransactions && info != null) {
  522               ConnectionId connectionId = info.getConnectionId();
  523               if (connectionId != null) {
  524                   ConnectionState cs = connectionStates.get(connectionId);
  525                   if (cs != null) {
  526                       TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
  527                       if (transactionState != null) {
  528                           transactionState.addCommand(info);
  529                       }
  530                   }
  531               }
  532               return TRACKED_RESPONSE_MARKER;
  533           }
  534           return null;
  535       }
  536   
  537       public boolean isRestoreConsumers() {
  538           return restoreConsumers;
  539       }
  540   
  541       public void setRestoreConsumers(boolean restoreConsumers) {
  542           this.restoreConsumers = restoreConsumers;
  543       }
  544   
  545       public boolean isRestoreProducers() {
  546           return restoreProducers;
  547       }
  548   
  549       public void setRestoreProducers(boolean restoreProducers) {
  550           this.restoreProducers = restoreProducers;
  551       }
  552   
  553       public boolean isRestoreSessions() {
  554           return restoreSessions;
  555       }
  556   
  557       public void setRestoreSessions(boolean restoreSessions) {
  558           this.restoreSessions = restoreSessions;
  559       }
  560   
  561       public boolean isTrackTransactions() {
  562           return trackTransactions;
  563       }
  564   
  565       public void setTrackTransactions(boolean trackTransactions) {
  566           this.trackTransactions = trackTransactions;
  567       }
  568       
  569       public boolean isTrackTransactionProducers() {
  570           return this.trackTransactionProducers;
  571       }
  572   
  573       public void setTrackTransactionProducers(boolean trackTransactionProducers) {
  574           this.trackTransactionProducers = trackTransactionProducers;
  575       }
  576       
  577       public boolean isRestoreTransaction() {
  578           return restoreTransaction;
  579       }
  580   
  581       public void setRestoreTransaction(boolean restoreTransaction) {
  582           this.restoreTransaction = restoreTransaction;
  583       }
  584   
  585       public boolean isTrackMessages() {
  586           return trackMessages;
  587       }
  588   
  589       public void setTrackMessages(boolean trackMessages) {
  590           this.trackMessages = trackMessages;
  591       }
  592   
  593       public int getMaxCacheSize() {
  594           return maxCacheSize;
  595       }
  596   
  597       public void setMaxCacheSize(int maxCacheSize) {
  598           this.maxCacheSize = maxCacheSize;
  599       }
  600   
  601   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » state » [javadoc | source]