001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import java.io.File; 020import java.io.FileInputStream; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.io.ObjectInputStream; 024import java.io.ObjectOutputStream; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.regex.Matcher; 031import java.util.regex.Pattern; 032 033import javax.management.JMException; 034import javax.management.ObjectName; 035 036import org.apache.activemq.advisory.AdvisorySupport; 037import org.apache.activemq.broker.Broker; 038import org.apache.activemq.broker.BrokerFilter; 039import org.apache.activemq.broker.BrokerService; 040import org.apache.activemq.broker.ConnectionContext; 041import org.apache.activemq.broker.jmx.AnnotatedMBean; 042import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 043import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheView; 044import org.apache.activemq.broker.region.Subscription; 045import org.apache.activemq.command.ConsumerInfo; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A plugin which allows the caching of the selector from a subscription queue. 051 * <p/> 052 * This stops the build-up of unwanted messages, especially when consumers may 053 * disconnect from time to time when using virtual destinations. 054 * <p/> 055 * This is influenced by code snippets developed by Maciej Rakowicz 056 * 057 * Refer to: 058 * https://issues.apache.org/activemq/browse/AMQ-3004 059 * http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E 060 */ 061public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable { 062 private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class); 063 public static final String MATCH_EVERYTHING = "TRUE"; 064 065 /** 066 * The subscription's selector cache. We cache compiled expressions keyed 067 * by the target destination. 068 */ 069 private ConcurrentMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>(); 070 071 private final File persistFile; 072 private boolean singleSelectorPerDestination = false; 073 private boolean ignoreWildcardSelectors = false; 074 private ObjectName objectName; 075 076 private boolean running = true; 077 private final Thread persistThread; 078 private long persistInterval = MAX_PERSIST_INTERVAL; 079 public static final long MAX_PERSIST_INTERVAL = 600000; 080 private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread"; 081 082 /** 083 * Constructor 084 */ 085 public SubQueueSelectorCacheBroker(Broker next, final File persistFile) { 086 super(next); 087 this.persistFile = persistFile; 088 LOG.info("Using persisted selector cache from[{}]", persistFile); 089 090 readCache(); 091 092 persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME); 093 persistThread.start(); 094 enableJmx(); 095 } 096 097 private void enableJmx() { 098 BrokerService broker = getBrokerService(); 099 if (broker.isUseJmx()) { 100 VirtualDestinationSelectorCacheView view = new VirtualDestinationSelectorCacheView(this); 101 try { 102 objectName = BrokerMBeanSupport.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache"); 103 LOG.trace("virtualDestinationCacheSelector mbean name; " + objectName.toString()); 104 AnnotatedMBean.registerMBean(broker.getManagementContext(), view, objectName); 105 } catch (Exception e) { 106 LOG.warn("JMX is enabled, but when installing the VirtualDestinationSelectorCache, couldn't install the JMX mbeans. Continuing without installing the mbeans."); 107 } 108 } 109 } 110 111 @Override 112 public void stop() throws Exception { 113 running = false; 114 if (persistThread != null) { 115 persistThread.interrupt(); 116 persistThread.join(); 117 } 118 unregisterMBeans(); 119 } 120 121 private void unregisterMBeans() { 122 BrokerService broker = getBrokerService(); 123 if (broker.isUseJmx() && this.objectName != null) { 124 try { 125 broker.getManagementContext().unregisterMBean(objectName); 126 } catch (JMException e) { 127 LOG.warn("Trying uninstall VirtualDestinationSelectorCache; couldn't uninstall mbeans, continuting..."); 128 } 129 } 130 } 131 132 @Override 133 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 134 // don't track selectors for advisory topics or temp destinations 135 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination()) && !info.getDestination().isTemporary()) { 136 String destinationName = info.getDestination().getQualifiedName(); 137 LOG.debug("Caching consumer selector [{}] on '{}'", info.getSelector(), destinationName); 138 139 String selector = info.getSelector() == null ? MATCH_EVERYTHING : info.getSelector(); 140 141 if (!(ignoreWildcardSelectors && hasWildcards(selector))) { 142 143 Set<String> selectors = subSelectorCache.get(destinationName); 144 if (selectors == null) { 145 selectors = Collections.synchronizedSet(new HashSet<String>()); 146 } else if (singleSelectorPerDestination && !MATCH_EVERYTHING.equals(selector)) { 147 // in this case, we allow only ONE selector. But we don't count the catch-all "null/TRUE" selector 148 // here, we always allow that one. But only one true selector. 149 boolean containsMatchEverything = selectors.contains(MATCH_EVERYTHING); 150 selectors.clear(); 151 152 // put back the MATCH_EVERYTHING selector 153 if (containsMatchEverything) { 154 selectors.add(MATCH_EVERYTHING); 155 } 156 } 157 158 LOG.debug("adding new selector: into cache " + selector); 159 selectors.add(selector); 160 LOG.debug("current selectors in cache: " + selectors); 161 subSelectorCache.put(destinationName, selectors); 162 } 163 } 164 165 return super.addConsumer(context, info); 166 } 167 168 static boolean hasWildcards(String selector) { 169 return WildcardFinder.hasWildcards(selector); 170 } 171 172 @Override 173 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 174 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination()) && !info.getDestination().isTemporary()) { 175 if (singleSelectorPerDestination) { 176 String destinationName = info.getDestination().getQualifiedName(); 177 Set<String> selectors = subSelectorCache.get(destinationName); 178 if (info.getSelector() == null && selectors.size() > 1) { 179 boolean removed = selectors.remove(MATCH_EVERYTHING); 180 LOG.debug("A non-selector consumer has dropped. Removing the catchall matching pattern 'TRUE'. Successful? " + removed); 181 } 182 } 183 184 } 185 super.removeConsumer(context, info); 186 } 187 188 @SuppressWarnings("unchecked") 189 private void readCache() { 190 if (persistFile != null && persistFile.exists()) { 191 try { 192 try (FileInputStream fis = new FileInputStream(persistFile);) { 193 ObjectInputStream in = new ObjectInputStream(fis); 194 try { 195 subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject(); 196 } catch (ClassNotFoundException ex) { 197 LOG.error("Invalid selector cache data found. Please remove file.", ex); 198 } finally { 199 in.close(); 200 } 201 } 202 } catch (IOException ex) { 203 LOG.error("Unable to read persisted selector cache...it will be ignored!", ex); 204 } 205 } 206 } 207 208 /** 209 * Persist the selector cache. 210 */ 211 private void persistCache() { 212 LOG.debug("Persisting selector cache...."); 213 try { 214 FileOutputStream fos = new FileOutputStream(persistFile); 215 try { 216 ObjectOutputStream out = new ObjectOutputStream(fos); 217 try { 218 out.writeObject(subSelectorCache); 219 } finally { 220 out.flush(); 221 out.close(); 222 } 223 } catch (IOException ex) { 224 LOG.error("Unable to persist selector cache", ex); 225 } finally { 226 fos.close(); 227 } 228 } catch (IOException ex) { 229 LOG.error("Unable to access file[{}]", persistFile, ex); 230 } 231 } 232 233 /** 234 * @return The JMS selector for the specified {@code destination} 235 */ 236 public Set<String> getSelector(final String destination) { 237 return subSelectorCache.get(destination); 238 } 239 240 /** 241 * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms. 242 * 243 * @see java.lang.Runnable#run() 244 */ 245 @Override 246 public void run() { 247 while (running) { 248 try { 249 Thread.sleep(persistInterval); 250 } catch (InterruptedException ex) { 251 } 252 253 persistCache(); 254 } 255 } 256 257 public boolean isSingleSelectorPerDestination() { 258 return singleSelectorPerDestination; 259 } 260 261 public void setSingleSelectorPerDestination(boolean singleSelectorPerDestination) { 262 this.singleSelectorPerDestination = singleSelectorPerDestination; 263 } 264 265 @SuppressWarnings("unchecked") 266 public Set<String> getSelectorsForDestination(String destinationName) { 267 if (subSelectorCache.containsKey(destinationName)) { 268 return new HashSet<String>(subSelectorCache.get(destinationName)); 269 } 270 271 return Collections.EMPTY_SET; 272 } 273 274 public long getPersistInterval() { 275 return persistInterval; 276 } 277 278 public void setPersistInterval(long persistInterval) { 279 this.persistInterval = persistInterval; 280 } 281 282 public boolean deleteSelectorForDestination(String destinationName, String selector) { 283 if (subSelectorCache.containsKey(destinationName)) { 284 Set<String> cachedSelectors = subSelectorCache.get(destinationName); 285 return cachedSelectors.remove(selector); 286 } 287 288 return false; 289 } 290 291 public boolean deleteAllSelectorsForDestination(String destinationName) { 292 if (subSelectorCache.containsKey(destinationName)) { 293 Set<String> cachedSelectors = subSelectorCache.get(destinationName); 294 cachedSelectors.clear(); 295 } 296 return true; 297 } 298 299 public boolean isIgnoreWildcardSelectors() { 300 return ignoreWildcardSelectors; 301 } 302 303 public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) { 304 this.ignoreWildcardSelectors = ignoreWildcardSelectors; 305 } 306 307 // find wildcards inside like operator arguments 308 static class WildcardFinder { 309 310 private static final Pattern LIKE_PATTERN=Pattern.compile( 311 "\\bLIKE\\s+'(?<like>([^']|'')+)'(\\s+ESCAPE\\s+'(?<escape>.)')?", 312 Pattern.CASE_INSENSITIVE); 313 314 private static final String REGEX_SPECIAL = ".+?*(){}[]\\-"; 315 316 private static String getLike(final Matcher matcher) { 317 return matcher.group("like"); 318 } 319 320 private static boolean hasLikeOperator(final Matcher matcher) { 321 return matcher.find(); 322 } 323 324 private static String getEscape(final Matcher matcher) { 325 String escapeChar = matcher.group("escape"); 326 if (escapeChar == null) { 327 return null; 328 } else if (REGEX_SPECIAL.contains(escapeChar)) { 329 escapeChar = "\\"+escapeChar; 330 } 331 return escapeChar; 332 } 333 334 private static boolean hasWildcardInCurrentMatch(final Matcher matcher) { 335 String wildcards = "[_%]"; 336 if (getEscape(matcher) != null) { 337 wildcards = "(^|[^" + getEscape(matcher) + "])" + wildcards; 338 } 339 return Pattern.compile(wildcards).matcher(getLike(matcher)).find(); 340 } 341 342 public static boolean hasWildcards(String selector) { 343 Matcher matcher = LIKE_PATTERN.matcher(selector); 344 345 while(hasLikeOperator(matcher)) { 346 if (hasWildcardInCurrentMatch(matcher)) { 347 return true; 348 } 349 } 350 return false; 351 } 352 } 353}