001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.journal; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.LinkedHashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028 029import org.apache.activeio.journal.RecordLocation; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.JournalQueueAck; 033import org.apache.activemq.command.Message; 034import org.apache.activemq.command.MessageAck; 035import org.apache.activemq.command.MessageId; 036import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 037import org.apache.activemq.store.IndexListener; 038import org.apache.activemq.store.MessageRecoveryListener; 039import org.apache.activemq.store.MessageStore; 040import org.apache.activemq.store.PersistenceAdapter; 041import org.apache.activemq.store.AbstractMessageStore; 042import org.apache.activemq.transaction.Synchronization; 043import org.apache.activemq.usage.MemoryUsage; 044import org.apache.activemq.util.Callback; 045import org.apache.activemq.util.TransactionTemplate; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A MessageStore that uses a Journal to store it's messages. 051 * 052 * 053 */ 054public class JournalMessageStore extends AbstractMessageStore { 055 056 private static final Logger LOG = LoggerFactory.getLogger(JournalMessageStore.class); 057 058 protected final JournalPersistenceAdapter peristenceAdapter; 059 protected final JournalTransactionStore transactionStore; 060 protected final MessageStore longTermStore; 061 protected final TransactionTemplate transactionTemplate; 062 protected RecordLocation lastLocation; 063 protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>(); 064 065 private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>(); 066 private List<MessageAck> messageAcks = new ArrayList<MessageAck>(); 067 068 /** A MessageStore that we can use to retrieve messages quickly. */ 069 private Map<MessageId, Message> cpAddedMessageIds; 070 071 072 private MemoryUsage memoryUsage; 073 074 public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) { 075 super(destination); 076 this.peristenceAdapter = adapter; 077 this.transactionStore = adapter.getTransactionStore(); 078 this.longTermStore = checkpointStore; 079 this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext())); 080 } 081 082 083 public void setMemoryUsage(MemoryUsage memoryUsage) { 084 this.memoryUsage=memoryUsage; 085 longTermStore.setMemoryUsage(memoryUsage); 086 } 087 088 /** 089 * Not synchronized since the Journal has better throughput if you increase 090 * the number of concurrent writes that it is doing. 091 */ 092 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 093 094 final MessageId id = message.getMessageId(); 095 096 final boolean debug = LOG.isDebugEnabled(); 097 message.incrementReferenceCount(); 098 099 final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); 100 if (!context.isInTransaction()) { 101 if (debug) { 102 LOG.debug("Journalled message add for: " + id + ", at: " + location); 103 } 104 addMessage(context, message, location); 105 } else { 106 if (debug) { 107 LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); 108 } 109 synchronized (this) { 110 inFlightTxLocations.add(location); 111 } 112 transactionStore.addMessage(this, message, location); 113 context.getTransaction().addSynchronization(new Synchronization() { 114 public void afterCommit() throws Exception { 115 if (debug) { 116 LOG.debug("Transacted message add commit for: " + id + ", at: " + location); 117 } 118 synchronized (JournalMessageStore.this) { 119 inFlightTxLocations.remove(location); 120 addMessage(context, message, location); 121 } 122 } 123 124 public void afterRollback() throws Exception { 125 if (debug) { 126 LOG.debug("Transacted message add rollback for: " + id + ", at: " + location); 127 } 128 synchronized (JournalMessageStore.this) { 129 inFlightTxLocations.remove(location); 130 } 131 message.decrementReferenceCount(); 132 } 133 }); 134 } 135 } 136 137 void addMessage(ConnectionContext context, final Message message, final RecordLocation location) { 138 synchronized (this) { 139 lastLocation = location; 140 MessageId id = message.getMessageId(); 141 messages.put(id, message); 142 message.getMessageId().setFutureOrSequenceLong(0l); 143 if (indexListener != null) { 144 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 145 } 146 } 147 } 148 149 public void replayAddMessage(ConnectionContext context, Message message) { 150 try { 151 // Only add the message if it has not already been added. 152 Message t = longTermStore.getMessage(message.getMessageId()); 153 if (t == null) { 154 longTermStore.addMessage(context, message); 155 } 156 } catch (Throwable e) { 157 LOG.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e); 158 } 159 } 160 161 /** 162 */ 163 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 164 final boolean debug = LOG.isDebugEnabled(); 165 JournalQueueAck remove = new JournalQueueAck(); 166 remove.setDestination(destination); 167 remove.setMessageAck(ack); 168 169 final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); 170 if (!context.isInTransaction()) { 171 if (debug) { 172 LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location); 173 } 174 removeMessage(ack, location); 175 } else { 176 if (debug) { 177 LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); 178 } 179 synchronized (this) { 180 inFlightTxLocations.add(location); 181 } 182 transactionStore.removeMessage(this, ack, location); 183 context.getTransaction().addSynchronization(new Synchronization() { 184 public void afterCommit() throws Exception { 185 if (debug) { 186 LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location); 187 } 188 synchronized (JournalMessageStore.this) { 189 inFlightTxLocations.remove(location); 190 removeMessage(ack, location); 191 } 192 } 193 194 public void afterRollback() throws Exception { 195 if (debug) { 196 LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location); 197 } 198 synchronized (JournalMessageStore.this) { 199 inFlightTxLocations.remove(location); 200 } 201 } 202 }); 203 204 } 205 } 206 207 final void removeMessage(final MessageAck ack, final RecordLocation location) { 208 synchronized (this) { 209 lastLocation = location; 210 MessageId id = ack.getLastMessageId(); 211 Message message = messages.remove(id); 212 if (message == null) { 213 messageAcks.add(ack); 214 } else { 215 message.decrementReferenceCount(); 216 } 217 } 218 } 219 220 public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { 221 try { 222 // Only remove the message if it has not already been removed. 223 Message t = longTermStore.getMessage(messageAck.getLastMessageId()); 224 if (t != null) { 225 longTermStore.removeMessage(context, messageAck); 226 } 227 } catch (Throwable e) { 228 LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); 229 } 230 } 231 232 /** 233 * @return 234 * @throws IOException 235 */ 236 public RecordLocation checkpoint() throws IOException { 237 return checkpoint(null); 238 } 239 240 /** 241 * @return 242 * @throws IOException 243 */ 244 @SuppressWarnings("unchecked") 245 public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException { 246 247 final List<MessageAck> cpRemovedMessageLocations; 248 final List<RecordLocation> cpActiveJournalLocations; 249 final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); 250 251 // swap out the message hash maps.. 252 synchronized (this) { 253 cpAddedMessageIds = this.messages; 254 cpRemovedMessageLocations = this.messageAcks; 255 256 cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations); 257 258 this.messages = new LinkedHashMap<MessageId, Message>(); 259 this.messageAcks = new ArrayList<MessageAck>(); 260 } 261 262 transactionTemplate.run(new Callback() { 263 public void execute() throws Exception { 264 265 int size = 0; 266 267 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter(); 268 ConnectionContext context = transactionTemplate.getContext(); 269 270 // Checkpoint the added messages. 271 synchronized (JournalMessageStore.this) { 272 Iterator<Message> iterator = cpAddedMessageIds.values().iterator(); 273 while (iterator.hasNext()) { 274 Message message = iterator.next(); 275 try { 276 longTermStore.addMessage(context, message); 277 } catch (Throwable e) { 278 LOG.warn("Message could not be added to long term store: " + e.getMessage(), e); 279 } 280 size += message.getSize(); 281 message.decrementReferenceCount(); 282 // Commit the batch if it's getting too big 283 if (size >= maxCheckpointMessageAddSize) { 284 persitanceAdapter.commitTransaction(context); 285 persitanceAdapter.beginTransaction(context); 286 size = 0; 287 } 288 } 289 } 290 291 persitanceAdapter.commitTransaction(context); 292 persitanceAdapter.beginTransaction(context); 293 294 // Checkpoint the removed messages. 295 Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator(); 296 while (iterator.hasNext()) { 297 try { 298 MessageAck ack = iterator.next(); 299 longTermStore.removeMessage(transactionTemplate.getContext(), ack); 300 } catch (Throwable e) { 301 LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e); 302 } 303 } 304 305 if (postCheckpointTest != null) { 306 postCheckpointTest.execute(); 307 } 308 } 309 310 }); 311 312 synchronized (this) { 313 cpAddedMessageIds = null; 314 } 315 316 if (cpActiveJournalLocations.size() > 0) { 317 Collections.sort(cpActiveJournalLocations); 318 return cpActiveJournalLocations.get(0); 319 } 320 synchronized (this) { 321 return lastLocation; 322 } 323 } 324 325 /** 326 * 327 */ 328 public Message getMessage(MessageId identity) throws IOException { 329 Message answer = null; 330 331 synchronized (this) { 332 // Do we have a still have it in the journal? 333 answer = messages.get(identity); 334 if (answer == null && cpAddedMessageIds != null) { 335 answer = cpAddedMessageIds.get(identity); 336 } 337 } 338 339 if (answer != null) { 340 return answer; 341 } 342 343 // If all else fails try the long term message store. 344 return longTermStore.getMessage(identity); 345 } 346 347 /** 348 * Replays the checkpointStore first as those messages are the oldest ones, 349 * then messages are replayed from the transaction log and then the cache is 350 * updated. 351 * 352 * @param listener 353 * @throws Exception 354 */ 355 public void recover(final MessageRecoveryListener listener) throws Exception { 356 peristenceAdapter.checkpoint(true, true); 357 longTermStore.recover(listener); 358 } 359 360 public void start() throws Exception { 361 if (this.memoryUsage != null) { 362 this.memoryUsage.addUsageListener(peristenceAdapter); 363 } 364 longTermStore.start(); 365 } 366 367 public void stop() throws Exception { 368 longTermStore.stop(); 369 if (this.memoryUsage != null) { 370 this.memoryUsage.removeUsageListener(peristenceAdapter); 371 } 372 } 373 374 /** 375 * @return Returns the longTermStore. 376 */ 377 public MessageStore getLongTermMessageStore() { 378 return longTermStore; 379 } 380 381 /** 382 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 383 */ 384 public void removeAllMessages(ConnectionContext context) throws IOException { 385 peristenceAdapter.checkpoint(true, true); 386 longTermStore.removeAllMessages(context); 387 } 388 389 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 390 throw new IOException("The journal does not support message references."); 391 } 392 393 public String getMessageReference(MessageId identity) throws IOException { 394 throw new IOException("The journal does not support message references."); 395 } 396 397 /** 398 * @return 399 * @throws IOException 400 * @see org.apache.activemq.store.MessageStore#getMessageCount() 401 */ 402 public int getMessageCount() throws IOException { 403 peristenceAdapter.checkpoint(true, true); 404 return longTermStore.getMessageCount(); 405 } 406 407 public long getMessageSize() throws IOException { 408 peristenceAdapter.checkpoint(true, true); 409 return longTermStore.getMessageSize(); 410 } 411 412 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 413 peristenceAdapter.checkpoint(true, true); 414 longTermStore.recoverNextMessages(maxReturned, listener); 415 416 } 417 418 public void resetBatching() { 419 longTermStore.resetBatching(); 420 421 } 422 423 @Override 424 public void setBatch(MessageId messageId) throws Exception { 425 peristenceAdapter.checkpoint(true, true); 426 longTermStore.setBatch(messageId); 427 } 428 429}