1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.state; 18 19 import java.io.IOException; 20 import java.util.Iterator; 21 import java.util.LinkedHashMap; 22 import java.util.Map; 23 import java.util.Vector; 24 import java.util.concurrent.ConcurrentHashMap; 25 26 import javax.jms.TransactionRolledBackException; 27 28 import org.apache.activemq.command.Command; 29 import org.apache.activemq.command.ConnectionId; 30 import org.apache.activemq.command.ConnectionInfo; 31 import org.apache.activemq.command.ConsumerId; 32 import org.apache.activemq.command.ConsumerInfo; 33 import org.apache.activemq.command.DestinationInfo; 34 import org.apache.activemq.command.ExceptionResponse; 35 import org.apache.activemq.command.Message; 36 import org.apache.activemq.command.MessageId; 37 import org.apache.activemq.command.ProducerId; 38 import org.apache.activemq.command.ProducerInfo; 39 import org.apache.activemq.command.Response; 40 import org.apache.activemq.command.SessionId; 41 import org.apache.activemq.command.SessionInfo; 42 import org.apache.activemq.command.TransactionInfo; 43 import org.apache.activemq.transport.Transport; 44 import org.apache.activemq.util.IOExceptionSupport; 45 import org.apache.commons.logging.Log; 46 import org.apache.commons.logging.LogFactory; 47 48 /** 49 * Tracks the state of a connection so a newly established transport can be 50 * re-initialized to the state that was tracked. 51 * 52 * @version $Revision$ 53 */ 54 public class ConnectionStateTracker extends CommandVisitorAdapter { 55 private static final Log LOG = LogFactory.getLog(ConnectionStateTracker.class); 56 57 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); 58 59 protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 60 61 private boolean trackTransactions; 62 private boolean restoreSessions = true; 63 private boolean restoreConsumers = true; 64 private boolean restoreProducers = true; 65 private boolean restoreTransaction = true; 66 private boolean trackMessages = true; 67 private boolean trackTransactionProducers = true; 68 private int maxCacheSize = 128 * 1024; 69 private int currentCacheSize; 70 private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){ 71 protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) { 72 boolean result = currentCacheSize > maxCacheSize; 73 if (result) { 74 currentCacheSize -= eldest.getValue().getSize(); 75 } 76 return result; 77 } 78 }; 79 80 81 private class RemoveTransactionAction implements Runnable { 82 private final TransactionInfo info; 83 84 public RemoveTransactionAction(TransactionInfo info) { 85 this.info = info; 86 } 87 88 public void run() { 89 ConnectionId connectionId = info.getConnectionId(); 90 ConnectionState cs = connectionStates.get(connectionId); 91 cs.removeTransactionState(info.getTransactionId()); 92 } 93 } 94 95 /** 96 * 97 * 98 * @param command 99 * @return null if the command is not state tracked. 100 * @throws IOException 101 */ 102 public Tracked track(Command command) throws IOException { 103 try { 104 return (Tracked)command.visit(this); 105 } catch (IOException e) { 106 throw e; 107 } catch (Throwable e) { 108 throw IOExceptionSupport.create(e); 109 } 110 } 111 112 public void trackBack(Command command) { 113 if (trackMessages && command != null && command.isMessage()) { 114 Message message = (Message) command; 115 if (message.getTransactionId()==null) { 116 currentCacheSize = currentCacheSize + message.getSize(); 117 } 118 } 119 } 120 121 public void restore(Transport transport) throws IOException { 122 // Restore the connections. 123 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) { 124 ConnectionState connectionState = iter.next(); 125 if (LOG.isDebugEnabled()) { 126 LOG.debug("conn: " + connectionState.getInfo().getConnectionId()); 127 } 128 transport.oneway(connectionState.getInfo()); 129 restoreTempDestinations(transport, connectionState); 130 131 if (restoreSessions) { 132 restoreSessions(transport, connectionState); 133 } 134 135 if (restoreTransaction) { 136 restoreTransactions(transport, connectionState); 137 } 138 } 139 //now flush messages 140 for (Message msg:messageCache.values()) { 141 transport.oneway(msg); 142 } 143 } 144 145 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { 146 Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>(); 147 for (TransactionState transactionState : connectionState.getTransactionStates()) { 148 if (LOG.isDebugEnabled()) { 149 LOG.debug("tx: " + transactionState.getId()); 150 } 151 152 // rollback any completed transactions - no way to know if commit got there 153 // or if reply went missing 154 // 155 if (!transactionState.getCommands().isEmpty()) { 156 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1); 157 if (lastCommand instanceof TransactionInfo) { 158 TransactionInfo transactionInfo = (TransactionInfo) lastCommand; 159 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) { 160 if (LOG.isDebugEnabled()) { 161 LOG.debug("rolling back potentially completed tx: " + transactionState.getId()); 162 } 163 toRollback.add(transactionInfo); 164 continue; 165 } 166 } 167 } 168 169 // replay short lived producers that may have been involved in the transaction 170 for (ProducerState producerState : transactionState.getProducerStates().values()) { 171 if (LOG.isDebugEnabled()) { 172 LOG.debug("tx replay producer :" + producerState.getInfo()); 173 } 174 transport.oneway(producerState.getInfo()); 175 } 176 177 for (Command command : transactionState.getCommands()) { 178 if (LOG.isDebugEnabled()) { 179 LOG.debug("tx replay: " + command); 180 } 181 transport.oneway(command); 182 } 183 184 for (ProducerState producerState : transactionState.getProducerStates().values()) { 185 if (LOG.isDebugEnabled()) { 186 LOG.debug("tx remove replayed producer :" + producerState.getInfo()); 187 } 188 transport.oneway(producerState.getInfo().createRemoveCommand()); 189 } 190 } 191 192 for (TransactionInfo command: toRollback) { 193 // respond to the outstanding commit 194 ExceptionResponse response = new ExceptionResponse(); 195 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId())); 196 response.setCorrelationId(command.getCommandId()); 197 transport.getTransportListener().onCommand(response); 198 } 199 } 200 201 /** 202 * @param transport 203 * @param connectionState 204 * @throws IOException 205 */ 206 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException { 207 // Restore the connection's sessions 208 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { 209 SessionState sessionState = (SessionState)iter2.next(); 210 if (LOG.isDebugEnabled()) { 211 LOG.debug("session: " + sessionState.getInfo().getSessionId()); 212 } 213 transport.oneway(sessionState.getInfo()); 214 215 if (restoreProducers) { 216 restoreProducers(transport, sessionState); 217 } 218 219 if (restoreConsumers) { 220 restoreConsumers(transport, sessionState); 221 } 222 } 223 } 224 225 /** 226 * @param transport 227 * @param sessionState 228 * @throws IOException 229 */ 230 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException { 231 // Restore the session's consumers 232 for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) { 233 ConsumerState consumerState = (ConsumerState)iter3.next(); 234 if (LOG.isDebugEnabled()) { 235 LOG.debug("restore consumer: " + consumerState.getInfo().getConsumerId()); 236 } 237 transport.oneway(consumerState.getInfo()); 238 } 239 } 240 241 /** 242 * @param transport 243 * @param sessionState 244 * @throws IOException 245 */ 246 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException { 247 // Restore the session's producers 248 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { 249 ProducerState producerState = (ProducerState)iter3.next(); 250 if (LOG.isDebugEnabled()) { 251 LOG.debug("producer: " + producerState.getInfo().getProducerId()); 252 } 253 transport.oneway(producerState.getInfo()); 254 } 255 } 256 257 /** 258 * @param transport 259 * @param connectionState 260 * @throws IOException 261 */ 262 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState) 263 throws IOException { 264 // Restore the connection's temp destinations. 265 for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) { 266 transport.oneway((DestinationInfo)iter2.next()); 267 } 268 } 269 270 public Response processAddDestination(DestinationInfo info) { 271 if (info != null) { 272 ConnectionState cs = connectionStates.get(info.getConnectionId()); 273 if (cs != null && info.getDestination().isTemporary()) { 274 cs.addTempDestination(info); 275 } 276 } 277 return TRACKED_RESPONSE_MARKER; 278 } 279 280 public Response processRemoveDestination(DestinationInfo info) { 281 if (info != null) { 282 ConnectionState cs = connectionStates.get(info.getConnectionId()); 283 if (cs != null && info.getDestination().isTemporary()) { 284 cs.removeTempDestination(info.getDestination()); 285 } 286 } 287 return TRACKED_RESPONSE_MARKER; 288 } 289 290 public Response processAddProducer(ProducerInfo info) { 291 if (info != null && info.getProducerId() != null) { 292 SessionId sessionId = info.getProducerId().getParentId(); 293 if (sessionId != null) { 294 ConnectionId connectionId = sessionId.getParentId(); 295 if (connectionId != null) { 296 ConnectionState cs = connectionStates.get(connectionId); 297 if (cs != null) { 298 SessionState ss = cs.getSessionState(sessionId); 299 if (ss != null) { 300 ss.addProducer(info); 301 } 302 } 303 } 304 } 305 } 306 return TRACKED_RESPONSE_MARKER; 307 } 308 309 public Response processRemoveProducer(ProducerId id) { 310 if (id != null) { 311 SessionId sessionId = id.getParentId(); 312 if (sessionId != null) { 313 ConnectionId connectionId = sessionId.getParentId(); 314 if (connectionId != null) { 315 ConnectionState cs = connectionStates.get(connectionId); 316 if (cs != null) { 317 SessionState ss = cs.getSessionState(sessionId); 318 if (ss != null) { 319 ss.removeProducer(id); 320 } 321 } 322 } 323 } 324 } 325 return TRACKED_RESPONSE_MARKER; 326 } 327 328 public Response processAddConsumer(ConsumerInfo info) { 329 if (info != null) { 330 SessionId sessionId = info.getConsumerId().getParentId(); 331 if (sessionId != null) { 332 ConnectionId connectionId = sessionId.getParentId(); 333 if (connectionId != null) { 334 ConnectionState cs = connectionStates.get(connectionId); 335 if (cs != null) { 336 SessionState ss = cs.getSessionState(sessionId); 337 if (ss != null) { 338 ss.addConsumer(info); 339 } 340 } 341 } 342 } 343 } 344 return TRACKED_RESPONSE_MARKER; 345 } 346 347 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) { 348 if (id != null) { 349 SessionId sessionId = id.getParentId(); 350 if (sessionId != null) { 351 ConnectionId connectionId = sessionId.getParentId(); 352 if (connectionId != null) { 353 ConnectionState cs = connectionStates.get(connectionId); 354 if (cs != null) { 355 SessionState ss = cs.getSessionState(sessionId); 356 if (ss != null) { 357 ss.removeConsumer(id); 358 } 359 } 360 } 361 } 362 } 363 return TRACKED_RESPONSE_MARKER; 364 } 365 366 public Response processAddSession(SessionInfo info) { 367 if (info != null) { 368 ConnectionId connectionId = info.getSessionId().getParentId(); 369 if (connectionId != null) { 370 ConnectionState cs = connectionStates.get(connectionId); 371 if (cs != null) { 372 cs.addSession(info); 373 } 374 } 375 } 376 return TRACKED_RESPONSE_MARKER; 377 } 378 379 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) { 380 if (id != null) { 381 ConnectionId connectionId = id.getParentId(); 382 if (connectionId != null) { 383 ConnectionState cs = connectionStates.get(connectionId); 384 if (cs != null) { 385 cs.removeSession(id); 386 } 387 } 388 } 389 return TRACKED_RESPONSE_MARKER; 390 } 391 392 public Response processAddConnection(ConnectionInfo info) { 393 if (info != null) { 394 connectionStates.put(info.getConnectionId(), new ConnectionState(info)); 395 } 396 return TRACKED_RESPONSE_MARKER; 397 } 398 399 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { 400 if (id != null) { 401 connectionStates.remove(id); 402 } 403 return TRACKED_RESPONSE_MARKER; 404 } 405 406 public Response processMessage(Message send) throws Exception { 407 if (send != null) { 408 if (trackTransactions && send.getTransactionId() != null) { 409 ProducerId producerId = send.getProducerId(); 410 ConnectionId connectionId = producerId.getParentId().getParentId(); 411 if (connectionId != null) { 412 ConnectionState cs = connectionStates.get(connectionId); 413 if (cs != null) { 414 TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); 415 if (transactionState != null) { 416 transactionState.addCommand(send); 417 418 if (trackTransactionProducers) { 419 // for jmstemplate, track the producer in case it is closed before commit 420 // and needs to be replayed 421 SessionState ss = cs.getSessionState(producerId.getParentId()); 422 ProducerState producerState = ss.getProducerState(producerId); 423 producerState.setTransactionState(transactionState); 424 } 425 } 426 } 427 } 428 return TRACKED_RESPONSE_MARKER; 429 }else if (trackMessages) { 430 messageCache.put(send.getMessageId(), send.copy()); 431 } 432 } 433 return null; 434 } 435 436 public Response processBeginTransaction(TransactionInfo info) { 437 if (trackTransactions && info != null && info.getTransactionId() != null) { 438 ConnectionId connectionId = info.getConnectionId(); 439 if (connectionId != null) { 440 ConnectionState cs = connectionStates.get(connectionId); 441 if (cs != null) { 442 cs.addTransactionState(info.getTransactionId()); 443 TransactionState state = cs.getTransactionState(info.getTransactionId()); 444 state.addCommand(info); 445 } 446 } 447 return TRACKED_RESPONSE_MARKER; 448 } 449 return null; 450 } 451 452 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 453 if (trackTransactions && info != null) { 454 ConnectionId connectionId = info.getConnectionId(); 455 if (connectionId != null) { 456 ConnectionState cs = connectionStates.get(connectionId); 457 if (cs != null) { 458 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 459 if (transactionState != null) { 460 transactionState.addCommand(info); 461 } 462 } 463 } 464 return TRACKED_RESPONSE_MARKER; 465 } 466 return null; 467 } 468 469 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 470 if (trackTransactions && info != null) { 471 ConnectionId connectionId = info.getConnectionId(); 472 if (connectionId != null) { 473 ConnectionState cs = connectionStates.get(connectionId); 474 if (cs != null) { 475 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 476 if (transactionState != null) { 477 transactionState.addCommand(info); 478 return new Tracked(new RemoveTransactionAction(info)); 479 } 480 } 481 } 482 } 483 return null; 484 } 485 486 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 487 if (trackTransactions && info != null) { 488 ConnectionId connectionId = info.getConnectionId(); 489 if (connectionId != null) { 490 ConnectionState cs = connectionStates.get(connectionId); 491 if (cs != null) { 492 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 493 if (transactionState != null) { 494 transactionState.addCommand(info); 495 return new Tracked(new RemoveTransactionAction(info)); 496 } 497 } 498 } 499 } 500 return null; 501 } 502 503 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 504 if (trackTransactions && info != null) { 505 ConnectionId connectionId = info.getConnectionId(); 506 if (connectionId != null) { 507 ConnectionState cs = connectionStates.get(connectionId); 508 if (cs != null) { 509 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 510 if (transactionState != null) { 511 transactionState.addCommand(info); 512 return new Tracked(new RemoveTransactionAction(info)); 513 } 514 } 515 } 516 } 517 return null; 518 } 519 520 public Response processEndTransaction(TransactionInfo info) throws Exception { 521 if (trackTransactions && info != null) { 522 ConnectionId connectionId = info.getConnectionId(); 523 if (connectionId != null) { 524 ConnectionState cs = connectionStates.get(connectionId); 525 if (cs != null) { 526 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 527 if (transactionState != null) { 528 transactionState.addCommand(info); 529 } 530 } 531 } 532 return TRACKED_RESPONSE_MARKER; 533 } 534 return null; 535 } 536 537 public boolean isRestoreConsumers() { 538 return restoreConsumers; 539 } 540 541 public void setRestoreConsumers(boolean restoreConsumers) { 542 this.restoreConsumers = restoreConsumers; 543 } 544 545 public boolean isRestoreProducers() { 546 return restoreProducers; 547 } 548 549 public void setRestoreProducers(boolean restoreProducers) { 550 this.restoreProducers = restoreProducers; 551 } 552 553 public boolean isRestoreSessions() { 554 return restoreSessions; 555 } 556 557 public void setRestoreSessions(boolean restoreSessions) { 558 this.restoreSessions = restoreSessions; 559 } 560 561 public boolean isTrackTransactions() { 562 return trackTransactions; 563 } 564 565 public void setTrackTransactions(boolean trackTransactions) { 566 this.trackTransactions = trackTransactions; 567 } 568 569 public boolean isTrackTransactionProducers() { 570 return this.trackTransactionProducers; 571 } 572 573 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 574 this.trackTransactionProducers = trackTransactionProducers; 575 } 576 577 public boolean isRestoreTransaction() { 578 return restoreTransaction; 579 } 580 581 public void setRestoreTransaction(boolean restoreTransaction) { 582 this.restoreTransaction = restoreTransaction; 583 } 584 585 public boolean isTrackMessages() { 586 return trackMessages; 587 } 588 589 public void setTrackMessages(boolean trackMessages) { 590 this.trackMessages = trackMessages; 591 } 592 593 public int getMaxCacheSize() { 594 return maxCacheSize; 595 } 596 597 public void setMaxCacheSize(int maxCacheSize) { 598 this.maxCacheSize = maxCacheSize; 599 } 600 601 }