hold pending messages in a linked list (messages awaiting disptach to a
consumer) cursor
Method from org.apache.activemq.broker.region.cursors.VMPendingMessageCursor Detail: |
public synchronized void addMessageFirst(MessageReference node) {
node.incrementReferenceCount();
list.addFirst(node);
}
add message to await dispatch |
public synchronized void addMessageLast(MessageReference node) {
node.incrementReferenceCount();
list.addLast(node);
}
add message to await dispatch |
public synchronized void clear() {
for (Iterator< MessageReference > i = list.iterator(); i.hasNext();) {
MessageReference ref = i.next();
ref.decrementReferenceCount();
}
list.clear();
}
clear all pending messages |
public void destroy() throws Exception {
super.destroy();
clear();
}
|
public synchronized boolean hasNext() {
return iter.hasNext();
}
|
public synchronized boolean isEmpty() {
if (list.isEmpty()) {
return true;
} else {
for (Iterator< MessageReference > iterator = list.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
if (node == QueueMessageReference.NULL_MESSAGE) {
continue;
}
if (!node.isDropped()) {
return false;
}
// We can remove dropped references.
iterator.remove();
}
return true;
}
}
|
public boolean isTransient() {
return true;
}
|
public synchronized MessageReference next() {
last = iter.next();
if (last != null) {
last.incrementReferenceCount();
}
return last;
}
|
public LinkedList<MessageReference> pageInList(int maxItems) {
LinkedList< MessageReference > result = new LinkedList< MessageReference >();
for (MessageReference ref: list) {
ref.incrementReferenceCount();
result.add(ref);
if (result.size() >= maxItems) {
break;
}
}
return result;
}
Page in a restricted number of messages |
public synchronized void remove() {
if (last != null) {
last.decrementReferenceCount();
}
iter.remove();
}
remove the message at the cursor position |
public synchronized void remove(MessageReference node) {
for (Iterator< MessageReference > i = list.iterator(); i.hasNext();) {
MessageReference ref = i.next();
if (node.getMessageId().equals(ref.getMessageId())) {
ref.decrementReferenceCount();
i.remove();
break;
}
}
}
|
public synchronized List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
List< MessageReference > rc = new ArrayList< MessageReference >();
for (Iterator< MessageReference > iterator = list.iterator(); iterator.hasNext();) {
MessageReference r = iterator.next();
if (r.getRegionDestination() == destination) {
r.decrementReferenceCount();
rc.add(r);
iterator.remove();
}
}
return rc;
}
|
public synchronized void reset() {
iter = list.listIterator();
last = null;
}
|
public synchronized int size() {
return list.size();
}
|