001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.journal; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.Set; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.FutureTask; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035 036import org.apache.activeio.journal.InvalidRecordLocationException; 037import org.apache.activeio.journal.Journal; 038import org.apache.activeio.journal.JournalEventListener; 039import org.apache.activeio.journal.RecordLocation; 040import org.apache.activeio.packet.ByteArrayPacket; 041import org.apache.activeio.packet.Packet; 042import org.apache.activemq.broker.BrokerService; 043import org.apache.activemq.broker.BrokerServiceAware; 044import org.apache.activemq.broker.ConnectionContext; 045import org.apache.activemq.broker.scheduler.JobSchedulerStore; 046import org.apache.activemq.command.ActiveMQDestination; 047import org.apache.activemq.command.ActiveMQQueue; 048import org.apache.activemq.command.ActiveMQTopic; 049import org.apache.activemq.command.DataStructure; 050import org.apache.activemq.command.JournalQueueAck; 051import org.apache.activemq.command.JournalTopicAck; 052import org.apache.activemq.command.JournalTrace; 053import org.apache.activemq.command.JournalTransaction; 054import org.apache.activemq.command.Message; 055import org.apache.activemq.command.MessageAck; 056import org.apache.activemq.command.ProducerId; 057import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 058import org.apache.activemq.openwire.OpenWireFormat; 059import org.apache.activemq.store.MessageStore; 060import org.apache.activemq.store.PersistenceAdapter; 061import org.apache.activemq.store.TopicMessageStore; 062import org.apache.activemq.store.TransactionStore; 063import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 064import org.apache.activemq.store.journal.JournalTransactionStore.Tx; 065import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation; 066import org.apache.activemq.thread.Scheduler; 067import org.apache.activemq.thread.Task; 068import org.apache.activemq.thread.TaskRunner; 069import org.apache.activemq.thread.TaskRunnerFactory; 070import org.apache.activemq.usage.SystemUsage; 071import org.apache.activemq.usage.Usage; 072import org.apache.activemq.usage.UsageListener; 073import org.apache.activemq.util.ByteSequence; 074import org.apache.activemq.util.IOExceptionSupport; 075import org.apache.activemq.util.ThreadPoolUtils; 076import org.apache.activemq.wireformat.WireFormat; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080/** 081 * An implementation of {@link PersistenceAdapter} designed for use with a 082 * {@link Journal} and then check pointing asynchronously on a timeout with some 083 * other long term persistent storage. 084 * 085 * @org.apache.xbean.XBean 086 * 087 */ 088public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { 089 090 private BrokerService brokerService; 091 092 protected Scheduler scheduler; 093 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class); 094 095 private Journal journal; 096 private PersistenceAdapter longTermPersistence; 097 098 private final WireFormat wireFormat = new OpenWireFormat(); 099 100 private final ConcurrentMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>(); 101 private final ConcurrentMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>(); 102 103 private SystemUsage usageManager; 104 private long checkpointInterval = 1000 * 60 * 5; 105 private long lastCheckpointRequest = System.currentTimeMillis(); 106 private long lastCleanup = System.currentTimeMillis(); 107 private int maxCheckpointWorkers = 10; 108 private int maxCheckpointMessageAddSize = 1024 * 1024; 109 110 private final JournalTransactionStore transactionStore = new JournalTransactionStore(this); 111 private ThreadPoolExecutor checkpointExecutor; 112 113 private TaskRunner checkpointTask; 114 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); 115 private boolean fullCheckPoint; 116 117 private final AtomicBoolean started = new AtomicBoolean(false); 118 119 private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); 120 121 private TaskRunnerFactory taskRunnerFactory; 122 private File directory; 123 124 public JournalPersistenceAdapter() { 125 } 126 127 public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { 128 setJournal(journal); 129 setTaskRunnerFactory(taskRunnerFactory); 130 setPersistenceAdapter(longTermPersistence); 131 } 132 133 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 134 this.taskRunnerFactory = taskRunnerFactory; 135 } 136 137 public void setJournal(Journal journal) { 138 this.journal = journal; 139 journal.setJournalEventListener(this); 140 } 141 142 public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) { 143 this.longTermPersistence = longTermPersistence; 144 } 145 146 final Runnable createPeriodicCheckpointTask() { 147 return new Runnable() { 148 @Override 149 public void run() { 150 long lastTime = 0; 151 synchronized (this) { 152 lastTime = lastCheckpointRequest; 153 } 154 if (System.currentTimeMillis() > lastTime + checkpointInterval) { 155 checkpoint(false, true); 156 } 157 } 158 }; 159 } 160 161 /** 162 * @param usageManager The UsageManager that is controlling the 163 * destination's memory usage. 164 */ 165 @Override 166 public void setUsageManager(SystemUsage usageManager) { 167 this.usageManager = usageManager; 168 longTermPersistence.setUsageManager(usageManager); 169 } 170 171 @Override 172 public Set<ActiveMQDestination> getDestinations() { 173 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations()); 174 destinations.addAll(queues.keySet()); 175 destinations.addAll(topics.keySet()); 176 return destinations; 177 } 178 179 private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { 180 if (destination.isQueue()) { 181 return createQueueMessageStore((ActiveMQQueue)destination); 182 } else { 183 return createTopicMessageStore((ActiveMQTopic)destination); 184 } 185 } 186 187 @Override 188 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 189 JournalMessageStore store = queues.get(destination); 190 if (store == null) { 191 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination); 192 store = new JournalMessageStore(this, checkpointStore, destination); 193 queues.put(destination, store); 194 } 195 return store; 196 } 197 198 @Override 199 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { 200 JournalTopicMessageStore store = topics.get(destinationName); 201 if (store == null) { 202 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName); 203 store = new JournalTopicMessageStore(this, checkpointStore, destinationName); 204 topics.put(destinationName, store); 205 } 206 return store; 207 } 208 209 /** 210 * Cleanup method to remove any state associated with the given destination 211 * 212 * @param destination Destination to forget 213 */ 214 @Override 215 public void removeQueueMessageStore(ActiveMQQueue destination) { 216 queues.remove(destination); 217 } 218 219 /** 220 * Cleanup method to remove any state associated with the given destination 221 * 222 * @param destination Destination to forget 223 */ 224 @Override 225 public void removeTopicMessageStore(ActiveMQTopic destination) { 226 topics.remove(destination); 227 } 228 229 @Override 230 public TransactionStore createTransactionStore() throws IOException { 231 return transactionStore; 232 } 233 234 @Override 235 public long getLastMessageBrokerSequenceId() throws IOException { 236 return longTermPersistence.getLastMessageBrokerSequenceId(); 237 } 238 239 @Override 240 public void beginTransaction(ConnectionContext context) throws IOException { 241 longTermPersistence.beginTransaction(context); 242 } 243 244 @Override 245 public void commitTransaction(ConnectionContext context) throws IOException { 246 longTermPersistence.commitTransaction(context); 247 } 248 249 @Override 250 public void rollbackTransaction(ConnectionContext context) throws IOException { 251 longTermPersistence.rollbackTransaction(context); 252 } 253 254 @Override 255 public synchronized void start() throws Exception { 256 if (!started.compareAndSet(false, true)) { 257 return; 258 } 259 260 if( brokerService!=null ) { 261 wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); 262 } 263 264 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { 265 @Override 266 public boolean iterate() { 267 return doCheckpoint(); 268 } 269 }, "ActiveMQ Journal Checkpoint Worker"); 270 271 checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 272 @Override 273 public Thread newThread(Runnable runable) { 274 Thread t = new Thread(runable, "Journal checkpoint worker"); 275 t.setPriority(7); 276 return t; 277 } 278 }); 279 // checkpointExecutor.allowCoreThreadTimeOut(true); 280 281 this.usageManager.getMemoryUsage().addUsageListener(this); 282 283 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 284 // Disabled periodic clean up as it deadlocks with the checkpoint 285 // operations. 286 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0); 287 } 288 289 longTermPersistence.start(); 290 createTransactionStore(); 291 recover(); 292 293 // Do a checkpoint periodically. 294 this.scheduler = new Scheduler("Journal Scheduler"); 295 this.scheduler.start(); 296 this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); 297 298 } 299 300 @Override 301 public void stop() throws Exception { 302 303 this.usageManager.getMemoryUsage().removeUsageListener(this); 304 if (!started.compareAndSet(true, false)) { 305 return; 306 } 307 308 this.scheduler.cancel(periodicCheckpointTask); 309 this.scheduler.stop(); 310 311 // Take one final checkpoint and stop checkpoint processing. 312 checkpoint(true, true); 313 checkpointTask.shutdown(); 314 ThreadPoolUtils.shutdown(checkpointExecutor); 315 checkpointExecutor = null; 316 317 queues.clear(); 318 topics.clear(); 319 320 IOException firstException = null; 321 try { 322 journal.close(); 323 } catch (Exception e) { 324 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); 325 } 326 longTermPersistence.stop(); 327 328 if (firstException != null) { 329 throw firstException; 330 } 331 } 332 333 // Properties 334 // ------------------------------------------------------------------------- 335 public PersistenceAdapter getLongTermPersistence() { 336 return longTermPersistence; 337 } 338 339 /** 340 * @return Returns the wireFormat. 341 */ 342 public WireFormat getWireFormat() { 343 return wireFormat; 344 } 345 346 // Implementation methods 347 // ------------------------------------------------------------------------- 348 349 /** 350 * The Journal give us a call back so that we can move old data out of the 351 * journal. Taking a checkpoint does this for us. 352 * 353 * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) 354 */ 355 @Override 356 public void overflowNotification(RecordLocation safeLocation) { 357 checkpoint(false, true); 358 } 359 360 /** 361 * When we checkpoint we move all the journalled data to long term storage. 362 * 363 */ 364 public void checkpoint(boolean sync, boolean fullCheckpoint) { 365 try { 366 if (journal == null) { 367 throw new IllegalStateException("Journal is closed."); 368 } 369 370 long now = System.currentTimeMillis(); 371 CountDownLatch latch = null; 372 synchronized (this) { 373 latch = nextCheckpointCountDownLatch; 374 lastCheckpointRequest = now; 375 if (fullCheckpoint) { 376 this.fullCheckPoint = true; 377 } 378 } 379 380 checkpointTask.wakeup(); 381 382 if (sync) { 383 LOG.debug("Waking for checkpoint to complete."); 384 latch.await(); 385 } 386 } catch (InterruptedException e) { 387 Thread.currentThread().interrupt(); 388 LOG.warn("Request to start checkpoint failed: " + e, e); 389 } 390 } 391 392 @Override 393 public void checkpoint(boolean sync) { 394 checkpoint(sync, sync); 395 } 396 397 /** 398 * This does the actual checkpoint. 399 * 400 * @return 401 */ 402 public boolean doCheckpoint() { 403 CountDownLatch latch = null; 404 boolean fullCheckpoint; 405 synchronized (this) { 406 latch = nextCheckpointCountDownLatch; 407 nextCheckpointCountDownLatch = new CountDownLatch(1); 408 fullCheckpoint = this.fullCheckPoint; 409 this.fullCheckPoint = false; 410 } 411 try { 412 413 LOG.debug("Checkpoint started."); 414 RecordLocation newMark = null; 415 416 ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size()); 417 418 // 419 // We do many partial checkpoints (fullCheckpoint==false) to move 420 // topic messages 421 // to long term store as soon as possible. 422 // 423 // We want to avoid doing that for queue messages since removes the 424 // come in the same 425 // checkpoint cycle will nullify the previous message add. 426 // Therefore, we only 427 // checkpoint queues on the fullCheckpoint cycles. 428 // 429 if (fullCheckpoint) { 430 Iterator<JournalMessageStore> iterator = queues.values().iterator(); 431 while (iterator.hasNext()) { 432 try { 433 final JournalMessageStore ms = iterator.next(); 434 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { 435 @Override 436 public RecordLocation call() throws Exception { 437 return ms.checkpoint(); 438 } 439 }); 440 futureTasks.add(task); 441 checkpointExecutor.execute(task); 442 } catch (Exception e) { 443 LOG.error("Failed to checkpoint a message store: " + e, e); 444 } 445 } 446 } 447 448 Iterator<JournalTopicMessageStore> iterator = topics.values().iterator(); 449 while (iterator.hasNext()) { 450 try { 451 final JournalTopicMessageStore ms = iterator.next(); 452 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { 453 @Override 454 public RecordLocation call() throws Exception { 455 return ms.checkpoint(); 456 } 457 }); 458 futureTasks.add(task); 459 checkpointExecutor.execute(task); 460 } catch (Exception e) { 461 LOG.error("Failed to checkpoint a message store: " + e, e); 462 } 463 } 464 465 try { 466 for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) { 467 FutureTask<RecordLocation> ft = iter.next(); 468 RecordLocation mark = ft.get(); 469 // We only set a newMark on full checkpoints. 470 if (fullCheckpoint) { 471 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { 472 newMark = mark; 473 } 474 } 475 } 476 } catch (Throwable e) { 477 LOG.error("Failed to checkpoint a message store: " + e, e); 478 } 479 480 if (fullCheckpoint) { 481 try { 482 if (newMark != null) { 483 LOG.debug("Marking journal at: " + newMark); 484 journal.setMark(newMark, true); 485 } 486 } catch (Exception e) { 487 LOG.error("Failed to mark the Journal: " + e, e); 488 } 489 490 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 491 // We may be check pointing more often than the 492 // checkpointInterval if under high use 493 // But we don't want to clean up the db that often. 494 long now = System.currentTimeMillis(); 495 if (now > lastCleanup + checkpointInterval) { 496 lastCleanup = now; 497 ((JDBCPersistenceAdapter)longTermPersistence).cleanup(); 498 } 499 } 500 } 501 502 LOG.debug("Checkpoint done."); 503 } finally { 504 latch.countDown(); 505 } 506 synchronized (this) { 507 return this.fullCheckPoint; 508 } 509 510 } 511 512 /** 513 * @param location 514 * @return 515 * @throws IOException 516 */ 517 public DataStructure readCommand(RecordLocation location) throws IOException { 518 try { 519 Packet packet = journal.read(location); 520 return (DataStructure)wireFormat.unmarshal(toByteSequence(packet)); 521 } catch (InvalidRecordLocationException e) { 522 throw createReadException(location, e); 523 } catch (IOException e) { 524 throw createReadException(location, e); 525 } 526 } 527 528 /** 529 * Move all the messages that were in the journal into long term storage. We 530 * just replay and do a checkpoint. 531 * 532 * @throws IOException 533 * @throws IOException 534 * @throws InvalidRecordLocationException 535 * @throws IllegalStateException 536 */ 537 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException { 538 539 RecordLocation pos = null; 540 int transactionCounter = 0; 541 542 LOG.info("Journal Recovery Started from: " + journal); 543 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 544 545 // While we have records in the journal. 546 while ((pos = journal.getNextRecordLocation(pos)) != null) { 547 Packet data = journal.read(pos); 548 DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data)); 549 550 if (c instanceof Message) { 551 Message message = (Message)c; 552 JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination()); 553 if (message.isInTransaction()) { 554 transactionStore.addMessage(store, message, pos); 555 } else { 556 store.replayAddMessage(context, message); 557 transactionCounter++; 558 } 559 } else { 560 switch (c.getDataStructureType()) { 561 case JournalQueueAck.DATA_STRUCTURE_TYPE: { 562 JournalQueueAck command = (JournalQueueAck)c; 563 JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination()); 564 if (command.getMessageAck().isInTransaction()) { 565 transactionStore.removeMessage(store, command.getMessageAck(), pos); 566 } else { 567 store.replayRemoveMessage(context, command.getMessageAck()); 568 transactionCounter++; 569 } 570 } 571 break; 572 case JournalTopicAck.DATA_STRUCTURE_TYPE: { 573 JournalTopicAck command = (JournalTopicAck)c; 574 JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination()); 575 if (command.getTransactionId() != null) { 576 transactionStore.acknowledge(store, command, pos); 577 } else { 578 store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()); 579 transactionCounter++; 580 } 581 } 582 break; 583 case JournalTransaction.DATA_STRUCTURE_TYPE: { 584 JournalTransaction command = (JournalTransaction)c; 585 try { 586 // Try to replay the packet. 587 switch (command.getType()) { 588 case JournalTransaction.XA_PREPARE: 589 transactionStore.replayPrepare(command.getTransactionId()); 590 break; 591 case JournalTransaction.XA_COMMIT: 592 case JournalTransaction.LOCAL_COMMIT: 593 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 594 if (tx == null) { 595 break; // We may be trying to replay a commit 596 } 597 // that 598 // was already committed. 599 600 // Replay the committed operations. 601 tx.getOperations(); 602 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 603 TxOperation op = (TxOperation)iter.next(); 604 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { 605 op.store.replayAddMessage(context, (Message)op.data); 606 } 607 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { 608 op.store.replayRemoveMessage(context, (MessageAck)op.data); 609 } 610 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { 611 JournalTopicAck ack = (JournalTopicAck)op.data; 612 ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()); 613 } 614 } 615 transactionCounter++; 616 break; 617 case JournalTransaction.LOCAL_ROLLBACK: 618 case JournalTransaction.XA_ROLLBACK: 619 transactionStore.replayRollback(command.getTransactionId()); 620 break; 621 default: 622 throw new IOException("Invalid journal command type: " + command.getType()); 623 } 624 } catch (IOException e) { 625 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); 626 } 627 } 628 break; 629 case JournalTrace.DATA_STRUCTURE_TYPE: 630 JournalTrace trace = (JournalTrace)c; 631 LOG.debug("TRACE Entry: " + trace.getMessage()); 632 break; 633 default: 634 LOG.error("Unknown type of record in transaction log which will be discarded: " + c); 635 } 636 } 637 } 638 639 RecordLocation location = writeTraceMessage("RECOVERED", true); 640 journal.setMark(location, true); 641 642 LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); 643 } 644 645 private IOException createReadException(RecordLocation location, Exception e) { 646 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); 647 } 648 649 protected IOException createWriteException(DataStructure packet, Exception e) { 650 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); 651 } 652 653 protected IOException createWriteException(String command, Exception e) { 654 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); 655 } 656 657 protected IOException createRecoveryFailedException(Exception e) { 658 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); 659 } 660 661 /** 662 * @param command 663 * @param sync 664 * @return 665 * @throws IOException 666 */ 667 public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { 668 if (started.get()) { 669 try { 670 return journal.write(toPacket(wireFormat.marshal(command)), sync); 671 } catch (IOException ioe) { 672 LOG.error("Cannot write to the journal", ioe); 673 brokerService.handleIOException(ioe); 674 throw ioe; 675 } 676 } 677 throw new IOException("closed"); 678 } 679 680 private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException { 681 JournalTrace trace = new JournalTrace(); 682 trace.setMessage(message); 683 return writeCommand(trace, sync); 684 } 685 686 @Override 687 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 688 newPercentUsage = (newPercentUsage / 10) * 10; 689 oldPercentUsage = (oldPercentUsage / 10) * 10; 690 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { 691 boolean sync = newPercentUsage >= 90; 692 checkpoint(sync, true); 693 } 694 } 695 696 public JournalTransactionStore getTransactionStore() { 697 return transactionStore; 698 } 699 700 @Override 701 public void deleteAllMessages() throws IOException { 702 try { 703 JournalTrace trace = new JournalTrace(); 704 trace.setMessage("DELETED"); 705 RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false); 706 journal.setMark(location, true); 707 LOG.info("Journal deleted: "); 708 } catch (IOException e) { 709 throw e; 710 } catch (Throwable e) { 711 throw IOExceptionSupport.create(e); 712 } 713 longTermPersistence.deleteAllMessages(); 714 } 715 716 public SystemUsage getUsageManager() { 717 return usageManager; 718 } 719 720 public int getMaxCheckpointMessageAddSize() { 721 return maxCheckpointMessageAddSize; 722 } 723 724 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { 725 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; 726 } 727 728 public int getMaxCheckpointWorkers() { 729 return maxCheckpointWorkers; 730 } 731 732 public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { 733 this.maxCheckpointWorkers = maxCheckpointWorkers; 734 } 735 736 public long getCheckpointInterval() { 737 return checkpointInterval; 738 } 739 740 public void setCheckpointInterval(long checkpointInterval) { 741 this.checkpointInterval = checkpointInterval; 742 } 743 744 public boolean isUseExternalMessageReferences() { 745 return false; 746 } 747 748 public void setUseExternalMessageReferences(boolean enable) { 749 if (enable) { 750 throw new IllegalArgumentException("The journal does not support message references."); 751 } 752 } 753 754 public Packet toPacket(ByteSequence sequence) { 755 return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length)); 756 } 757 758 public ByteSequence toByteSequence(Packet packet) { 759 org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); 760 return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); 761 } 762 763 @Override 764 public void setBrokerName(String brokerName) { 765 longTermPersistence.setBrokerName(brokerName); 766 } 767 768 @Override 769 public String toString() { 770 return "JournalPersistenceAdapter(" + longTermPersistence + ")"; 771 } 772 773 @Override 774 public void setDirectory(File dir) { 775 this.directory=dir; 776 } 777 778 @Override 779 public File getDirectory(){ 780 return directory; 781 } 782 783 @Override 784 public long size(){ 785 return 0; 786 } 787 788 @Override 789 public void setBrokerService(BrokerService brokerService) { 790 this.brokerService = brokerService; 791 PersistenceAdapter pa = getLongTermPersistence(); 792 if( pa instanceof BrokerServiceAware ) { 793 ((BrokerServiceAware)pa).setBrokerService(brokerService); 794 } 795 } 796 797 @Override 798 public long getLastProducerSequenceId(ProducerId id) { 799 return -1; 800 } 801 802 @Override 803 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 804 return longTermPersistence.createJobSchedulerStore(); 805 } 806 807}