001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command;
018
019import org.apache.activemq.ActiveMQConnectionFactory;
020import org.apache.activemq.command.ActiveMQDestination;
021import org.apache.activemq.util.ConsumerThread;
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025import javax.jms.Connection;
026import javax.jms.Session;
027import java.util.List;
028import java.util.concurrent.CountDownLatch;
029
030public class ConsumerCommand extends AbstractCommand {
031    private static final Logger LOG = LoggerFactory.getLogger(ConsumerCommand.class);
032
033    String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
034    String user = ActiveMQConnectionFactory.DEFAULT_USER;
035    String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
036    String destination = "queue://TEST";
037    int messageCount = 1000;
038    int sleep;
039    boolean transacted;
040    private boolean durable;
041    private String clientId;
042    int batchSize = 10;
043    int ackMode = Session.AUTO_ACKNOWLEDGE;
044    int parallelThreads = 1;
045    boolean bytesAsText;
046
047    @Override
048    protected void runTask(List<String> tokens) throws Exception {
049        LOG.info("Connecting to URL: " + brokerUrl + " (" + user + ":" + password + ")");
050        LOG.info("Consuming " + destination);
051        LOG.info("Sleeping between receives " + sleep + " ms");
052        LOG.info("Running " + parallelThreads + " parallel threads");
053
054        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
055        Connection conn = null;
056        try {
057            conn = factory.createConnection(user, password);
058            if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
059                conn.setClientID(clientId);
060            }
061            conn.start();
062
063
064            CountDownLatch active = new CountDownLatch(parallelThreads);
065
066            for (int i = 1; i <= parallelThreads; i++) {
067                Session sess;
068                if (transacted) {
069                   sess = conn.createSession(true, Session.SESSION_TRANSACTED);
070                } else {
071                   sess = conn.createSession(false, ackMode);
072                }
073                ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
074                consumer.setName("consumer-" + i);
075                consumer.setDurable(durable);
076                consumer.setBreakOnNull(false);
077                consumer.setMessageCount(messageCount);
078                consumer.setSleep(sleep);
079                consumer.setBatchSize(batchSize);
080                consumer.setFinished(active);
081                consumer.setBytesAsText(bytesAsText);
082                consumer.start();
083            }
084
085            active.await();
086        } finally {
087            if (conn != null) {
088                conn.close();
089            }
090        }
091    }
092
093    public String getBrokerUrl() {
094        return brokerUrl;
095    }
096
097    public void setBrokerUrl(String brokerUrl) {
098        this.brokerUrl = brokerUrl;
099    }
100
101    public String getUser() {
102        return user;
103    }
104
105    public void setUser(String user) {
106        this.user = user;
107    }
108
109    public String getPassword() {
110        return password;
111    }
112
113    public void setPassword(String password) {
114        this.password = password;
115    }
116
117    public String getDestination() {
118        return destination;
119    }
120
121    public void setDestination(String destination) {
122        this.destination = destination;
123    }
124
125    public int getMessageCount() {
126        return messageCount;
127    }
128
129    public void setMessageCount(int messageCount) {
130        this.messageCount = messageCount;
131    }
132
133    public int getSleep() {
134        return sleep;
135    }
136
137    public void setSleep(int sleep) {
138        this.sleep = sleep;
139    }
140
141    public int getBatchSize() {
142        return batchSize;
143    }
144
145    public void setBatchSize(int batchSize) {
146        this.batchSize = batchSize;
147    }
148
149    public int getParallelThreads() {
150        return parallelThreads;
151    }
152
153    public void setParallelThreads(int parallelThreads) {
154        this.parallelThreads = parallelThreads;
155    }
156
157    public boolean isBytesAsText() {
158        return bytesAsText;
159    }
160
161    public void setBytesAsText(boolean bytesAsText) {
162        this.bytesAsText = bytesAsText;
163    }
164
165    public boolean isTransacted() {
166        return transacted;
167    }
168
169    public void setTransacted(boolean transacted) {
170        this.transacted = transacted;
171    }
172
173    public int getAckMode() {
174        return ackMode;
175    }
176
177    public void setAckMode(String ackMode) {
178        if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
179            this.ackMode = Session.CLIENT_ACKNOWLEDGE;
180        }
181        if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
182            this.ackMode = Session.AUTO_ACKNOWLEDGE;
183        }
184        if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
185            this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
186        }
187    }
188
189    public boolean isDurable() {
190        return durable;
191    }
192
193    public void setDurable(boolean durable) {
194        this.durable = durable;
195    }
196
197    public String getClientId() {
198        return clientId;
199    }
200
201    public void setClientId(String clientId) {
202        this.clientId = clientId;
203    }
204
205    @Override
206    protected void printHelp() {
207        printHelpFromFile();
208    }
209
210    @Override
211    public String getName() {
212        return "consumer";
213    }
214
215    @Override
216    public String getOneLineDescription() {
217        return "Receives messages from the broker";
218    }
219}