001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.File;
020import java.io.IOException;
021import java.sql.Connection;
022import java.sql.SQLException;
023import java.util.Collections;
024import java.util.Locale;
025import java.util.Set;
026import java.util.concurrent.ScheduledFuture;
027import java.util.concurrent.ScheduledThreadPoolExecutor;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.TimeUnit;
030
031import javax.sql.DataSource;
032
033import org.apache.activemq.ActiveMQMessageAudit;
034import org.apache.activemq.broker.BrokerService;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Locker;
037import org.apache.activemq.broker.scheduler.JobSchedulerStore;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ActiveMQQueue;
040import org.apache.activemq.command.ActiveMQTopic;
041import org.apache.activemq.command.Message;
042import org.apache.activemq.command.MessageAck;
043import org.apache.activemq.command.MessageId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.openwire.OpenWireFormat;
046import org.apache.activemq.store.MessageStore;
047import org.apache.activemq.store.PersistenceAdapter;
048import org.apache.activemq.store.TopicMessageStore;
049import org.apache.activemq.store.TransactionStore;
050import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
051import org.apache.activemq.store.memory.MemoryTransactionStore;
052import org.apache.activemq.usage.SystemUsage;
053import org.apache.activemq.util.ByteSequence;
054import org.apache.activemq.util.FactoryFinder;
055import org.apache.activemq.util.IOExceptionSupport;
056import org.apache.activemq.util.LongSequenceGenerator;
057import org.apache.activemq.util.ServiceStopper;
058import org.apache.activemq.util.ThreadPoolUtils;
059import org.apache.activemq.wireformat.WireFormat;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * A {@link PersistenceAdapter} implementation using JDBC for persistence
065 * storage.
066 *
067 * This persistence adapter will correctly remember prepared XA transactions,
068 * but it will not keep track of local transaction commits so that operations
069 * performed against the Message store are done as a single uow.
070 *
071 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
072 *
073 */
074public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {
075
076    private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
077    private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
078        "META-INF/services/org/apache/activemq/store/jdbc/");
079    private static FactoryFinder lockFactoryFinder = new FactoryFinder(
080        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
081
082    public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
083
084    private WireFormat wireFormat = new OpenWireFormat();
085    private Statements statements;
086    private JDBCAdapter adapter;
087    private MemoryTransactionStore transactionStore;
088    private ScheduledFuture<?> cleanupTicket;
089    private int cleanupPeriod = 1000 * 60 * 5;
090    private boolean useExternalMessageReferences;
091    private boolean createTablesOnStartup = true;
092    private DataSource lockDataSource;
093    private int transactionIsolation;
094    private File directory;
095    private boolean changeAutoCommitAllowed = true;
096
097    protected int maxProducersToAudit=1024;
098    protected int maxAuditDepth=1000;
099    protected boolean enableAudit=false;
100    protected int auditRecoveryDepth = 1024;
101    protected ActiveMQMessageAudit audit;
102
103    protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
104    protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
105
106    {
107        setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
108    }
109
110    public JDBCPersistenceAdapter() {
111    }
112
113    public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
114        super(ds);
115        this.wireFormat = wireFormat;
116    }
117
118    @Override
119    public Set<ActiveMQDestination> getDestinations() {
120        TransactionContext c = null;
121        try {
122            c = getTransactionContext();
123            return getAdapter().doGetDestinations(c);
124        } catch (IOException e) {
125            return emptyDestinationSet();
126        } catch (SQLException e) {
127            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
128            return emptyDestinationSet();
129        } finally {
130            if (c != null) {
131                try {
132                    c.close();
133                } catch (Throwable e) {
134                }
135            }
136        }
137    }
138
139    @SuppressWarnings("unchecked")
140    private Set<ActiveMQDestination> emptyDestinationSet() {
141        return Collections.EMPTY_SET;
142    }
143
144    protected void createMessageAudit() {
145        if (enableAudit && audit == null) {
146            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
147            TransactionContext c = null;
148
149            try {
150                c = getTransactionContext();
151                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
152                    @Override
153                    public void messageId(MessageId id) {
154                        audit.isDuplicate(id);
155                    }
156                });
157            } catch (Exception e) {
158                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
159            } finally {
160                if (c != null) {
161                    try {
162                        c.close();
163                    } catch (Throwable e) {
164                    }
165                }
166            }
167        }
168    }
169
170    public void initSequenceIdGenerator() {
171        TransactionContext c = null;
172        try {
173            c = getTransactionContext();
174            getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
175                @Override
176                public void messageId(MessageId id) {
177                    audit.isDuplicate(id);
178                }
179            });
180        } catch (Exception e) {
181            LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
182        } finally {
183            if (c != null) {
184                try {
185                    c.close();
186                } catch (Throwable e) {
187                }
188            }
189        }
190    }
191
192    @Override
193    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
194        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
195        if (transactionStore != null) {
196            rc = transactionStore.proxy(rc);
197        }
198        return rc;
199    }
200
201    @Override
202    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
203        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
204        if (transactionStore != null) {
205            rc = transactionStore.proxy(rc);
206        }
207        return rc;
208    }
209
210    /**
211     * Cleanup method to remove any state associated with the given destination
212     * @param destination Destination to forget
213     */
214    @Override
215    public void removeQueueMessageStore(ActiveMQQueue destination) {
216        if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
217            try {
218                removeConsumerDestination(destination);
219            } catch (IOException ioe) {
220                LOG.error("Failed to remove consumer destination: " + destination, ioe);
221            }
222        }
223    }
224
225    private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
226        TransactionContext c = getTransactionContext();
227        try {
228            String id = destination.getQualifiedName();
229            getAdapter().doDeleteSubscription(c, destination, id, id);
230        } catch (SQLException e) {
231            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
232            throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
233        } finally {
234            c.close();
235        }
236    }
237
238    /**
239     * Cleanup method to remove any state associated with the given destination
240     * No state retained.... nothing to do
241     *
242     * @param destination Destination to forget
243     */
244    @Override
245    public void removeTopicMessageStore(ActiveMQTopic destination) {
246    }
247
248    @Override
249    public TransactionStore createTransactionStore() throws IOException {
250        if (transactionStore == null) {
251            transactionStore = new JdbcMemoryTransactionStore(this);
252        }
253        return this.transactionStore;
254    }
255
256    @Override
257    public long getLastMessageBrokerSequenceId() throws IOException {
258        TransactionContext c = getTransactionContext();
259        try {
260            long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
261            sequenceGenerator.setLastSequenceId(seq);
262            long brokerSeq = 0;
263            if (seq != 0) {
264                byte[] msg = getAdapter().doGetMessageById(c, seq);
265                if (msg != null) {
266                    Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
267                    brokerSeq = last.getMessageId().getBrokerSequenceId();
268                } else {
269                   LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
270                }
271            }
272            return brokerSeq;
273        } catch (SQLException e) {
274            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
275            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
276        } finally {
277            c.close();
278        }
279    }
280
281    @Override
282    public long getLastProducerSequenceId(ProducerId id) throws IOException {
283        TransactionContext c = getTransactionContext();
284        try {
285            return getAdapter().doGetLastProducerSequenceId(c, id);
286        } catch (SQLException e) {
287            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
288            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
289        } finally {
290            c.close();
291        }
292    }
293
294    @Override
295    public void init() throws Exception {
296        getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
297
298        if (isCreateTablesOnStartup()) {
299            TransactionContext transactionContext = getTransactionContext();
300            transactionContext.getExclusiveConnection();
301            transactionContext.begin();
302            try {
303                try {
304                    getAdapter().doCreateTables(transactionContext);
305                } catch (SQLException e) {
306                    LOG.warn("Cannot create tables due to: " + e);
307                    JDBCPersistenceAdapter.log("Failure Details: ", e);
308                }
309            } finally {
310                transactionContext.commit();
311            }
312        }
313    }
314
315    @Override
316    public void doStart() throws Exception {
317
318        if( brokerService!=null ) {
319          wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
320        }
321
322        // Cleanup the db periodically.
323        if (cleanupPeriod > 0) {
324            cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
325                @Override
326                public void run() {
327                    cleanup();
328                }
329            }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
330        }
331        createMessageAudit();
332    }
333
334    @Override
335    public synchronized void doStop(ServiceStopper stopper) throws Exception {
336        if (cleanupTicket != null) {
337            cleanupTicket.cancel(true);
338            cleanupTicket = null;
339        }
340        closeDataSource(getDataSource());
341    }
342
343    public void cleanup() {
344        TransactionContext c = null;
345        try {
346            LOG.debug("Cleaning up old messages.");
347            c = getTransactionContext();
348            c.getExclusiveConnection();
349            getAdapter().doDeleteOldMessages(c);
350        } catch (IOException e) {
351            LOG.warn("Old message cleanup failed due to: " + e, e);
352        } catch (SQLException e) {
353            LOG.warn("Old message cleanup failed due to: " + e);
354            JDBCPersistenceAdapter.log("Failure Details: ", e);
355        } finally {
356            if (c != null) {
357                try {
358                    c.close();
359                } catch (Throwable e) {
360                }
361            }
362            LOG.debug("Cleanup done.");
363        }
364    }
365
366    @Override
367    public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
368        if (clockDaemon == null) {
369            clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
370                @Override
371                public Thread newThread(Runnable runnable) {
372                    Thread thread = new Thread(runnable, "ActiveMQ JDBC PA Scheduled Task");
373                    thread.setDaemon(true);
374                    return thread;
375                }
376            });
377        }
378        return clockDaemon;
379    }
380
381    public JDBCAdapter getAdapter() throws IOException {
382        if (adapter == null) {
383            setAdapter(createAdapter());
384        }
385        return adapter;
386    }
387
388    /**
389     * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
390     */
391    @Deprecated
392    public Locker getDatabaseLocker() throws IOException {
393        return getLocker();
394    }
395
396    /**
397     * Sets the database locker strategy to use to lock the database on startup
398     * @throws IOException
399     *
400     * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
401     */
402    @Deprecated
403    public void setDatabaseLocker(Locker locker) throws IOException {
404        setLocker(locker);
405    }
406
407    public DataSource getLockDataSource() throws IOException {
408        if (lockDataSource == null) {
409            lockDataSource = getDataSource();
410            if (lockDataSource == null) {
411                throw new IllegalArgumentException(
412                        "No dataSource property has been configured");
413            }
414        }
415        return lockDataSource;
416    }
417
418    public void setLockDataSource(DataSource dataSource) {
419        this.lockDataSource = dataSource;
420        LOG.info("Using a separate dataSource for locking: "
421                            + lockDataSource);
422    }
423
424    @Override
425    public BrokerService getBrokerService() {
426        return brokerService;
427    }
428
429    /**
430     * @throws IOException
431     */
432    protected JDBCAdapter createAdapter() throws IOException {
433
434        adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
435
436        // Use the default JDBC adapter if the
437        // Database type is not recognized.
438        if (adapter == null) {
439            adapter = new DefaultJDBCAdapter();
440            LOG.debug("Using default JDBC Adapter: " + adapter);
441        }
442        return adapter;
443    }
444
445    private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
446        Object adapter = null;
447        TransactionContext c = getTransactionContext();
448        try {
449            try {
450                // Make the filename file system safe.
451                String dirverName = c.getConnection().getMetaData().getDriverName();
452                dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH);
453
454                try {
455                    adapter = finder.newInstance(dirverName);
456                    LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
457                } catch (Throwable e) {
458                    LOG.info("Database " + kind + " driver override not found for : [" + dirverName
459                             + "].  Will use default implementation.");
460                }
461            } catch (SQLException e) {
462                LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
463                          + e.getMessage());
464                JDBCPersistenceAdapter.log("Failure Details: ", e);
465            }
466        } finally {
467            c.close();
468        }
469        return adapter;
470    }
471
472    public void setAdapter(JDBCAdapter adapter) {
473        this.adapter = adapter;
474        this.adapter.setStatements(getStatements());
475        this.adapter.setMaxRows(getMaxRows());
476    }
477
478    public WireFormat getWireFormat() {
479        return wireFormat;
480    }
481
482    public void setWireFormat(WireFormat wireFormat) {
483        this.wireFormat = wireFormat;
484    }
485
486    public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
487        if (context == null) {
488            return getTransactionContext();
489        } else {
490            TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
491            if (answer == null) {
492                answer = getTransactionContext();
493                context.setLongTermStoreContext(answer);
494            }
495            return answer;
496        }
497    }
498
499    public TransactionContext getTransactionContext() throws IOException {
500        TransactionContext answer = new TransactionContext(this);
501        if (transactionIsolation > 0) {
502            answer.setTransactionIsolation(transactionIsolation);
503        }
504        return answer;
505    }
506
507    @Override
508    public void beginTransaction(ConnectionContext context) throws IOException {
509        TransactionContext transactionContext = getTransactionContext(context);
510        transactionContext.begin();
511    }
512
513    @Override
514    public void commitTransaction(ConnectionContext context) throws IOException {
515        TransactionContext transactionContext = getTransactionContext(context);
516        transactionContext.commit();
517    }
518
519    @Override
520    public void rollbackTransaction(ConnectionContext context) throws IOException {
521        TransactionContext transactionContext = getTransactionContext(context);
522        transactionContext.rollback();
523    }
524
525    public int getCleanupPeriod() {
526        return cleanupPeriod;
527    }
528
529    /**
530     * Sets the number of milliseconds until the database is attempted to be
531     * cleaned up for durable topics
532     */
533    public void setCleanupPeriod(int cleanupPeriod) {
534        this.cleanupPeriod = cleanupPeriod;
535    }
536
537    public boolean isChangeAutoCommitAllowed() {
538        return changeAutoCommitAllowed;
539    }
540
541    /**
542     * Whether the JDBC driver allows to set the auto commit.
543     * Some drivers does not allow changing the auto commit. The default value is true.
544     *
545     * @param changeAutoCommitAllowed true to change, false to not change.
546     */
547    public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed) {
548        this.changeAutoCommitAllowed = changeAutoCommitAllowed;
549    }
550
551    @Override
552    public void deleteAllMessages() throws IOException {
553        TransactionContext c = getTransactionContext();
554        c.getExclusiveConnection();
555        try {
556            getAdapter().doDropTables(c);
557            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
558            getAdapter().doCreateTables(c);
559            LOG.info("Persistence store purged.");
560        } catch (SQLException e) {
561            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
562            throw IOExceptionSupport.create(e);
563        } finally {
564            c.close();
565        }
566    }
567
568    public boolean isUseExternalMessageReferences() {
569        return useExternalMessageReferences;
570    }
571
572    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
573        this.useExternalMessageReferences = useExternalMessageReferences;
574    }
575
576    public boolean isCreateTablesOnStartup() {
577        return createTablesOnStartup;
578    }
579
580    /**
581     * Sets whether or not tables are created on startup
582     */
583    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
584        this.createTablesOnStartup = createTablesOnStartup;
585    }
586
587    /**
588     * @deprecated use {@link #setUseLock(boolean)} instead
589     *
590     * Sets whether or not an exclusive database lock should be used to enable
591     * JDBC Master/Slave. Enabled by default.
592     */
593    @Deprecated
594    public void setUseDatabaseLock(boolean useDatabaseLock) {
595        setUseLock(useDatabaseLock);
596    }
597
598    public static void log(String msg, SQLException e) {
599        String s = msg + e.getMessage();
600        while (e.getNextException() != null) {
601            e = e.getNextException();
602            s += ", due to: " + e.getMessage();
603        }
604        LOG.warn(s, e);
605    }
606
607    public Statements getStatements() {
608        if (statements == null) {
609            statements = new Statements();
610        }
611        return statements;
612    }
613
614    public void setStatements(Statements statements) {
615        this.statements = statements;
616        if (adapter != null) {
617            this.adapter.setStatements(getStatements());
618        }
619    }
620
621    /**
622     * @param usageManager The UsageManager that is controlling the
623     *                destination's memory usage.
624     */
625    @Override
626    public void setUsageManager(SystemUsage usageManager) {
627    }
628
629    @Override
630    public Locker createDefaultLocker() throws IOException {
631        Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock");
632        if (locker == null) {
633            locker = new DefaultDatabaseLocker();
634            LOG.debug("Using default JDBC Locker: " + locker);
635        }
636        locker.configure(this);
637        return locker;
638    }
639
640    @Override
641    public void setBrokerName(String brokerName) {
642    }
643
644    @Override
645    public String toString() {
646        return "JDBCPersistenceAdapter(" + super.toString() + ")";
647    }
648
649    @Override
650    public void setDirectory(File dir) {
651        this.directory=dir;
652    }
653
654    @Override
655    public File getDirectory(){
656        if (this.directory==null && brokerService != null){
657            this.directory=brokerService.getBrokerDataDirectory();
658        }
659        return this.directory;
660    }
661
662    // interesting bit here is proof that DB is ok
663    @Override
664    public void checkpoint(boolean sync) throws IOException {
665        // by pass TransactionContext to avoid IO Exception handler
666        Connection connection = null;
667        try {
668            connection = getDataSource().getConnection();
669            if (!connection.isValid(10)) {
670                throw new IOException("isValid(10) failed for: " + connection);
671            }
672        } catch (SQLException e) {
673            LOG.debug("Could not get JDBC connection for checkpoint: " + e);
674            throw IOExceptionSupport.create(e);
675        } finally {
676            if (connection != null) {
677                try {
678                    connection.close();
679                } catch (Throwable ignored) {
680                }
681            }
682        }
683    }
684
685    @Override
686    public long size(){
687        return 0;
688    }
689
690    /**
691     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
692     *
693     * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
694     * not applied if DataBaseLocker is injected.
695     *
696     */
697    @Deprecated
698    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
699        getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
700    }
701
702    /**
703     * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
704     * This allowable dirty isolation level may not be achievable in clustered DB environments
705     * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
706     * see isolation level constants in {@link java.sql.Connection}
707     * @param transactionIsolation the isolation level to use
708     */
709    public void setTransactionIsolation(int transactionIsolation) {
710        this.transactionIsolation = transactionIsolation;
711    }
712
713    public int getMaxProducersToAudit() {
714        return maxProducersToAudit;
715    }
716
717    public void setMaxProducersToAudit(int maxProducersToAudit) {
718        this.maxProducersToAudit = maxProducersToAudit;
719    }
720
721    public int getMaxAuditDepth() {
722        return maxAuditDepth;
723    }
724
725    public void setMaxAuditDepth(int maxAuditDepth) {
726        this.maxAuditDepth = maxAuditDepth;
727    }
728
729    public boolean isEnableAudit() {
730        return enableAudit;
731    }
732
733    public void setEnableAudit(boolean enableAudit) {
734        this.enableAudit = enableAudit;
735    }
736
737    public int getAuditRecoveryDepth() {
738        return auditRecoveryDepth;
739    }
740
741    public void setAuditRecoveryDepth(int auditRecoveryDepth) {
742        this.auditRecoveryDepth = auditRecoveryDepth;
743    }
744
745    public long getNextSequenceId() {
746        return sequenceGenerator.getNextSequenceId();
747    }
748
749    public int getMaxRows() {
750        return maxRows;
751    }
752
753    /*
754     * the max rows return from queries, with sparse selectors this may need to be increased
755     */
756    public void setMaxRows(int maxRows) {
757        this.maxRows = maxRows;
758    }
759
760    public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException {
761        TransactionContext c = getTransactionContext();
762        try {
763            getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore);
764        } catch (SQLException e) {
765            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
766            throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e);
767        } finally {
768            c.close();
769        }
770    }
771
772    public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException {
773        TransactionContext c = getTransactionContext(context);
774        try {
775            long sequence = (Long)messageId.getEntryLocator();
776            getAdapter().doCommitAddOp(c, preparedSequenceId, sequence);
777        } catch (SQLException e) {
778            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
779            throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
780        } finally {
781            c.close();
782        }
783    }
784
785    public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
786        TransactionContext c = getTransactionContext(context);
787        try {
788            getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getFutureOrSequenceLong(), null);
789        } catch (SQLException e) {
790            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
791            throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);
792        } finally {
793            c.close();
794        }
795    }
796
797    public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
798        TransactionContext c = getTransactionContext(context);
799        try {
800            getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority);
801        } catch (SQLException e) {
802            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
803            throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e);
804        } finally {
805            c.close();
806        }
807    }
808
809    public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException {
810        TransactionContext c = getTransactionContext(context);
811        try {
812            byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1];
813            getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName);
814        } catch (SQLException e) {
815            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
816            throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " +  store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e);
817        } finally {
818            c.close();
819        }
820    }
821
822    // after recovery there is no record of the original messageId for the ack
823    public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
824        TransactionContext c = getTransactionContext(context);
825        try {
826            getAdapter().doClearLastAck(c, destination, priority, clientId, subName);
827        } catch (SQLException e) {
828            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
829            throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e);
830        } finally {
831            c.close();
832        }
833    }
834
835    long[] getStoreSequenceIdForMessageId(ConnectionContext context, MessageId messageId, ActiveMQDestination destination) throws IOException {
836        long[] result = new long[]{-1, Byte.MAX_VALUE -1};
837        TransactionContext c = getTransactionContext(context);
838        try {
839            result = adapter.getStoreSequenceId(c, destination, messageId);
840        } catch (SQLException e) {
841            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
842            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
843        } finally {
844            c.close();
845        }
846        return result;
847    }
848
849    @Override
850    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
851        throw new UnsupportedOperationException();
852    }
853}