001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt.strategy;
018
019import java.io.IOException;
020import java.util.List;
021
022import org.apache.activemq.ActiveMQPrefetchPolicy;
023import org.apache.activemq.command.ActiveMQDestination;
024import org.apache.activemq.command.ActiveMQTopic;
025import org.apache.activemq.command.ConsumerInfo;
026import org.apache.activemq.command.RemoveSubscriptionInfo;
027import org.apache.activemq.command.Response;
028import org.apache.activemq.command.SubscriptionInfo;
029import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
030import org.apache.activemq.transport.mqtt.MQTTProtocolException;
031import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
032import org.apache.activemq.transport.mqtt.MQTTSubscription;
033import org.apache.activemq.transport.mqtt.ResponseHandler;
034import org.fusesource.mqtt.client.QoS;
035import org.fusesource.mqtt.codec.CONNECT;
036
037/**
038 * Default implementation that uses unmapped topic subscriptions.
039 */
040public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
041
042    @Override
043    public void onConnect(CONNECT connect) throws MQTTProtocolException {
044        List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId());
045
046        if (connect.cleanSession()) {
047            deleteDurableSubs(subs);
048        } else {
049            restoreDurableSubs(subs);
050        }
051    }
052
053    @Override
054    public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
055        ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
056
057        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
058        consumerInfo.setDestination(destination);
059        consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
060        consumerInfo.setRetroactive(true);
061        consumerInfo.setDispatchAsync(true);
062        // create durable subscriptions only when clean session is false
063        if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
064            consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
065            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH);
066        }
067
068        if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
069            consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
070        }
071
072        return doSubscribe(consumerInfo, topicName, requestedQoS);
073    }
074
075    @Override
076    public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
077
078        ActiveMQDestination destination = mqttSubscription.getDestination();
079
080        // check whether the Topic has been recovered in restoreDurableSubs
081        // mark subscription available for recovery for duplicate subscription
082        if (restoredDurableSubs.remove(destination.getPhysicalName())) {
083            return;
084        }
085
086        super.onReSubscribe(mqttSubscription);
087    }
088
089    @Override
090    public void onUnSubscribe(String topicName) throws MQTTProtocolException {
091        MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
092        if (subscription != null) {
093            doUnSubscribe(subscription);
094
095            // check if the durable sub also needs to be removed
096            if (subscription.getConsumerInfo().getSubscriptionName() != null) {
097                // also remove it from restored durable subscriptions set
098                restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
099
100                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
101                rsi.setConnectionId(protocol.getConnectionId());
102                rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
103                rsi.setClientId(protocol.getClientId());
104                protocol.sendToActiveMQ(rsi, new ResponseHandler() {
105                    @Override
106                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
107                        // ignore failures..
108                    }
109                });
110            }
111        }
112    }
113}