001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.journal; 018 019import java.io.IOException; 020import java.util.HashMap; 021import java.util.Iterator; 022 023import org.apache.activeio.journal.RecordLocation; 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.command.ActiveMQTopic; 026import org.apache.activemq.command.JournalTopicAck; 027import org.apache.activemq.command.Message; 028import org.apache.activemq.command.MessageAck; 029import org.apache.activemq.command.MessageId; 030import org.apache.activemq.command.SubscriptionInfo; 031import org.apache.activemq.store.MessageRecoveryListener; 032import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 033import org.apache.activemq.store.TopicMessageStore; 034import org.apache.activemq.transaction.Synchronization; 035import org.apache.activemq.util.Callback; 036import org.apache.activemq.util.SubscriptionKey; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A MessageStore that uses a Journal to store it's messages. 042 * 043 * 044 */ 045public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore { 046 047 private static final Logger LOG = LoggerFactory.getLogger(JournalTopicMessageStore.class); 048 049 private TopicMessageStore longTermStore; 050 private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); 051 052 public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, 053 ActiveMQTopic destinationName) { 054 super(adapter, checkpointStore, destinationName); 055 this.longTermStore = checkpointStore; 056 } 057 058 @Override 059 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) 060 throws Exception { 061 this.peristenceAdapter.checkpoint(true, true); 062 longTermStore.recoverSubscription(clientId, subscriptionName, listener); 063 } 064 065 @Override 066 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, 067 MessageRecoveryListener listener) throws Exception { 068 this.peristenceAdapter.checkpoint(true, true); 069 longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener); 070 071 } 072 073 @Override 074 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 075 return longTermStore.lookupSubscription(clientId, subscriptionName); 076 } 077 078 @Override 079 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 080 this.peristenceAdapter.checkpoint(true, true); 081 longTermStore.addSubscription(subscriptionInfo, retroactive); 082 } 083 084 @Override 085 public void addMessage(ConnectionContext context, Message message) throws IOException { 086 super.addMessage(context, message); 087 } 088 089 /** 090 */ 091 @Override 092 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 093 final MessageId messageId, MessageAck originalAck) throws IOException { 094 final boolean debug = LOG.isDebugEnabled(); 095 096 JournalTopicAck ack = new JournalTopicAck(); 097 ack.setDestination(destination); 098 ack.setMessageId(messageId); 099 ack.setMessageSequenceId(messageId.getBrokerSequenceId()); 100 ack.setSubscritionName(subscriptionName); 101 ack.setClientId(clientId); 102 ack.setTransactionId(context.getTransaction() != null 103 ? context.getTransaction().getTransactionId() : null); 104 final RecordLocation location = peristenceAdapter.writeCommand(ack, false); 105 106 final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 107 if (!context.isInTransaction()) { 108 if (debug) { 109 LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location); 110 } 111 acknowledge(messageId, location, key); 112 } else { 113 if (debug) { 114 LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); 115 } 116 synchronized (this) { 117 inFlightTxLocations.add(location); 118 } 119 transactionStore.acknowledge(this, ack, location); 120 context.getTransaction().addSynchronization(new Synchronization() { 121 @Override 122 public void afterCommit() throws Exception { 123 if (debug) { 124 LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); 125 } 126 synchronized (JournalTopicMessageStore.this) { 127 inFlightTxLocations.remove(location); 128 acknowledge(messageId, location, key); 129 } 130 } 131 132 @Override 133 public void afterRollback() throws Exception { 134 if (debug) { 135 LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); 136 } 137 synchronized (JournalTopicMessageStore.this) { 138 inFlightTxLocations.remove(location); 139 } 140 } 141 }); 142 } 143 144 } 145 146 public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, 147 MessageId messageId) { 148 try { 149 SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName); 150 if (sub != null) { 151 longTermStore.acknowledge(context, clientId, subscritionName, messageId, null); 152 } 153 } catch (Throwable e) { 154 LOG.debug("Could not replay acknowledge for message '" + messageId 155 + "'. Message may have already been acknowledged. reason: " + e); 156 } 157 } 158 159 /** 160 * @param messageId 161 * @param location 162 * @param key 163 */ 164 protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { 165 synchronized (this) { 166 lastLocation = location; 167 ackedLastAckLocations.put(key, messageId); 168 } 169 } 170 171 @Override 172 public RecordLocation checkpoint() throws IOException { 173 174 final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations; 175 176 // swap out the hash maps.. 177 synchronized (this) { 178 cpAckedLastAckLocations = this.ackedLastAckLocations; 179 this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); 180 } 181 182 return super.checkpoint(new Callback() { 183 @Override 184 public void execute() throws Exception { 185 186 // Checkpoint the acknowledged messages. 187 Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator(); 188 while (iterator.hasNext()) { 189 SubscriptionKey subscriptionKey = iterator.next(); 190 MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); 191 longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, 192 subscriptionKey.subscriptionName, identity, null); 193 } 194 195 } 196 }); 197 198 } 199 200 /** 201 * @return Returns the longTermStore. 202 */ 203 public TopicMessageStore getLongTermTopicMessageStore() { 204 return longTermStore; 205 } 206 207 @Override 208 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 209 longTermStore.deleteSubscription(clientId, subscriptionName); 210 } 211 212 @Override 213 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 214 return longTermStore.getAllSubscriptions(); 215 } 216 217 @Override 218 public int getMessageCount(String clientId, String subscriberName) throws IOException { 219 this.peristenceAdapter.checkpoint(true, true); 220 return longTermStore.getMessageCount(clientId, subscriberName); 221 } 222 223 @Override 224 public long getMessageSize(String clientId, String subscriberName) throws IOException { 225 this.peristenceAdapter.checkpoint(true, true); 226 return longTermStore.getMessageSize(clientId, subscriberName); 227 } 228 229 @Override 230 public void resetBatching(String clientId, String subscriptionName) { 231 longTermStore.resetBatching(clientId, subscriptionName); 232 } 233 234 private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); 235 236 @Override 237 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 238 return stats; 239 } 240}