1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.broker.util; 18 19 import org.apache.activemq.broker.BrokerPluginSupport; 20 import org.apache.activemq.broker.ProducerBrokerExchange; 21 import org.apache.activemq.command.Message; 22 import org.apache.commons.logging.Log; 23 import org.apache.commons.logging.LogFactory; 24 25 /** 26 * A Broker interceptor which updates a JMS Client's timestamp on the message 27 * with a broker timestamp. Useful when the clocks on client machines are known 28 * to not be correct and you can only trust the time set on the broker machines. 29 * 30 * Enabling this plugin will break JMS compliance since the timestamp that the 31 * producer sees on the messages after as send() will be different from the 32 * timestamp the consumer will observe when he receives the message. This plugin 33 * is not enabled in the default ActiveMQ configuration. 34 * 35 * 2 new attributes have been added which will allow the administrator some override control 36 * over the expiration time for incoming messages: 37 * 38 * Attribute 'zeroExpirationOverride' can be used to apply an expiration 39 * time to incoming messages with no expiration defined (messages that would never expire) 40 * 41 * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time 42 * 43 * @org.apache.xbean.XBean element="timeStampingBrokerPlugin" 44 * 45 * @version $Revision$ 46 */ 47 public class TimeStampingBrokerPlugin extends BrokerPluginSupport { 48 49 /** 50 * variable which (when non-zero) is used to override 51 * the expiration date for messages that arrive with 52 * no expiration date set (in Milliseconds). 53 */ 54 long zeroExpirationOverride = 0; 55 56 /** 57 * variable which (when non-zero) is used to limit 58 * the expiration date (in Milliseconds). 59 */ 60 long ttlCeiling = 0; 61 62 /** 63 * If true, the plugin will not update timestamp to past values 64 * False by default 65 */ 66 boolean futureOnly = false; 67 68 /** 69 * setter method for zeroExpirationOverride 70 */ 71 public void setZeroExpirationOverride(long ttl) 72 { 73 this.zeroExpirationOverride = ttl; 74 } 75 76 /** 77 * setter method for ttlCeiling 78 */ 79 public void setTtlCeiling(long ttlCeiling) 80 { 81 this.ttlCeiling = ttlCeiling; 82 } 83 84 public void setFutureOnly(boolean futureOnly) { 85 this.futureOnly = futureOnly; 86 } 87 88 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 89 if (message.getTimestamp() > 0 90 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { 91 // timestamp not been disabled and has not passed through a network 92 long oldExpiration = message.getExpiration(); 93 long newTimeStamp = System.currentTimeMillis(); 94 long timeToLive = zeroExpirationOverride; 95 if (oldExpiration > 0) { 96 long oldTimestamp = message.getTimestamp(); 97 timeToLive = oldExpiration - oldTimestamp; 98 } 99 if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) { 100 timeToLive = ttlCeiling; 101 } 102 long expiration = timeToLive + newTimeStamp; 103 //In the scenario that the Broker is behind the clients we never want to set the Timestamp and Expiration in the past 104 if(!futureOnly || (expiration > oldExpiration)) { 105 if (timeToLive > 0 && expiration > 0) { 106 message.setExpiration(expiration); 107 } 108 message.setTimestamp(newTimeStamp); 109 } 110 } 111 super.send(producerExchange, message); 112 } 113 }