persist pending messages pending message (messages awaiting dispatch to a
consumer) cursor
Method from org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor Detail: |
public synchronized void add(ConnectionContext context,
Destination destination) throws Exception {
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
tsp.setMaxBatchSize(getMaxBatchSize());
tsp.setSystemUsage(systemUsage);
tsp.setEnableAudit(isEnableAudit());
tsp.setMaxAuditDepth(getMaxAuditDepth());
tsp.setMaxProducersToAudit(getMaxProducersToAudit());
tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
topics.put(destination, tsp);
storePrefetches.add(tsp);
if (isStarted()) {
tsp.start();
}
}
}
|
public synchronized void addMessageLast(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
if (isStarted()) {
if (!msg.isPersistent()) {
nonPersistent.addMessageLast(node);
}
}
if (msg.isPersistent()) {
Destination dest = msg.getRegionDestination();
TopicStorePrefetch tsp = topics.get(dest);
if (tsp != null) {
tsp.addMessageLast(node);
}
}
}
}
|
public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
nonPersistent.addMessageLast(node);
}
|
public synchronized void clear() {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.clear();
}
}
|
public synchronized void gc() {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.gc();
}
}
|
protected synchronized PendingMessageCursor getNextCursor() throws Exception {
if (currentCursor == null || currentCursor.isEmpty()) {
currentCursor = null;
for (PendingMessageCursor tsp : storePrefetches) {
if (tsp.hasNext()) {
currentCursor = tsp;
break;
}
}
// round-robin
if (storePrefetches.size() >1) {
PendingMessageCursor first = storePrefetches.remove(0);
storePrefetches.add(first);
}
}
return currentCursor;
}
|
public synchronized boolean hasNext() {
boolean result = true;
if (result) {
try {
currentCursor = getNextCursor();
} catch (Exception e) {
LOG.error("Failed to get current cursor ", e);
throw new RuntimeException(e);
}
result = currentCursor != null ? currentCursor.hasNext() : false;
}
return result;
}
|
public synchronized boolean isEmpty() {
for (PendingMessageCursor tsp : storePrefetches) {
if( !tsp.isEmpty() )
return false;
}
return true;
}
|
public synchronized boolean isEmpty(Destination destination) {
boolean result = true;
TopicStorePrefetch tsp = topics.get(destination);
if (tsp != null) {
result = tsp.isEmpty();
}
return result;
}
|
public boolean isRecoveryRequired() {
return false;
}
Informs the Broker if the subscription needs to intervention to recover
it's state e.g. DurableTopicSubscriber may do |
public synchronized MessageReference next() {
MessageReference result = currentCursor != null ? currentCursor.next() : null;
return result;
}
|
public synchronized void release() {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.release();
}
}
|
public synchronized void remove() {
if (currentCursor != null) {
currentCursor.remove();
}
}
|
public synchronized void remove(MessageReference node) {
if (currentCursor != null) {
currentCursor.remove(node);
}
}
|
public synchronized List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
PendingMessageCursor tsp = topics.remove(destination);
if (tsp != null) {
storePrefetches.remove(tsp);
}
return Collections.EMPTY_LIST;
}
|
public synchronized void reset() {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.reset();
}
}
|
public void setEnableAudit(boolean enableAudit) {
super.setEnableAudit(enableAudit);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setEnableAudit(enableAudit);
}
}
|
public void setMaxAuditDepth(int maxAuditDepth) {
super.setMaxAuditDepth(maxAuditDepth);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setMaxAuditDepth(maxAuditDepth);
}
}
|
public void setMaxBatchSize(int maxBatchSize) {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.setMaxBatchSize(maxBatchSize);
}
super.setMaxBatchSize(maxBatchSize);
}
|
public void setMaxProducersToAudit(int maxProducersToAudit) {
super.setMaxProducersToAudit(maxProducersToAudit);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setMaxAuditDepth(maxAuditDepth);
}
}
|
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
}
}
|
public void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager);
for (PendingMessageCursor tsp : storePrefetches) {
tsp.setSystemUsage(usageManager);
}
}
|
public void setUseCache(boolean useCache) {
super.setUseCache(useCache);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.setUseCache(useCache);
}
}
|
public synchronized int size() {
int pendingCount=0;
for (PendingMessageCursor tsp : storePrefetches) {
pendingCount += tsp.size();
}
return pendingCount;
}
|
public synchronized void start() throws Exception {
if (!isStarted()) {
super.start();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.setMessageAudit(getMessageAudit());
tsp.start();
}
}
}
|
public synchronized void stop() throws Exception {
if (isStarted()) {
super.stop();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.stop();
}
}
}
|
public String toString() {
return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
}
|