001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.region.Destination;
021import org.apache.activemq.broker.region.Subscription;
022import org.apache.activemq.broker.region.Topic;
023import org.apache.activemq.command.Message;
024import org.apache.activemq.filter.BooleanExpression;
025import org.apache.activemq.filter.MessageEvaluationContext;
026import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
027import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
028import org.apache.activemq.selector.SelectorParser;
029import org.apache.activemq.util.LRUCache;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import java.io.IOException;
034import java.util.List;
035import java.util.Set;
036
037public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
038    private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
039    LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
040    private final SubQueueSelectorCacheBroker selectorCachePlugin;
041
042    public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
043        super(next, virtualTopic);
044        selectorCachePlugin = (SubQueueSelectorCacheBroker)
045                ((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
046    }
047
048    /**
049     * Respect the selectors of the subscriptions to ensure only matched messages are dispatched to
050     * the virtual queues, hence there is no build up of unmatched messages on these destinations
051     */
052    @Override
053    protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException {
054        //first validate that the prefix matches in the super class
055        if (super.shouldDispatch(broker, message, dest)) {
056            boolean matches = false;
057            MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
058            msgContext.setDestination(dest.getActiveMQDestination());
059            msgContext.setMessageReference(message);
060            List<Subscription> subs = dest.getConsumers();
061            for (Subscription sub : subs) {
062                if (sub.matches(message, msgContext)) {
063                    matches = true;
064                    break;
065                }
066            }
067            if (matches == false) {
068                matches = tryMatchingCachedSubs(broker, dest, msgContext);
069            }
070            return matches;
071        }
072        return false;
073    }
074
075    private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) {
076        boolean matches = false;
077        LOG.debug("No active consumer match found. Will try cache if configured...");
078
079        if (selectorCachePlugin != null) {
080            final Set<String> selectors = selectorCachePlugin.getSelector(dest.getActiveMQDestination().getQualifiedName());
081            if (selectors != null) {
082                for (String selector : selectors) {
083                    try {
084                        final BooleanExpression expression = getExpression(selector);
085                        matches = expression.matches(msgContext);
086                        if (matches) {
087                            return true;
088                        }
089                    } catch (Exception e) {
090                        LOG.error(e.getMessage(), e);
091                    }
092                }
093            }
094        }
095        return matches;
096    }
097
098    private BooleanExpression getExpression(String selector) throws Exception{
099        BooleanExpression result;
100        synchronized(expressionCache){
101            result = expressionCache.get(selector);
102            if (result == null){
103                result = compileSelector(selector);
104                expressionCache.put(selector,result);
105            }
106        }
107        return result;
108    }
109
110    /**
111     * Pre-compile the JMS selector.
112     *
113     * @param selectorExpression The non-null JMS selector expression.
114     */
115    private BooleanExpression compileSelector(final String selectorExpression) throws Exception {
116        return SelectorParser.parse(selectorExpression);
117    }
118}