001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.Iterator; 023import org.apache.activemq.broker.ConnectionContext; 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.command.Message; 026import org.apache.activemq.command.MessageAck; 027import org.apache.activemq.command.MessageId; 028import org.apache.activemq.command.TransactionId; 029import org.apache.activemq.command.XATransactionId; 030import org.apache.activemq.store.IndexListener; 031import org.apache.activemq.store.MessageStore; 032import org.apache.activemq.store.ProxyMessageStore; 033import org.apache.activemq.store.ProxyTopicMessageStore; 034import org.apache.activemq.store.TopicMessageStore; 035import org.apache.activemq.store.TransactionRecoveryListener; 036import org.apache.activemq.store.memory.MemoryTransactionStore; 037import org.apache.activemq.util.ByteSequence; 038import org.apache.activemq.util.DataByteArrayInputStream; 039 040/** 041 * respect 2pc prepare 042 * uses local transactions to maintain prepared state 043 * xid column provides transaction flag for additions and removals 044 * a commit clears that context and completes the work 045 * a rollback clears the flag and removes the additions 046 * Essentially a prepare is an insert &| update transaction 047 * commit|rollback is an update &| remove 048 */ 049public class JdbcMemoryTransactionStore extends MemoryTransactionStore { 050 051 052 private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination, MessageStore>(); 053 private HashMap<ActiveMQDestination, MessageStore> queueStores = new HashMap<ActiveMQDestination, MessageStore>(); 054 055 public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) { 056 super(jdbcPersistenceAdapter); 057 } 058 059 @Override 060 public void prepare(TransactionId txid) throws IOException { 061 Tx tx = inflightTransactions.remove(txid); 062 if (tx == null) { 063 return; 064 } 065 066 ConnectionContext ctx = new ConnectionContext(); 067 // setting the xid modifies the add/remove to be pending transaction outcome 068 ctx.setXid((XATransactionId) txid); 069 persistenceAdapter.beginTransaction(ctx); 070 try { 071 072 // Do all the message adds. 073 for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) { 074 AddMessageCommand cmd = iter.next(); 075 cmd.run(ctx); 076 } 077 // And removes.. 078 for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext();) { 079 RemoveMessageCommand cmd = iter.next(); 080 cmd.run(ctx); 081 } 082 083 } catch ( IOException e ) { 084 persistenceAdapter.rollbackTransaction(ctx); 085 throw e; 086 } 087 persistenceAdapter.commitTransaction(ctx); 088 089 ctx.setXid(null); 090 // setup for commit outcome 091 ArrayList<AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<AddMessageCommand>(); 092 for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) { 093 final AddMessageCommand addMessageCommand = iter.next(); 094 updateFromPreparedStateCommands.add(new CommitAddOutcome(addMessageCommand)); 095 } 096 tx.messages = updateFromPreparedStateCommands; 097 preparedTransactions.put(txid, tx); 098 099 } 100 101 102 class CommitAddOutcome implements AddMessageCommand { 103 final Message message; 104 JDBCMessageStore jdbcMessageStore; 105 106 public CommitAddOutcome(JDBCMessageStore jdbcMessageStore, Message message) { 107 this.jdbcMessageStore = jdbcMessageStore; 108 this.message = message; 109 } 110 111 public CommitAddOutcome(AddMessageCommand addMessageCommand) { 112 this((JDBCMessageStore)addMessageCommand.getMessageStore(), addMessageCommand.getMessage()); 113 } 114 115 @Override 116 public Message getMessage() { 117 return message; 118 } 119 120 @Override 121 public MessageStore getMessageStore() { 122 return jdbcMessageStore; 123 } 124 125 @Override 126 public void run(final ConnectionContext context) throws IOException { 127 JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter; 128 final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator(); 129 TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context); 130 131 synchronized (jdbcMessageStore.pendingAdditions) { 132 message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId()); 133 134 c.onCompletion(new Runnable() { 135 @Override 136 public void run() { 137 message.getMessageId().setFutureOrSequenceLong(message.getMessageId().getEntryLocator()); 138 } 139 }); 140 141 if (jdbcMessageStore.getIndexListener() != null) { 142 jdbcMessageStore.getIndexListener().onAdd(new IndexListener.MessageContext(context, message, null)); 143 } 144 } 145 146 jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence); 147 jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority()); 148 } 149 150 @Override 151 public void setMessageStore(MessageStore messageStore) { 152 jdbcMessageStore = (JDBCMessageStore) messageStore; 153 } 154 } 155 156 @Override 157 public void rollback(TransactionId txid) throws IOException { 158 159 Tx tx = inflightTransactions.remove(txid); 160 if (tx == null) { 161 tx = preparedTransactions.remove(txid); 162 if (tx != null) { 163 // undo prepare work 164 ConnectionContext ctx = new ConnectionContext(); 165 persistenceAdapter.beginTransaction(ctx); 166 try { 167 168 for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext(); ) { 169 final Message message = iter.next().getMessage(); 170 // need to delete the row 171 ((JDBCPersistenceAdapter) persistenceAdapter).commitRemove(ctx, new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, 1)); 172 } 173 174 for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext(); ) { 175 RemoveMessageCommand removeMessageCommand = iter.next(); 176 if (removeMessageCommand instanceof LastAckCommand ) { 177 ((LastAckCommand)removeMessageCommand).rollback(ctx); 178 } else { 179 MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId(); 180 // need to unset the txid flag on the existing row 181 ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, (Long)messageId.getEntryLocator()); 182 } 183 } 184 } catch (IOException e) { 185 persistenceAdapter.rollbackTransaction(ctx); 186 throw e; 187 } 188 persistenceAdapter.commitTransaction(ctx); 189 } 190 } 191 } 192 193 @Override 194 public void recover(TransactionRecoveryListener listener) throws IOException { 195 ((JDBCPersistenceAdapter)persistenceAdapter).recover(this); 196 super.recover(listener); 197 } 198 199 public void recoverAdd(long id, byte[] messageBytes) throws IOException { 200 final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes)); 201 message.getMessageId().setFutureOrSequenceLong(id); 202 message.getMessageId().setEntryLocator(id); 203 Tx tx = getPreparedTx(message.getTransactionId()); 204 tx.add(new CommitAddOutcome(null, message)); 205 } 206 207 public void recoverAck(long id, byte[] xid, byte[] message) throws IOException { 208 Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message)); 209 msg.getMessageId().setFutureOrSequenceLong(id); 210 msg.getMessageId().setEntryLocator(id); 211 Tx tx = getPreparedTx(new XATransactionId(xid)); 212 final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1); 213 tx.add(new RemoveMessageCommand() { 214 @Override 215 public MessageAck getMessageAck() { 216 return ack; 217 } 218 219 @Override 220 public void run(ConnectionContext context) throws IOException { 221 ((JDBCPersistenceAdapter)persistenceAdapter).commitRemove(context, ack); 222 } 223 224 @Override 225 public MessageStore getMessageStore() { 226 return null; 227 } 228 229 }); 230 231 } 232 233 interface LastAckCommand extends RemoveMessageCommand { 234 void rollback(ConnectionContext context) throws IOException; 235 236 String getClientId(); 237 238 String getSubName(); 239 240 long getSequence(); 241 242 byte getPriority(); 243 244 void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore); 245 } 246 247 public void recoverLastAck(byte[] encodedXid, final ActiveMQDestination destination, final String subName, final String clientId) throws IOException { 248 Tx tx = getPreparedTx(new XATransactionId(encodedXid)); 249 DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid); 250 inputStream.skipBytes(1); // +|- 251 final long lastAck = inputStream.readLong(); 252 final byte priority = inputStream.readByte(); 253 final MessageAck ack = new MessageAck(); 254 ack.setDestination(destination); 255 tx.add(new LastAckCommand() { 256 JDBCTopicMessageStore jdbcTopicMessageStore; 257 258 @Override 259 public MessageAck getMessageAck() { 260 return ack; 261 } 262 263 @Override 264 public MessageStore getMessageStore() { 265 return jdbcTopicMessageStore; 266 } 267 268 @Override 269 public void run(ConnectionContext context) throws IOException { 270 ((JDBCPersistenceAdapter)persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId); 271 jdbcTopicMessageStore.complete(clientId, subName); 272 } 273 274 @Override 275 public void rollback(ConnectionContext context) throws IOException { 276 ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority, jdbcTopicMessageStore.getDestination(), subName, clientId); 277 jdbcTopicMessageStore.complete(clientId, subName); 278 } 279 280 @Override 281 public String getClientId() { 282 return clientId; 283 } 284 285 @Override 286 public String getSubName() { 287 return subName; 288 } 289 290 @Override 291 public long getSequence() { 292 return lastAck; 293 } 294 295 @Override 296 public byte getPriority() { 297 return priority; 298 } 299 300 @Override 301 public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) { 302 this.jdbcTopicMessageStore = jdbcTopicMessageStore; 303 } 304 }); 305 306 } 307 308 @Override 309 protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) { 310 topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate()); 311 } 312 313 @Override 314 protected void onProxyQueueStore(ProxyMessageStore proxyQueueMessageStore) { 315 queueStores.put(proxyQueueMessageStore.getDestination(), proxyQueueMessageStore.getDelegate()); 316 } 317 318 @Override 319 protected void onRecovered(Tx tx) { 320 for (RemoveMessageCommand removeMessageCommand: tx.acks) { 321 if (removeMessageCommand instanceof LastAckCommand) { 322 LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand; 323 JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination()); 324 jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority()); 325 lastAckCommand.setMessageStore(jdbcTopicMessageStore); 326 } else { 327 // when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction), 328 // but the sql is non portable to match BLOB with LIKE etc 329 // so we make up for it when we recover the ack 330 ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment(); 331 } 332 } 333 for (AddMessageCommand addMessageCommand : tx.messages) { 334 ActiveMQDestination destination = addMessageCommand.getMessage().getDestination(); 335 addMessageCommand.setMessageStore(destination.isQueue() ? queueStores.get(destination) : topicStores.get(destination)); 336 } 337 } 338 339 @Override 340 public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName, 341 final MessageId messageId, final MessageAck ack) throws IOException { 342 343 if (ack.isInTransaction()) { 344 Tx tx = getTx(ack.getTransactionId()); 345 tx.add(new LastAckCommand() { 346 public MessageAck getMessageAck() { 347 return ack; 348 } 349 350 public void run(ConnectionContext ctx) throws IOException { 351 topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 352 } 353 354 @Override 355 public MessageStore getMessageStore() { 356 return topicMessageStore; 357 } 358 359 @Override 360 public void rollback(ConnectionContext context) throws IOException { 361 JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore; 362 ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, 363 jdbcTopicMessageStore, 364 ack, 365 subscriptionName, clientId); 366 jdbcTopicMessageStore.complete(clientId, subscriptionName); 367 } 368 369 370 @Override 371 public String getClientId() { 372 return clientId; 373 } 374 375 @Override 376 public String getSubName() { 377 return subscriptionName; 378 } 379 380 @Override 381 public long getSequence() { 382 throw new IllegalStateException("Sequence id must be inferred from ack"); 383 } 384 385 @Override 386 public byte getPriority() { 387 throw new IllegalStateException("Priority must be inferred from ack or row"); 388 } 389 390 @Override 391 public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) { 392 throw new IllegalStateException("message store already known!"); 393 } 394 }); 395 } else { 396 topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack); 397 } 398 } 399 400}