001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.command.MessageAck;
029import org.apache.activemq.command.MessageId;
030import org.apache.activemq.command.SubscriptionInfo;
031import org.apache.activemq.store.MessageRecoveryListener;
032import org.apache.activemq.store.MessageStoreStatistics;
033import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
034import org.apache.activemq.store.TopicMessageStore;
035import org.apache.activemq.util.LRUCache;
036import org.apache.activemq.util.SubscriptionKey;
037
038public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
039
040    private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
041    private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
042    private final Map<MessageId, Message> originalMessageTable;
043
044    public MemoryTopicMessageStore(ActiveMQDestination destination) {
045        this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
046
047        // Set the messageStoreStatistics after the super class is initialized
048        // so that the stats can be properly updated on cache eviction
049        MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable;
050        cache.setMessageStoreStatistics(messageStoreStatistics);
051    }
052
053    public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable,
054        Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
055        super(destination, messageTable);
056        this.subscriberDatabase = subscriberDatabase;
057        this.topicSubMap = makeSubMap();
058        // this is only necessary so that messageStoreStatistics can be set if
059        // necessary We need the original reference since messageTable is wrapped
060        // in a synchronized map in the parent class
061        this.originalMessageTable = messageTable;
062    }
063
064    protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
065        return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
066    }
067
068    protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
069        return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
070    }
071
072    @Override
073    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
074        super.addMessage(context, message);
075        for (MemoryTopicSub sub : topicSubMap.values()) {
076            sub.addMessage(message.getMessageId(), message);
077        }
078    }
079
080    @Override
081    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
082        super.removeMessage(messageId);
083        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
084        MemoryTopicSub sub = topicSubMap.get(key);
085        if (sub != null) {
086            sub.removeMessage(messageId);
087        }
088    }
089
090    @Override
091    public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
092        return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
093    }
094
095    @Override
096    public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException {
097        SubscriptionKey key = new SubscriptionKey(info);
098        MemoryTopicSub sub = new MemoryTopicSub(key);
099        topicSubMap.put(key, sub);
100        if (retroactive) {
101            for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
102                sub.addMessage(entry.getKey(), entry.getValue());
103            }
104        }
105        subscriberDatabase.put(key, info);
106    }
107
108    @Override
109    public synchronized void deleteSubscription(String clientId, String subscriptionName) {
110        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
111        subscriberDatabase.remove(key);
112        MemoryTopicSub subscription = topicSubMap.get(key);
113        if (subscription != null) {
114            List<Message> storedMessages = subscription.getStoredMessages();
115            for (Message message : storedMessages) {
116                try {
117                    acknowledge(null, key.getClientId(), key.getSubscriptionName(), message.getMessageId(), null);
118                } catch (IOException e) {
119                }
120            }
121        }
122
123        subscriberDatabase.remove(key);
124        topicSubMap.remove(key);
125    }
126
127    @Override
128    public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
129        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
130        if (sub != null) {
131            sub.recoverSubscription(listener);
132        }
133    }
134
135    @Override
136    public synchronized void delete() {
137        super.delete();
138        subscriberDatabase.clear();
139        topicSubMap.clear();
140    }
141
142    @Override
143    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
144        return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
145    }
146
147    @Override
148    public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
149        int result = 0;
150        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
151        if (sub != null) {
152            result = sub.size();
153        }
154        return result;
155    }
156
157    @Override
158    public synchronized long getMessageSize(String clientId, String subscriberName) throws IOException {
159        long result = 0;
160        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
161        if (sub != null) {
162            result = sub.messageSize();
163        }
164        return result;
165    }
166
167    @Override
168    public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
169        MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
170        if (sub != null) {
171            sub.recoverNextMessages(maxReturned, listener);
172        }
173    }
174
175    @Override
176    public void resetBatching(String clientId, String subscriptionName) {
177        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
178        if (sub != null) {
179            sub.resetBatching();
180        }
181    }
182
183    // Disabled for the memory store, can be enabled later if necessary
184    private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
185
186    @Override
187    public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
188        return stats;
189    }
190
191    /**
192     * Since we initialize the store with a LRUCache in some cases, we need to
193     * account for cache evictions when computing the message store statistics.
194     *
195     */
196    private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> {
197        private static final long serialVersionUID = -342098639681884413L;
198        private MessageStoreStatistics messageStoreStatistics;
199
200        public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize, float loadFactor, boolean accessOrder) {
201            super(initialCapacity, maximumCacheSize, loadFactor, accessOrder);
202        }
203
204        public void setMessageStoreStatistics(MessageStoreStatistics messageStoreStatistics) {
205            this.messageStoreStatistics = messageStoreStatistics;
206        }
207
208        @Override
209        protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
210            decMessageStoreStatistics(messageStoreStatistics, eldest.getValue());
211
212            // We aren't tracking this anymore so remove our reference to it.
213            eldest.getValue().decrementReferenceCount();
214        }
215    }
216}