001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.scheduler;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.atomic.AtomicBoolean;
028
029import javax.jms.MessageFormatException;
030
031import org.apache.activemq.broker.scheduler.CronParser;
032import org.apache.activemq.broker.scheduler.Job;
033import org.apache.activemq.broker.scheduler.JobListener;
034import org.apache.activemq.broker.scheduler.JobScheduler;
035import org.apache.activemq.protobuf.Buffer;
036import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
037import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
038import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
039import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
040import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
041import org.apache.activemq.store.kahadb.disk.journal.Location;
042import org.apache.activemq.store.kahadb.disk.page.Transaction;
043import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
044import org.apache.activemq.util.ByteSequence;
045import org.apache.activemq.util.IdGenerator;
046import org.apache.activemq.util.ServiceStopper;
047import org.apache.activemq.util.ServiceSupport;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
052
053    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
054    private final JobSchedulerStoreImpl store;
055    private final AtomicBoolean running = new AtomicBoolean();
056    private String name;
057    private BTreeIndex<Long, List<JobLocation>> index;
058    private Thread thread;
059    private final AtomicBoolean started = new AtomicBoolean(false);
060    private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>();
061    private static final IdGenerator ID_GENERATOR = new IdGenerator();
062    private final ScheduleTime scheduleTime = new ScheduleTime();
063
064    JobSchedulerImpl(JobSchedulerStoreImpl store) {
065        this.store = store;
066    }
067
068    public void setName(String name) {
069        this.name = name;
070    }
071
072    @Override
073    public String getName() {
074        return this.name;
075    }
076
077    @Override
078    public void addListener(JobListener l) {
079        this.jobListeners.add(l);
080    }
081
082    @Override
083    public void removeListener(JobListener l) {
084        this.jobListeners.remove(l);
085    }
086
087    @Override
088    public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
089        doSchedule(jobId, payload, "", 0, delay, 0);
090    }
091
092    @Override
093    public void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception {
094        doSchedule(jobId, payload, cronEntry, 0, 0, 0);
095    }
096
097    @Override
098    public void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period, final int repeat) throws IOException {
099        doSchedule(jobId, payload, cronEntry, delay, period, repeat);
100    }
101
102    @Override
103    public void remove(final long time) throws IOException {
104        doRemoveRange(time, time);
105    }
106
107    @Override
108    public void remove(final String jobId) throws IOException {
109        doRemove(-1, jobId);
110    }
111
112    @Override
113    public void removeAllJobs() throws IOException {
114        doRemoveRange(0, Long.MAX_VALUE);
115    }
116
117    @Override
118    public void removeAllJobs(final long start, final long finish) throws IOException {
119        doRemoveRange(start, finish);
120    }
121
122    @Override
123    public long getNextScheduleTime() throws IOException {
124        this.store.readLockIndex();
125        try {
126            Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
127            return first != null ? first.getKey() : -1l;
128        } finally {
129            this.store.readUnlockIndex();
130        }
131    }
132
133    @Override
134    public List<Job> getNextScheduleJobs() throws IOException {
135        final List<Job> result = new ArrayList<>();
136        this.store.readLockIndex();
137        try {
138            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
139                @Override
140                public void execute(Transaction tx) throws IOException {
141                    Map.Entry<Long, List<JobLocation>> first = index.getFirst(tx);
142                    if (first != null) {
143                        for (JobLocation jl : first.getValue()) {
144                            ByteSequence bs = getPayload(jl.getLocation());
145                            Job job = new JobImpl(jl, bs);
146                            result.add(job);
147                        }
148                    }
149                }
150            });
151        } finally {
152            this.store.readUnlockIndex();
153        }
154        return result;
155    }
156
157    private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
158        this.store.readLockIndex();
159        try {
160            if (!this.store.isStopped() && !this.store.isStopping()) {
161                Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
162                return first;
163            }
164        } finally {
165            this.store.readUnlockIndex();
166        }
167        return null;
168    }
169
170    @Override
171    public List<Job> getAllJobs() throws IOException {
172        final List<Job> result = new ArrayList<>();
173        this.store.readLockIndex();
174        try {
175            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
176                @Override
177                public void execute(Transaction tx) throws IOException {
178                    Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx());
179                    while (iter.hasNext()) {
180                        Map.Entry<Long, List<JobLocation>> next = iter.next();
181                        if (next != null) {
182                            for (JobLocation jl : next.getValue()) {
183                                ByteSequence bs = getPayload(jl.getLocation());
184                                Job job = new JobImpl(jl, bs);
185                                result.add(job);
186                            }
187                        } else {
188                            break;
189                        }
190                    }
191                }
192            });
193        } finally {
194            this.store.readUnlockIndex();
195        }
196        return result;
197    }
198
199    @Override
200    public List<Job> getAllJobs(final long start, final long finish) throws IOException {
201        final List<Job> result = new ArrayList<>();
202        this.store.readLockIndex();
203        try {
204            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
205                @Override
206                public void execute(Transaction tx) throws IOException {
207                    Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(tx, start);
208                    while (iter.hasNext()) {
209                        Map.Entry<Long, List<JobLocation>> next = iter.next();
210                        if (next != null && next.getKey().longValue() <= finish) {
211                            for (JobLocation jl : next.getValue()) {
212                                ByteSequence bs = getPayload(jl.getLocation());
213                                Job job = new JobImpl(jl, bs);
214                                result.add(job);
215                            }
216                        } else {
217                            break;
218                        }
219                    }
220                }
221            });
222        } finally {
223            this.store.readUnlockIndex();
224        }
225        return result;
226    }
227
228    private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException {
229        long startTime = System.currentTimeMillis();
230        // round startTime - so we can schedule more jobs at the same time
231        startTime = ((startTime + 500) / 500) * 500;
232
233        long time = 0;
234        if (cronEntry != null && cronEntry.length() > 0) {
235            try {
236                time = CronParser.getNextScheduledTime(cronEntry, startTime);
237            } catch (MessageFormatException e) {
238                throw new IOException(e.getMessage());
239            }
240        }
241
242        if (time == 0) {
243            // start time not set by CRON - so it it to the current time
244            time = startTime;
245        }
246
247        if (delay > 0) {
248            time += delay;
249        } else {
250            time += period;
251        }
252
253        KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand();
254        newJob.setScheduler(name);
255        newJob.setJobId(jobId);
256        newJob.setStartTime(startTime);
257        newJob.setCronEntry(cronEntry);
258        newJob.setDelay(delay);
259        newJob.setPeriod(period);
260        newJob.setRepeat(repeat);
261        newJob.setNextExecutionTime(time);
262        newJob.setPayload(new Buffer(payload.getData(), payload.getOffset(), payload.getLength()));
263
264        this.store.store(newJob);
265    }
266
267    private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException {
268        KahaRescheduleJobCommand update = new KahaRescheduleJobCommand();
269        update.setScheduler(name);
270        update.setJobId(jobId);
271        update.setExecutionTime(executionTime);
272        update.setNextExecutionTime(nextExecutionTime);
273        update.setRescheduledCount(rescheduledCount);
274        this.store.store(update);
275    }
276
277    private void doRemove(final long executionTime, final List<JobLocation> jobs) throws IOException {
278        for (JobLocation job : jobs) {
279            doRemove(executionTime, job.getJobId());
280        }
281    }
282
283    private void doRemove(long executionTime, final String jobId) throws IOException {
284        KahaRemoveScheduledJobCommand remove = new KahaRemoveScheduledJobCommand();
285        remove.setScheduler(name);
286        remove.setJobId(jobId);
287        remove.setNextExecutionTime(executionTime);
288        this.store.store(remove);
289    }
290
291    private void doRemoveRange(long start, long end) throws IOException {
292        KahaRemoveScheduledJobsCommand destroy = new KahaRemoveScheduledJobsCommand();
293        destroy.setScheduler(name);
294        destroy.setStartTime(start);
295        destroy.setEndTime(end);
296        this.store.store(destroy);
297    }
298
299    /**
300     * Adds a new Scheduled job to the index.  Must be called under index lock.
301     *
302     * This method must ensure that a duplicate add is not processed into the scheduler.  On index
303     * recover some adds may be replayed and we don't allow more than one instance of a JobId to
304     * exist at any given scheduled time, so filter these out to ensure idempotence.
305     *
306     * @param tx
307     *      Transaction in which the update is performed.
308     * @param command
309     *      The new scheduled job command to process.
310     * @param location
311     *      The location where the add command is stored in the journal.
312     *
313     * @throws IOException if an error occurs updating the index.
314     */
315    protected void process(final Transaction tx, final KahaAddScheduledJobCommand command, Location location) throws IOException {
316        JobLocation jobLocation = new JobLocation(location);
317        jobLocation.setJobId(command.getJobId());
318        jobLocation.setStartTime(command.getStartTime());
319        jobLocation.setCronEntry(command.getCronEntry());
320        jobLocation.setDelay(command.getDelay());
321        jobLocation.setPeriod(command.getPeriod());
322        jobLocation.setRepeat(command.getRepeat());
323
324        long nextExecutionTime = command.getNextExecutionTime();
325
326        List<JobLocation> values = null;
327        jobLocation.setNextTime(nextExecutionTime);
328        if (this.index.containsKey(tx, nextExecutionTime)) {
329            values = this.index.remove(tx, nextExecutionTime);
330        }
331        if (values == null) {
332            values = new ArrayList<>();
333        }
334
335        // There can never be more than one instance of the same JobId scheduled at any
336        // given time, when it happens its probably the result of index recovery and this
337        // method must be idempotent so check for it first.
338        if (!values.contains(jobLocation)) {
339            values.add(jobLocation);
340
341            // Reference the log file where the add command is stored to prevent GC.
342            this.store.incrementJournalCount(tx, location);
343            this.index.put(tx, nextExecutionTime, values);
344            this.scheduleTime.newJob();
345        } else {
346            this.index.put(tx, nextExecutionTime, values);
347            LOG.trace("Job {} already in scheduler at this time {}",
348                      jobLocation.getJobId(), jobLocation.getNextTime());
349        }
350    }
351
352    /**
353     * Reschedules a Job after it has be fired.
354     *
355     * For jobs that are repeating this method updates the job in the index by adding it to the
356     * jobs list for the new execution time.  If the job is not a cron type job then this method
357     * will reduce the repeat counter if the job has a fixed number of repeats set.  The Job will
358     * be removed from the jobs list it just executed on.
359     *
360     * This method must also update the value of the last update location in the JobLocation
361     * instance so that the checkpoint worker doesn't drop the log file in which that command lives.
362     *
363     * This method must ensure that an reschedule command that references a job that doesn't exist
364     * does not cause an error since it's possible that on recover the original add might be gone
365     * and so the job should not reappear in the scheduler.
366     *
367     * @param tx
368     *      The TX under which the index is updated.
369     * @param command
370     *      The reschedule command to process.
371     * @param location
372     *      The location in the index where the reschedule command was stored.
373     *
374     * @throws IOException if an error occurs during the reschedule.
375     */
376    protected void process(final Transaction tx, final KahaRescheduleJobCommand command, Location location) throws IOException {
377        JobLocation result = null;
378        final List<JobLocation> current = this.index.remove(tx, command.getExecutionTime());
379        if (current != null) {
380            for (int i = 0; i < current.size(); i++) {
381                JobLocation jl = current.get(i);
382                if (jl.getJobId().equals(command.getJobId())) {
383                    current.remove(i);
384                    if (!current.isEmpty()) {
385                        this.index.put(tx, command.getExecutionTime(), current);
386                    }
387                    result = jl;
388                    break;
389                }
390            }
391        } else {
392            LOG.debug("Process reschedule command for job {} non-existent executime time {}.",
393                      command.getJobId(), command.getExecutionTime());
394        }
395
396        if (result != null) {
397            Location previousUpdate = result.getLastUpdate();
398
399            List<JobLocation> target = null;
400            result.setNextTime(command.getNextExecutionTime());
401            result.setLastUpdate(location);
402            result.setRescheduledCount(command.getRescheduledCount());
403            if (!result.isCron() && result.getRepeat() > 0) {
404                result.setRepeat(result.getRepeat() - 1);
405            }
406            if (this.index.containsKey(tx, command.getNextExecutionTime())) {
407                target = this.index.remove(tx, command.getNextExecutionTime());
408            }
409            if (target == null) {
410                target = new ArrayList<>();
411            }
412            target.add(result);
413
414            // Track the location of the last reschedule command and release the log file
415            // reference for the previous one if there was one.
416            this.store.incrementJournalCount(tx, location);
417            if (previousUpdate != null) {
418                this.store.decrementJournalCount(tx, previousUpdate);
419            }
420
421            this.index.put(tx, command.getNextExecutionTime(), target);
422            this.scheduleTime.newJob();
423        } else {
424            LOG.debug("Process reschedule command for non-scheduled job {} at executime time {}.",
425                      command.getJobId(), command.getExecutionTime());
426        }
427    }
428
429    /**
430     * Removes a scheduled job from the scheduler.
431     *
432     * The remove operation can be of two forms.  The first is that there is a job Id but no set time
433     * (-1) in which case the jobs index is searched until the target job Id is located.  The alternate
434     * form is that a job Id and execution time are both set in which case the given time is checked
435     * for a job matching that Id.  In either case once an execution time is identified the job is
436     * removed and the index updated.
437     *
438     * This method should ensure that if the matching job is not found that no error results as it
439     * is possible that on a recover the initial add command could be lost so the job may not be
440     * rescheduled.
441     *
442     * @param tx
443     *      The transaction under which the index is updated.
444     * @param command
445     *      The remove command to process.
446     * @param location
447     *      The location of the remove command in the Journal.
448     *
449     * @throws IOException if an error occurs while updating the scheduler index.
450     */
451    void process(final Transaction tx, final KahaRemoveScheduledJobCommand command, Location location) throws IOException {
452
453        // Case 1: JobId and no time value means find the job and remove it.
454        // Case 2: JobId and a time value means find exactly this scheduled job.
455
456        Long executionTime = command.getNextExecutionTime();
457
458        List<JobLocation> values = null;
459
460        if (executionTime == -1) {
461            for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
462                Map.Entry<Long, List<JobLocation>> entry = i.next();
463                List<JobLocation> candidates = entry.getValue();
464                if (candidates != null) {
465                    for (JobLocation jl : candidates) {
466                        if (jl.getJobId().equals(command.getJobId())) {
467                            LOG.trace("Entry {} contains the remove target: {}", entry.getKey(), command.getJobId());
468                            executionTime = entry.getKey();
469                            values = this.index.remove(tx, executionTime);
470                            break;
471                        }
472                    }
473                }
474            }
475        } else {
476            values = this.index.remove(tx, executionTime);
477        }
478
479        JobLocation removed = null;
480
481        // Remove the job and update the index if there are any other jobs scheduled at this time.
482        if (values != null) {
483            for (JobLocation job : values) {
484                if (job.getJobId().equals(command.getJobId())) {
485                    removed = job;
486                    values.remove(removed);
487                    break;
488                }
489            }
490
491            if (!values.isEmpty()) {
492                this.index.put(tx, executionTime, values);
493            }
494        }
495
496        if (removed != null) {
497            LOG.trace("{} removed from scheduler {}", removed, this);
498
499            // Remove the references for add and reschedule commands for this job
500            // so that those logs can be GC'd when free.
501            this.store.decrementJournalCount(tx, removed.getLocation());
502            if (removed.getLastUpdate() != null) {
503                this.store.decrementJournalCount(tx, removed.getLastUpdate());
504            }
505
506            // now that the job is removed from the index we can store the remove info and
507            // then dereference the log files that hold the initial add command and the most
508            // recent update command.  If the remove and the add that created the job are in
509            // the same file we don't need to track it and just let a normal GC of the logs
510            // remove it when the log is unreferenced.
511            if (removed.getLocation().getDataFileId() != location.getDataFileId()) {
512                this.store.referenceRemovedLocation(tx, location, removed);
513            }
514        }
515    }
516
517    /**
518     * Removes all scheduled jobs within a given time range.
519     *
520     * The method can be used to clear the entire scheduler index by specifying a range that
521     * encompasses all time [0...Long.MAX_VALUE] or a single execution time can be removed by
522     * setting start and end time to the same value.
523     *
524     * @param tx
525     *      The transaction under which the index is updated.
526     * @param command
527     *      The remove command to process.
528     * @param location
529     *      The location of the remove command in the Journal.
530     *
531     * @throws IOException if an error occurs while updating the scheduler index.
532     */
533    protected void process(final Transaction tx, final KahaRemoveScheduledJobsCommand command, Location location) throws IOException {
534        removeInRange(tx, command.getStartTime(), command.getEndTime(), location);
535    }
536
537    /**
538     * Removes all jobs from the schedulers index.  Must be called with the index locked.
539     *
540     * @param tx
541     *      The transaction under which the index entries for this scheduler are removed.
542     *
543     * @throws IOException if an error occurs removing the jobs from the scheduler index.
544     */
545    protected void removeAll(Transaction tx) throws IOException {
546        this.removeInRange(tx, 0, Long.MAX_VALUE, null);
547    }
548
549    /**
550     * Removes all scheduled jobs within the target range.
551     *
552     * This method can be used to remove all the stored jobs by passing a range of [0...Long.MAX_VALUE]
553     * or it can be used to remove all jobs at a given scheduled time by passing the same time value
554     * for both start and end.  If the optional location parameter is set then this method will update
555     * the store's remove location tracker with the location value and the Jobs that are being removed.
556     *
557     * This method must be called with the store index locked for writes.
558     *
559     * @param tx
560     *      The transaction under which the index is to be updated.
561     * @param start
562     *      The start time for the remove operation.
563     * @param finish
564     *      The end time for the remove operation.
565     * @param location (optional)
566     *      The location of the remove command that triggered this remove.
567     *
568     * @throws IOException if an error occurs during the remove operation.
569     */
570    protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException {
571        List<Long> keys = new ArrayList<>();
572        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) {
573            Map.Entry<Long, List<JobLocation>> entry = i.next();
574            if (entry.getKey().longValue() <= finish) {
575                keys.add(entry.getKey());
576            } else {
577                break;
578            }
579        }
580
581        for (Long executionTime : keys) {
582            List<JobLocation> values = this.index.remove(tx, executionTime);
583            if (location != null) {
584                for (JobLocation job : values) {
585                    LOG.trace("Removing {} scheduled at: {}", job, executionTime);
586
587                    // Remove the references for add and reschedule commands for this job
588                    // so that those logs can be GC'd when free.
589                    this.store.decrementJournalCount(tx, job.getLocation());
590                    if (job.getLastUpdate() != null) {
591                        this.store.decrementJournalCount(tx, job.getLastUpdate());
592                    }
593
594                    // now that the job is removed from the index we can store the remove info and
595                    // then dereference the log files that hold the initial add command and the most
596                    // recent update command.  If the remove and the add that created the job are in
597                    // the same file we don't need to track it and just let a normal GC of the logs
598                    // remove it when the log is unreferenced.
599                    if (job.getLocation().getDataFileId() != location.getDataFileId()) {
600                        this.store.referenceRemovedLocation(tx, location, job);
601                    }
602                }
603            }
604        }
605    }
606
607    /**
608     * Removes a Job from the index using it's Id value and the time it is currently set to
609     * be executed.  This method will only remove the Job if it is found at the given execution
610     * time.
611     *
612     * This method must be called under index lock.
613     *
614     * @param tx
615     *        the transaction under which this method is being executed.
616     * @param jobId
617     *        the target Job Id to remove.
618     * @param executionTime
619     *        the scheduled time that for the Job Id that is being removed.
620     *
621     * @returns true if the Job was removed or false if not found at the given time.
622     *
623     * @throws IOException if an error occurs while removing the Job.
624     */
625    protected boolean removeJobAtTime(Transaction tx, String jobId, long executionTime) throws IOException {
626        boolean result = false;
627
628        List<JobLocation> jobs = this.index.remove(tx, executionTime);
629        Iterator<JobLocation> jobsIter = jobs.iterator();
630        while (jobsIter.hasNext()) {
631            JobLocation job = jobsIter.next();
632            if (job.getJobId().equals(jobId)) {
633                jobsIter.remove();
634                // Remove the references for add and reschedule commands for this job
635                // so that those logs can be GC'd when free.
636                this.store.decrementJournalCount(tx, job.getLocation());
637                if (job.getLastUpdate() != null) {
638                    this.store.decrementJournalCount(tx, job.getLastUpdate());
639                }
640                result = true;
641                break;
642            }
643        }
644
645        // Return the list to the index modified or unmodified.
646        this.index.put(tx, executionTime, jobs);
647
648        return result;
649    }
650
651    /**
652     * Walks the Scheduled Job Tree and collects the add location and last update location
653     * for all scheduled jobs.
654     *
655     * This method must be called with the index locked.
656     *
657     * @param tx
658     *        the transaction under which this operation was invoked.
659     *
660     * @return a list of all referenced Location values for this JobSchedulerImpl
661     *
662     * @throws IOException if an error occurs walking the scheduler tree.
663     */
664    protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
665        List<JobLocation> references = new ArrayList<>();
666
667        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
668            Map.Entry<Long, List<JobLocation>> entry = i.next();
669            List<JobLocation> scheduled = entry.getValue();
670            for (JobLocation job : scheduled) {
671                references.add(job);
672            }
673        }
674
675        return references;
676    }
677
678    @Override
679    public void run() {
680        try {
681            mainLoop();
682        } catch (Throwable e) {
683            if (this.running.get() && isStarted()) {
684                LOG.error("{} Caught exception in mainloop", this, e);
685            }
686        } finally {
687            if (running.get()) {
688                try {
689                    stop();
690                } catch (Exception e) {
691                    LOG.error("Failed to stop {}", this);
692                }
693            }
694        }
695    }
696
697    @Override
698    public String toString() {
699        return "JobScheduler: " + this.name;
700    }
701
702    protected void mainLoop() {
703        while (this.running.get()) {
704            this.scheduleTime.clearNewJob();
705            try {
706                long currentTime = System.currentTimeMillis();
707
708                // Read the list of scheduled events and fire the jobs, reschedule repeating jobs as
709                // needed before firing the job event.
710                Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
711                if (first != null) {
712                    List<JobLocation> list = new ArrayList<>(first.getValue());
713                    List<JobLocation> toRemove = new ArrayList<>(list.size());
714                    final long executionTime = first.getKey();
715                    long nextExecutionTime = 0;
716                    if (executionTime <= currentTime) {
717                        for (final JobLocation job : list) {
718
719                            if (!running.get()) {
720                                break;
721                            }
722
723                            int repeat = job.getRepeat();
724                            nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat);
725                            long waitTime = nextExecutionTime - currentTime;
726                            this.scheduleTime.setWaitTime(waitTime);
727                            if (!job.isCron()) {
728                                fireJob(job);
729                                if (repeat != 0) {
730                                    // Reschedule for the next time, the scheduler will take care of
731                                    // updating the repeat counter on the update.
732                                    doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
733                                } else {
734                                    toRemove.add(job);
735                                }
736                            } else {
737                                if (repeat == 0) {
738                                    // This is a non-repeating Cron entry so we can fire and forget it.
739                                    fireJob(job);
740                                }
741
742                                if (nextExecutionTime > currentTime) {
743                                    // Reschedule the cron job as a new event, if the cron entry signals
744                                    // a repeat then it will be stored separately and fired as a normal
745                                    // event with decrementing repeat.
746                                    doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
747
748                                    if (repeat != 0) {
749                                        // we have a separate schedule to run at this time
750                                        // so the cron job is used to set of a separate schedule
751                                        // hence we won't fire the original cron job to the
752                                        // listeners but we do need to start a separate schedule
753                                        String jobId = ID_GENERATOR.generateId();
754                                        ByteSequence payload = getPayload(job.getLocation());
755                                        schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
756                                        waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod();
757                                        this.scheduleTime.setWaitTime(waitTime);
758                                    }
759                                } else {
760                                    toRemove.add(job);
761                                }
762                            }
763                        }
764
765                        // now remove all jobs that have not been rescheduled from this execution
766                        // time, if there are no more entries in that time it will be removed.
767                        doRemove(executionTime, toRemove);
768
769                        // If there is a job that should fire before the currently set wait time
770                        // we need to reset wait time otherwise we'll miss it.
771                        Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule();
772                        if (nextUp != null) {
773                            final long timeUntilNextScheduled = nextUp.getKey() - currentTime;
774                            if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) {
775                                this.scheduleTime.setWaitTime(timeUntilNextScheduled);
776                            }
777                        }
778                    } else {
779                        this.scheduleTime.setWaitTime(executionTime - currentTime);
780                    }
781                }
782
783                this.scheduleTime.pause();
784            } catch (Exception ioe) {
785                LOG.error("{} Failed to schedule job", this.name, ioe);
786                try {
787                    this.store.stop();
788                } catch (Exception e) {
789                    LOG.error("{} Failed to shutdown JobSchedulerStore", this.name, e);
790                }
791            }
792        }
793    }
794
795    void fireJob(JobLocation job) throws IllegalStateException, IOException {
796        LOG.debug("Firing: {}", job);
797        ByteSequence bs = this.store.getPayload(job.getLocation());
798        for (JobListener l : jobListeners) {
799            l.scheduledJob(job.getJobId(), bs);
800        }
801    }
802
803    @Override
804    public void startDispatching() throws Exception {
805        if (!this.running.get()) {
806            return;
807        }
808
809        if (started.compareAndSet(false, true)) {
810            this.thread = new Thread(this, "JobScheduler:" + this.name);
811            this.thread.setDaemon(true);
812            this.thread.start();
813        }
814    }
815
816    @Override
817    public void stopDispatching() throws Exception {
818        if (started.compareAndSet(true, false)) {
819            this.scheduleTime.wakeup();
820            Thread t = this.thread;
821            this.thread = null;
822            if (t != null) {
823                t.join(3000);
824            }
825        }
826    }
827
828    @Override
829    protected void doStart() throws Exception {
830        this.running.set(true);
831    }
832
833    @Override
834    protected void doStop(ServiceStopper stopper) throws Exception {
835        this.running.set(false);
836        stopDispatching();
837    }
838
839    private ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
840        return this.store.getPayload(location);
841    }
842
843    long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException {
844        long result = currentTime;
845        String cron = job.getCronEntry();
846        if (cron != null && cron.length() > 0) {
847            result = CronParser.getNextScheduledTime(cron, result);
848        } else if (job.getRepeat() != 0) {
849            result += job.getPeriod();
850        }
851        return result;
852    }
853
854    void createIndexes(Transaction tx) throws IOException {
855        this.index = new BTreeIndex<>(this.store.getPageFile(), tx.allocate().getPageId());
856    }
857
858    void load(Transaction tx) throws IOException {
859        this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
860        this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
861        this.index.load(tx);
862    }
863
864    void read(DataInput in) throws IOException {
865        this.name = in.readUTF();
866        this.index = new BTreeIndex<>(this.store.getPageFile(), in.readLong());
867        this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
868        this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
869    }
870
871    public void write(DataOutput out) throws IOException {
872        out.writeUTF(name);
873        out.writeLong(this.index.getPageId());
874    }
875
876    static class ScheduleTime {
877        private final int DEFAULT_WAIT = 500;
878        private final int DEFAULT_NEW_JOB_WAIT = 100;
879        private boolean newJob;
880        private long waitTime = DEFAULT_WAIT;
881        private final Object mutex = new Object();
882
883        /**
884         * @return the waitTime
885         */
886        long getWaitTime() {
887            return this.waitTime;
888        }
889
890        /**
891         * @param waitTime
892         *            the waitTime to set
893         */
894        void setWaitTime(long waitTime) {
895            if (!this.newJob) {
896                this.waitTime = waitTime > 0 ? waitTime : DEFAULT_WAIT;
897            }
898        }
899
900        void pause() {
901            synchronized (mutex) {
902                try {
903                    mutex.wait(this.waitTime);
904                } catch (InterruptedException e) {
905                }
906            }
907        }
908
909        void newJob() {
910            this.newJob = true;
911            this.waitTime = DEFAULT_NEW_JOB_WAIT;
912            wakeup();
913        }
914
915        void clearNewJob() {
916            this.newJob = false;
917        }
918
919        void wakeup() {
920            synchronized (this.mutex) {
921                mutex.notifyAll();
922            }
923        }
924    }
925}