001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.journal;
018
019import java.io.File;
020import java.io.IOException;
021
022import org.apache.activeio.journal.Journal;
023import org.apache.activeio.journal.active.JournalImpl;
024import org.apache.activeio.journal.active.JournalLockedException;
025import org.apache.activemq.broker.Locker;
026import org.apache.activemq.store.PersistenceAdapter;
027import org.apache.activemq.store.PersistenceAdapterFactory;
028import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
029import org.apache.activemq.store.jdbc.JDBCAdapter;
030import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
031import org.apache.activemq.store.jdbc.Statements;
032import org.apache.activemq.thread.TaskRunnerFactory;
033import org.apache.activemq.util.ServiceStopper;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Factory class that can create PersistenceAdapter objects.
039 * 
040 * @org.apache.xbean.XBean
041 * 
042 */
043public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory {
044
045    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
046
047    private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class);
048
049    private long checkpointInterval = 1000 * 60 * 5;
050    private int journalLogFileSize = 1024 * 1024 * 20;
051    private int journalLogFiles = 2;
052    private TaskRunnerFactory taskRunnerFactory;
053    private Journal journal;
054    private boolean useJournal = true;
055    private boolean useQuickJournal;
056    private File journalArchiveDirectory;
057    private boolean failIfJournalIsLocked;
058    private int journalThreadPriority = Thread.MAX_PRIORITY;
059    private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
060    private boolean useDedicatedTaskRunner;
061
062    public PersistenceAdapter createPersistenceAdapter() throws IOException {
063        jdbcPersistenceAdapter.setDataSource(getDataSource());
064
065        if (!useJournal) {
066            return jdbcPersistenceAdapter;
067        }
068        JournalPersistenceAdapter result =  new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
069        result.setDirectory(getDataDirectoryFile());
070        result.setCheckpointInterval(getCheckpointInterval());
071        return result;
072
073    }
074
075    public int getJournalLogFiles() {
076        return journalLogFiles;
077    }
078
079    /**
080     * Sets the number of journal log files to use
081     */
082    public void setJournalLogFiles(int journalLogFiles) {
083        this.journalLogFiles = journalLogFiles;
084    }
085
086    public int getJournalLogFileSize() {
087        return journalLogFileSize;
088    }
089
090    /**
091     * Sets the size of the journal log files
092     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
093     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
094     */
095    public void setJournalLogFileSize(int journalLogFileSize) {
096        this.journalLogFileSize = journalLogFileSize;
097    }
098
099    public JDBCPersistenceAdapter getJdbcAdapter() {
100        return jdbcPersistenceAdapter;
101    }
102
103    public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
104        this.jdbcPersistenceAdapter = jdbcAdapter;
105    }
106
107    public boolean isUseJournal() {
108        return useJournal;
109    }
110
111    public long getCheckpointInterval() {
112        return checkpointInterval;
113    }
114
115    public void setCheckpointInterval(long checkpointInterval) {
116        this.checkpointInterval = checkpointInterval;
117    }
118
119    /**
120     * Enables or disables the use of the journal. The default is to use the
121     * journal
122     * 
123     * @param useJournal
124     */
125    public void setUseJournal(boolean useJournal) {
126        this.useJournal = useJournal;
127    }
128
129    public boolean isUseDedicatedTaskRunner() {
130        return useDedicatedTaskRunner;
131    }
132    
133    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
134        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
135    }
136    
137    public TaskRunnerFactory getTaskRunnerFactory() {
138        if (taskRunnerFactory == null) {
139            taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
140                                                      true, 1000, isUseDedicatedTaskRunner());
141        }
142        return taskRunnerFactory;
143    }
144
145    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
146        this.taskRunnerFactory = taskRunnerFactory;
147    }
148
149    public Journal getJournal() throws IOException {
150        if (journal == null) {
151            createJournal();
152        }
153        return journal;
154    }
155
156    public void setJournal(Journal journal) {
157        this.journal = journal;
158    }
159
160    public File getJournalArchiveDirectory() {
161        if (journalArchiveDirectory == null && useQuickJournal) {
162            journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
163        }
164        return journalArchiveDirectory;
165    }
166
167    public void setJournalArchiveDirectory(File journalArchiveDirectory) {
168        this.journalArchiveDirectory = journalArchiveDirectory;
169    }
170
171    public boolean isUseQuickJournal() {
172        return useQuickJournal;
173    }
174
175    /**
176     * Enables or disables the use of quick journal, which keeps messages in the
177     * journal and just stores a reference to the messages in JDBC. Defaults to
178     * false so that messages actually reside long term in the JDBC database.
179     */
180    public void setUseQuickJournal(boolean useQuickJournal) {
181        this.useQuickJournal = useQuickJournal;
182    }
183
184    public JDBCAdapter getAdapter() throws IOException {
185        return jdbcPersistenceAdapter.getAdapter();
186    }
187
188    public void setAdapter(JDBCAdapter adapter) {
189        jdbcPersistenceAdapter.setAdapter(adapter);
190    }
191
192    public Statements getStatements() {
193        return jdbcPersistenceAdapter.getStatements();
194    }
195
196    public void setStatements(Statements statements) {
197        jdbcPersistenceAdapter.setStatements(statements);
198    }
199
200    /**
201     * Sets whether or not an exclusive database lock should be used to enable
202     * JDBC Master/Slave. Enabled by default.
203     */
204    public void setUseDatabaseLock(boolean useDatabaseLock) {
205        jdbcPersistenceAdapter.setUseLock(useDatabaseLock);
206    }
207
208    public boolean isCreateTablesOnStartup() {
209        return jdbcPersistenceAdapter.isCreateTablesOnStartup();
210    }
211
212    /**
213     * Sets whether or not tables are created on startup
214     */
215    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
216        jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
217    }
218
219    public int getJournalThreadPriority() {
220        return journalThreadPriority;
221    }
222
223    /**
224     * Sets the thread priority of the journal thread
225     */
226    public void setJournalThreadPriority(int journalThreadPriority) {
227        this.journalThreadPriority = journalThreadPriority;
228    }
229
230    /**
231     * @throws IOException
232     */
233    protected void createJournal() throws IOException {
234        File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
235        if (failIfJournalIsLocked) {
236            journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
237                                      getJournalArchiveDirectory());
238        } else {
239            while (true) {
240                try {
241                    journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
242                                              getJournalArchiveDirectory());
243                    break;
244                } catch (JournalLockedException e) {
245                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
246                             + " seconds for the journal to be unlocked.");
247                    try {
248                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
249                    } catch (InterruptedException e1) {
250                    }
251                }
252            }
253        }
254    }
255
256    @Override
257    public Locker createDefaultLocker() throws IOException {
258        return null;
259    }
260
261    @Override
262    public void init() throws Exception {
263    }
264
265    @Override
266    protected void doStop(ServiceStopper stopper) throws Exception {}
267
268    @Override
269    protected void doStart() throws Exception {}
270}