Skip to content

Commit 12008c2

Browse files
committed
Quartz: introduce Nonconcurrent
- fixes quarkusio#44048
1 parent 2245430 commit 12008c2

File tree

16 files changed

+474
-80
lines changed

16 files changed

+474
-80
lines changed

extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.sql.Connection;
66
import java.util.ArrayList;
77
import java.util.HashMap;
8+
import java.util.HashSet;
89
import java.util.List;
910
import java.util.Map;
1011
import java.util.Optional;
@@ -55,6 +56,7 @@
5556
import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem;
5657
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
5758
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
59+
import io.quarkus.quartz.Nonconcurrent;
5860
import io.quarkus.quartz.runtime.QuarkusQuartzConnectionPoolProvider;
5961
import io.quarkus.quartz.runtime.QuartzBuildTimeConfig;
6062
import io.quarkus.quartz.runtime.QuartzExtensionPointConfig;
@@ -69,6 +71,7 @@
6971
import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate;
7072
import io.quarkus.runtime.configuration.ConfigurationException;
7173
import io.quarkus.scheduler.Scheduled;
74+
import io.quarkus.scheduler.deployment.ScheduledBusinessMethodItem;
7275
import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem;
7376

7477
public class QuartzProcessor {
@@ -79,6 +82,7 @@ public class QuartzProcessor {
7982
private static final DotName DELEGATE_HSQLDB = DotName.createSimple(QuarkusHSQLDBDelegate.class.getName());
8083
private static final DotName DELEGATE_MSSQL = DotName.createSimple(QuarkusMSSQLDelegate.class.getName());
8184
private static final DotName DELEGATE_STDJDBC = DotName.createSimple(QuarkusStdJDBCDelegate.class.getName());
85+
private static final DotName NONCONCURRENT = DotName.createSimple(Nonconcurrent.class);
8286

8387
@BuildStep
8488
FeatureBuildItem feature() {
@@ -313,12 +317,23 @@ public void start(BuildProducer<ServiceStartBuildItem> serviceStart,
313317
@Record(RUNTIME_INIT)
314318
public void quartzSupportBean(QuartzRuntimeConfig runtimeConfig, QuartzBuildTimeConfig buildTimeConfig,
315319
QuartzRecorder recorder,
316-
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer,
317-
QuartzJDBCDriverDialectBuildItem driverDialect) {
320+
QuartzJDBCDriverDialectBuildItem driverDialect,
321+
List<ScheduledBusinessMethodItem> scheduledMethods,
322+
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer) {
323+
324+
Set<String> nonconcurrentMethods = new HashSet<>();
325+
for (ScheduledBusinessMethodItem m : scheduledMethods) {
326+
if (m.getMethod().hasAnnotation(NONCONCURRENT)) {
327+
nonconcurrentMethods.add(m.getMethod().declaringClass().name() + "#" + m.getMethod().name());
328+
}
329+
}
318330

319331
syntheticBeanBuildItemBuildProducer.produce(SyntheticBeanBuildItem.configure(QuartzSupport.class)
320332
.scope(Singleton.class) // this should be @ApplicationScoped but it fails for some reason
321333
.setRuntimeInit()
322-
.supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver())).done());
334+
.supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver(),
335+
nonconcurrentMethods))
336+
.done());
323337
}
338+
324339
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import jakarta.inject.Inject;
10+
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.api.extension.RegisterExtension;
13+
14+
import io.quarkus.quartz.QuartzScheduler;
15+
import io.quarkus.scheduler.Scheduled;
16+
import io.quarkus.test.QuarkusUnitTest;
17+
18+
public class NonconcurrentJobDefinitionTest {
19+
20+
@RegisterExtension
21+
static final QuarkusUnitTest test = new QuarkusUnitTest()
22+
.withApplicationRoot(root -> root.addClasses(Jobs.class))
23+
.overrideConfigKey("quarkus.scheduler.start-mode", "forced")
24+
.overrideConfigKey("quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread",
25+
"true");
26+
27+
@Inject
28+
QuartzScheduler scheduler;
29+
30+
@Test
31+
public void testExecution() throws InterruptedException {
32+
scheduler.newJob("foo")
33+
.setTask(se -> {
34+
Jobs.NONCONCURRENT_COUNTER.incrementAndGet();
35+
try {
36+
if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
37+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
38+
}
39+
} catch (InterruptedException e) {
40+
throw new IllegalStateException(e);
41+
}
42+
if (Jobs.NONCONCURRENT_COUNTER.get() == 1) {
43+
// concurrent() executed >= 5x and nonconcurrent() 1x
44+
Jobs.NONCONCURRENT_LATCH.countDown();
45+
}
46+
})
47+
.setInterval("1s")
48+
.setNonconcurrent()
49+
.schedule();
50+
51+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
52+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
53+
}
54+
55+
static class Jobs {
56+
57+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
58+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
59+
60+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
61+
62+
@Scheduled(identity = "bar", every = "1s")
63+
void concurrent() throws InterruptedException {
64+
CONCURRENT_LATCH.countDown();
65+
}
66+
67+
}
68+
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import jakarta.inject.Inject;
10+
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.api.extension.RegisterExtension;
13+
import org.quartz.DisallowConcurrentExecution;
14+
import org.quartz.Job;
15+
import org.quartz.JobBuilder;
16+
import org.quartz.JobDetail;
17+
import org.quartz.JobExecutionContext;
18+
import org.quartz.JobExecutionException;
19+
import org.quartz.SchedulerException;
20+
import org.quartz.SimpleScheduleBuilder;
21+
import org.quartz.Trigger;
22+
import org.quartz.TriggerBuilder;
23+
24+
import io.quarkus.quartz.QuartzScheduler;
25+
import io.quarkus.scheduler.Scheduled;
26+
import io.quarkus.scheduler.Scheduler;
27+
import io.quarkus.test.QuarkusUnitTest;
28+
29+
public class NonconcurrentProgrammaticTest {
30+
31+
@RegisterExtension
32+
static final QuarkusUnitTest test = new QuarkusUnitTest()
33+
.withApplicationRoot(root -> root
34+
.addClasses(Jobs.class))
35+
.overrideConfigKey("quarkus.scheduler.start-mode", "halted");
36+
37+
@Inject
38+
QuartzScheduler scheduler;
39+
40+
@Test
41+
public void testExecution() throws SchedulerException, InterruptedException {
42+
JobDetail job = JobBuilder.newJob(Jobs.class)
43+
.withIdentity("foo", Scheduler.class.getName())
44+
.build();
45+
Trigger trigger = TriggerBuilder.newTrigger()
46+
.withIdentity("foo", Scheduler.class.getName())
47+
.startNow()
48+
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
49+
.withIntervalInSeconds(1)
50+
.repeatForever())
51+
.build();
52+
scheduler.getScheduler().scheduleJob(job, trigger);
53+
54+
scheduler.resume();
55+
56+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
57+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
58+
}
59+
60+
@DisallowConcurrentExecution
61+
static class Jobs implements Job {
62+
63+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
64+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
65+
66+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
67+
68+
@Scheduled(identity = "bar", every = "1s")
69+
void concurrent() throws InterruptedException {
70+
CONCURRENT_LATCH.countDown();
71+
}
72+
73+
@Override
74+
public void execute(JobExecutionContext context) throws JobExecutionException {
75+
Jobs.NONCONCURRENT_COUNTER.incrementAndGet();
76+
try {
77+
if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
78+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
79+
}
80+
} catch (InterruptedException e) {
81+
throw new IllegalStateException(e);
82+
}
83+
if (Jobs.NONCONCURRENT_COUNTER.get() == 1) {
84+
// concurrent() executed >= 5x and nonconcurrent() 1x
85+
Jobs.NONCONCURRENT_LATCH.countDown();
86+
}
87+
}
88+
89+
}
90+
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.extension.RegisterExtension;
11+
12+
import io.quarkus.quartz.Nonconcurrent;
13+
import io.quarkus.scheduler.Scheduled;
14+
import io.quarkus.test.QuarkusUnitTest;
15+
16+
public class NonconcurrentTest {
17+
18+
@RegisterExtension
19+
static final QuarkusUnitTest test = new QuarkusUnitTest()
20+
.withApplicationRoot(root -> root.addClasses(Jobs.class))
21+
.overrideConfigKey("quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread",
22+
"true");
23+
24+
@Test
25+
public void testExecution() throws InterruptedException {
26+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
27+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
28+
}
29+
30+
static class Jobs {
31+
32+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
33+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
34+
35+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
36+
37+
@Nonconcurrent
38+
@Scheduled(identity = "foo", every = "1s")
39+
void nonconcurrent() throws InterruptedException {
40+
NONCONCURRENT_COUNTER.incrementAndGet();
41+
if (!CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
42+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
43+
}
44+
if (NONCONCURRENT_COUNTER.get() == 1) {
45+
// concurrent() executed >= 5x and nonconcurrent() 1x
46+
NONCONCURRENT_LATCH.countDown();
47+
}
48+
}
49+
50+
@Scheduled(identity = "bar", every = "1s")
51+
void concurrent() throws InterruptedException {
52+
CONCURRENT_LATCH.countDown();
53+
}
54+
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.fail;
4+
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.extension.RegisterExtension;
7+
8+
import io.quarkus.quartz.Nonconcurrent;
9+
import io.quarkus.scheduler.Scheduled;
10+
import io.quarkus.test.QuarkusUnitTest;
11+
12+
public class NonconcurrentValidationFailureTest {
13+
14+
@RegisterExtension
15+
static final QuarkusUnitTest test = new QuarkusUnitTest()
16+
.withApplicationRoot(root -> root
17+
.addClasses(Jobs.class))
18+
.setExpectedException(IllegalStateException.class, true)
19+
.overrideConfigKey("quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread",
20+
"false");
21+
22+
@Test
23+
public void test() throws InterruptedException {
24+
fail();
25+
}
26+
27+
static class Jobs {
28+
29+
@Nonconcurrent
30+
@Scheduled(identity = "foo", every = "1s")
31+
void nonconcurrent() throws InterruptedException {
32+
}
33+
34+
}
35+
}

extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testJobs() throws InterruptedException {
6969
.setSkipPredicate(AlwaysSkipPredicate.class)
7070
.schedule();
7171

72-
Scheduler.JobDefinition job1 = scheduler.newJob("foo")
72+
Scheduler.JobDefinition<?> job1 = scheduler.newJob("foo")
7373
.setInterval("1s")
7474
.setTask(ec -> {
7575
assertTrue(Arc.container().requestContext().isActive());
@@ -79,7 +79,7 @@ public void testJobs() throws InterruptedException {
7979
assertEquals("Sync task was already set",
8080
assertThrows(IllegalStateException.class, () -> job1.setAsyncTask(ec -> null)).getMessage());
8181

82-
Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?");
82+
Scheduler.JobDefinition<?> job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?");
8383
assertEquals("Either sync or async task must be set",
8484
assertThrows(IllegalStateException.class, () -> job2.schedule()).getMessage());
8585
job2.setTask(ec -> {
@@ -117,7 +117,7 @@ public void testJobs() throws InterruptedException {
117117
@Test
118118
public void testAsyncJob() throws InterruptedException, SchedulerException {
119119
String identity = "fooAsync";
120-
JobDefinition asyncJob = scheduler.newJob(identity)
120+
JobDefinition<?> asyncJob = scheduler.newJob(identity)
121121
.setInterval("1s")
122122
.setAsyncTask(ec -> {
123123
assertTrue(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.quarkus.quartz;
2+
3+
import static java.lang.annotation.ElementType.METHOD;
4+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
5+
6+
import java.lang.annotation.Retention;
7+
import java.lang.annotation.Target;
8+
9+
import org.quartz.DisallowConcurrentExecution;
10+
import org.quartz.Job;
11+
12+
import io.quarkus.scheduler.Scheduled;
13+
import io.quarkus.scheduler.SkippedExecution;
14+
15+
/**
16+
* Annotated scheduled method may not be executed concurrently. The behavior is identical to a {@link Job} class annotated with
17+
* {@link DisallowConcurrentExecution}.
18+
* <p>
19+
* This annotation can be only used if {@code quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread} is set to
20+
* {@code true}, otherwise the application startup fails.
21+
* <p>
22+
* Unlike with {@link Scheduled.ConcurrentExecution#SKIP} the {@link SkippedExecution} event is never fired if a method
23+
* execution is skipped by Quartz.
24+
*
25+
* @see DisallowConcurrentExecution
26+
*/
27+
@Target(METHOD)
28+
@Retention(RUNTIME)
29+
public @interface Nonconcurrent {
30+
31+
}

extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java

+14
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,18 @@ public interface QuartzScheduler extends Scheduler {
1313
*/
1414
org.quartz.Scheduler getScheduler();
1515

16+
@Override
17+
QuartzJobDefinition newJob(String identity);
18+
19+
interface QuartzJobDefinition extends JobDefinition<QuartzJobDefinition> {
20+
21+
/**
22+
*
23+
* @return self
24+
* @see Nonconcurrent
25+
*/
26+
QuartzJobDefinition setNonconcurrent();
27+
28+
}
29+
1630
}

0 commit comments

Comments
 (0)