001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Arrays;
022import java.util.LinkedList;
023
024import org.apache.activemq.ActiveMQMessageAudit;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.command.MessageAck;
029import org.apache.activemq.command.MessageId;
030import org.apache.activemq.command.XATransactionId;
031import org.apache.activemq.store.AbstractMessageStore;
032import org.apache.activemq.store.IndexListener;
033import org.apache.activemq.store.MessageRecoveryListener;
034import org.apache.activemq.util.ByteSequence;
035import org.apache.activemq.util.ByteSequenceData;
036import org.apache.activemq.util.IOExceptionSupport;
037import org.apache.activemq.wireformat.WireFormat;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 *
043 */
044public class JDBCMessageStore extends AbstractMessageStore {
045
046    class Duration {
047        static final int LIMIT = 100;
048        final long start = System.currentTimeMillis();
049        final String name;
050
051        Duration(String name) {
052            this.name = name;
053        }
054        void end() {
055            end(null);
056        }
057        void end(Object o) {
058            long duration = System.currentTimeMillis() - start;
059
060            if (duration > LIMIT) {
061                System.err.println(name + " took a long time: " + duration + "ms " + o);
062            }
063        }
064    }
065    private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
066    protected final WireFormat wireFormat;
067    protected final JDBCAdapter adapter;
068    protected final JDBCPersistenceAdapter persistenceAdapter;
069    protected ActiveMQMessageAudit audit;
070    protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>();
071    final long[] perPriorityLastRecovered = new long[10];
072
073    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
074        super(destination);
075        this.persistenceAdapter = persistenceAdapter;
076        this.adapter = adapter;
077        this.wireFormat = wireFormat;
078        this.audit = audit;
079
080        if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
081            recordDestinationCreation(destination);
082        }
083        resetBatching();
084    }
085
086    private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
087        TransactionContext c = persistenceAdapter.getTransactionContext();
088        try {
089            c = persistenceAdapter.getTransactionContext();
090            if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) {
091                adapter.doRecordDestination(c, destination);
092            }
093        } catch (SQLException e) {
094            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
095            throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e);
096        } finally {
097            c.close();
098        }
099    }
100
101    @Override
102    public void addMessage(final ConnectionContext context, final Message message) throws IOException {
103        MessageId messageId = message.getMessageId();
104        if (audit != null && audit.isDuplicate(message)) {
105            if (LOG.isDebugEnabled()) {
106                LOG.debug(destination.getPhysicalName()
107                    + " ignoring duplicated (add) message, already stored: "
108                    + messageId);
109            }
110            return;
111        }
112
113        // if xaXid present - this is a prepare - so we don't yet have an outcome
114        final XATransactionId xaXid =  context != null ? context.getXid() : null;
115
116        // Serialize the Message..
117        byte data[];
118        try {
119            ByteSequence packet = wireFormat.marshal(message);
120            data = ByteSequenceData.toByteArray(packet);
121        } catch (IOException e) {
122            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
123        }
124
125        // Get a connection and insert the message into the DB.
126        TransactionContext c = persistenceAdapter.getTransactionContext(context);
127        long sequenceId;
128        synchronized (pendingAdditions) {
129            sequenceId = persistenceAdapter.getNextSequenceId();
130            final long sequence = sequenceId;
131            message.getMessageId().setEntryLocator(sequence);
132
133            if (xaXid == null) {
134                pendingAdditions.add(sequence);
135
136                c.onCompletion(new Runnable() {
137                    @Override
138                    public void run() {
139                        // jdbc close or jms commit - while futureOrSequenceLong==null ordered
140                        // work will remain pending on the Queue
141                        message.getMessageId().setFutureOrSequenceLong(sequence);
142                    }
143                });
144
145                if (indexListener != null) {
146                    indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
147                        @Override
148                        public void run() {
149                            // cursor add complete
150                            synchronized (pendingAdditions) { pendingAdditions.remove(sequence); }
151                        }
152                    }));
153                } else {
154                    pendingAdditions.remove(sequence);
155                }
156            }
157        }
158        try {
159            adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(),
160                    this.isPrioritizedMessages() ? message.getPriority() : 0, xaXid);
161        } catch (SQLException e) {
162            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
163            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
164        } finally {
165            c.close();
166        }
167        if (xaXid == null) {
168            onAdd(message, sequenceId, message.getPriority());
169        }
170    }
171
172    // jdbc commit order is random with concurrent connections - limit scan to lowest pending
173    private long minPendingSequeunceId() {
174        synchronized (pendingAdditions) {
175            if (!pendingAdditions.isEmpty()) {
176                return pendingAdditions.get(0);
177            } else {
178                // nothing pending, ensure scan is limited to current state
179                return persistenceAdapter.sequenceGenerator.getLastSequenceId() + 1;
180            }
181        }
182    }
183
184    @Override
185    public void updateMessage(Message message) throws IOException {
186        TransactionContext c = persistenceAdapter.getTransactionContext();
187        try {
188            adapter.doUpdateMessage(c, destination, message.getMessageId(), ByteSequenceData.toByteArray(wireFormat.marshal(message)));
189        } catch (SQLException e) {
190            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
191            throw IOExceptionSupport.create("Failed to update message: " + message.getMessageId() + " in container: " + e, e);
192        } finally {
193            c.close();
194        }
195    }
196
197    protected void onAdd(Message message, long sequenceId, byte priority) {}
198
199    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
200        // Get a connection and insert the message into the DB.
201        TransactionContext c = persistenceAdapter.getTransactionContext(context);
202        try {
203            adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
204        } catch (SQLException e) {
205            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
206            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
207        } finally {
208            c.close();
209        }
210    }
211
212    @Override
213    public Message getMessage(MessageId messageId) throws IOException {
214        // Get a connection and pull the message out of the DB
215        TransactionContext c = persistenceAdapter.getTransactionContext();
216        try {
217            byte data[] = adapter.doGetMessage(c, messageId);
218            if (data == null) {
219                return null;
220            }
221
222            Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
223            return answer;
224        } catch (IOException e) {
225            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
226        } catch (SQLException e) {
227            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
228            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
229        } finally {
230            c.close();
231        }
232    }
233
234    public String getMessageReference(MessageId messageId) throws IOException {
235        long id = messageId.getBrokerSequenceId();
236
237        // Get a connection and pull the message out of the DB
238        TransactionContext c = persistenceAdapter.getTransactionContext();
239        try {
240            return adapter.doGetMessageReference(c, id);
241        } catch (IOException e) {
242            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
243        } catch (SQLException e) {
244            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
245            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
246        } finally {
247            c.close();
248        }
249    }
250
251    @Override
252    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
253
254        long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ?
255                (Long) ack.getLastMessageId().getFutureOrSequenceLong() :
256                persistenceAdapter.getStoreSequenceIdForMessageId(context, ack.getLastMessageId(), destination)[0];
257
258        // Get a connection and remove the message from the DB
259        TransactionContext c = persistenceAdapter.getTransactionContext(context);
260        try {
261            adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null);
262        } catch (SQLException e) {
263            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
264            throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
265        } finally {
266            c.close();
267        }
268    }
269
270    @Override
271    public void recover(final MessageRecoveryListener listener) throws Exception {
272
273        // Get all the Message ids out of the database.
274        TransactionContext c = persistenceAdapter.getTransactionContext();
275        try {
276            c = persistenceAdapter.getTransactionContext();
277            adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
278                @Override
279                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
280                    if (listener.hasSpace()) {
281                        Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
282                        msg.getMessageId().setBrokerSequenceId(sequenceId);
283                        return listener.recoverMessage(msg);
284                    } else {
285                        if (LOG.isTraceEnabled()) {
286                            LOG.trace("Message recovery limit reached for MessageRecoveryListener");
287                        }
288                        return false;
289                    }
290                }
291
292                @Override
293                public boolean recoverMessageReference(String reference) throws Exception {
294                    if (listener.hasSpace()) {
295                        return listener.recoverMessageReference(new MessageId(reference));
296                    } else {
297                        if (LOG.isTraceEnabled()) {
298                            LOG.trace("Message recovery limit reached for MessageRecoveryListener");
299                        }
300                        return false;
301                    }
302                }
303            });
304        } catch (SQLException e) {
305            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
306            throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
307        } finally {
308            c.close();
309        }
310    }
311
312    /**
313     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
314     */
315    @Override
316    public void removeAllMessages(ConnectionContext context) throws IOException {
317        // Get a connection and remove the message from the DB
318        TransactionContext c = persistenceAdapter.getTransactionContext(context);
319        try {
320            adapter.doRemoveAllMessages(c, destination);
321        } catch (SQLException e) {
322            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
323            throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
324        } finally {
325            c.close();
326        }
327    }
328
329    @Override
330    public int getMessageCount() throws IOException {
331        int result = 0;
332        TransactionContext c = persistenceAdapter.getTransactionContext();
333        try {
334
335            result = adapter.doGetMessageCount(c, destination);
336
337        } catch (SQLException e) {
338            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
339            throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
340        } finally {
341            c.close();
342        }
343        return result;
344    }
345
346    /**
347     * @param maxReturned
348     * @param listener
349     * @throws Exception
350     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
351     *      org.apache.activemq.store.MessageRecoveryListener)
352     */
353    @Override
354    public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
355        TransactionContext c = persistenceAdapter.getTransactionContext();
356        try {
357            if (LOG.isTraceEnabled()) {
358                LOG.trace(this + " recoverNext lastRecovered:" + Arrays.toString(perPriorityLastRecovered) + ", minPending:" + minPendingSequeunceId());
359            }
360            adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
361                    maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
362
363                @Override
364                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
365                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
366                        msg.getMessageId().setBrokerSequenceId(sequenceId);
367                        msg.getMessageId().setFutureOrSequenceLong(sequenceId);
368                        listener.recoverMessage(msg);
369                        trackLastRecovered(sequenceId, msg.getPriority());
370                        return true;
371                }
372
373                @Override
374                public boolean recoverMessageReference(String reference) throws Exception {
375                    if (listener.hasSpace()) {
376                        listener.recoverMessageReference(new MessageId(reference));
377                        return true;
378                    }
379                    return false;
380                }
381
382            });
383        } catch (SQLException e) {
384            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
385        } finally {
386            c.close();
387        }
388
389    }
390
391    private void trackLastRecovered(long sequenceId, int priority) {
392        perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0] = sequenceId;
393    }
394
395    /**
396     * @see org.apache.activemq.store.MessageStore#resetBatching()
397     */
398    @Override
399    public void resetBatching() {
400        if (LOG.isTraceEnabled()) {
401            LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered));
402        }
403        setLastRecovered(-1);
404    }
405
406    private void setLastRecovered(long val) {
407        for (int i=0;i<perPriorityLastRecovered.length;i++) {
408            perPriorityLastRecovered[i] = val;
409        }
410    }
411
412
413    @Override
414    public void setBatch(MessageId messageId) {
415        if (LOG.isTraceEnabled()) {
416            LOG.trace(this + " setBatch: last recovered: " + Arrays.toString(perPriorityLastRecovered));
417        }
418        try {
419            long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination);
420            setLastRecovered(storedValues[0]);
421        } catch (IOException ignoredAsAlreadyLogged) {
422            resetBatching();
423        }
424        if (LOG.isTraceEnabled()) {
425            LOG.trace(this + " setBatch: new last recovered: " + Arrays.toString(perPriorityLastRecovered));
426        }
427    }
428
429
430    @Override
431    public void setPrioritizedMessages(boolean prioritizedMessages) {
432        super.setPrioritizedMessages(prioritizedMessages);
433    }
434
435    @Override
436    public String toString() {
437        return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
438    }
439
440}