001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.atomic.AtomicReference; 043 044import org.apache.activemq.broker.ConnectionContext; 045import org.apache.activemq.broker.region.BaseDestination; 046import org.apache.activemq.broker.scheduler.JobSchedulerStore; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.ActiveMQQueue; 049import org.apache.activemq.command.ActiveMQTempQueue; 050import org.apache.activemq.command.ActiveMQTempTopic; 051import org.apache.activemq.command.ActiveMQTopic; 052import org.apache.activemq.command.Message; 053import org.apache.activemq.command.MessageAck; 054import org.apache.activemq.command.MessageId; 055import org.apache.activemq.command.ProducerId; 056import org.apache.activemq.command.SubscriptionInfo; 057import org.apache.activemq.command.TransactionId; 058import org.apache.activemq.openwire.OpenWireFormat; 059import org.apache.activemq.protobuf.Buffer; 060import org.apache.activemq.store.AbstractMessageStore; 061import org.apache.activemq.store.IndexListener; 062import org.apache.activemq.store.ListenableFuture; 063import org.apache.activemq.store.MessageRecoveryListener; 064import org.apache.activemq.store.MessageStore; 065import org.apache.activemq.store.MessageStoreStatistics; 066import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 067import org.apache.activemq.store.NoLocalSubscriptionAware; 068import org.apache.activemq.store.PersistenceAdapter; 069import org.apache.activemq.store.TopicMessageStore; 070import org.apache.activemq.store.TransactionIdTransformer; 071import org.apache.activemq.store.TransactionStore; 072import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 073import org.apache.activemq.store.kahadb.data.KahaDestination; 074import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 075import org.apache.activemq.store.kahadb.data.KahaLocation; 076import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 077import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 078import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 079import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 080import org.apache.activemq.store.kahadb.disk.journal.Location; 081import org.apache.activemq.store.kahadb.disk.page.Transaction; 082import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 083import org.apache.activemq.usage.MemoryUsage; 084import org.apache.activemq.usage.SystemUsage; 085import org.apache.activemq.util.ServiceStopper; 086import org.apache.activemq.util.ThreadPoolUtils; 087import org.apache.activemq.wireformat.WireFormat; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware { 092 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 093 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 094 095 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 096 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 097 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 098 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 099 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 100 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 101 102 protected ExecutorService queueExecutor; 103 protected ExecutorService topicExecutor; 104 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 105 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 106 final WireFormat wireFormat = new OpenWireFormat(); 107 private SystemUsage usageManager; 108 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 109 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 110 Semaphore globalQueueSemaphore; 111 Semaphore globalTopicSemaphore; 112 private boolean concurrentStoreAndDispatchQueues = true; 113 // when true, message order may be compromised when cache is exhausted if store is out 114 // or order w.r.t cache 115 private boolean concurrentStoreAndDispatchTopics = false; 116 private final boolean concurrentStoreAndDispatchTransactions = false; 117 private int maxAsyncJobs = MAX_ASYNC_JOBS; 118 private final KahaDBTransactionStore transactionStore; 119 private TransactionIdTransformer transactionIdTransformer; 120 121 public KahaDBStore() { 122 this.transactionStore = new KahaDBTransactionStore(this); 123 this.transactionIdTransformer = new TransactionIdTransformer() { 124 @Override 125 public TransactionId transform(TransactionId txid) { 126 return txid; 127 } 128 }; 129 } 130 131 @Override 132 public String toString() { 133 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 134 } 135 136 @Override 137 public void setBrokerName(String brokerName) { 138 } 139 140 @Override 141 public void setUsageManager(SystemUsage usageManager) { 142 this.usageManager = usageManager; 143 } 144 145 public SystemUsage getUsageManager() { 146 return this.usageManager; 147 } 148 149 /** 150 * @return the concurrentStoreAndDispatch 151 */ 152 public boolean isConcurrentStoreAndDispatchQueues() { 153 return this.concurrentStoreAndDispatchQueues; 154 } 155 156 /** 157 * @param concurrentStoreAndDispatch 158 * the concurrentStoreAndDispatch to set 159 */ 160 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 161 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 162 } 163 164 /** 165 * @return the concurrentStoreAndDispatch 166 */ 167 public boolean isConcurrentStoreAndDispatchTopics() { 168 return this.concurrentStoreAndDispatchTopics; 169 } 170 171 /** 172 * @param concurrentStoreAndDispatch 173 * the concurrentStoreAndDispatch to set 174 */ 175 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 176 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 177 } 178 179 public boolean isConcurrentStoreAndDispatchTransactions() { 180 return this.concurrentStoreAndDispatchTransactions; 181 } 182 183 /** 184 * @return the maxAsyncJobs 185 */ 186 public int getMaxAsyncJobs() { 187 return this.maxAsyncJobs; 188 } 189 190 /** 191 * @param maxAsyncJobs 192 * the maxAsyncJobs to set 193 */ 194 public void setMaxAsyncJobs(int maxAsyncJobs) { 195 this.maxAsyncJobs = maxAsyncJobs; 196 } 197 198 199 @Override 200 protected void configureMetadata() { 201 if (brokerService != null) { 202 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 203 wireFormat.setVersion(metadata.openwireVersion); 204 205 if (LOG.isDebugEnabled()) { 206 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 207 } 208 209 } 210 } 211 212 @Override 213 public void doStart() throws Exception { 214 //configure the metadata before start, right now 215 //this is just the open wire version 216 configureMetadata(); 217 218 super.doStart(); 219 220 if (brokerService != null) { 221 // In case the recovered store used a different OpenWire version log a warning 222 // to assist in determining why journal reads fail. 223 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 224 LOG.warn("Existing Store uses a different OpenWire version[{}] " + 225 "than the version configured[{}] reverting to the version " + 226 "used by this store, some newer broker features may not work" + 227 "as expected.", 228 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 229 230 // Update the broker service instance to the actual version in use. 231 wireFormat.setVersion(metadata.openwireVersion); 232 brokerService.setStoreOpenWireVersion(metadata.openwireVersion); 233 } 234 } 235 236 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 237 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 238 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 239 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 240 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 241 asyncQueueJobQueue, new ThreadFactory() { 242 @Override 243 public Thread newThread(Runnable runnable) { 244 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 245 thread.setDaemon(true); 246 return thread; 247 } 248 }); 249 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 250 asyncTopicJobQueue, new ThreadFactory() { 251 @Override 252 public Thread newThread(Runnable runnable) { 253 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 254 thread.setDaemon(true); 255 return thread; 256 } 257 }); 258 } 259 260 @Override 261 public void doStop(ServiceStopper stopper) throws Exception { 262 // drain down async jobs 263 LOG.info("Stopping async queue tasks"); 264 if (this.globalQueueSemaphore != null) { 265 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 266 } 267 synchronized (this.asyncQueueMaps) { 268 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 269 synchronized (m) { 270 for (StoreTask task : m.values()) { 271 task.cancel(); 272 } 273 } 274 } 275 this.asyncQueueMaps.clear(); 276 } 277 LOG.info("Stopping async topic tasks"); 278 if (this.globalTopicSemaphore != null) { 279 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 280 } 281 synchronized (this.asyncTopicMaps) { 282 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 283 synchronized (m) { 284 for (StoreTask task : m.values()) { 285 task.cancel(); 286 } 287 } 288 } 289 this.asyncTopicMaps.clear(); 290 } 291 if (this.globalQueueSemaphore != null) { 292 this.globalQueueSemaphore.drainPermits(); 293 } 294 if (this.globalTopicSemaphore != null) { 295 this.globalTopicSemaphore.drainPermits(); 296 } 297 if (this.queueExecutor != null) { 298 ThreadPoolUtils.shutdownNow(queueExecutor); 299 queueExecutor = null; 300 } 301 if (this.topicExecutor != null) { 302 ThreadPoolUtils.shutdownNow(topicExecutor); 303 topicExecutor = null; 304 } 305 LOG.info("Stopped KahaDB"); 306 super.doStop(stopper); 307 } 308 309 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 310 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 311 @Override 312 public Location execute(Transaction tx) throws IOException { 313 StoredDestination sd = getStoredDestination(destination, tx); 314 Long sequence = sd.messageIdIndex.get(tx, key); 315 if (sequence == null) { 316 return null; 317 } 318 return sd.orderIndex.get(tx, sequence).location; 319 } 320 }); 321 } 322 323 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 324 StoreQueueTask task = null; 325 synchronized (store.asyncTaskMap) { 326 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 327 } 328 return task; 329 } 330 331 // with asyncTaskMap locked 332 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 333 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 334 this.queueExecutor.execute(task); 335 } 336 337 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 338 StoreTopicTask task = null; 339 synchronized (store.asyncTaskMap) { 340 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 341 } 342 return task; 343 } 344 345 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 346 synchronized (store.asyncTaskMap) { 347 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 348 } 349 this.topicExecutor.execute(task); 350 } 351 352 @Override 353 public TransactionStore createTransactionStore() throws IOException { 354 return this.transactionStore; 355 } 356 357 public boolean getForceRecoverIndex() { 358 return this.forceRecoverIndex; 359 } 360 361 public void setForceRecoverIndex(boolean forceRecoverIndex) { 362 this.forceRecoverIndex = forceRecoverIndex; 363 } 364 365 public class KahaDBMessageStore extends AbstractMessageStore { 366 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 367 protected KahaDestination dest; 368 private final int maxAsyncJobs; 369 private final Semaphore localDestinationSemaphore; 370 371 double doneTasks, canceledTasks = 0; 372 373 public KahaDBMessageStore(ActiveMQDestination destination) { 374 super(destination); 375 this.dest = convert(destination); 376 this.maxAsyncJobs = getMaxAsyncJobs(); 377 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 378 } 379 380 @Override 381 public ActiveMQDestination getDestination() { 382 return destination; 383 } 384 385 @Override 386 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 387 throws IOException { 388 if (isConcurrentStoreAndDispatchQueues()) { 389 message.beforeMarshall(wireFormat); 390 StoreQueueTask result = new StoreQueueTask(this, context, message); 391 ListenableFuture<Object> future = result.getFuture(); 392 message.getMessageId().setFutureOrSequenceLong(future); 393 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 394 result.aquireLocks(); 395 synchronized (asyncTaskMap) { 396 addQueueTask(this, result); 397 if (indexListener != null) { 398 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 399 } 400 } 401 return future; 402 } else { 403 return super.asyncAddQueueMessage(context, message); 404 } 405 } 406 407 @Override 408 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 409 if (isConcurrentStoreAndDispatchQueues()) { 410 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 411 StoreQueueTask task = null; 412 synchronized (asyncTaskMap) { 413 task = (StoreQueueTask) asyncTaskMap.get(key); 414 } 415 if (task != null) { 416 if (ack.isInTransaction() || !task.cancel()) { 417 try { 418 task.future.get(); 419 } catch (InterruptedException e) { 420 throw new InterruptedIOException(e.toString()); 421 } catch (Exception ignored) { 422 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 423 } 424 removeMessage(context, ack); 425 } else { 426 indexLock.writeLock().lock(); 427 try { 428 metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); 429 } finally { 430 indexLock.writeLock().unlock(); 431 } 432 synchronized (asyncTaskMap) { 433 asyncTaskMap.remove(key); 434 } 435 } 436 } else { 437 removeMessage(context, ack); 438 } 439 } else { 440 removeMessage(context, ack); 441 } 442 } 443 444 @Override 445 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 446 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 447 command.setDestination(dest); 448 command.setMessageId(message.getMessageId().toProducerKey()); 449 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 450 command.setPriority(message.getPriority()); 451 command.setPrioritySupported(isPrioritizedMessages()); 452 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 453 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 454 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 455 // sync add? (for async, future present from getFutureOrSequenceLong) 456 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 457 458 @Override 459 public void sequenceAssignedWithIndexLocked(final long sequence) { 460 message.getMessageId().setFutureOrSequenceLong(sequence); 461 if (indexListener != null) { 462 if (possibleFuture == null) { 463 trackPendingAdd(dest, sequence); 464 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 465 @Override 466 public void run() { 467 trackPendingAddComplete(dest, sequence); 468 } 469 })); 470 } 471 } 472 } 473 }, null); 474 } 475 476 @Override 477 public void updateMessage(Message message) throws IOException { 478 if (LOG.isTraceEnabled()) { 479 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 480 } 481 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 482 KahaAddMessageCommand command = new KahaAddMessageCommand(); 483 command.setDestination(dest); 484 command.setMessageId(message.getMessageId().toProducerKey()); 485 command.setPriority(message.getPriority()); 486 command.setPrioritySupported(prioritizedMessages); 487 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 488 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 489 updateMessageCommand.setMessage(command); 490 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 491 } 492 493 @Override 494 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 495 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 496 command.setDestination(dest); 497 command.setMessageId(ack.getLastMessageId().toProducerKey()); 498 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 499 500 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 501 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 502 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 503 } 504 505 @Override 506 public void removeAllMessages(ConnectionContext context) throws IOException { 507 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 508 command.setDestination(dest); 509 store(command, true, null, null); 510 } 511 512 @Override 513 public Message getMessage(MessageId identity) throws IOException { 514 final String key = identity.toProducerKey(); 515 516 // Hopefully one day the page file supports concurrent read 517 // operations... but for now we must 518 // externally synchronize... 519 Location location; 520 indexLock.writeLock().lock(); 521 try { 522 location = findMessageLocation(key, dest); 523 } finally { 524 indexLock.writeLock().unlock(); 525 } 526 if (location == null) { 527 return null; 528 } 529 530 return loadMessage(location); 531 } 532 533 @Override 534 public boolean isEmpty() throws IOException { 535 indexLock.writeLock().lock(); 536 try { 537 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 538 @Override 539 public Boolean execute(Transaction tx) throws IOException { 540 // Iterate through all index entries to get a count of 541 // messages in the destination. 542 StoredDestination sd = getStoredDestination(dest, tx); 543 return sd.locationIndex.isEmpty(tx); 544 } 545 }); 546 } finally { 547 indexLock.writeLock().unlock(); 548 } 549 } 550 551 @Override 552 public void recover(final MessageRecoveryListener listener) throws Exception { 553 // recovery may involve expiry which will modify 554 indexLock.writeLock().lock(); 555 try { 556 pageFile.tx().execute(new Transaction.Closure<Exception>() { 557 @Override 558 public void execute(Transaction tx) throws Exception { 559 StoredDestination sd = getStoredDestination(dest, tx); 560 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 561 sd.orderIndex.resetCursorPosition(); 562 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 563 .hasNext(); ) { 564 Entry<Long, MessageKeys> entry = iterator.next(); 565 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 566 continue; 567 } 568 Message msg = loadMessage(entry.getValue().location); 569 listener.recoverMessage(msg); 570 } 571 } 572 }); 573 } finally { 574 indexLock.writeLock().unlock(); 575 } 576 } 577 578 @Override 579 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 580 indexLock.writeLock().lock(); 581 try { 582 pageFile.tx().execute(new Transaction.Closure<Exception>() { 583 @Override 584 public void execute(Transaction tx) throws Exception { 585 StoredDestination sd = getStoredDestination(dest, tx); 586 Entry<Long, MessageKeys> entry = null; 587 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 588 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 589 entry = iterator.next(); 590 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 591 continue; 592 } 593 Message msg = loadMessage(entry.getValue().location); 594 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 595 listener.recoverMessage(msg); 596 counter++; 597 if (counter >= maxReturned) { 598 break; 599 } 600 } 601 sd.orderIndex.stoppedIterating(); 602 } 603 }); 604 } finally { 605 indexLock.writeLock().unlock(); 606 } 607 } 608 609 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 610 int counter = 0; 611 String id; 612 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 613 id = iterator.next(); 614 iterator.remove(); 615 Long sequence = sd.messageIdIndex.get(tx, id); 616 if (sequence != null) { 617 if (sd.orderIndex.alreadyDispatched(sequence)) { 618 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 619 counter++; 620 if (counter >= maxReturned) { 621 break; 622 } 623 } else { 624 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 625 } 626 } else { 627 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 628 } 629 } 630 return counter; 631 } 632 633 634 @Override 635 public void resetBatching() { 636 if (pageFile.isLoaded()) { 637 indexLock.writeLock().lock(); 638 try { 639 pageFile.tx().execute(new Transaction.Closure<Exception>() { 640 @Override 641 public void execute(Transaction tx) throws Exception { 642 StoredDestination sd = getExistingStoredDestination(dest, tx); 643 if (sd != null) { 644 sd.orderIndex.resetCursorPosition();} 645 } 646 }); 647 } catch (Exception e) { 648 LOG.error("Failed to reset batching",e); 649 } finally { 650 indexLock.writeLock().unlock(); 651 } 652 } 653 } 654 655 @Override 656 public void setBatch(final MessageId identity) throws IOException { 657 indexLock.writeLock().lock(); 658 try { 659 pageFile.tx().execute(new Transaction.Closure<IOException>() { 660 @Override 661 public void execute(Transaction tx) throws IOException { 662 StoredDestination sd = getStoredDestination(dest, tx); 663 Long location = (Long) identity.getFutureOrSequenceLong(); 664 Long pending = sd.orderIndex.minPendingAdd(); 665 if (pending != null) { 666 location = Math.min(location, pending-1); 667 } 668 sd.orderIndex.setBatch(tx, location); 669 } 670 }); 671 } finally { 672 indexLock.writeLock().unlock(); 673 } 674 } 675 676 @Override 677 public void setMemoryUsage(MemoryUsage memoryUsage) { 678 } 679 @Override 680 public void start() throws Exception { 681 super.start(); 682 } 683 @Override 684 public void stop() throws Exception { 685 super.stop(); 686 } 687 688 protected void lockAsyncJobQueue() { 689 try { 690 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 691 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 692 } 693 } catch (Exception e) { 694 LOG.error("Failed to lock async jobs for " + this.destination, e); 695 } 696 } 697 698 protected void unlockAsyncJobQueue() { 699 this.localDestinationSemaphore.release(this.maxAsyncJobs); 700 } 701 702 protected void acquireLocalAsyncLock() { 703 try { 704 this.localDestinationSemaphore.acquire(); 705 } catch (InterruptedException e) { 706 LOG.error("Failed to aquire async lock for " + this.destination, e); 707 } 708 } 709 710 protected void releaseLocalAsyncLock() { 711 this.localDestinationSemaphore.release(); 712 } 713 714 @Override 715 public String toString(){ 716 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 717 } 718 719 @Override 720 protected void recoverMessageStoreStatistics() throws IOException { 721 try { 722 MessageStoreStatistics recoveredStatistics; 723 lockAsyncJobQueue(); 724 indexLock.writeLock().lock(); 725 try { 726 recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() { 727 @Override 728 public MessageStoreStatistics execute(Transaction tx) throws IOException { 729 MessageStoreStatistics statistics = new MessageStoreStatistics(); 730 731 // Iterate through all index entries to get the size of each message 732 StoredDestination sd = getStoredDestination(dest, tx); 733 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 734 int locationSize = iterator.next().getKey().getSize(); 735 statistics.getMessageCount().increment(); 736 statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); 737 } 738 return statistics; 739 } 740 }); 741 getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); 742 getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); 743 } finally { 744 indexLock.writeLock().unlock(); 745 } 746 } finally { 747 unlockAsyncJobQueue(); 748 } 749 } 750 } 751 752 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 753 private final AtomicInteger subscriptionCount = new AtomicInteger(); 754 protected final MessageStoreSubscriptionStatistics messageStoreSubStats = 755 new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics()); 756 757 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 758 super(destination); 759 this.subscriptionCount.set(getAllSubscriptions().length); 760 if (isConcurrentStoreAndDispatchTopics()) { 761 asyncTopicMaps.add(asyncTaskMap); 762 } 763 } 764 765 @Override 766 protected void recoverMessageStoreStatistics() throws IOException { 767 super.recoverMessageStoreStatistics(); 768 this.recoverMessageStoreSubMetrics(); 769 } 770 771 @Override 772 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 773 throws IOException { 774 if (isConcurrentStoreAndDispatchTopics()) { 775 message.beforeMarshall(wireFormat); 776 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 777 result.aquireLocks(); 778 addTopicTask(this, result); 779 return result.getFuture(); 780 } else { 781 return super.asyncAddTopicMessage(context, message); 782 } 783 } 784 785 @Override 786 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 787 MessageId messageId, MessageAck ack) throws IOException { 788 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 789 if (isConcurrentStoreAndDispatchTopics()) { 790 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 791 StoreTopicTask task = null; 792 synchronized (asyncTaskMap) { 793 task = (StoreTopicTask) asyncTaskMap.get(key); 794 } 795 if (task != null) { 796 if (task.addSubscriptionKey(subscriptionKey)) { 797 removeTopicTask(this, messageId); 798 if (task.cancel()) { 799 synchronized (asyncTaskMap) { 800 asyncTaskMap.remove(key); 801 } 802 } 803 } 804 } else { 805 doAcknowledge(context, subscriptionKey, messageId, ack); 806 } 807 } else { 808 doAcknowledge(context, subscriptionKey, messageId, ack); 809 } 810 } 811 812 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 813 throws IOException { 814 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 815 command.setDestination(dest); 816 command.setSubscriptionKey(subscriptionKey); 817 command.setMessageId(messageId.toProducerKey()); 818 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 819 if (ack != null && ack.isUnmatchedAck()) { 820 command.setAck(UNMATCHED); 821 } else { 822 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 823 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 824 } 825 store(command, false, null, null); 826 } 827 828 @Override 829 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 830 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 831 .getSubscriptionName()); 832 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 833 command.setDestination(dest); 834 command.setSubscriptionKey(subscriptionKey.toString()); 835 command.setRetroactive(retroactive); 836 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 837 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 838 store(command, isEnableJournalDiskSyncs() && true, null, null); 839 this.subscriptionCount.incrementAndGet(); 840 } 841 842 @Override 843 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 844 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 845 command.setDestination(dest); 846 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 847 store(command, isEnableJournalDiskSyncs() && true, null, null); 848 this.subscriptionCount.decrementAndGet(); 849 } 850 851 @Override 852 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 853 854 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 855 indexLock.writeLock().lock(); 856 try { 857 pageFile.tx().execute(new Transaction.Closure<IOException>() { 858 @Override 859 public void execute(Transaction tx) throws IOException { 860 StoredDestination sd = getStoredDestination(dest, tx); 861 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 862 .hasNext();) { 863 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 864 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 865 .getValue().getSubscriptionInfo().newInput())); 866 subscriptions.add(info); 867 868 } 869 } 870 }); 871 } finally { 872 indexLock.writeLock().unlock(); 873 } 874 875 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 876 subscriptions.toArray(rc); 877 return rc; 878 } 879 880 @Override 881 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 882 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 883 indexLock.writeLock().lock(); 884 try { 885 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 886 @Override 887 public SubscriptionInfo execute(Transaction tx) throws IOException { 888 StoredDestination sd = getStoredDestination(dest, tx); 889 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 890 if (command == null) { 891 return null; 892 } 893 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 894 .getSubscriptionInfo().newInput())); 895 } 896 }); 897 } finally { 898 indexLock.writeLock().unlock(); 899 } 900 } 901 902 @Override 903 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 904 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 905 906 if (isEnableSubscriptionStatistics()) { 907 return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount(); 908 } else { 909 910 indexLock.writeLock().lock(); 911 try { 912 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 913 @Override 914 public Integer execute(Transaction tx) throws IOException { 915 StoredDestination sd = getStoredDestination(dest, tx); 916 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 917 if (cursorPos == null) { 918 // The subscription might not exist. 919 return 0; 920 } 921 922 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 923 } 924 }); 925 } finally { 926 indexLock.writeLock().unlock(); 927 } 928 } 929 } 930 931 932 @Override 933 public long getMessageSize(String clientId, String subscriptionName) throws IOException { 934 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 935 if (isEnableSubscriptionStatistics()) { 936 return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize(); 937 } else { 938 indexLock.writeLock().lock(); 939 try { 940 return pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() { 941 @Override 942 public Long execute(Transaction tx) throws IOException { 943 StoredDestination sd = getStoredDestination(dest, tx); 944 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 945 if (cursorPos == null) { 946 // The subscription might not exist. 947 return 0l; 948 } 949 950 return getStoredMessageSize(tx, sd, subscriptionKey); 951 } 952 }); 953 } finally { 954 indexLock.writeLock().unlock(); 955 } 956 } 957 } 958 959 protected void recoverMessageStoreSubMetrics() throws IOException { 960 if (isEnableSubscriptionStatistics()) { 961 962 final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); 963 indexLock.writeLock().lock(); 964 try { 965 pageFile.tx().execute(new Transaction.Closure<IOException>() { 966 @Override 967 public void execute(Transaction tx) throws IOException { 968 StoredDestination sd = getStoredDestination(dest, tx); 969 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions 970 .iterator(tx); iterator.hasNext();) { 971 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 972 973 String subscriptionKey = entry.getKey(); 974 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 975 if (cursorPos != null) { 976 long size = getStoredMessageSize(tx, sd, subscriptionKey); 977 statistics.getMessageCount(subscriptionKey) 978 .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); 979 statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0); 980 } 981 } 982 } 983 }); 984 } finally { 985 indexLock.writeLock().unlock(); 986 } 987 } 988 } 989 990 @Override 991 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 992 throws Exception { 993 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 994 @SuppressWarnings("unused") 995 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 996 indexLock.writeLock().lock(); 997 try { 998 pageFile.tx().execute(new Transaction.Closure<Exception>() { 999 @Override 1000 public void execute(Transaction tx) throws Exception { 1001 StoredDestination sd = getStoredDestination(dest, tx); 1002 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 1003 sd.orderIndex.setBatch(tx, cursorPos); 1004 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 1005 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 1006 .hasNext();) { 1007 Entry<Long, MessageKeys> entry = iterator.next(); 1008 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1009 continue; 1010 } 1011 listener.recoverMessage(loadMessage(entry.getValue().location)); 1012 } 1013 sd.orderIndex.resetCursorPosition(); 1014 } 1015 }); 1016 } finally { 1017 indexLock.writeLock().unlock(); 1018 } 1019 } 1020 1021 @Override 1022 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 1023 final MessageRecoveryListener listener) throws Exception { 1024 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1025 @SuppressWarnings("unused") 1026 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 1027 indexLock.writeLock().lock(); 1028 try { 1029 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1030 @Override 1031 public void execute(Transaction tx) throws Exception { 1032 StoredDestination sd = getStoredDestination(dest, tx); 1033 sd.orderIndex.resetCursorPosition(); 1034 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 1035 if (moc == null) { 1036 LastAck pos = getLastAck(tx, sd, subscriptionKey); 1037 if (pos == null) { 1038 // sub deleted 1039 return; 1040 } 1041 sd.orderIndex.setBatch(tx, pos); 1042 moc = sd.orderIndex.cursor; 1043 } else { 1044 sd.orderIndex.cursor.sync(moc); 1045 } 1046 1047 Entry<Long, MessageKeys> entry = null; 1048 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 1049 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 1050 .hasNext();) { 1051 entry = iterator.next(); 1052 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1053 continue; 1054 } 1055 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 1056 counter++; 1057 } 1058 if (counter >= maxReturned || listener.hasSpace() == false) { 1059 break; 1060 } 1061 } 1062 sd.orderIndex.stoppedIterating(); 1063 if (entry != null) { 1064 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 1065 sd.subscriptionCursors.put(subscriptionKey, copy); 1066 } 1067 } 1068 }); 1069 } finally { 1070 indexLock.writeLock().unlock(); 1071 } 1072 } 1073 1074 @Override 1075 public void resetBatching(String clientId, String subscriptionName) { 1076 try { 1077 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1078 indexLock.writeLock().lock(); 1079 try { 1080 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1081 @Override 1082 public void execute(Transaction tx) throws IOException { 1083 StoredDestination sd = getStoredDestination(dest, tx); 1084 sd.subscriptionCursors.remove(subscriptionKey); 1085 } 1086 }); 1087 }finally { 1088 indexLock.writeLock().unlock(); 1089 } 1090 } catch (IOException e) { 1091 throw new RuntimeException(e); 1092 } 1093 } 1094 1095 @Override 1096 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 1097 return messageStoreSubStats; 1098 } 1099 } 1100 1101 String subscriptionKey(String clientId, String subscriptionName) { 1102 return clientId + ":" + subscriptionName; 1103 } 1104 1105 @Override 1106 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1107 String key = key(convert(destination)); 1108 MessageStore store = storeCache.get(key(convert(destination))); 1109 if (store == null) { 1110 final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1111 store = storeCache.putIfAbsent(key, queueStore); 1112 if (store == null) { 1113 store = queueStore; 1114 } 1115 } 1116 1117 return store; 1118 } 1119 1120 @Override 1121 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1122 String key = key(convert(destination)); 1123 MessageStore store = storeCache.get(key(convert(destination))); 1124 if (store == null) { 1125 final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1126 store = storeCache.putIfAbsent(key, topicStore); 1127 if (store == null) { 1128 store = topicStore; 1129 } 1130 } 1131 1132 return (TopicMessageStore) store; 1133 } 1134 1135 /** 1136 * Cleanup method to remove any state associated with the given destination. 1137 * This method does not stop the message store (it might not be cached). 1138 * 1139 * @param destination 1140 * Destination to forget 1141 */ 1142 @Override 1143 public void removeQueueMessageStore(ActiveMQQueue destination) { 1144 } 1145 1146 /** 1147 * Cleanup method to remove any state associated with the given destination 1148 * This method does not stop the message store (it might not be cached). 1149 * 1150 * @param destination 1151 * Destination to forget 1152 */ 1153 @Override 1154 public void removeTopicMessageStore(ActiveMQTopic destination) { 1155 } 1156 1157 @Override 1158 public void deleteAllMessages() throws IOException { 1159 deleteAllMessages = true; 1160 } 1161 1162 @Override 1163 public Set<ActiveMQDestination> getDestinations() { 1164 try { 1165 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1166 indexLock.writeLock().lock(); 1167 try { 1168 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1169 @Override 1170 public void execute(Transaction tx) throws IOException { 1171 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1172 .hasNext();) { 1173 Entry<String, StoredDestination> entry = iterator.next(); 1174 //Removing isEmpty topic check - see AMQ-5875 1175 rc.add(convert(entry.getKey())); 1176 } 1177 } 1178 }); 1179 }finally { 1180 indexLock.writeLock().unlock(); 1181 } 1182 return rc; 1183 } catch (IOException e) { 1184 throw new RuntimeException(e); 1185 } 1186 } 1187 1188 @Override 1189 public long getLastMessageBrokerSequenceId() throws IOException { 1190 return 0; 1191 } 1192 1193 @Override 1194 public long getLastProducerSequenceId(ProducerId id) { 1195 indexLock.writeLock().lock(); 1196 try { 1197 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1198 } finally { 1199 indexLock.writeLock().unlock(); 1200 } 1201 } 1202 1203 @Override 1204 public long size() { 1205 try { 1206 return journalSize.get() + getPageFile().getDiskSize(); 1207 } catch (IOException e) { 1208 throw new RuntimeException(e); 1209 } 1210 } 1211 1212 @Override 1213 public void beginTransaction(ConnectionContext context) throws IOException { 1214 throw new IOException("Not yet implemented."); 1215 } 1216 @Override 1217 public void commitTransaction(ConnectionContext context) throws IOException { 1218 throw new IOException("Not yet implemented."); 1219 } 1220 @Override 1221 public void rollbackTransaction(ConnectionContext context) throws IOException { 1222 throw new IOException("Not yet implemented."); 1223 } 1224 1225 @Override 1226 public void checkpoint(boolean sync) throws IOException { 1227 super.checkpointCleanup(sync); 1228 } 1229 1230 // ///////////////////////////////////////////////////////////////// 1231 // Internal helper methods. 1232 // ///////////////////////////////////////////////////////////////// 1233 1234 /** 1235 * @param location 1236 * @return 1237 * @throws IOException 1238 */ 1239 Message loadMessage(Location location) throws IOException { 1240 try { 1241 JournalCommand<?> command = load(location); 1242 KahaAddMessageCommand addMessage = null; 1243 switch (command.type()) { 1244 case KAHA_UPDATE_MESSAGE_COMMAND: 1245 addMessage = ((KahaUpdateMessageCommand) command).getMessage(); 1246 break; 1247 default: 1248 addMessage = (KahaAddMessageCommand) command; 1249 } 1250 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1251 return msg; 1252 } catch (IOException ioe) { 1253 LOG.error("Failed to load message at: {}", location , ioe); 1254 brokerService.handleIOException(ioe); 1255 throw ioe; 1256 } 1257 } 1258 1259 // ///////////////////////////////////////////////////////////////// 1260 // Internal conversion methods. 1261 // ///////////////////////////////////////////////////////////////// 1262 1263 KahaLocation convert(Location location) { 1264 KahaLocation rc = new KahaLocation(); 1265 rc.setLogId(location.getDataFileId()); 1266 rc.setOffset(location.getOffset()); 1267 return rc; 1268 } 1269 1270 KahaDestination convert(ActiveMQDestination dest) { 1271 KahaDestination rc = new KahaDestination(); 1272 rc.setName(dest.getPhysicalName()); 1273 switch (dest.getDestinationType()) { 1274 case ActiveMQDestination.QUEUE_TYPE: 1275 rc.setType(DestinationType.QUEUE); 1276 return rc; 1277 case ActiveMQDestination.TOPIC_TYPE: 1278 rc.setType(DestinationType.TOPIC); 1279 return rc; 1280 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1281 rc.setType(DestinationType.TEMP_QUEUE); 1282 return rc; 1283 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1284 rc.setType(DestinationType.TEMP_TOPIC); 1285 return rc; 1286 default: 1287 return null; 1288 } 1289 } 1290 1291 ActiveMQDestination convert(String dest) { 1292 int p = dest.indexOf(":"); 1293 if (p < 0) { 1294 throw new IllegalArgumentException("Not in the valid destination format"); 1295 } 1296 int type = Integer.parseInt(dest.substring(0, p)); 1297 String name = dest.substring(p + 1); 1298 return convert(type, name); 1299 } 1300 1301 private ActiveMQDestination convert(KahaDestination commandDestination) { 1302 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1303 } 1304 1305 private ActiveMQDestination convert(int type, String name) { 1306 switch (KahaDestination.DestinationType.valueOf(type)) { 1307 case QUEUE: 1308 return new ActiveMQQueue(name); 1309 case TOPIC: 1310 return new ActiveMQTopic(name); 1311 case TEMP_QUEUE: 1312 return new ActiveMQTempQueue(name); 1313 case TEMP_TOPIC: 1314 return new ActiveMQTempTopic(name); 1315 default: 1316 throw new IllegalArgumentException("Not in the valid destination format"); 1317 } 1318 } 1319 1320 public TransactionIdTransformer getTransactionIdTransformer() { 1321 return transactionIdTransformer; 1322 } 1323 1324 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1325 this.transactionIdTransformer = transactionIdTransformer; 1326 } 1327 1328 static class AsyncJobKey { 1329 MessageId id; 1330 ActiveMQDestination destination; 1331 1332 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1333 this.id = id; 1334 this.destination = destination; 1335 } 1336 1337 @Override 1338 public boolean equals(Object obj) { 1339 if (obj == this) { 1340 return true; 1341 } 1342 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1343 && destination.equals(((AsyncJobKey) obj).destination); 1344 } 1345 1346 @Override 1347 public int hashCode() { 1348 return id.hashCode() + destination.hashCode(); 1349 } 1350 1351 @Override 1352 public String toString() { 1353 return destination.getPhysicalName() + "-" + id; 1354 } 1355 } 1356 1357 public interface StoreTask { 1358 public boolean cancel(); 1359 1360 public void aquireLocks(); 1361 1362 public void releaseLocks(); 1363 } 1364 1365 class StoreQueueTask implements Runnable, StoreTask { 1366 protected final Message message; 1367 protected final ConnectionContext context; 1368 protected final KahaDBMessageStore store; 1369 protected final InnerFutureTask future; 1370 protected final AtomicBoolean done = new AtomicBoolean(); 1371 protected final AtomicBoolean locked = new AtomicBoolean(); 1372 1373 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1374 this.store = store; 1375 this.context = context; 1376 this.message = message; 1377 this.future = new InnerFutureTask(this); 1378 } 1379 1380 public ListenableFuture<Object> getFuture() { 1381 return this.future; 1382 } 1383 1384 @Override 1385 public boolean cancel() { 1386 if (this.done.compareAndSet(false, true)) { 1387 return this.future.cancel(false); 1388 } 1389 return false; 1390 } 1391 1392 @Override 1393 public void aquireLocks() { 1394 if (this.locked.compareAndSet(false, true)) { 1395 try { 1396 globalQueueSemaphore.acquire(); 1397 store.acquireLocalAsyncLock(); 1398 message.incrementReferenceCount(); 1399 } catch (InterruptedException e) { 1400 LOG.warn("Failed to aquire lock", e); 1401 } 1402 } 1403 1404 } 1405 1406 @Override 1407 public void releaseLocks() { 1408 if (this.locked.compareAndSet(true, false)) { 1409 store.releaseLocalAsyncLock(); 1410 globalQueueSemaphore.release(); 1411 message.decrementReferenceCount(); 1412 } 1413 } 1414 1415 @Override 1416 public void run() { 1417 this.store.doneTasks++; 1418 try { 1419 if (this.done.compareAndSet(false, true)) { 1420 this.store.addMessage(context, message); 1421 removeQueueTask(this.store, this.message.getMessageId()); 1422 this.future.complete(); 1423 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1424 System.err.println(this.store.dest.getName() + " cancelled: " 1425 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1426 this.store.canceledTasks = this.store.doneTasks = 0; 1427 } 1428 } catch (Exception e) { 1429 this.future.setException(e); 1430 } 1431 } 1432 1433 protected Message getMessage() { 1434 return this.message; 1435 } 1436 1437 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1438 1439 private final AtomicReference<Runnable> listenerRef = new AtomicReference<>(); 1440 1441 public InnerFutureTask(Runnable runnable) { 1442 super(runnable, null); 1443 } 1444 1445 public void setException(final Exception e) { 1446 super.setException(e); 1447 } 1448 1449 public void complete() { 1450 super.set(null); 1451 } 1452 1453 @Override 1454 public void done() { 1455 fireListener(); 1456 } 1457 1458 @Override 1459 public void addListener(Runnable listener) { 1460 this.listenerRef.set(listener); 1461 if (isDone()) { 1462 fireListener(); 1463 } 1464 } 1465 1466 private void fireListener() { 1467 Runnable listener = listenerRef.getAndSet(null); 1468 if (listener != null) { 1469 try { 1470 listener.run(); 1471 } catch (Exception ignored) { 1472 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1473 } 1474 } 1475 } 1476 } 1477 } 1478 1479 class StoreTopicTask extends StoreQueueTask { 1480 private final int subscriptionCount; 1481 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1482 private final KahaDBTopicMessageStore topicStore; 1483 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1484 int subscriptionCount) { 1485 super(store, context, message); 1486 this.topicStore = store; 1487 this.subscriptionCount = subscriptionCount; 1488 1489 } 1490 1491 @Override 1492 public void aquireLocks() { 1493 if (this.locked.compareAndSet(false, true)) { 1494 try { 1495 globalTopicSemaphore.acquire(); 1496 store.acquireLocalAsyncLock(); 1497 message.incrementReferenceCount(); 1498 } catch (InterruptedException e) { 1499 LOG.warn("Failed to aquire lock", e); 1500 } 1501 } 1502 } 1503 1504 @Override 1505 public void releaseLocks() { 1506 if (this.locked.compareAndSet(true, false)) { 1507 message.decrementReferenceCount(); 1508 store.releaseLocalAsyncLock(); 1509 globalTopicSemaphore.release(); 1510 } 1511 } 1512 1513 /** 1514 * add a key 1515 * 1516 * @param key 1517 * @return true if all acknowledgements received 1518 */ 1519 public boolean addSubscriptionKey(String key) { 1520 synchronized (this.subscriptionKeys) { 1521 this.subscriptionKeys.add(key); 1522 } 1523 return this.subscriptionKeys.size() >= this.subscriptionCount; 1524 } 1525 1526 @Override 1527 public void run() { 1528 this.store.doneTasks++; 1529 try { 1530 if (this.done.compareAndSet(false, true)) { 1531 this.topicStore.addMessage(context, message); 1532 // apply any acks we have 1533 synchronized (this.subscriptionKeys) { 1534 for (String key : this.subscriptionKeys) { 1535 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1536 1537 } 1538 } 1539 removeTopicTask(this.topicStore, this.message.getMessageId()); 1540 this.future.complete(); 1541 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1542 System.err.println(this.store.dest.getName() + " cancelled: " 1543 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1544 this.store.canceledTasks = this.store.doneTasks = 0; 1545 } 1546 } catch (Exception e) { 1547 this.future.setException(e); 1548 } 1549 } 1550 } 1551 1552 public class StoreTaskExecutor extends ThreadPoolExecutor { 1553 1554 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1555 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1556 } 1557 1558 @Override 1559 protected void afterExecute(Runnable runnable, Throwable throwable) { 1560 super.afterExecute(runnable, throwable); 1561 1562 if (runnable instanceof StoreTask) { 1563 ((StoreTask)runnable).releaseLocks(); 1564 } 1565 } 1566 } 1567 1568 @Override 1569 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1570 return new JobSchedulerStoreImpl(); 1571 } 1572 1573 /* (non-Javadoc) 1574 * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() 1575 */ 1576 @Override 1577 public boolean isPersistNoLocal() { 1578 // Prior to v11 the broker did not store the noLocal value for durable subs. 1579 return brokerService.getStoreOpenWireVersion() >= 11; 1580 } 1581}