001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.File; 020import java.io.IOException; 021import java.sql.Connection; 022import java.sql.SQLException; 023import java.util.Collections; 024import java.util.Locale; 025import java.util.Set; 026import java.util.concurrent.ScheduledFuture; 027import java.util.concurrent.ScheduledThreadPoolExecutor; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.TimeUnit; 030 031import javax.sql.DataSource; 032 033import org.apache.activemq.ActiveMQMessageAudit; 034import org.apache.activemq.broker.BrokerService; 035import org.apache.activemq.broker.ConnectionContext; 036import org.apache.activemq.broker.Locker; 037import org.apache.activemq.broker.scheduler.JobSchedulerStore; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ActiveMQQueue; 040import org.apache.activemq.command.ActiveMQTopic; 041import org.apache.activemq.command.Message; 042import org.apache.activemq.command.MessageAck; 043import org.apache.activemq.command.MessageId; 044import org.apache.activemq.command.ProducerId; 045import org.apache.activemq.openwire.OpenWireFormat; 046import org.apache.activemq.store.MessageStore; 047import org.apache.activemq.store.PersistenceAdapter; 048import org.apache.activemq.store.TopicMessageStore; 049import org.apache.activemq.store.TransactionStore; 050import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; 051import org.apache.activemq.store.memory.MemoryTransactionStore; 052import org.apache.activemq.usage.SystemUsage; 053import org.apache.activemq.util.ByteSequence; 054import org.apache.activemq.util.FactoryFinder; 055import org.apache.activemq.util.IOExceptionSupport; 056import org.apache.activemq.util.LongSequenceGenerator; 057import org.apache.activemq.util.ServiceStopper; 058import org.apache.activemq.util.ThreadPoolUtils; 059import org.apache.activemq.wireformat.WireFormat; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * A {@link PersistenceAdapter} implementation using JDBC for persistence 065 * storage. 066 * 067 * This persistence adapter will correctly remember prepared XA transactions, 068 * but it will not keep track of local transaction commits so that operations 069 * performed against the Message store are done as a single uow. 070 * 071 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter" 072 * 073 */ 074public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter { 075 076 private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class); 077 private static FactoryFinder adapterFactoryFinder = new FactoryFinder( 078 "META-INF/services/org/apache/activemq/store/jdbc/"); 079 private static FactoryFinder lockFactoryFinder = new FactoryFinder( 080 "META-INF/services/org/apache/activemq/store/jdbc/lock/"); 081 082 public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000; 083 084 private WireFormat wireFormat = new OpenWireFormat(); 085 private Statements statements; 086 private JDBCAdapter adapter; 087 private MemoryTransactionStore transactionStore; 088 private ScheduledFuture<?> cleanupTicket; 089 private int cleanupPeriod = 1000 * 60 * 5; 090 private boolean useExternalMessageReferences; 091 private boolean createTablesOnStartup = true; 092 private DataSource lockDataSource; 093 private int transactionIsolation; 094 private File directory; 095 private boolean changeAutoCommitAllowed = true; 096 097 protected int maxProducersToAudit=1024; 098 protected int maxAuditDepth=1000; 099 protected boolean enableAudit=false; 100 protected int auditRecoveryDepth = 1024; 101 protected ActiveMQMessageAudit audit; 102 103 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 104 protected int maxRows = DefaultJDBCAdapter.MAX_ROWS; 105 106 { 107 setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD); 108 } 109 110 public JDBCPersistenceAdapter() { 111 } 112 113 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) { 114 super(ds); 115 this.wireFormat = wireFormat; 116 } 117 118 @Override 119 public Set<ActiveMQDestination> getDestinations() { 120 TransactionContext c = null; 121 try { 122 c = getTransactionContext(); 123 return getAdapter().doGetDestinations(c); 124 } catch (IOException e) { 125 return emptyDestinationSet(); 126 } catch (SQLException e) { 127 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 128 return emptyDestinationSet(); 129 } finally { 130 if (c != null) { 131 try { 132 c.close(); 133 } catch (Throwable e) { 134 } 135 } 136 } 137 } 138 139 @SuppressWarnings("unchecked") 140 private Set<ActiveMQDestination> emptyDestinationSet() { 141 return Collections.EMPTY_SET; 142 } 143 144 protected void createMessageAudit() { 145 if (enableAudit && audit == null) { 146 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 147 TransactionContext c = null; 148 149 try { 150 c = getTransactionContext(); 151 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 152 @Override 153 public void messageId(MessageId id) { 154 audit.isDuplicate(id); 155 } 156 }); 157 } catch (Exception e) { 158 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 159 } finally { 160 if (c != null) { 161 try { 162 c.close(); 163 } catch (Throwable e) { 164 } 165 } 166 } 167 } 168 } 169 170 public void initSequenceIdGenerator() { 171 TransactionContext c = null; 172 try { 173 c = getTransactionContext(); 174 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 175 @Override 176 public void messageId(MessageId id) { 177 audit.isDuplicate(id); 178 } 179 }); 180 } catch (Exception e) { 181 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 182 } finally { 183 if (c != null) { 184 try { 185 c.close(); 186 } catch (Throwable e) { 187 } 188 } 189 } 190 } 191 192 @Override 193 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 194 MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit); 195 if (transactionStore != null) { 196 rc = transactionStore.proxy(rc); 197 } 198 return rc; 199 } 200 201 @Override 202 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 203 TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit); 204 if (transactionStore != null) { 205 rc = transactionStore.proxy(rc); 206 } 207 return rc; 208 } 209 210 /** 211 * Cleanup method to remove any state associated with the given destination 212 * @param destination Destination to forget 213 */ 214 @Override 215 public void removeQueueMessageStore(ActiveMQQueue destination) { 216 if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) { 217 try { 218 removeConsumerDestination(destination); 219 } catch (IOException ioe) { 220 LOG.error("Failed to remove consumer destination: " + destination, ioe); 221 } 222 } 223 } 224 225 private void removeConsumerDestination(ActiveMQQueue destination) throws IOException { 226 TransactionContext c = getTransactionContext(); 227 try { 228 String id = destination.getQualifiedName(); 229 getAdapter().doDeleteSubscription(c, destination, id, id); 230 } catch (SQLException e) { 231 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 232 throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e); 233 } finally { 234 c.close(); 235 } 236 } 237 238 /** 239 * Cleanup method to remove any state associated with the given destination 240 * No state retained.... nothing to do 241 * 242 * @param destination Destination to forget 243 */ 244 @Override 245 public void removeTopicMessageStore(ActiveMQTopic destination) { 246 } 247 248 @Override 249 public TransactionStore createTransactionStore() throws IOException { 250 if (transactionStore == null) { 251 transactionStore = new JdbcMemoryTransactionStore(this); 252 } 253 return this.transactionStore; 254 } 255 256 @Override 257 public long getLastMessageBrokerSequenceId() throws IOException { 258 TransactionContext c = getTransactionContext(); 259 try { 260 long seq = getAdapter().doGetLastMessageStoreSequenceId(c); 261 sequenceGenerator.setLastSequenceId(seq); 262 long brokerSeq = 0; 263 if (seq != 0) { 264 byte[] msg = getAdapter().doGetMessageById(c, seq); 265 if (msg != null) { 266 Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg)); 267 brokerSeq = last.getMessageId().getBrokerSequenceId(); 268 } else { 269 LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!"); 270 } 271 } 272 return brokerSeq; 273 } catch (SQLException e) { 274 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 275 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 276 } finally { 277 c.close(); 278 } 279 } 280 281 @Override 282 public long getLastProducerSequenceId(ProducerId id) throws IOException { 283 TransactionContext c = getTransactionContext(); 284 try { 285 return getAdapter().doGetLastProducerSequenceId(c, id); 286 } catch (SQLException e) { 287 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 288 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 289 } finally { 290 c.close(); 291 } 292 } 293 294 @Override 295 public void init() throws Exception { 296 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 297 298 if (isCreateTablesOnStartup()) { 299 TransactionContext transactionContext = getTransactionContext(); 300 transactionContext.getExclusiveConnection(); 301 transactionContext.begin(); 302 try { 303 try { 304 getAdapter().doCreateTables(transactionContext); 305 } catch (SQLException e) { 306 LOG.warn("Cannot create tables due to: " + e); 307 JDBCPersistenceAdapter.log("Failure Details: ", e); 308 } 309 } finally { 310 transactionContext.commit(); 311 } 312 } 313 } 314 315 @Override 316 public void doStart() throws Exception { 317 318 if( brokerService!=null ) { 319 wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); 320 } 321 322 // Cleanup the db periodically. 323 if (cleanupPeriod > 0) { 324 cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() { 325 @Override 326 public void run() { 327 cleanup(); 328 } 329 }, 0, cleanupPeriod, TimeUnit.MILLISECONDS); 330 } 331 createMessageAudit(); 332 } 333 334 @Override 335 public synchronized void doStop(ServiceStopper stopper) throws Exception { 336 if (cleanupTicket != null) { 337 cleanupTicket.cancel(true); 338 cleanupTicket = null; 339 } 340 closeDataSource(getDataSource()); 341 } 342 343 public void cleanup() { 344 TransactionContext c = null; 345 try { 346 LOG.debug("Cleaning up old messages."); 347 c = getTransactionContext(); 348 c.getExclusiveConnection(); 349 getAdapter().doDeleteOldMessages(c); 350 } catch (IOException e) { 351 LOG.warn("Old message cleanup failed due to: " + e, e); 352 } catch (SQLException e) { 353 LOG.warn("Old message cleanup failed due to: " + e); 354 JDBCPersistenceAdapter.log("Failure Details: ", e); 355 } finally { 356 if (c != null) { 357 try { 358 c.close(); 359 } catch (Throwable e) { 360 } 361 } 362 LOG.debug("Cleanup done."); 363 } 364 } 365 366 @Override 367 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { 368 if (clockDaemon == null) { 369 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { 370 @Override 371 public Thread newThread(Runnable runnable) { 372 Thread thread = new Thread(runnable, "ActiveMQ JDBC PA Scheduled Task"); 373 thread.setDaemon(true); 374 return thread; 375 } 376 }); 377 } 378 return clockDaemon; 379 } 380 381 public JDBCAdapter getAdapter() throws IOException { 382 if (adapter == null) { 383 setAdapter(createAdapter()); 384 } 385 return adapter; 386 } 387 388 /** 389 * @deprecated as of 5.7.0, replaced by {@link #getLocker()} 390 */ 391 @Deprecated 392 public Locker getDatabaseLocker() throws IOException { 393 return getLocker(); 394 } 395 396 /** 397 * Sets the database locker strategy to use to lock the database on startup 398 * @throws IOException 399 * 400 * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)} 401 */ 402 @Deprecated 403 public void setDatabaseLocker(Locker locker) throws IOException { 404 setLocker(locker); 405 } 406 407 public DataSource getLockDataSource() throws IOException { 408 if (lockDataSource == null) { 409 lockDataSource = getDataSource(); 410 if (lockDataSource == null) { 411 throw new IllegalArgumentException( 412 "No dataSource property has been configured"); 413 } 414 } 415 return lockDataSource; 416 } 417 418 public void setLockDataSource(DataSource dataSource) { 419 this.lockDataSource = dataSource; 420 LOG.info("Using a separate dataSource for locking: " 421 + lockDataSource); 422 } 423 424 @Override 425 public BrokerService getBrokerService() { 426 return brokerService; 427 } 428 429 /** 430 * @throws IOException 431 */ 432 protected JDBCAdapter createAdapter() throws IOException { 433 434 adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter"); 435 436 // Use the default JDBC adapter if the 437 // Database type is not recognized. 438 if (adapter == null) { 439 adapter = new DefaultJDBCAdapter(); 440 LOG.debug("Using default JDBC Adapter: " + adapter); 441 } 442 return adapter; 443 } 444 445 private Object loadAdapter(FactoryFinder finder, String kind) throws IOException { 446 Object adapter = null; 447 TransactionContext c = getTransactionContext(); 448 try { 449 try { 450 // Make the filename file system safe. 451 String dirverName = c.getConnection().getMetaData().getDriverName(); 452 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH); 453 454 try { 455 adapter = finder.newInstance(dirverName); 456 LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass()); 457 } catch (Throwable e) { 458 LOG.info("Database " + kind + " driver override not found for : [" + dirverName 459 + "]. Will use default implementation."); 460 } 461 } catch (SQLException e) { 462 LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: " 463 + e.getMessage()); 464 JDBCPersistenceAdapter.log("Failure Details: ", e); 465 } 466 } finally { 467 c.close(); 468 } 469 return adapter; 470 } 471 472 public void setAdapter(JDBCAdapter adapter) { 473 this.adapter = adapter; 474 this.adapter.setStatements(getStatements()); 475 this.adapter.setMaxRows(getMaxRows()); 476 } 477 478 public WireFormat getWireFormat() { 479 return wireFormat; 480 } 481 482 public void setWireFormat(WireFormat wireFormat) { 483 this.wireFormat = wireFormat; 484 } 485 486 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException { 487 if (context == null) { 488 return getTransactionContext(); 489 } else { 490 TransactionContext answer = (TransactionContext)context.getLongTermStoreContext(); 491 if (answer == null) { 492 answer = getTransactionContext(); 493 context.setLongTermStoreContext(answer); 494 } 495 return answer; 496 } 497 } 498 499 public TransactionContext getTransactionContext() throws IOException { 500 TransactionContext answer = new TransactionContext(this); 501 if (transactionIsolation > 0) { 502 answer.setTransactionIsolation(transactionIsolation); 503 } 504 return answer; 505 } 506 507 @Override 508 public void beginTransaction(ConnectionContext context) throws IOException { 509 TransactionContext transactionContext = getTransactionContext(context); 510 transactionContext.begin(); 511 } 512 513 @Override 514 public void commitTransaction(ConnectionContext context) throws IOException { 515 TransactionContext transactionContext = getTransactionContext(context); 516 transactionContext.commit(); 517 } 518 519 @Override 520 public void rollbackTransaction(ConnectionContext context) throws IOException { 521 TransactionContext transactionContext = getTransactionContext(context); 522 transactionContext.rollback(); 523 } 524 525 public int getCleanupPeriod() { 526 return cleanupPeriod; 527 } 528 529 /** 530 * Sets the number of milliseconds until the database is attempted to be 531 * cleaned up for durable topics 532 */ 533 public void setCleanupPeriod(int cleanupPeriod) { 534 this.cleanupPeriod = cleanupPeriod; 535 } 536 537 public boolean isChangeAutoCommitAllowed() { 538 return changeAutoCommitAllowed; 539 } 540 541 /** 542 * Whether the JDBC driver allows to set the auto commit. 543 * Some drivers does not allow changing the auto commit. The default value is true. 544 * 545 * @param changeAutoCommitAllowed true to change, false to not change. 546 */ 547 public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed) { 548 this.changeAutoCommitAllowed = changeAutoCommitAllowed; 549 } 550 551 @Override 552 public void deleteAllMessages() throws IOException { 553 TransactionContext c = getTransactionContext(); 554 c.getExclusiveConnection(); 555 try { 556 getAdapter().doDropTables(c); 557 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 558 getAdapter().doCreateTables(c); 559 LOG.info("Persistence store purged."); 560 } catch (SQLException e) { 561 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 562 throw IOExceptionSupport.create(e); 563 } finally { 564 c.close(); 565 } 566 } 567 568 public boolean isUseExternalMessageReferences() { 569 return useExternalMessageReferences; 570 } 571 572 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 573 this.useExternalMessageReferences = useExternalMessageReferences; 574 } 575 576 public boolean isCreateTablesOnStartup() { 577 return createTablesOnStartup; 578 } 579 580 /** 581 * Sets whether or not tables are created on startup 582 */ 583 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 584 this.createTablesOnStartup = createTablesOnStartup; 585 } 586 587 /** 588 * @deprecated use {@link #setUseLock(boolean)} instead 589 * 590 * Sets whether or not an exclusive database lock should be used to enable 591 * JDBC Master/Slave. Enabled by default. 592 */ 593 @Deprecated 594 public void setUseDatabaseLock(boolean useDatabaseLock) { 595 setUseLock(useDatabaseLock); 596 } 597 598 public static void log(String msg, SQLException e) { 599 String s = msg + e.getMessage(); 600 while (e.getNextException() != null) { 601 e = e.getNextException(); 602 s += ", due to: " + e.getMessage(); 603 } 604 LOG.warn(s, e); 605 } 606 607 public Statements getStatements() { 608 if (statements == null) { 609 statements = new Statements(); 610 } 611 return statements; 612 } 613 614 public void setStatements(Statements statements) { 615 this.statements = statements; 616 if (adapter != null) { 617 this.adapter.setStatements(getStatements()); 618 } 619 } 620 621 /** 622 * @param usageManager The UsageManager that is controlling the 623 * destination's memory usage. 624 */ 625 @Override 626 public void setUsageManager(SystemUsage usageManager) { 627 } 628 629 @Override 630 public Locker createDefaultLocker() throws IOException { 631 Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock"); 632 if (locker == null) { 633 locker = new DefaultDatabaseLocker(); 634 LOG.debug("Using default JDBC Locker: " + locker); 635 } 636 locker.configure(this); 637 return locker; 638 } 639 640 @Override 641 public void setBrokerName(String brokerName) { 642 } 643 644 @Override 645 public String toString() { 646 return "JDBCPersistenceAdapter(" + super.toString() + ")"; 647 } 648 649 @Override 650 public void setDirectory(File dir) { 651 this.directory=dir; 652 } 653 654 @Override 655 public File getDirectory(){ 656 if (this.directory==null && brokerService != null){ 657 this.directory=brokerService.getBrokerDataDirectory(); 658 } 659 return this.directory; 660 } 661 662 // interesting bit here is proof that DB is ok 663 @Override 664 public void checkpoint(boolean sync) throws IOException { 665 // by pass TransactionContext to avoid IO Exception handler 666 Connection connection = null; 667 try { 668 connection = getDataSource().getConnection(); 669 if (!connection.isValid(10)) { 670 throw new IOException("isValid(10) failed for: " + connection); 671 } 672 } catch (SQLException e) { 673 LOG.debug("Could not get JDBC connection for checkpoint: " + e); 674 throw IOExceptionSupport.create(e); 675 } finally { 676 if (connection != null) { 677 try { 678 connection.close(); 679 } catch (Throwable ignored) { 680 } 681 } 682 } 683 } 684 685 @Override 686 public long size(){ 687 return 0; 688 } 689 690 /** 691 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead 692 * 693 * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker 694 * not applied if DataBaseLocker is injected. 695 * 696 */ 697 @Deprecated 698 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException { 699 getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval); 700 } 701 702 /** 703 * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED 704 * This allowable dirty isolation level may not be achievable in clustered DB environments 705 * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ 706 * see isolation level constants in {@link java.sql.Connection} 707 * @param transactionIsolation the isolation level to use 708 */ 709 public void setTransactionIsolation(int transactionIsolation) { 710 this.transactionIsolation = transactionIsolation; 711 } 712 713 public int getMaxProducersToAudit() { 714 return maxProducersToAudit; 715 } 716 717 public void setMaxProducersToAudit(int maxProducersToAudit) { 718 this.maxProducersToAudit = maxProducersToAudit; 719 } 720 721 public int getMaxAuditDepth() { 722 return maxAuditDepth; 723 } 724 725 public void setMaxAuditDepth(int maxAuditDepth) { 726 this.maxAuditDepth = maxAuditDepth; 727 } 728 729 public boolean isEnableAudit() { 730 return enableAudit; 731 } 732 733 public void setEnableAudit(boolean enableAudit) { 734 this.enableAudit = enableAudit; 735 } 736 737 public int getAuditRecoveryDepth() { 738 return auditRecoveryDepth; 739 } 740 741 public void setAuditRecoveryDepth(int auditRecoveryDepth) { 742 this.auditRecoveryDepth = auditRecoveryDepth; 743 } 744 745 public long getNextSequenceId() { 746 return sequenceGenerator.getNextSequenceId(); 747 } 748 749 public int getMaxRows() { 750 return maxRows; 751 } 752 753 /* 754 * the max rows return from queries, with sparse selectors this may need to be increased 755 */ 756 public void setMaxRows(int maxRows) { 757 this.maxRows = maxRows; 758 } 759 760 public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException { 761 TransactionContext c = getTransactionContext(); 762 try { 763 getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore); 764 } catch (SQLException e) { 765 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 766 throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e); 767 } finally { 768 c.close(); 769 } 770 } 771 772 public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException { 773 TransactionContext c = getTransactionContext(context); 774 try { 775 long sequence = (Long)messageId.getEntryLocator(); 776 getAdapter().doCommitAddOp(c, preparedSequenceId, sequence); 777 } catch (SQLException e) { 778 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 779 throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e); 780 } finally { 781 c.close(); 782 } 783 } 784 785 public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException { 786 TransactionContext c = getTransactionContext(context); 787 try { 788 getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getFutureOrSequenceLong(), null); 789 } catch (SQLException e) { 790 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 791 throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e); 792 } finally { 793 c.close(); 794 } 795 } 796 797 public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException { 798 TransactionContext c = getTransactionContext(context); 799 try { 800 getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority); 801 } catch (SQLException e) { 802 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 803 throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e); 804 } finally { 805 c.close(); 806 } 807 } 808 809 public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException { 810 TransactionContext c = getTransactionContext(context); 811 try { 812 byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1]; 813 getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName); 814 } catch (SQLException e) { 815 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 816 throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " + store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e); 817 } finally { 818 c.close(); 819 } 820 } 821 822 // after recovery there is no record of the original messageId for the ack 823 public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException { 824 TransactionContext c = getTransactionContext(context); 825 try { 826 getAdapter().doClearLastAck(c, destination, priority, clientId, subName); 827 } catch (SQLException e) { 828 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 829 throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e); 830 } finally { 831 c.close(); 832 } 833 } 834 835 long[] getStoreSequenceIdForMessageId(ConnectionContext context, MessageId messageId, ActiveMQDestination destination) throws IOException { 836 long[] result = new long[]{-1, Byte.MAX_VALUE -1}; 837 TransactionContext c = getTransactionContext(context); 838 try { 839 result = adapter.getStoreSequenceId(c, destination, messageId); 840 } catch (SQLException e) { 841 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 842 throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); 843 } finally { 844 c.close(); 845 } 846 return result; 847 } 848 849 @Override 850 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 851 throw new UnsupportedOperationException(); 852 } 853}