001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc.adapter; 018 019import static javax.xml.bind.DatatypeConverter.parseBase64Binary; 020import static javax.xml.bind.DatatypeConverter.printBase64Binary; 021 022import java.io.IOException; 023import java.sql.Connection; 024import java.sql.PreparedStatement; 025import java.sql.ResultSet; 026import java.sql.SQLException; 027import java.sql.Statement; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.LinkedList; 031import java.util.Set; 032import java.util.concurrent.locks.ReadWriteLock; 033import java.util.concurrent.locks.ReentrantReadWriteLock; 034 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.MessageId; 037import org.apache.activemq.command.ProducerId; 038import org.apache.activemq.command.SubscriptionInfo; 039import org.apache.activemq.command.XATransactionId; 040import org.apache.activemq.store.jdbc.JDBCAdapter; 041import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; 042import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; 043import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 044import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore; 045import org.apache.activemq.store.jdbc.Statements; 046import org.apache.activemq.store.jdbc.TransactionContext; 047import org.apache.activemq.util.DataByteArrayOutputStream; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is 053 * encouraged to override the default implementation of methods to account for differences in JDBC Driver 054 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/> 055 * The databases/JDBC drivers that use this adapter are: 056 * <ul> 057 * <li></li> 058 * </ul> 059 * 060 * @org.apache.xbean.XBean element="defaultJDBCAdapter" 061 * 062 * 063 */ 064public class DefaultJDBCAdapter implements JDBCAdapter { 065 private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); 066 public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE; 067 private static final String FAILURE_MESSAGE = "Failure was: %s Message: %s SQLState: %s Vendor code: %s"; 068 protected Statements statements; 069 private boolean batchStatements = true; 070 //This is deprecated and should be removed in a future release 071 protected boolean batchStatments = true; 072 protected boolean prioritizedMessages; 073 protected int maxRows = MAX_ROWS; 074 075 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { 076 s.setBytes(index, data); 077 } 078 079 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { 080 return rs.getBytes(index); 081 } 082 083 @Override 084 public void doCreateTables(TransactionContext transactionContext) throws SQLException, IOException { 085 // Check to see if the table already exists. If it does, then don't log warnings during startup. 086 // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table 087 boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext); 088 089 for (String createStatement : this.statements.getCreateSchemaStatements()) { 090 // This will fail usually since the tables will be 091 // created already. 092 executeStatement(transactionContext, createStatement, messageTableAlreadyExists); 093 } 094 } 095 096 private boolean messageTableAlreadyExists(TransactionContext transactionContext) { 097 boolean alreadyExists = false; 098 ResultSet rs = null; 099 try { 100 rs = transactionContext.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), new String[] { "TABLE" }); 101 alreadyExists = rs.next(); 102 } catch (Throwable ignore) { 103 } finally { 104 close(rs); 105 } 106 return alreadyExists; 107 } 108 109 private void executeStatement(TransactionContext transactionContext, String createStatement, boolean ignoreStatementExecutionFailure) throws IOException { 110 Statement statement = null; 111 try { 112 LOG.debug("Executing SQL: " + createStatement); 113 statement = transactionContext.getConnection().createStatement(); 114 statement.execute(createStatement); 115 116 commitIfAutoCommitIsDisabled(transactionContext); 117 } catch (SQLException e) { 118 if (ignoreStatementExecutionFailure) { 119 LOG.debug("Could not create JDBC tables; The message table already existed. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode())); 120 } else { 121 LOG.warn("Could not create JDBC tables; they could already exist. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode())); 122 JDBCPersistenceAdapter.log("Failure details: ", e); 123 } 124 } finally { 125 closeStatement(statement); 126 } 127 } 128 129 private void closeStatement(Statement statement) { 130 try { 131 if (statement != null) { 132 statement.close(); 133 } 134 } catch (SQLException ignored) {} 135 } 136 137 private void commitIfAutoCommitIsDisabled(TransactionContext c) throws SQLException, IOException { 138 if (!c.getConnection().getAutoCommit()) { 139 c.getConnection().commit(); 140 } 141 } 142 143 @Override 144 public void doDropTables(TransactionContext c) throws SQLException, IOException { 145 Statement s = null; 146 try { 147 s = c.getConnection().createStatement(); 148 String[] dropStatments = this.statements.getDropSchemaStatements(); 149 for (int i = 0; i < dropStatments.length; i++) { 150 // This will fail usually since the tables will be 151 // created already. 152 try { 153 LOG.debug("Executing SQL: " + dropStatments[i]); 154 s.execute(dropStatments[i]); 155 } catch (SQLException e) { 156 LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i] 157 + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: " 158 + e.getErrorCode()); 159 JDBCPersistenceAdapter.log("Failure details: ", e); 160 } 161 } 162 commitIfAutoCommitIsDisabled(c); 163 } finally { 164 try { 165 s.close(); 166 } catch (Throwable e) { 167 } 168 } 169 } 170 171 @Override 172 public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException { 173 PreparedStatement s = null; 174 ResultSet rs = null; 175 try { 176 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 177 rs = s.executeQuery(); 178 long seq1 = 0; 179 if (rs.next()) { 180 seq1 = rs.getLong(1); 181 } 182 rs.close(); 183 s.close(); 184 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement()); 185 rs = s.executeQuery(); 186 long seq2 = 0; 187 if (rs.next()) { 188 seq2 = rs.getLong(1); 189 } 190 long seq = Math.max(seq1, seq2); 191 return seq; 192 } finally { 193 close(rs); 194 close(s); 195 } 196 } 197 198 @Override 199 public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException { 200 PreparedStatement s = null; 201 ResultSet rs = null; 202 try { 203 s = c.getConnection().prepareStatement( 204 this.statements.getFindMessageByIdStatement()); 205 s.setLong(1, storeSequenceId); 206 rs = s.executeQuery(); 207 if (!rs.next()) { 208 return null; 209 } 210 return getBinaryData(rs, 1); 211 } finally { 212 close(rs); 213 close(s); 214 } 215 } 216 217 218 /** 219 * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome 220 */ 221 @Override 222 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, 223 long expiration, byte priority, XATransactionId xid) throws SQLException, IOException { 224 PreparedStatement s = c.getAddMessageStatement(); 225 try { 226 if (s == null) { 227 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 228 if (this.batchStatements) { 229 c.setAddMessageStatement(s); 230 } 231 } 232 s.setLong(1, sequence); 233 s.setString(2, messageID.getProducerId().toString()); 234 s.setLong(3, messageID.getProducerSequenceId()); 235 s.setString(4, destination.getQualifiedName()); 236 s.setLong(5, expiration); 237 s.setLong(6, priority); 238 setBinaryData(s, 7, data); 239 if (xid != null) { 240 byte[] xidVal = xid.getEncodedXidBytes(); 241 xidVal[0] = '+'; 242 String xidString = printBase64Binary(xidVal); 243 s.setString(8, xidString); 244 } else { 245 s.setString(8, null); 246 } 247 if (this.batchStatements) { 248 s.addBatch(); 249 } else if (s.executeUpdate() != 1) { 250 throw new SQLException("Failed add a message"); 251 } 252 } finally { 253 if (!this.batchStatements) { 254 if (s != null) { 255 s.close(); 256 } 257 } 258 } 259 } 260 261 @Override 262 public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException { 263 PreparedStatement s = null; 264 try { 265 s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement()); 266 setBinaryData(s, 1, data); 267 s.setString(2, id.getProducerId().toString()); 268 s.setLong(3, id.getProducerSequenceId()); 269 s.setString(4, destination.getQualifiedName()); 270 if (s.executeUpdate() != 1) { 271 throw new IOException("Could not update message: " + id + " in " + destination); 272 } 273 } finally { 274 close(s); 275 } 276 } 277 278 279 @Override 280 public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, 281 long expirationTime, String messageRef) throws SQLException, IOException { 282 PreparedStatement s = c.getAddMessageStatement(); 283 try { 284 if (s == null) { 285 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 286 if (this.batchStatements) { 287 c.setAddMessageStatement(s); 288 } 289 } 290 s.setLong(1, messageID.getBrokerSequenceId()); 291 s.setString(2, messageID.getProducerId().toString()); 292 s.setLong(3, messageID.getProducerSequenceId()); 293 s.setString(4, destination.getQualifiedName()); 294 s.setLong(5, expirationTime); 295 s.setString(6, messageRef); 296 if (this.batchStatements) { 297 s.addBatch(); 298 } else if (s.executeUpdate() != 1) { 299 throw new SQLException("Failed add a message"); 300 } 301 } finally { 302 if (!this.batchStatements) { 303 s.close(); 304 } 305 } 306 } 307 308 @Override 309 public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException { 310 PreparedStatement s = null; 311 ResultSet rs = null; 312 try { 313 s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); 314 s.setString(1, messageID.getProducerId().toString()); 315 s.setLong(2, messageID.getProducerSequenceId()); 316 s.setString(3, destination.getQualifiedName()); 317 rs = s.executeQuery(); 318 if (!rs.next()) { 319 return new long[]{0,0}; 320 } 321 return new long[]{rs.getLong(1), rs.getLong(2)}; 322 } finally { 323 close(rs); 324 close(s); 325 } 326 } 327 328 @Override 329 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { 330 PreparedStatement s = null; 331 ResultSet rs = null; 332 try { 333 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 334 s.setString(1, id.getProducerId().toString()); 335 s.setLong(2, id.getProducerSequenceId()); 336 rs = s.executeQuery(); 337 if (!rs.next()) { 338 return null; 339 } 340 return getBinaryData(rs, 1); 341 } finally { 342 close(rs); 343 close(s); 344 } 345 } 346 347 @Override 348 public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException { 349 PreparedStatement s = null; 350 ResultSet rs = null; 351 try { 352 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 353 s.setLong(1, seq); 354 rs = s.executeQuery(); 355 if (!rs.next()) { 356 return null; 357 } 358 return rs.getString(1); 359 } finally { 360 close(rs); 361 close(s); 362 } 363 } 364 365 /** 366 * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome 367 */ 368 @Override 369 public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException { 370 PreparedStatement s = c.getRemovedMessageStatement(); 371 try { 372 if (s == null) { 373 s = c.getConnection().prepareStatement(xid == null ? 374 this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement()); 375 if (this.batchStatements) { 376 c.setRemovedMessageStatement(s); 377 } 378 } 379 if (xid == null) { 380 s.setLong(1, seq); 381 } else { 382 byte[] xidVal = xid.getEncodedXidBytes(); 383 xidVal[0] = '-'; 384 String xidString = printBase64Binary(xidVal); 385 s.setString(1, xidString); 386 s.setLong(2, seq); 387 } 388 if (this.batchStatements) { 389 s.addBatch(); 390 } else if (s.executeUpdate() != 1) { 391 throw new SQLException("Failed to remove message seq: " + seq); 392 } 393 } finally { 394 if (!this.batchStatements && s != null) { 395 s.close(); 396 } 397 } 398 } 399 400 @Override 401 public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) 402 throws Exception { 403 PreparedStatement s = null; 404 ResultSet rs = null; 405 try { 406 s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement()); 407 s.setString(1, destination.getQualifiedName()); 408 rs = s.executeQuery(); 409 if (this.statements.isUseExternalMessageReferences()) { 410 while (rs.next()) { 411 if (!listener.recoverMessageReference(rs.getString(2))) { 412 break; 413 } 414 } 415 } else { 416 while (rs.next()) { 417 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 418 break; 419 } 420 } 421 } 422 } finally { 423 close(rs); 424 close(s); 425 } 426 } 427 428 @Override 429 public void doMessageIdScan(TransactionContext c, int limit, 430 JDBCMessageIdScanListener listener) throws SQLException, IOException { 431 PreparedStatement s = null; 432 ResultSet rs = null; 433 try { 434 s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); 435 s.setMaxRows(limit); 436 rs = s.executeQuery(); 437 // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid 438 LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>(); 439 while (rs.next()) { 440 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3))); 441 } 442 if (LOG.isDebugEnabled()) { 443 LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids"); 444 } 445 for (MessageId id : reverseOrderIds) { 446 listener.messageId(id); 447 } 448 } finally { 449 close(rs); 450 close(s); 451 } 452 } 453 454 @Override 455 public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, 456 String subscriptionName, long seq, long priority) throws SQLException, IOException { 457 PreparedStatement s = c.getUpdateLastAckStatement(); 458 try { 459 if (s == null) { 460 s = c.getConnection().prepareStatement(xid == null ? 461 this.statements.getUpdateDurableLastAckWithPriorityStatement() : 462 this.statements.getUpdateDurableLastAckWithPriorityInTxStatement()); 463 if (this.batchStatements) { 464 c.setUpdateLastAckStatement(s); 465 } 466 } 467 if (xid != null) { 468 byte[] xidVal = encodeXid(xid, seq, priority); 469 String xidString = printBase64Binary(xidVal); 470 s.setString(1, xidString); 471 } else { 472 s.setLong(1, seq); 473 } 474 s.setString(2, destination.getQualifiedName()); 475 s.setString(3, clientId); 476 s.setString(4, subscriptionName); 477 s.setLong(5, priority); 478 if (this.batchStatements) { 479 s.addBatch(); 480 } else if (s.executeUpdate() != 1) { 481 throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName); 482 } 483 } finally { 484 if (!this.batchStatements) { 485 close(s); 486 } 487 } 488 } 489 490 491 @Override 492 public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, 493 String subscriptionName, long seq, long priority) throws SQLException, IOException { 494 PreparedStatement s = c.getUpdateLastAckStatement(); 495 try { 496 if (s == null) { 497 s = c.getConnection().prepareStatement(xid == null ? 498 this.statements.getUpdateDurableLastAckStatement() : 499 this.statements.getUpdateDurableLastAckInTxStatement()); 500 if (this.batchStatements) { 501 c.setUpdateLastAckStatement(s); 502 } 503 } 504 if (xid != null) { 505 byte[] xidVal = encodeXid(xid, seq, priority); 506 String xidString = printBase64Binary(xidVal); 507 s.setString(1, xidString); 508 } else { 509 s.setLong(1, seq); 510 } 511 s.setString(2, destination.getQualifiedName()); 512 s.setString(3, clientId); 513 s.setString(4, subscriptionName); 514 515 if (this.batchStatements) { 516 s.addBatch(); 517 } else if (s.executeUpdate() != 1) { 518 throw new IOException("Could not update last ack seq : " 519 + seq + ", for sub: " + subscriptionName); 520 } 521 } finally { 522 if (!this.batchStatements) { 523 close(s); 524 } 525 } 526 } 527 528 private byte[] encodeXid(XATransactionId xid, long seq, long priority) { 529 byte[] xidVal = xid.getEncodedXidBytes(); 530 // encode the update 531 DataByteArrayOutputStream outputStream = xid.internalOutputStream(); 532 outputStream.position(1); 533 outputStream.writeLong(seq); 534 outputStream.writeByte(Long.valueOf(priority).byteValue()); 535 return xidVal; 536 } 537 538 @Override 539 public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException { 540 PreparedStatement s = null; 541 try { 542 s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement()); 543 s.setString(1, destination.getQualifiedName()); 544 s.setString(2, clientId); 545 s.setString(3, subName); 546 s.setLong(4, priority); 547 if (s.executeUpdate() != 1) { 548 throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName); 549 } 550 } finally { 551 close(s); 552 } 553 } 554 555 @Override 556 public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 557 String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception { 558 // dumpTables(c, 559 // destination.getQualifiedName(),clientId,subscriptionName); 560 PreparedStatement s = null; 561 ResultSet rs = null; 562 try { 563 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement()); 564 s.setString(1, destination.getQualifiedName()); 565 s.setString(2, clientId); 566 s.setString(3, subscriptionName); 567 rs = s.executeQuery(); 568 if (this.statements.isUseExternalMessageReferences()) { 569 while (rs.next()) { 570 if (!listener.recoverMessageReference(rs.getString(2))) { 571 break; 572 } 573 } 574 } else { 575 while (rs.next()) { 576 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 577 break; 578 } 579 } 580 } 581 } finally { 582 close(rs); 583 close(s); 584 } 585 } 586 587 @Override 588 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, 589 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 590 591 PreparedStatement s = null; 592 ResultSet rs = null; 593 try { 594 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); 595 s.setMaxRows(Math.min(maxReturned * 2, maxRows)); 596 s.setString(1, destination.getQualifiedName()); 597 s.setString(2, clientId); 598 s.setString(3, subscriptionName); 599 s.setLong(4, seq); 600 rs = s.executeQuery(); 601 int count = 0; 602 if (this.statements.isUseExternalMessageReferences()) { 603 while (rs.next() && count < maxReturned) { 604 if (listener.recoverMessageReference(rs.getString(1))) { 605 count++; 606 } 607 } 608 } else { 609 while (rs.next() && count < maxReturned) { 610 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 611 count++; 612 } 613 } 614 } 615 } finally { 616 close(rs); 617 close(s); 618 } 619 } 620 621 @Override 622 public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, 623 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 624 625 PreparedStatement s = null; 626 ResultSet rs = null; 627 try { 628 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement()); 629 s.setMaxRows(Math.min(maxReturned * 2, maxRows)); 630 s.setString(1, destination.getQualifiedName()); 631 s.setString(2, clientId); 632 s.setString(3, subscriptionName); 633 s.setLong(4, seq); 634 s.setLong(5, priority); 635 rs = s.executeQuery(); 636 int count = 0; 637 if (this.statements.isUseExternalMessageReferences()) { 638 while (rs.next() && count < maxReturned) { 639 if (listener.recoverMessageReference(rs.getString(1))) { 640 count++; 641 } 642 } 643 } else { 644 while (rs.next() && count < maxReturned) { 645 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 646 count++; 647 } 648 } 649 } 650 } finally { 651 close(rs); 652 close(s); 653 } 654 } 655 656 @Override 657 public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, 658 String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException { 659 PreparedStatement s = null; 660 ResultSet rs = null; 661 int result = 0; 662 try { 663 if (isPrioritizedMessages) { 664 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority()); 665 } else { 666 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement()); 667 } 668 s.setString(1, destination.getQualifiedName()); 669 s.setString(2, clientId); 670 s.setString(3, subscriptionName); 671 rs = s.executeQuery(); 672 if (rs.next()) { 673 result = rs.getInt(1); 674 } 675 } finally { 676 close(rs); 677 close(s); 678 } 679 return result; 680 } 681 682 /** 683 * @param c 684 * @param info 685 * @param retroactive 686 * @throws SQLException 687 * @throws IOException 688 */ 689 @Override 690 public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages) 691 throws SQLException, IOException { 692 // dumpTables(c, destination.getQualifiedName(), clientId, 693 // subscriptionName); 694 PreparedStatement s = null; 695 try { 696 long lastMessageId = -1; 697 if (!retroactive) { 698 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 699 ResultSet rs = null; 700 try { 701 rs = s.executeQuery(); 702 if (rs.next()) { 703 lastMessageId = rs.getLong(1); 704 } 705 } finally { 706 close(rs); 707 close(s); 708 } 709 } 710 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 711 int maxPriority = 1; 712 if (isPrioritizedMessages) { 713 maxPriority = 10; 714 } 715 716 for (int priority = 0; priority < maxPriority; priority++) { 717 s.setString(1, info.getDestination().getQualifiedName()); 718 s.setString(2, info.getClientId()); 719 s.setString(3, info.getSubscriptionName()); 720 s.setString(4, info.getSelector()); 721 s.setLong(5, lastMessageId); 722 s.setString(6, info.getSubscribedDestination().getQualifiedName()); 723 s.setLong(7, priority); 724 725 if (s.executeUpdate() != 1) { 726 throw new IOException("Could not create durable subscription for: " + info.getClientId()); 727 } 728 } 729 730 } finally { 731 close(s); 732 } 733 } 734 735 @Override 736 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, 737 String clientId, String subscriptionName) throws SQLException, IOException { 738 PreparedStatement s = null; 739 ResultSet rs = null; 740 try { 741 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement()); 742 s.setString(1, destination.getQualifiedName()); 743 s.setString(2, clientId); 744 s.setString(3, subscriptionName); 745 rs = s.executeQuery(); 746 if (!rs.next()) { 747 return null; 748 } 749 SubscriptionInfo subscription = new SubscriptionInfo(); 750 subscription.setDestination(destination); 751 subscription.setClientId(clientId); 752 subscription.setSubscriptionName(subscriptionName); 753 subscription.setSelector(rs.getString(1)); 754 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), 755 ActiveMQDestination.QUEUE_TYPE)); 756 return subscription; 757 } finally { 758 close(rs); 759 close(s); 760 } 761 } 762 763 @Override 764 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) 765 throws SQLException, IOException { 766 PreparedStatement s = null; 767 ResultSet rs = null; 768 try { 769 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement()); 770 s.setString(1, destination.getQualifiedName()); 771 rs = s.executeQuery(); 772 ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>(); 773 while (rs.next()) { 774 SubscriptionInfo subscription = new SubscriptionInfo(); 775 subscription.setDestination(destination); 776 subscription.setSelector(rs.getString(1)); 777 subscription.setSubscriptionName(rs.getString(2)); 778 subscription.setClientId(rs.getString(3)); 779 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4), 780 ActiveMQDestination.QUEUE_TYPE)); 781 rc.add(subscription); 782 } 783 return rc.toArray(new SubscriptionInfo[rc.size()]); 784 } finally { 785 close(rs); 786 close(s); 787 } 788 } 789 790 @Override 791 public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, 792 IOException { 793 PreparedStatement s = null; 794 try { 795 s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement()); 796 s.setString(1, destinationName.getQualifiedName()); 797 s.executeUpdate(); 798 s.close(); 799 s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement()); 800 s.setString(1, destinationName.getQualifiedName()); 801 s.executeUpdate(); 802 } finally { 803 close(s); 804 } 805 } 806 807 @Override 808 public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 809 String subscriptionName) throws SQLException, IOException { 810 PreparedStatement s = null; 811 try { 812 s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement()); 813 s.setString(1, destination.getQualifiedName()); 814 s.setString(2, clientId); 815 s.setString(3, subscriptionName); 816 s.executeUpdate(); 817 } finally { 818 close(s); 819 } 820 } 821 822 char priorityIterator = 0; // unsigned 823 @Override 824 public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException { 825 PreparedStatement s = null; 826 try { 827 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority()); 828 s = c.getExclusiveConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority()); 829 int priority = priorityIterator++%10; 830 s.setInt(1, priority); 831 s.setInt(2, priority); 832 int i = s.executeUpdate(); 833 LOG.debug("Deleted " + i + " old message(s) at priority: " + priority); 834 } finally { 835 close(s); 836 } 837 } 838 839 @Override 840 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, 841 String clientId, String subscriberName) throws SQLException, IOException { 842 PreparedStatement s = null; 843 ResultSet rs = null; 844 long result = -1; 845 try { 846 s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); 847 s.setString(1, destination.getQualifiedName()); 848 s.setString(2, clientId); 849 s.setString(3, subscriberName); 850 rs = s.executeQuery(); 851 if (rs.next()) { 852 result = rs.getLong(1); 853 if (result == 0 && rs.wasNull()) { 854 result = -1; 855 } 856 } 857 } finally { 858 close(rs); 859 close(s); 860 } 861 return result; 862 } 863 864 protected static void close(PreparedStatement s) { 865 try { 866 s.close(); 867 } catch (Throwable e) { 868 } 869 } 870 871 protected static void close(ResultSet rs) { 872 try { 873 rs.close(); 874 } catch (Throwable e) { 875 } 876 } 877 878 @Override 879 public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException { 880 HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 881 PreparedStatement s = null; 882 ResultSet rs = null; 883 try { 884 s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement()); 885 rs = s.executeQuery(); 886 while (rs.next()) { 887 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE)); 888 } 889 } finally { 890 close(rs); 891 close(s); 892 } 893 return rc; 894 } 895 896 /** 897 * @return true if batchStatements 898 */ 899 public boolean isBatchStatements() { 900 return batchStatements; 901 } 902 903 /** 904 * Set the number of statements to process as a single batch DB update 905 * @param batchStatements 906 */ 907 public void setBatchStatements(boolean batchStatements) { 908 this.batchStatements = batchStatements; 909 // The next lines are deprecated and should be removed in a future release 910 // and is here in case someone created their own 911 // this.batchStatments = batchStatements; 912 } 913 914 // Note - remove batchStatment in future distributions. Here for backward compatibility 915 /** 916 * @return true if batchStements 917 */ 918 public boolean isBatchStatments() { 919 return this.batchStatements; 920 } 921 922 /** 923 * This value batchStatments is deprecated and will be removed in a future release. Use batchStatements instead (Note the 'e' in Statement)" 924 * @deprecated 925 * @param batchStatments 926 */ 927 public void setBatchStatments(boolean batchStatments) { 928 LOG.warn("batchStatments is deprecated and will be removed in a future release. Use batchStatements instead (Note the 'e' in Statement)"); 929 this.batchStatements = batchStatments; 930 this.batchStatments = batchStatments; 931 } 932 933 @Override 934 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 935 this.statements.setUseExternalMessageReferences(useExternalMessageReferences); 936 } 937 938 /** 939 * @return the statements 940 */ 941 public Statements getStatements() { 942 return this.statements; 943 } 944 945 @Override 946 public void setStatements(Statements statements) { 947 this.statements = statements; 948 } 949 950 @Override 951 public int getMaxRows() { 952 return maxRows; 953 } 954 955 /** 956 * the max value for statement maxRows, used to limit jdbc queries 957 */ 958 @Override 959 public void setMaxRows(int maxRows) { 960 this.maxRows = maxRows; 961 } 962 963 @Override 964 public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException { 965 PreparedStatement s = null; 966 try { 967 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 968 s.setString(1, destination.getQualifiedName()); 969 s.setString(2, destination.getQualifiedName()); 970 s.setString(3, destination.getQualifiedName()); 971 s.setString(4, null); 972 s.setLong(5, 0); 973 s.setString(6, destination.getQualifiedName()); 974 s.setLong(7, 11); // entry out of priority range 975 976 if (s.executeUpdate() != 1) { 977 throw new IOException("Could not create ack record for destination: " + destination); 978 } 979 } finally { 980 close(s); 981 } 982 } 983 984 @Override 985 public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException { 986 PreparedStatement s = null; 987 ResultSet rs = null; 988 try { 989 s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement()); 990 rs = s.executeQuery(); 991 while (rs.next()) { 992 long id = rs.getLong(1); 993 String encodedString = rs.getString(2); 994 byte[] encodedXid = parseBase64Binary(encodedString); 995 if (encodedXid[0] == '+') { 996 jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3)); 997 } else { 998 jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3)); 999 } 1000 } 1001 1002 close(rs); 1003 close(s); 1004 1005 s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement()); 1006 rs = s.executeQuery(); 1007 while (rs.next()) { 1008 String encodedString = rs.getString(1); 1009 byte[] encodedXid = parseBase64Binary(encodedString); 1010 String destination = rs.getString(2); 1011 String subName = rs.getString(3); 1012 String subId = rs.getString(4); 1013 jdbcMemoryTransactionStore.recoverLastAck(encodedXid, 1014 ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE), 1015 subName, subId); 1016 } 1017 } finally { 1018 close(rs); 1019 close(s); 1020 } 1021 } 1022 1023 @Override 1024 public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException { 1025 PreparedStatement s = null; 1026 try { 1027 s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement()); 1028 s.setLong(1, sequence); 1029 s.setLong(2, preparedSequence); 1030 if (s.executeUpdate() != 1) { 1031 throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence); 1032 } 1033 } finally { 1034 close(s); 1035 } 1036 } 1037 1038 1039 @Override 1040 public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, 1041 IOException { 1042 PreparedStatement s = null; 1043 ResultSet rs = null; 1044 int result = 0; 1045 try { 1046 s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement()); 1047 s.setString(1, destination.getQualifiedName()); 1048 rs = s.executeQuery(); 1049 if (rs.next()) { 1050 result = rs.getInt(1); 1051 } 1052 } finally { 1053 close(rs); 1054 close(s); 1055 } 1056 return result; 1057 } 1058 1059 @Override 1060 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries, 1061 long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception { 1062 PreparedStatement s = null; 1063 ResultSet rs = null; 1064 try { 1065 if (isPrioritizedMessages) { 1066 s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesByPriorityStatement())); 1067 } else { 1068 s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesStatement())); 1069 } 1070 s.setMaxRows(Math.min(maxReturned, maxRows)); 1071 s.setString(1, destination.getQualifiedName()); 1072 s.setLong(2, maxSeq); 1073 int paramId = 3; 1074 if (isPrioritizedMessages) { 1075 for (int i=9;i>=0;i--) { 1076 s.setLong(paramId++, lastRecoveredEntries[i]); 1077 } 1078 } else { 1079 s.setLong(paramId, lastRecoveredEntries[0]); 1080 } 1081 rs = s.executeQuery(); 1082 int count = 0; 1083 if (this.statements.isUseExternalMessageReferences()) { 1084 while (rs.next() && count < maxReturned) { 1085 if (listener.recoverMessageReference(rs.getString(1))) { 1086 count++; 1087 } else { 1088 LOG.debug("Stopped recover next messages"); 1089 break; 1090 } 1091 } 1092 } else { 1093 while (rs.next() && count < maxReturned) { 1094 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 1095 count++; 1096 } else { 1097 LOG.debug("Stopped recover next messages"); 1098 break; 1099 } 1100 } 1101 } 1102 } catch (Exception e) { 1103 LOG.warn("Exception recovering next messages", e); 1104 } finally { 1105 close(rs); 1106 close(s); 1107 } 1108 } 1109 1110 @Override 1111 public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) 1112 throws SQLException, IOException { 1113 PreparedStatement s = null; 1114 ResultSet rs = null; 1115 try { 1116 s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement()); 1117 s.setString(1, id.toString()); 1118 rs = s.executeQuery(); 1119 long seq = -1; 1120 if (rs.next()) { 1121 seq = rs.getLong(1); 1122 } 1123 return seq; 1124 } finally { 1125 close(rs); 1126 close(s); 1127 } 1128 } 1129 1130 public static void dumpTables(Connection c, String destinationName, String clientId, String 1131 subscriptionName) throws SQLException { 1132 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 1133 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 1134 PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 1135 + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 1136 + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 1137 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 1138 + " ORDER BY M.ID"); 1139 s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); 1140 printQuery(s,System.out); } 1141 1142 public static void dumpTables(java.sql.Connection c) throws SQLException { 1143 printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out); 1144 1145 //printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_ACKS", System.out); 1146 1147 //printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out); 1148 //printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 1149 } 1150 1151 public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out) 1152 throws SQLException { 1153 printQuery(c.prepareStatement(query), out); 1154 } 1155 1156 public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out) 1157 throws SQLException { 1158 1159 ResultSet set = null; 1160 try { 1161 set = s.executeQuery(); 1162 java.sql.ResultSetMetaData metaData = set.getMetaData(); 1163 for (int i = 1; i <= metaData.getColumnCount(); i++) { 1164 if (i == 1) 1165 out.print("||"); 1166 out.print(metaData.getColumnName(i) + "||"); 1167 } 1168 out.println(); 1169 while (set.next()) { 1170 for (int i = 1; i <= metaData.getColumnCount(); i++) { 1171 if (i == 1) 1172 out.print("|"); 1173 out.print(set.getString(i) + "|"); 1174 } 1175 out.println(); 1176 } 1177 } finally { 1178 try { 1179 set.close(); 1180 } catch (Throwable ignore) { 1181 } 1182 try { 1183 s.close(); 1184 } catch (Throwable ignore) { 1185 } 1186 } 1187 } 1188 1189 @Override 1190 public String limitQuery(String query) { 1191 return query; 1192 } 1193}