001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.List;
023
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.command.MessageId;
026
027/**
028 * An abstraction that keeps the correct order of messages that need to be dispatched
029 * to consumers, but also hides the fact that there might be redelivered messages that
030 * should be dispatched ahead of any other paged in messages.
031 *
032 * Direct usage of this class is recommended as you can control when redeliveries need
033 * to be added vs regular pending messages (the next set of messages that can be dispatched)
034 *
035 * Created by ceposta
036 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
037 */
038public class QueueDispatchPendingList implements PendingList {
039
040    private PendingList pagedInPendingDispatch = new OrderedPendingList();
041    private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
042    private boolean prioritized = false;
043
044
045    @Override
046    public boolean isEmpty() {
047        return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty();
048    }
049
050    @Override
051    public void clear() {
052        pagedInPendingDispatch.clear();
053        redeliveredWaitingDispatch.clear();
054    }
055
056    /**
057     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
058     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
059     * method
060     * @param message
061     *      The MessageReference that is to be added to this list.
062     *
063     * @return the pending node.
064     */
065    @Override
066    public PendingNode addMessageFirst(MessageReference message) {
067        return pagedInPendingDispatch.addMessageFirst(message);
068    }
069
070    /**
071     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
072     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
073     * method
074     * @param message
075     *      The MessageReference that is to be added to this list.
076     *
077     * @return the pending node.
078     */
079    @Override
080    public PendingNode addMessageLast(MessageReference message) {
081        return pagedInPendingDispatch.addMessageLast(message);
082    }
083
084    @Override
085    public PendingNode remove(MessageReference message) {
086        if (pagedInPendingDispatch.contains(message)) {
087            return pagedInPendingDispatch.remove(message);
088        } else if (redeliveredWaitingDispatch.contains(message)) {
089            return redeliveredWaitingDispatch.remove(message);
090        }
091        return null;
092    }
093
094    @Override
095    public int size() {
096        return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size();
097    }
098
099    @Override
100    public long messageSize() {
101        return pagedInPendingDispatch.messageSize() + redeliveredWaitingDispatch.messageSize();
102    }
103
104    @Override
105    public Iterator<MessageReference> iterator() {
106        if (prioritized && hasRedeliveries()) {
107            final QueueDispatchPendingList delegate = this;
108            final PrioritizedPendingList  priorityOrderedRedeliveredAndPending = new PrioritizedPendingList();
109            priorityOrderedRedeliveredAndPending.addAll(redeliveredWaitingDispatch);
110            priorityOrderedRedeliveredAndPending.addAll(pagedInPendingDispatch);
111
112            return new Iterator<MessageReference>() {
113
114                Iterator<MessageReference> combinedIterator = priorityOrderedRedeliveredAndPending.iterator();
115                MessageReference current = null;
116
117                @Override
118                public boolean hasNext() {
119                    return combinedIterator.hasNext();
120                }
121
122                @Override
123                public MessageReference next() {
124                    current = combinedIterator.next();
125                    return current;
126                }
127
128                @Override
129                public void remove() {
130                    if (current!=null) {
131                        delegate.remove(current);
132                    }
133                }
134            };
135
136        } else {
137
138            return new Iterator<MessageReference>() {
139
140                Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator();
141                Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator();
142                Iterator<MessageReference> current = redeliveries;
143
144
145                @Override
146                public boolean hasNext() {
147                    if (!redeliveries.hasNext() && (current == redeliveries)) {
148                        current = pendingDispatch;
149                    }
150                    return current.hasNext();
151                }
152
153                @Override
154                public MessageReference next() {
155                    return current.next();
156                }
157
158                @Override
159                public void remove() {
160                    current.remove();
161                }
162            };
163        }
164    }
165
166    @Override
167    public boolean contains(MessageReference message) {
168        return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message);
169    }
170
171    @Override
172    public Collection<MessageReference> values() {
173        List<MessageReference> messageReferences = new ArrayList<MessageReference>();
174        Iterator<MessageReference> iterator = iterator();
175        while (iterator.hasNext()) {
176            messageReferences.add(iterator.next());
177        }
178        return messageReferences;
179    }
180
181    @Override
182    public void addAll(PendingList pendingList) {
183        pagedInPendingDispatch.addAll(pendingList);
184    }
185
186    @Override
187    public MessageReference get(MessageId messageId) {
188        MessageReference rc = pagedInPendingDispatch.get(messageId);
189        if (rc == null) {
190            return redeliveredWaitingDispatch.get(messageId);
191        }
192        return rc;
193    }
194
195    public void setPrioritizedMessages(boolean prioritizedMessages) {
196        prioritized = prioritizedMessages;
197        if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
198            pagedInPendingDispatch = new PrioritizedPendingList();
199            redeliveredWaitingDispatch = new PrioritizedPendingList();
200        } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
201            pagedInPendingDispatch = new OrderedPendingList();
202            redeliveredWaitingDispatch = new OrderedPendingList();
203        }
204    }
205
206    public boolean hasRedeliveries(){
207        return !redeliveredWaitingDispatch.isEmpty();
208    }
209
210    public void addForRedelivery(List<MessageReference> list, boolean noConsumers) {
211        if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList && willBeInOrder(list)) {
212            // a single consumer can expect repeatable redelivery order irrespective
213            // of transaction or prefetch boundaries
214            ((OrderedPendingList)redeliveredWaitingDispatch).insertAtHead(list);
215        } else {
216            for (MessageReference ref : list) {
217                redeliveredWaitingDispatch.addMessageLast(ref);
218            }
219        }
220    }
221
222    private boolean willBeInOrder(List<MessageReference> list) {
223        // for a single consumer inserting at head will be in order w.r.t brokerSequence but
224        // will not be if there were multiple consumers in the mix even if this is the last
225        // consumer to close (noConsumers==true)
226        return !redeliveredWaitingDispatch.isEmpty() && list != null && !list.isEmpty() &&
227            redeliveredWaitingDispatch.iterator().next().getMessageId().getBrokerSequenceId() > list.get(list.size() - 1).getMessageId().getBrokerSequenceId();
228    }
229}