001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.ArrayList; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.List; 023 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.broker.region.Destination; 026import org.apache.activemq.broker.region.MessageReference; 027import org.apache.activemq.broker.region.QueueMessageReference; 028 029/** 030 * hold pending messages in a linked list (messages awaiting disptach to a 031 * consumer) cursor 032 * 033 * 034 */ 035public class VMPendingMessageCursor extends AbstractPendingMessageCursor { 036 private final PendingList list; 037 private Iterator<MessageReference> iter; 038 039 public VMPendingMessageCursor(boolean prioritizedMessages) { 040 super(prioritizedMessages); 041 if (this.prioritizedMessages) { 042 this.list= new PrioritizedPendingList(); 043 }else { 044 this.list = new OrderedPendingList(); 045 } 046 } 047 048 049 @Override 050 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) 051 throws Exception { 052 List<MessageReference> rc = new ArrayList<MessageReference>(); 053 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { 054 MessageReference r = iterator.next(); 055 if (r.getRegionDestination() == destination) { 056 r.decrementReferenceCount(); 057 rc.add(r); 058 iterator.remove(); 059 } 060 } 061 return rc; 062 } 063 064 /** 065 * @return true if there are no pending messages 066 */ 067 068 @Override 069 public synchronized boolean isEmpty() { 070 if (list.isEmpty()) { 071 return true; 072 } else { 073 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { 074 MessageReference node = iterator.next(); 075 if (node == QueueMessageReference.NULL_MESSAGE) { 076 continue; 077 } 078 if (!node.isDropped()) { 079 return false; 080 } 081 // We can remove dropped references. 082 iterator.remove(); 083 } 084 return true; 085 } 086 } 087 088 /** 089 * reset the cursor 090 */ 091 092 @Override 093 public synchronized void reset() { 094 iter = list.iterator(); 095 last = null; 096 } 097 098 /** 099 * add message to await dispatch 100 * 101 * @param node 102 */ 103 104 @Override 105 public synchronized boolean tryAddMessageLast(MessageReference node, long maxWait) { 106 node.incrementReferenceCount(); 107 list.addMessageLast(node); 108 return true; 109 } 110 111 /** 112 * add message to await dispatch 113 * 114 * @param node 115 */ 116 117 @Override 118 public synchronized void addMessageFirst(MessageReference node) { 119 node.incrementReferenceCount(); 120 list.addMessageFirst(node); 121 } 122 123 /** 124 * @return true if there pending messages to dispatch 125 */ 126 127 @Override 128 public synchronized boolean hasNext() { 129 return iter.hasNext(); 130 } 131 132 /** 133 * @return the next pending message 134 */ 135 136 @Override 137 public synchronized MessageReference next() { 138 last = iter.next(); 139 if (last != null) { 140 last.incrementReferenceCount(); 141 } 142 return last; 143 } 144 145 /** 146 * remove the message at the cursor position 147 */ 148 149 @Override 150 public synchronized void remove() { 151 if (last != null) { 152 last.decrementReferenceCount(); 153 } 154 iter.remove(); 155 } 156 157 /** 158 * @return the number of pending messages 159 */ 160 161 @Override 162 public synchronized int size() { 163 return list.size(); 164 } 165 166 @Override 167 public synchronized long messageSize() { 168 return list.messageSize(); 169 } 170 171 /** 172 * clear all pending messages 173 */ 174 175 @Override 176 public synchronized void clear() { 177 for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) { 178 MessageReference ref = i.next(); 179 ref.decrementReferenceCount(); 180 } 181 list.clear(); 182 } 183 184 185 @Override 186 public synchronized void remove(MessageReference node) { 187 list.remove(node); 188 node.decrementReferenceCount(); 189 } 190 191 /** 192 * Page in a restricted number of messages 193 * 194 * @param maxItems 195 * @return a list of paged in messages 196 */ 197 198 @Override 199 public LinkedList<MessageReference> pageInList(int maxItems) { 200 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 201 for (Iterator<MessageReference>i = list.iterator();i.hasNext();) { 202 MessageReference ref = i.next(); 203 ref.incrementReferenceCount(); 204 result.add(ref); 205 if (result.size() >= maxItems) { 206 break; 207 } 208 } 209 return result; 210 } 211 212 213 @Override 214 public boolean isTransient() { 215 return true; 216 } 217 218 219 @Override 220 public void destroy() throws Exception { 221 super.destroy(); 222 clear(); 223 } 224}