001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.Collections;
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Set;
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.BaseDestination;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.usage.SystemUsage;
032
033/**
034 * Abstract method holder for pending message (messages awaiting disptach to a
035 * consumer) cursor
036 *
037 *
038 */
039public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040    protected int memoryUsageHighWaterMark = 70;
041    protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042    protected SystemUsage systemUsage;
043    protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044    protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045    protected boolean enableAudit=true;
046    protected ActiveMQMessageAudit audit;
047    protected boolean useCache=true;
048    private boolean cacheEnabled=true;
049    private boolean started=false;
050    protected MessageReference last = null;
051    protected final boolean prioritizedMessages;
052
053    public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054        this.prioritizedMessages=prioritizedMessages;
055    }
056
057
058    @Override
059    public synchronized void start() throws Exception  {
060        if (!started && enableAudit && audit==null) {
061            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
062        }
063        started=true;
064    }
065
066    @Override
067    public synchronized void stop() throws Exception  {
068        started=false;
069        gc();
070    }
071
072    @Override
073    public void add(ConnectionContext context, Destination destination) throws Exception {
074    }
075
076    @Override
077    @SuppressWarnings("unchecked")
078    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
079        return Collections.EMPTY_LIST;
080    }
081
082    @Override
083    public boolean isRecoveryRequired() {
084        return true;
085    }
086
087    @Override
088    public void addMessageFirst(MessageReference node) throws Exception {
089    }
090
091    @Override
092    public boolean addMessageLast(MessageReference node) throws Exception {
093        return tryAddMessageLast(node, INFINITE_WAIT);
094    }
095
096    @Override
097    public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
098        return true;
099    }
100
101    @Override
102    public void addRecoveredMessage(MessageReference node) throws Exception {
103        addMessageLast(node);
104    }
105
106    @Override
107    public void clear() {
108    }
109
110    @Override
111    public boolean hasNext() {
112        return false;
113    }
114
115    @Override
116    public boolean isEmpty() {
117        return false;
118    }
119
120    @Override
121    public boolean isEmpty(Destination destination) {
122        return isEmpty();
123    }
124
125    @Override
126    public MessageReference next() {
127        return null;
128    }
129
130    @Override
131    public void remove() {
132    }
133
134    @Override
135    public void reset() {
136    }
137
138    @Override
139    public int size() {
140        return 0;
141    }
142
143    @Override
144    public int getMaxBatchSize() {
145        return maxBatchSize;
146    }
147
148    @Override
149    public void setMaxBatchSize(int maxBatchSize) {
150        this.maxBatchSize = maxBatchSize;
151    }
152
153    protected void fillBatch() throws Exception {
154    }
155
156    @Override
157    public void resetForGC() {
158        reset();
159    }
160
161    @Override
162    public void remove(MessageReference node) {
163    }
164
165    @Override
166    public void gc() {
167    }
168
169    @Override
170    public void setSystemUsage(SystemUsage usageManager) {
171        this.systemUsage = usageManager;
172    }
173
174    @Override
175    public boolean hasSpace() {
176        return systemUsage != null ? (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark)) : true;
177    }
178
179    @Override
180    public boolean isFull() {
181        return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
182    }
183
184    @Override
185    public void release() {
186    }
187
188    @Override
189    public boolean hasMessagesBufferedToDeliver() {
190        return false;
191    }
192
193    /**
194     * @return the memoryUsageHighWaterMark
195     */
196    @Override
197    public int getMemoryUsageHighWaterMark() {
198        return memoryUsageHighWaterMark;
199    }
200
201    /**
202     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
203     */
204    @Override
205    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
206        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
207    }
208
209    /**
210     * @return the usageManager
211     */
212    @Override
213    public SystemUsage getSystemUsage() {
214        return this.systemUsage;
215    }
216
217    /**
218     * destroy the cursor
219     *
220     * @throws Exception
221     */
222    @Override
223    public void destroy() throws Exception {
224        stop();
225    }
226
227    /**
228     * Page in a restricted number of messages
229     *
230     * @param maxItems maximum number of messages to return
231     * @return a list of paged in messages
232     */
233    @Override
234    public LinkedList<MessageReference> pageInList(int maxItems) {
235        throw new RuntimeException("Not supported");
236    }
237
238    /**
239     * @return the maxProducersToAudit
240     */
241    @Override
242    public int getMaxProducersToAudit() {
243        return maxProducersToAudit;
244    }
245
246    /**
247     * @param maxProducersToAudit the maxProducersToAudit to set
248     */
249    @Override
250    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
251        this.maxProducersToAudit = maxProducersToAudit;
252        if (audit != null) {
253            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
254        }
255    }
256
257    /**
258     * @return the maxAuditDepth
259     */
260    @Override
261    public int getMaxAuditDepth() {
262        return maxAuditDepth;
263    }
264
265
266    /**
267     * @param maxAuditDepth the maxAuditDepth to set
268     */
269    @Override
270    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
271        this.maxAuditDepth = maxAuditDepth;
272        if (audit != null) {
273            audit.setAuditDepth(maxAuditDepth);
274        }
275    }
276
277
278    /**
279     * @return the enableAudit
280     */
281    @Override
282    public boolean isEnableAudit() {
283        return enableAudit;
284    }
285
286    /**
287     * @param enableAudit the enableAudit to set
288     */
289    @Override
290    public synchronized void setEnableAudit(boolean enableAudit) {
291        this.enableAudit = enableAudit;
292        if (enableAudit && started && audit==null) {
293            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
294        }
295    }
296
297    @Override
298    public boolean isTransient() {
299        return false;
300    }
301
302
303    /**
304     * set the audit
305     * @param audit new audit component
306     */
307    @Override
308    public void setMessageAudit(ActiveMQMessageAudit audit) {
309        this.audit=audit;
310    }
311
312
313    /**
314     * @return the audit
315     */
316    @Override
317    public ActiveMQMessageAudit getMessageAudit() {
318        return audit;
319    }
320
321    @Override
322    public boolean isUseCache() {
323        return useCache;
324    }
325
326    @Override
327    public void setUseCache(boolean useCache) {
328        this.useCache = useCache;
329    }
330
331    public synchronized boolean isDuplicate(MessageId messageId) {
332        boolean unique = recordUniqueId(messageId);
333        rollback(messageId);
334        return !unique;
335    }
336
337    /**
338     * records a message id and checks if it is a duplicate
339     * @param messageId
340     * @return true if id is unique, false otherwise.
341     */
342    public synchronized boolean recordUniqueId(MessageId messageId) {
343        if (!enableAudit || audit==null) {
344            return true;
345        }
346        return !audit.isDuplicate(messageId);
347    }
348
349    @Override
350    public synchronized void rollback(MessageId id) {
351        if (audit != null) {
352            audit.rollback(id);
353        }
354    }
355
356    public synchronized boolean isStarted() {
357        return started;
358    }
359
360    public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
361        boolean result = false;
362        Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
363        if (destinations != null) {
364            for (Destination dest:destinations) {
365                if (dest.isPrioritizedMessages()) {
366                    result = true;
367                    break;
368                }
369            }
370        }
371        return result;
372
373    }
374
375    @Override
376    public synchronized boolean isCacheEnabled() {
377        return cacheEnabled;
378    }
379
380    public synchronized void setCacheEnabled(boolean val) {
381        cacheEnabled = val;
382    }
383
384    @Override
385    public void rebase() {
386    }
387}