public void send(ProducerBrokerExchange producerExchange,
Message message) throws Exception {
if (message.getTimestamp() > 0
&& (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) {
// timestamp not been disabled and has not passed through a network
long oldExpiration = message.getExpiration();
long newTimeStamp = System.currentTimeMillis();
long timeToLive = zeroExpirationOverride;
if (oldExpiration > 0) {
long oldTimestamp = message.getTimestamp();
timeToLive = oldExpiration - oldTimestamp;
}
if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) {
timeToLive = ttlCeiling;
}
long expiration = timeToLive + newTimeStamp;
//In the scenario that the Broker is behind the clients we never want to set the Timestamp and Expiration in the past
if(!futureOnly || (expiration > oldExpiration)) {
if (timeToLive > 0 && expiration > 0) {
message.setExpiration(expiration);
}
message.setTimestamp(newTimeStamp);
}
}
super.send(producerExchange, message);
}
|