001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.List;
023
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.broker.region.QueueMessageReference;
026import org.apache.activemq.command.MessageId;
027
028/**
029 * An abstraction that keeps the correct order of messages that need to be dispatched
030 * to consumers, but also hides the fact that there might be redelivered messages that
031 * should be dispatched ahead of any other paged in messages.
032 *
033 * Direct usage of this class is recommended as you can control when redeliveries need
034 * to be added vs regular pending messages (the next set of messages that can be dispatched)
035 *
036 * Created by ceposta
037 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
038 */
039public class QueueDispatchPendingList implements PendingList {
040
041    private PendingList pagedInPendingDispatch = new OrderedPendingList();
042    private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
043    // when true use one PrioritizedPendingList for everything
044    private boolean prioritized = false;
045
046
047    @Override
048    public boolean isEmpty() {
049        return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty();
050    }
051
052    @Override
053    public void clear() {
054        pagedInPendingDispatch.clear();
055        redeliveredWaitingDispatch.clear();
056    }
057
058    /**
059     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
060     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
061     * method
062     * @param message
063     *      The MessageReference that is to be added to this list.
064     *
065     * @return the pending node.
066     */
067    @Override
068    public PendingNode addMessageFirst(MessageReference message) {
069        return pagedInPendingDispatch.addMessageFirst(message);
070    }
071
072    /**
073     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
074     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
075     * method
076     * @param message
077     *      The MessageReference that is to be added to this list.
078     *
079     * @return the pending node.
080     */
081    @Override
082    public PendingNode addMessageLast(MessageReference message) {
083        return pagedInPendingDispatch.addMessageLast(message);
084    }
085
086    @Override
087    public PendingNode remove(MessageReference message) {
088        if (pagedInPendingDispatch.contains(message)) {
089            return pagedInPendingDispatch.remove(message);
090        }else if (redeliveredWaitingDispatch.contains(message)) {
091            return redeliveredWaitingDispatch.remove(message);
092        }
093        return null;
094    }
095
096    @Override
097    public int size() {
098        return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size();
099    }
100
101    @Override
102    public long messageSize() {
103        return pagedInPendingDispatch.messageSize() + redeliveredWaitingDispatch.messageSize();
104    }
105
106    @Override
107    public Iterator<MessageReference> iterator() {
108        return new Iterator<MessageReference>() {
109
110            Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator();
111            Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator();
112            Iterator<MessageReference> current = redeliveries;
113
114
115            @Override
116            public boolean hasNext() {
117                if (!redeliveries.hasNext() && (current == redeliveries)) {
118                    current = pendingDispatch;
119                }
120                return current.hasNext();
121            }
122
123            @Override
124            public MessageReference next() {
125                return current.next();
126            }
127
128            @Override
129            public void remove() {
130                current.remove();
131            }
132        };
133    }
134
135    @Override
136    public boolean contains(MessageReference message) {
137        return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message);
138    }
139
140    @Override
141    public Collection<MessageReference> values() {
142        List<MessageReference> messageReferences = new ArrayList<MessageReference>();
143        Iterator<MessageReference> iterator = iterator();
144        while (iterator.hasNext()) {
145            messageReferences.add(iterator.next());
146        }
147        return messageReferences;
148    }
149
150    @Override
151    public void addAll(PendingList pendingList) {
152        pagedInPendingDispatch.addAll(pendingList);
153    }
154
155    @Override
156    public MessageReference get(MessageId messageId) {
157        MessageReference rc = pagedInPendingDispatch.get(messageId);
158        if (rc == null) {
159            return redeliveredWaitingDispatch.get(messageId);
160        }
161        return rc;
162    }
163
164    public void setPrioritizedMessages(boolean prioritizedMessages) {
165        prioritized = prioritizedMessages;
166        if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
167            pagedInPendingDispatch = new PrioritizedPendingList();
168            redeliveredWaitingDispatch = new PrioritizedPendingList();
169        } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
170            pagedInPendingDispatch = new OrderedPendingList();
171            redeliveredWaitingDispatch = new OrderedPendingList();
172        }
173    }
174
175    public void addMessageForRedelivery(QueueMessageReference qmr) {
176        if (prioritized) {
177            pagedInPendingDispatch.addMessageLast(qmr);
178        } else {
179            redeliveredWaitingDispatch.addMessageLast(qmr);
180        }
181    }
182
183    public boolean hasRedeliveries(){
184        return prioritized ? !pagedInPendingDispatch.isEmpty() : !redeliveredWaitingDispatch.isEmpty();
185    }
186}