001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Arrays; 022import java.util.LinkedList; 023 024import org.apache.activemq.ActiveMQMessageAudit; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.Message; 028import org.apache.activemq.command.MessageAck; 029import org.apache.activemq.command.MessageId; 030import org.apache.activemq.command.XATransactionId; 031import org.apache.activemq.store.AbstractMessageStore; 032import org.apache.activemq.store.IndexListener; 033import org.apache.activemq.store.MessageRecoveryListener; 034import org.apache.activemq.util.ByteSequence; 035import org.apache.activemq.util.ByteSequenceData; 036import org.apache.activemq.util.IOExceptionSupport; 037import org.apache.activemq.wireformat.WireFormat; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * 043 */ 044public class JDBCMessageStore extends AbstractMessageStore { 045 046 class Duration { 047 static final int LIMIT = 100; 048 final long start = System.currentTimeMillis(); 049 final String name; 050 051 Duration(String name) { 052 this.name = name; 053 } 054 void end() { 055 end(null); 056 } 057 void end(Object o) { 058 long duration = System.currentTimeMillis() - start; 059 060 if (duration > LIMIT) { 061 System.err.println(name + " took a long time: " + duration + "ms " + o); 062 } 063 } 064 } 065 private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class); 066 protected final WireFormat wireFormat; 067 protected final JDBCAdapter adapter; 068 protected final JDBCPersistenceAdapter persistenceAdapter; 069 protected ActiveMQMessageAudit audit; 070 protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>(); 071 final long[] perPriorityLastRecovered = new long[10]; 072 073 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { 074 super(destination); 075 this.persistenceAdapter = persistenceAdapter; 076 this.adapter = adapter; 077 this.wireFormat = wireFormat; 078 this.audit = audit; 079 080 if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) { 081 recordDestinationCreation(destination); 082 } 083 resetBatching(); 084 } 085 086 private void recordDestinationCreation(ActiveMQDestination destination) throws IOException { 087 TransactionContext c = persistenceAdapter.getTransactionContext(); 088 try { 089 c = persistenceAdapter.getTransactionContext(); 090 if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) { 091 adapter.doRecordDestination(c, destination); 092 } 093 } catch (SQLException e) { 094 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 095 throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e); 096 } finally { 097 c.close(); 098 } 099 } 100 101 @Override 102 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 103 MessageId messageId = message.getMessageId(); 104 if (audit != null && audit.isDuplicate(message)) { 105 if (LOG.isDebugEnabled()) { 106 LOG.debug(destination.getPhysicalName() 107 + " ignoring duplicated (add) message, already stored: " 108 + messageId); 109 } 110 return; 111 } 112 113 // if xaXid present - this is a prepare - so we don't yet have an outcome 114 final XATransactionId xaXid = context != null ? context.getXid() : null; 115 116 // Serialize the Message.. 117 byte data[]; 118 try { 119 ByteSequence packet = wireFormat.marshal(message); 120 data = ByteSequenceData.toByteArray(packet); 121 } catch (IOException e) { 122 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 123 } 124 125 // Get a connection and insert the message into the DB. 126 TransactionContext c = persistenceAdapter.getTransactionContext(context); 127 long sequenceId; 128 synchronized (pendingAdditions) { 129 sequenceId = persistenceAdapter.getNextSequenceId(); 130 final long sequence = sequenceId; 131 message.getMessageId().setEntryLocator(sequence); 132 133 if (xaXid == null) { 134 pendingAdditions.add(sequence); 135 136 c.onCompletion(new Runnable() { 137 @Override 138 public void run() { 139 // jdbc close or jms commit - while futureOrSequenceLong==null ordered 140 // work will remain pending on the Queue 141 message.getMessageId().setFutureOrSequenceLong(sequence); 142 } 143 }); 144 145 if (indexListener != null) { 146 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 147 @Override 148 public void run() { 149 // cursor add complete 150 synchronized (pendingAdditions) { pendingAdditions.remove(sequence); } 151 } 152 })); 153 } else { 154 pendingAdditions.remove(sequence); 155 } 156 } 157 } 158 try { 159 adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(), 160 this.isPrioritizedMessages() ? message.getPriority() : 0, xaXid); 161 } catch (SQLException e) { 162 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 163 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 164 } finally { 165 c.close(); 166 } 167 if (xaXid == null) { 168 onAdd(message, sequenceId, message.getPriority()); 169 } 170 } 171 172 // jdbc commit order is random with concurrent connections - limit scan to lowest pending 173 private long minPendingSequeunceId() { 174 synchronized (pendingAdditions) { 175 if (!pendingAdditions.isEmpty()) { 176 return pendingAdditions.get(0); 177 } else { 178 // nothing pending, ensure scan is limited to current state 179 return persistenceAdapter.sequenceGenerator.getLastSequenceId() + 1; 180 } 181 } 182 } 183 184 @Override 185 public void updateMessage(Message message) throws IOException { 186 TransactionContext c = persistenceAdapter.getTransactionContext(); 187 try { 188 adapter.doUpdateMessage(c, destination, message.getMessageId(), ByteSequenceData.toByteArray(wireFormat.marshal(message))); 189 } catch (SQLException e) { 190 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 191 throw IOExceptionSupport.create("Failed to update message: " + message.getMessageId() + " in container: " + e, e); 192 } finally { 193 c.close(); 194 } 195 } 196 197 protected void onAdd(Message message, long sequenceId, byte priority) {} 198 199 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 200 // Get a connection and insert the message into the DB. 201 TransactionContext c = persistenceAdapter.getTransactionContext(context); 202 try { 203 adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef); 204 } catch (SQLException e) { 205 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 206 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 207 } finally { 208 c.close(); 209 } 210 } 211 212 @Override 213 public Message getMessage(MessageId messageId) throws IOException { 214 // Get a connection and pull the message out of the DB 215 TransactionContext c = persistenceAdapter.getTransactionContext(); 216 try { 217 byte data[] = adapter.doGetMessage(c, messageId); 218 if (data == null) { 219 return null; 220 } 221 222 Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data)); 223 return answer; 224 } catch (IOException e) { 225 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 226 } catch (SQLException e) { 227 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 228 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 229 } finally { 230 c.close(); 231 } 232 } 233 234 public String getMessageReference(MessageId messageId) throws IOException { 235 long id = messageId.getBrokerSequenceId(); 236 237 // Get a connection and pull the message out of the DB 238 TransactionContext c = persistenceAdapter.getTransactionContext(); 239 try { 240 return adapter.doGetMessageReference(c, id); 241 } catch (IOException e) { 242 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 243 } catch (SQLException e) { 244 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 245 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 246 } finally { 247 c.close(); 248 } 249 } 250 251 @Override 252 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 253 254 long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ? 255 (Long) ack.getLastMessageId().getFutureOrSequenceLong() : 256 persistenceAdapter.getStoreSequenceIdForMessageId(context, ack.getLastMessageId(), destination)[0]; 257 258 // Get a connection and remove the message from the DB 259 TransactionContext c = persistenceAdapter.getTransactionContext(context); 260 try { 261 adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null); 262 } catch (SQLException e) { 263 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 264 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e); 265 } finally { 266 c.close(); 267 } 268 } 269 270 @Override 271 public void recover(final MessageRecoveryListener listener) throws Exception { 272 273 // Get all the Message ids out of the database. 274 TransactionContext c = persistenceAdapter.getTransactionContext(); 275 try { 276 c = persistenceAdapter.getTransactionContext(); 277 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { 278 @Override 279 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 280 if (listener.hasSpace()) { 281 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); 282 msg.getMessageId().setBrokerSequenceId(sequenceId); 283 return listener.recoverMessage(msg); 284 } else { 285 if (LOG.isTraceEnabled()) { 286 LOG.trace("Message recovery limit reached for MessageRecoveryListener"); 287 } 288 return false; 289 } 290 } 291 292 @Override 293 public boolean recoverMessageReference(String reference) throws Exception { 294 if (listener.hasSpace()) { 295 return listener.recoverMessageReference(new MessageId(reference)); 296 } else { 297 if (LOG.isTraceEnabled()) { 298 LOG.trace("Message recovery limit reached for MessageRecoveryListener"); 299 } 300 return false; 301 } 302 } 303 }); 304 } catch (SQLException e) { 305 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 306 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e); 307 } finally { 308 c.close(); 309 } 310 } 311 312 /** 313 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 314 */ 315 @Override 316 public void removeAllMessages(ConnectionContext context) throws IOException { 317 // Get a connection and remove the message from the DB 318 TransactionContext c = persistenceAdapter.getTransactionContext(context); 319 try { 320 adapter.doRemoveAllMessages(c, destination); 321 } catch (SQLException e) { 322 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 323 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e); 324 } finally { 325 c.close(); 326 } 327 } 328 329 @Override 330 public int getMessageCount() throws IOException { 331 int result = 0; 332 TransactionContext c = persistenceAdapter.getTransactionContext(); 333 try { 334 335 result = adapter.doGetMessageCount(c, destination); 336 337 } catch (SQLException e) { 338 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 339 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e); 340 } finally { 341 c.close(); 342 } 343 return result; 344 } 345 346 /** 347 * @param maxReturned 348 * @param listener 349 * @throws Exception 350 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, 351 * org.apache.activemq.store.MessageRecoveryListener) 352 */ 353 @Override 354 public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { 355 TransactionContext c = persistenceAdapter.getTransactionContext(); 356 try { 357 if (LOG.isTraceEnabled()) { 358 LOG.trace(this + " recoverNext lastRecovered:" + Arrays.toString(perPriorityLastRecovered) + ", minPending:" + minPendingSequeunceId()); 359 } 360 adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(), 361 maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { 362 363 @Override 364 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 365 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 366 msg.getMessageId().setBrokerSequenceId(sequenceId); 367 msg.getMessageId().setFutureOrSequenceLong(sequenceId); 368 listener.recoverMessage(msg); 369 trackLastRecovered(sequenceId, msg.getPriority()); 370 return true; 371 } 372 373 @Override 374 public boolean recoverMessageReference(String reference) throws Exception { 375 if (listener.hasSpace()) { 376 listener.recoverMessageReference(new MessageId(reference)); 377 return true; 378 } 379 return false; 380 } 381 382 }); 383 } catch (SQLException e) { 384 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 385 } finally { 386 c.close(); 387 } 388 389 } 390 391 private void trackLastRecovered(long sequenceId, int priority) { 392 perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0] = sequenceId; 393 } 394 395 /** 396 * @see org.apache.activemq.store.MessageStore#resetBatching() 397 */ 398 @Override 399 public void resetBatching() { 400 if (LOG.isTraceEnabled()) { 401 LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered)); 402 } 403 setLastRecovered(-1); 404 } 405 406 private void setLastRecovered(long val) { 407 for (int i=0;i<perPriorityLastRecovered.length;i++) { 408 perPriorityLastRecovered[i] = val; 409 } 410 } 411 412 413 @Override 414 public void setBatch(MessageId messageId) { 415 if (LOG.isTraceEnabled()) { 416 LOG.trace(this + " setBatch: last recovered: " + Arrays.toString(perPriorityLastRecovered)); 417 } 418 try { 419 long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination); 420 setLastRecovered(storedValues[0]); 421 } catch (IOException ignoredAsAlreadyLogged) { 422 resetBatching(); 423 } 424 if (LOG.isTraceEnabled()) { 425 LOG.trace(this + " setBatch: new last recovered: " + Arrays.toString(perPriorityLastRecovered)); 426 } 427 } 428 429 430 @Override 431 public void setPrioritizedMessages(boolean prioritizedMessages) { 432 super.setPrioritizedMessages(prioritizedMessages); 433 } 434 435 @Override 436 public String toString() { 437 return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size(); 438 } 439 440}