1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.broker.region.cursors; 18 19 import java.util.Collections; 20 import java.util.LinkedList; 21 import java.util.List; 22 import org.apache.activemq.ActiveMQMessageAudit; 23 import org.apache.activemq.broker.ConnectionContext; 24 import org.apache.activemq.broker.region.BaseDestination; 25 import org.apache.activemq.broker.region.Destination; 26 import org.apache.activemq.broker.region.MessageReference; 27 import org.apache.activemq.command.MessageId; 28 import org.apache.activemq.usage.SystemUsage; 29 30 /** 31 * Abstract method holder for pending message (messages awaiting disptach to a 32 * consumer) cursor 33 * 34 * @version $Revision: 882100 $ 35 */ 36 public class AbstractPendingMessageCursor implements PendingMessageCursor { 37 protected int memoryUsageHighWaterMark = 70; 38 protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE; 39 protected SystemUsage systemUsage; 40 protected int maxProducersToAudit=1024; 41 protected int maxAuditDepth=1000; 42 protected boolean enableAudit=true; 43 protected ActiveMQMessageAudit audit; 44 protected boolean useCache=true; 45 private boolean started=false; 46 protected MessageReference last = null; 47 48 49 public synchronized void start() throws Exception { 50 if (!started && enableAudit && audit==null) { 51 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 52 } 53 started=true; 54 } 55 56 public synchronized void stop() throws Exception { 57 started=false; 58 audit=null; 59 gc(); 60 } 61 62 public void add(ConnectionContext context, Destination destination) throws Exception { 63 } 64 65 @SuppressWarnings("unchecked") 66 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 67 return Collections.EMPTY_LIST; 68 } 69 70 public boolean isRecoveryRequired() { 71 return true; 72 } 73 74 public void addMessageFirst(MessageReference node) throws Exception { 75 } 76 77 public void addMessageLast(MessageReference node) throws Exception { 78 } 79 80 public void addRecoveredMessage(MessageReference node) throws Exception { 81 addMessageLast(node); 82 } 83 84 public void clear() { 85 } 86 87 public boolean hasNext() { 88 return false; 89 } 90 91 public boolean isEmpty() { 92 return false; 93 } 94 95 public boolean isEmpty(Destination destination) { 96 return isEmpty(); 97 } 98 99 public MessageReference next() { 100 return null; 101 } 102 103 public void remove() { 104 } 105 106 public void reset() { 107 } 108 109 public int size() { 110 return 0; 111 } 112 113 public int getMaxBatchSize() { 114 return maxBatchSize; 115 } 116 117 public void setMaxBatchSize(int maxBatchSize) { 118 this.maxBatchSize = maxBatchSize; 119 } 120 121 protected void fillBatch() throws Exception { 122 } 123 124 public void resetForGC() { 125 reset(); 126 } 127 128 public void remove(MessageReference node) { 129 } 130 131 public void gc() { 132 } 133 134 public void setSystemUsage(SystemUsage usageManager) { 135 this.systemUsage = usageManager; 136 } 137 138 public boolean hasSpace() { 139 return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true; 140 } 141 142 public boolean isFull() { 143 return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false; 144 } 145 146 public void release() { 147 } 148 149 public boolean hasMessagesBufferedToDeliver() { 150 return false; 151 } 152 153 /** 154 * @return the memoryUsageHighWaterMark 155 */ 156 public int getMemoryUsageHighWaterMark() { 157 return memoryUsageHighWaterMark; 158 } 159 160 /** 161 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 162 */ 163 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 164 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 165 } 166 167 /** 168 * @return the usageManager 169 */ 170 public SystemUsage getSystemUsage() { 171 return this.systemUsage; 172 } 173 174 /** 175 * destroy the cursor 176 * 177 * @throws Exception 178 */ 179 public void destroy() throws Exception { 180 stop(); 181 } 182 183 /** 184 * Page in a restricted number of messages 185 * 186 * @param maxItems maximum number of messages to return 187 * @return a list of paged in messages 188 */ 189 public LinkedList<MessageReference> pageInList(int maxItems) { 190 throw new RuntimeException("Not supported"); 191 } 192 193 /** 194 * @return the maxProducersToAudit 195 */ 196 public int getMaxProducersToAudit() { 197 return maxProducersToAudit; 198 } 199 200 /** 201 * @param maxProducersToAudit the maxProducersToAudit to set 202 */ 203 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 204 this.maxProducersToAudit = maxProducersToAudit; 205 if (audit != null) { 206 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 207 } 208 } 209 210 /** 211 * @return the maxAuditDepth 212 */ 213 public int getMaxAuditDepth() { 214 return maxAuditDepth; 215 } 216 217 218 /** 219 * @param maxAuditDepth the maxAuditDepth to set 220 */ 221 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 222 this.maxAuditDepth = maxAuditDepth; 223 if (audit != null) { 224 audit.setAuditDepth(maxAuditDepth); 225 } 226 } 227 228 229 /** 230 * @return the enableAudit 231 */ 232 public boolean isEnableAudit() { 233 return enableAudit; 234 } 235 236 /** 237 * @param enableAudit the enableAudit to set 238 */ 239 public synchronized void setEnableAudit(boolean enableAudit) { 240 this.enableAudit = enableAudit; 241 if (enableAudit && started && audit==null) { 242 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 243 } 244 } 245 246 public boolean isTransient() { 247 return false; 248 } 249 250 251 /** 252 * set the audit 253 * @param audit new audit component 254 */ 255 public void setMessageAudit(ActiveMQMessageAudit audit) { 256 this.audit=audit; 257 } 258 259 260 /** 261 * @return the audit 262 */ 263 public ActiveMQMessageAudit getMessageAudit() { 264 return audit; 265 } 266 267 public boolean isUseCache() { 268 return useCache; 269 } 270 271 public void setUseCache(boolean useCache) { 272 this.useCache = useCache; 273 } 274 275 public synchronized boolean isDuplicate(MessageId messageId) { 276 boolean unique = recordUniqueId(messageId); 277 rollback(messageId); 278 return !unique; 279 } 280 281 /** 282 * records a message id and checks if it is a duplicate 283 * @param messageId 284 * @return true if id is unique, false otherwise. 285 */ 286 public synchronized boolean recordUniqueId(MessageId messageId) { 287 if (!enableAudit || audit==null) { 288 return true; 289 } 290 return !audit.isDuplicate(messageId); 291 } 292 293 public synchronized void rollback(MessageId id) { 294 if (audit != null) { 295 audit.rollback(id); 296 } 297 } 298 299 protected synchronized boolean isStarted() { 300 return started; 301 } 302 }