-
Notifications
You must be signed in to change notification settings - Fork 369
NoSQL persistence: add Java/Vert.X executor abstraction layer #2527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| <!-- | ||
| Licensed to the Apache Software Foundation (ASF) under one | ||
| or more contributor license agreements. See the NOTICE file | ||
| distributed with this work for additional information | ||
| regarding copyright ownership. The ASF licenses this file | ||
| to you under the Apache License, Version 2.0 (the | ||
| "License"); you may not use this file except in compliance | ||
| with the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| KIND, either express or implied. See the License for the | ||
| specific language governing permissions and limitations | ||
| under the License. | ||
| --> | ||
|
|
||
| # Async execution API | ||
|
|
||
| Provides an abstraction to submit asynchronous tasks, optionally with a delay or delay + repetition and implementations | ||
| based on Java's `ThreadPoolExecutor` and Vert.X. | ||
|
|
||
| ## Code structure | ||
|
|
||
| The code is structured into multiple modules. Consuming code should almost always pull in only the API module. | ||
|
|
||
| * `polaris-async-api` provides the necessary Java interfaces and immutable types. | ||
| * `polaris-async-java` implementation leveraging `CompletableFuture.delayedExecutor` for delayed/scheduled invocations. | ||
| * `polaris-async-vertx` implementation leveraging Vert.X for delayed/scheduled invocations. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| plugins { | ||
| id("org.kordamp.gradle.jandex") | ||
| id("polaris-server") | ||
| } | ||
|
|
||
| description = "Polaris async execution API" | ||
|
|
||
| dependencies { | ||
| compileOnly(libs.jakarta.annotation.api) | ||
| compileOnly(libs.jakarta.validation.api) | ||
| compileOnly(libs.jakarta.inject.api) | ||
| compileOnly(libs.jakarta.enterprise.cdi.api) | ||
|
|
||
| compileOnly(libs.smallrye.config.core) | ||
| compileOnly(platform(libs.quarkus.bom)) | ||
| compileOnly("io.quarkus:quarkus-core") | ||
|
|
||
| compileOnly(project(":polaris-immutables")) | ||
| annotationProcessor(project(":polaris-immutables", configuration = "processor")) | ||
|
|
||
| implementation(platform(libs.jackson.bom)) | ||
| implementation("com.fasterxml.jackson.core:jackson-databind") | ||
|
|
||
| testFixturesCompileOnly(platform(libs.jackson.bom)) | ||
| testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-databind") | ||
|
|
||
| testFixturesApi(libs.jakarta.annotation.api) | ||
| testFixturesApi(libs.jakarta.validation.api) | ||
| testFixturesApi(libs.jakarta.inject.api) | ||
| testFixturesApi(libs.jakarta.enterprise.cdi.api) | ||
|
|
||
| testFixturesApi(project(":polaris-idgen-api")) | ||
|
|
||
| testFixturesImplementation(libs.weld.se.core) | ||
| testFixturesImplementation(libs.weld.junit5) | ||
| testFixturesImplementation(libs.guava) | ||
| testFixturesRuntimeOnly(libs.smallrye.jandex) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.nosql.async; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonFormat; | ||
| import com.fasterxml.jackson.annotation.JsonInclude; | ||
| import com.fasterxml.jackson.databind.annotation.JsonDeserialize; | ||
| import com.fasterxml.jackson.databind.annotation.JsonSerialize; | ||
| import io.smallrye.config.ConfigMapping; | ||
| import io.smallrye.config.WithDefault; | ||
| import java.time.Duration; | ||
| import java.util.Optional; | ||
| import java.util.OptionalInt; | ||
| import org.apache.polaris.immutables.PolarisImmutable; | ||
|
|
||
| /** Advanced configuration options to tune async activities. */ | ||
| @PolarisImmutable | ||
| @ConfigMapping(prefix = "polaris.async") | ||
| @JsonSerialize(as = ImmutableAsyncConfiguration.class) | ||
| @JsonDeserialize(as = ImmutableAsyncConfiguration.class) | ||
| public interface AsyncConfiguration { | ||
|
|
||
| String DEFAULT_THREAD_KEEP_ALIVE_STRING = "PT1S"; | ||
| Duration DEFAULT_THREAD_KEEP_ALIVE = Duration.parse(DEFAULT_THREAD_KEEP_ALIVE_STRING); | ||
|
|
||
| String DEFAULT_MAX_THREADS_STRING = "256"; | ||
| int DEFAULT_MAX_THREADS = Integer.parseInt(DEFAULT_MAX_THREADS_STRING); | ||
|
|
||
| /** Duration to keep idle threads alive. */ | ||
| @WithDefault(DEFAULT_THREAD_KEEP_ALIVE_STRING) | ||
| @JsonInclude(JsonInclude.Include.NON_ABSENT) | ||
| @JsonFormat(shape = JsonFormat.Shape.STRING) | ||
| Optional<Duration> threadKeepAlive(); | ||
|
|
||
| /** Maximum number of threads available for asynchronous execution. Default is 256. */ | ||
| @JsonInclude(JsonInclude.Include.NON_ABSENT) | ||
| OptionalInt maxThreads(); | ||
|
|
||
| static ImmutableAsyncConfiguration.Builder builder() { | ||
| return ImmutableAsyncConfiguration.builder(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.nosql.async; | ||
|
|
||
| import static java.util.concurrent.Executors.callable; | ||
|
|
||
| import jakarta.enterprise.context.ApplicationScoped; | ||
| import java.time.Duration; | ||
| import java.util.concurrent.Callable; | ||
|
|
||
| /** | ||
| * Abstraction for platform/environment-specific scheduler implementations for delayed and | ||
| * optionally repeated executions. | ||
| * | ||
| * <p>Quarkus production systems use Vert.x, tests usually use Java executors. | ||
| * | ||
| * <p>Implementations, like Java executors or Vert.X, are usually {@link | ||
| * ApplicationScoped @ApplicationScoped} in CDI environments. | ||
| */ | ||
| public interface AsyncExec { | ||
|
|
||
| default <R> Cancelable<R> submit(Callable<R> callable) { | ||
| return schedule(callable, Duration.ZERO); | ||
| } | ||
|
|
||
| /** | ||
| * Asynchronously run the given {@linkplain Callable callable} after the provided {@linkplain | ||
| * Duration delay}. If the delay is not positive, the function is scheduled for immediate | ||
| * execution. | ||
| * | ||
| * @param callable the callable to execute | ||
| * @param delay the execution delay, zero and negative values mean immediate scheduling | ||
| * @param <R> return type of the callable propagated through the returned cancelable | ||
| * @return the cancelable for the scheduled task | ||
| */ | ||
| <R> Cancelable<R> schedule(Callable<R> callable, Duration delay); | ||
|
|
||
| /** | ||
| * This is a convenience function for {@link #schedule(Callable, Duration)} with a void result, | ||
| * using a {@link Runnable}. | ||
| */ | ||
| default Cancelable<Void> schedule(Runnable runnable, Duration delay) { | ||
| return schedule(callable(runnable, null), delay); | ||
| } | ||
|
|
||
| /** | ||
| * Schedules a runnable to be executed repeatedly using the given initial delay. This is | ||
| * equivalent to calling {@link #schedulePeriodic(Runnable, Duration, Duration)} with the {@code | ||
| * initialDelay} and {@code delay} having the same values. | ||
| * | ||
| * <p>There is intentionally no variant of {@code schedulePeriodic()} that takes a {@link Callable | ||
| * Callable<R>} because there are multiple invocations of the runnable. | ||
| */ | ||
| default Cancelable<Void> schedulePeriodic(Runnable runnable, Duration delay) { | ||
| return schedulePeriodic(runnable, delay, delay); | ||
| } | ||
|
|
||
| /** | ||
| * Schedules a runnable to be executed repeatedly, starting after the given initial delay. | ||
| * | ||
| * <p>There is intentionally no variant of {@code schedulePeriodic()} that takes a {@link Callable | ||
| * Callable<R>} because there are multiple invocations of the runnable. | ||
| * | ||
| * @param runnable the runnable to execute | ||
| * @param initialDelay initial delay, zero and negative values mean immediate scheduling | ||
| * @param delay repetition delay, zero and negative cause an {@link IllegalArgumentException} | ||
| * @return cancelable instance | ||
| */ | ||
| Cancelable<Void> schedulePeriodic(Runnable runnable, Duration initialDelay, Duration delay); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.nosql.async; | ||
|
|
||
| import java.util.concurrent.CompletionStage; | ||
|
|
||
| /** | ||
| * Implementation agnostic interface for asynchronous delayed and optionally repeated executions. | ||
| * | ||
| * <p>Implementations may use JVM-local backends, like Java executors or Vert.X. Vert.X's API is not | ||
| * based on Java's {@code (Completable)Future} or {@code CompletionStage}. The cancellation | ||
| * semantics/guarantees are different for Vert.X and Java. | ||
| * | ||
| * @param <R> | ||
| */ | ||
| public interface Cancelable<R> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have some javadoc comments explaining the difference between this and Can callers still get a future via |
||
| /** | ||
| * Attempt to cancel the delayed execution of a callable. Already running callables are not | ||
| * interrupted. A callable may still be invoked after calling this function, because of side | ||
| * effects and race conditions. | ||
| * | ||
| * <p>After cancellation, the result of this instance's might be either in state "completed | ||
| * exceptionally" ({@link java.util.concurrent.CancellationException}) or successfully completed. | ||
| */ | ||
| void cancel(); | ||
|
|
||
| /** | ||
| * Retrieve the {@link CompletionStage} associated with this {@link Cancelable} for the submitted | ||
| * async and potentially periodic execution. | ||
| */ | ||
| CompletionStage<R> completionStage(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| <!-- | ||
| ~ Licensed to the Apache Software Foundation (ASF) under one | ||
| ~ or more contributor license agreements. See the NOTICE file | ||
| ~ distributed with this work for additional information | ||
| ~ regarding copyright ownership. The ASF licenses this file | ||
| ~ to you under the Apache License, Version 2.0 (the | ||
| ~ "License"); you may not use this file except in compliance | ||
| ~ with the License. You may obtain a copy of the License at | ||
| ~ | ||
| ~ http://www.apache.org/licenses/LICENSE-2.0 | ||
| ~ | ||
| ~ Unless required by applicable law or agreed to in writing, | ||
| ~ software distributed under the License is distributed on an | ||
| ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| ~ KIND, either express or implied. See the License for the | ||
| ~ specific language governing permissions and limitations | ||
| ~ under the License. | ||
| --> | ||
|
|
||
| <beans xmlns="https://jakarta.ee/xml/ns/jakartaee" | ||
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
| xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd"> | ||
| <!-- File required by Weld (used for testing), not by Quarkus --> | ||
| </beans> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.nosql.async; | ||
|
|
||
| import jakarta.enterprise.context.ApplicationScoped; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| /** | ||
| * Async-API-specific tests check utility, only used to verify that invocations into CDI beans work, | ||
| * <em>not</em> for "general reuse". | ||
| */ | ||
| @ApplicationScoped | ||
| class AppScopedChecker { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could use a javadoc for how this is supposed to be used in tests. Does it make sense to use this across different usage domains? Like if we had some persistence and some events-handling features all using this package, and an integration test exercises both, is there a semantic to sharing a global singleton AtomicInteger between them via injection? Is the CDI usage important here compared to just declaring a static final AtomicInteger within the test class itself? |
||
| static final AtomicInteger COUNTER = new AtomicInteger(); | ||
|
|
||
| int getAndIncrement() { | ||
| return COUNTER.getAndIncrement(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if not using Java executors, can't we still use Java interfaces like
ScheduledExecutorService? Is Vertx fundamentally incompatible with ScheduledFutures that are returned inScheduledExecutorService?