diff --git a/src/main/java/org/knowm/sundial/InterruptingJob.java b/src/main/java/org/knowm/sundial/InterruptingJob.java new file mode 100644 index 0000000..2bda8c2 --- /dev/null +++ b/src/main/java/org/knowm/sundial/InterruptingJob.java @@ -0,0 +1,32 @@ +package org.knowm.sundial; + +import org.quartz.exceptions.UnableToInterruptJobException; + +/** Base class that interrupts blocking operations when the job is stopped. + * + * This class can be used as the base class for jobs that should interrupt + * blocking operations when the job is stopped, as opposed to completing them + * first. + * + * @see Thread#interrupt() + */ +public abstract class InterruptingJob extends Job { + private Thread executingThread; + + public synchronized void setup() { + executingThread = Thread.currentThread(); + } + + public synchronized void cleanup() { + executingThread = null; + } + + public void interrupt() throws UnableToInterruptJobException { + synchronized (this) { + if (executingThread != null) { + executingThread.interrupt(); + } + } + super.interrupt(); + } +} diff --git a/src/main/java/org/knowm/sundial/Job.java b/src/main/java/org/knowm/sundial/Job.java index 70ca507..0d50619 100644 --- a/src/main/java/org/knowm/sundial/Job.java +++ b/src/main/java/org/knowm/sundial/Job.java @@ -53,6 +53,7 @@ public final void execute(JobExecutionContext jobExecutionContext) throws JobExe initContextContainer(jobExecutionContext); + setup(); doRun(); } catch (RequiredParameterException e) { @@ -73,7 +74,14 @@ public void interrupt() throws UnableToInterruptJobException { logger.info("Interrupt called!"); } - /** + /** + * Override and place any code in here that should be called before doRun + */ + public void setup() { + + } + + /** * Override and place any code in here that should be called no matter what after the Job runs or throws an exception. */ public void cleanup() { diff --git a/src/main/java/org/quartz/jobs/InterruptableJob.java b/src/main/java/org/quartz/jobs/InterruptableJob.java index b5851c8..cde1d0b 100644 --- a/src/main/java/org/quartz/jobs/InterruptableJob.java +++ b/src/main/java/org/quartz/jobs/InterruptableJob.java @@ -36,16 +36,11 @@ * execute(..) signals that it has noticed the set flag. *

*

- * If the Job performs some form of blocking I/O or similar functions, you may want to consider having the Job.execute(..) method store a - * reference to the calling Thread as a member variable. Then the Implementation of this interfaces interrupt() method can - * call interrupt() on that Thread. Before attempting this, make sure that you fully understand what - * java.lang.Thread.interrupt() does and doesn't do. Also make sure that you clear the Job's member reference to the Thread when the - * execute(..) method exits (preferably in a finally block. + * If the Job performs some form of blocking I/O or similar functions, you may want to consider extending InterruptingJob. Its + * interrupt() method calls interrupt() on the thread that is executing the job. Before attempting this, make sure that you + * fully understand what java.lang.Thread.interrupt() does and doesn't do. *

- *

- * See Example 7 (org.quartz.examples.example7.DumbInterruptableJob) for a simple implementation demonstration. - *

- * + * * @see Job * @see StatefulJob * @see Scheduler#interrupt(JobKey) diff --git a/src/test/java/org/knowm/sundial/InterruptingJobTest.java b/src/test/java/org/knowm/sundial/InterruptingJobTest.java new file mode 100644 index 0000000..87a0943 --- /dev/null +++ b/src/test/java/org/knowm/sundial/InterruptingJobTest.java @@ -0,0 +1,74 @@ +package org.knowm.sundial; + +import java.util.concurrent.Semaphore; + +import org.junit.Test; +import org.quartz.core.JobExecutionContext; +import org.quartz.core.JobExecutionContextImpl; +import org.quartz.core.TriggerFiredBundle; +import org.quartz.jobs.JobDetail; +import org.quartz.jobs.JobDetailImpl; +import org.quartz.triggers.OperableTrigger; +import org.quartz.triggers.SimpleTriggerImpl; + +public class InterruptingJobTest { + @Test + public void shouldInterruptThread() throws Exception { + final Semaphore sem = new Semaphore(0); + final WaitingJob job = new WaitingJob(sem); + + final JobDetail detail = new JobDetailImpl(); + final OperableTrigger trigger = new SimpleTriggerImpl(); + final TriggerFiredBundle bundle = new TriggerFiredBundle(detail, trigger, null, true, null, null, null, null); + final JobExecutionContext context = new JobExecutionContextImpl(null, bundle, job); + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + job.execute(context); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + t.start(); + + synchronized (job) { + while (!job.aboutToSleep) { + job.wait(); + } + } + while (!sem.hasQueuedThreads()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + job.interrupt(); + + t.join(); + } + + static class WaitingJob extends InterruptingJob { + public boolean aboutToSleep = false; + final Semaphore sem; + public WaitingJob(Semaphore sem) { + this.sem = sem; + } + @Override + public void doRun() { + synchronized (this) { + aboutToSleep = true; + notify(); + } + try { + sem.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +}