001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Arrays; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.LinkedHashMap; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.locks.ReentrantReadWriteLock; 029 030import org.apache.activemq.ActiveMQMessageAudit; 031import org.apache.activemq.broker.ConnectionContext; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ActiveMQTopic; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageId; 037import org.apache.activemq.command.SubscriptionInfo; 038import org.apache.activemq.store.MessageRecoveryListener; 039import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 040import org.apache.activemq.store.TopicMessageStore; 041import org.apache.activemq.util.ByteSequence; 042import org.apache.activemq.util.IOExceptionSupport; 043import org.apache.activemq.wireformat.WireFormat; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * 049 */ 050public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { 051 052 private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class); 053 private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>(); 054 private Set<String> pendingCompletion = new HashSet<String>(); 055 056 public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE"; 057 private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty( 058 PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10); 059 private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock(); 060 private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() { 061 @Override 062 protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) { 063 return size() > SEQUENCE_ID_CACHE_SIZE; 064 } 065 }; 066 067 068 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException { 069 super(persistenceAdapter, adapter, wireFormat, topic, audit); 070 } 071 072 @Override 073 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { 074 if (ack != null && ack.isUnmatchedAck()) { 075 if (LOG.isTraceEnabled()) { 076 LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks."); 077 } 078 return; 079 } 080 TransactionContext c = persistenceAdapter.getTransactionContext(context); 081 try { 082 long[] res = getCachedStoreSequenceId(c, destination, messageId); 083 if (this.isPrioritizedMessages()) { 084 adapter.doSetLastAckWithPriority(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]); 085 } else { 086 adapter.doSetLastAck(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]); 087 } 088 if (LOG.isTraceEnabled()) { 089 LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId); 090 } 091 } catch (SQLException e) { 092 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 093 throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e); 094 } finally { 095 c.close(); 096 } 097 } 098 099 public long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException { 100 long[] val = null; 101 sequenceIdCacheSizeLock.readLock().lock(); 102 try { 103 val = sequenceIdCache.get(messageId); 104 } finally { 105 sequenceIdCacheSizeLock.readLock().unlock(); 106 } 107 if (val == null) { 108 val = adapter.getStoreSequenceId(transactionContext, destination, messageId); 109 } 110 return val; 111 } 112 113 /** 114 * @throws Exception 115 */ 116 @Override 117 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 118 TransactionContext c = persistenceAdapter.getTransactionContext(); 119 try { 120 adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() { 121 @Override 122 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 123 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 124 msg.getMessageId().setBrokerSequenceId(sequenceId); 125 return listener.recoverMessage(msg); 126 } 127 128 @Override 129 public boolean recoverMessageReference(String reference) throws Exception { 130 return listener.recoverMessageReference(new MessageId(reference)); 131 } 132 133 }); 134 } catch (SQLException e) { 135 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 136 throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e); 137 } finally { 138 c.close(); 139 } 140 } 141 142 private class LastRecovered implements Iterable<LastRecoveredEntry> { 143 LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10]; 144 LastRecovered() { 145 for (int i=0; i<perPriority.length; i++) { 146 perPriority[i] = new LastRecoveredEntry(i); 147 } 148 } 149 150 public void updateStored(long sequence, int priority) { 151 perPriority[priority].stored = sequence; 152 } 153 154 public LastRecoveredEntry defaultPriority() { 155 return perPriority[javax.jms.Message.DEFAULT_PRIORITY]; 156 } 157 158 @Override 159 public String toString() { 160 return Arrays.deepToString(perPriority); 161 } 162 163 @Override 164 public Iterator<LastRecoveredEntry> iterator() { 165 return new PriorityIterator(); 166 } 167 168 class PriorityIterator implements Iterator<LastRecoveredEntry> { 169 int current = 9; 170 @Override 171 public boolean hasNext() { 172 for (int i=current; i>=0; i--) { 173 if (perPriority[i].hasMessages()) { 174 current = i; 175 return true; 176 } 177 } 178 return false; 179 } 180 181 @Override 182 public LastRecoveredEntry next() { 183 return perPriority[current]; 184 } 185 186 @Override 187 public void remove() { 188 throw new RuntimeException("not implemented"); 189 } 190 } 191 } 192 193 private class LastRecoveredEntry { 194 final int priority; 195 long recovered = 0; 196 long stored = Integer.MAX_VALUE; 197 198 public LastRecoveredEntry(int priority) { 199 this.priority = priority; 200 } 201 202 @Override 203 public String toString() { 204 return priority + "-" + stored + ":" + recovered; 205 } 206 207 public void exhausted() { 208 stored = recovered; 209 } 210 211 public boolean hasMessages() { 212 return stored > recovered; 213 } 214 } 215 216 class LastRecoveredAwareListener implements JDBCMessageRecoveryListener { 217 final MessageRecoveryListener delegate; 218 final int maxMessages; 219 LastRecoveredEntry lastRecovered; 220 int recoveredCount; 221 int recoveredMarker; 222 223 public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) { 224 this.delegate = delegate; 225 this.maxMessages = maxMessages; 226 } 227 228 @Override 229 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 230 if (delegate.hasSpace() && recoveredCount < maxMessages) { 231 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); 232 msg.getMessageId().setBrokerSequenceId(sequenceId); 233 lastRecovered.recovered = sequenceId; 234 if (delegate.recoverMessage(msg)) { 235 recoveredCount++; 236 return true; 237 } 238 } 239 return false; 240 } 241 242 @Override 243 public boolean recoverMessageReference(String reference) throws Exception { 244 return delegate.recoverMessageReference(new MessageId(reference)); 245 } 246 247 public void setLastRecovered(LastRecoveredEntry lastRecovered) { 248 this.lastRecovered = lastRecovered; 249 recoveredMarker = recoveredCount; 250 } 251 252 public boolean complete() { 253 return !delegate.hasSpace() || recoveredCount == maxMessages; 254 } 255 256 public boolean stalled() { 257 return recoveredMarker == recoveredCount; 258 } 259 } 260 261 @Override 262 public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) 263 throws Exception { 264 //Duration duration = new Duration("recoverNextMessages"); 265 TransactionContext c = persistenceAdapter.getTransactionContext(); 266 267 String key = getSubscriptionKey(clientId, subscriptionName); 268 if (!subscriberLastRecoveredMap.containsKey(key)) { 269 subscriberLastRecoveredMap.put(key, new LastRecovered()); 270 } 271 final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key); 272 LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned); 273 try { 274 if (LOG.isTraceEnabled()) { 275 LOG.trace(this + ", " + key + " existing last recovered: " + lastRecovered); 276 } 277 if (isPrioritizedMessages()) { 278 Iterator<LastRecoveredEntry> it = lastRecovered.iterator(); 279 for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) { 280 LastRecoveredEntry entry = it.next(); 281 recoveredAwareListener.setLastRecovered(entry); 282 //Duration microDuration = new Duration("recoverNextMessages:loop"); 283 adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName, 284 entry.recovered, entry.priority, maxReturned, recoveredAwareListener); 285 //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount)); 286 if (recoveredAwareListener.stalled()) { 287 if (recoveredAwareListener.complete()) { 288 break; 289 } else { 290 entry.exhausted(); 291 } 292 } 293 } 294 } else { 295 LastRecoveredEntry last = lastRecovered.defaultPriority(); 296 recoveredAwareListener.setLastRecovered(last); 297 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, 298 last.recovered, 0, maxReturned, recoveredAwareListener); 299 } 300 if (LOG.isTraceEnabled()) { 301 LOG.trace(key + " last recovered: " + lastRecovered); 302 } 303 //duration.end(); 304 } catch (SQLException e) { 305 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 306 } finally { 307 c.close(); 308 } 309 } 310 311 @Override 312 public void resetBatching(String clientId, String subscriptionName) { 313 String key = getSubscriptionKey(clientId, subscriptionName); 314 if (!pendingCompletion.contains(key)) { 315 subscriberLastRecoveredMap.remove(key); 316 } else { 317 LOG.trace(this + ", skip resetBatch during pending completion for: " + key); 318 } 319 } 320 321 public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) { 322 final String key = getSubscriptionKey(clientId, subscriptionName); 323 LastRecovered recovered = new LastRecovered(); 324 recovered.perPriority[isPrioritizedMessages() ? priority : javax.jms.Message.DEFAULT_PRIORITY].recovered = sequenceId; 325 subscriberLastRecoveredMap.put(key, recovered); 326 pendingCompletion.add(key); 327 LOG.trace(this + ", pending completion: " + key + ", last: " + recovered); 328 } 329 330 public void complete(String clientId, String subscriptionName) { 331 pendingCompletion.remove(getSubscriptionKey(clientId, subscriptionName)); 332 LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName)); 333 } 334 335 @Override 336 protected void onAdd(Message message, long sequenceId, byte priority) { 337 // update last recovered state 338 for (LastRecovered last : subscriberLastRecoveredMap.values()) { 339 last.updateStored(sequenceId, priority); 340 } 341 sequenceIdCacheSizeLock.writeLock().lock(); 342 try { 343 sequenceIdCache.put(message.getMessageId(), new long[]{sequenceId, priority}); 344 } finally { 345 sequenceIdCacheSizeLock.writeLock().unlock(); 346 } 347 } 348 349 @Override 350 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 351 TransactionContext c = persistenceAdapter.getTransactionContext(); 352 try { 353 c = persistenceAdapter.getTransactionContext(); 354 adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages()); 355 } catch (SQLException e) { 356 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 357 throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e); 358 } finally { 359 c.close(); 360 } 361 } 362 363 /** 364 * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String, 365 * String) 366 */ 367 @Override 368 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 369 TransactionContext c = persistenceAdapter.getTransactionContext(); 370 try { 371 return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName); 372 } catch (SQLException e) { 373 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 374 throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e); 375 } finally { 376 c.close(); 377 } 378 } 379 380 @Override 381 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 382 TransactionContext c = persistenceAdapter.getTransactionContext(); 383 try { 384 adapter.doDeleteSubscription(c, destination, clientId, subscriptionName); 385 } catch (SQLException e) { 386 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 387 throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e); 388 } finally { 389 c.close(); 390 resetBatching(clientId, subscriptionName); 391 } 392 } 393 394 @Override 395 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 396 TransactionContext c = persistenceAdapter.getTransactionContext(); 397 try { 398 return adapter.doGetAllSubscriptions(c, destination); 399 } catch (SQLException e) { 400 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 401 throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e); 402 } finally { 403 c.close(); 404 } 405 } 406 407 @Override 408 public int getMessageCount(String clientId, String subscriberName) throws IOException { 409 //Duration duration = new Duration("getMessageCount"); 410 int result = 0; 411 TransactionContext c = persistenceAdapter.getTransactionContext(); 412 try { 413 result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages()); 414 } catch (SQLException e) { 415 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 416 throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e); 417 } finally { 418 c.close(); 419 } 420 if (LOG.isTraceEnabled()) { 421 LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result); 422 } 423 //duration.end(); 424 return result; 425 } 426 427 @Override 428 public long getMessageSize(String clientId, String subscriberName) throws IOException { 429 return 0; 430 } 431 432 protected String getSubscriptionKey(String clientId, String subscriberName) { 433 String result = clientId + ":"; 434 result += subscriberName != null ? subscriberName : "NOT_SET"; 435 return result; 436 } 437 438 private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); 439 440 @Override 441 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 442 return stats; 443 } 444 445}