diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 4a19b06c00..90281d65f1 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -36,6 +36,14 @@ dependencies { api(project(":polaris-version")) api(project(":polaris-persistence-nosql-varint")) + api(project(":polaris-async-api")) + api(project(":polaris-async-java")) + api(project(":polaris-async-vertx")) + + api(project(":polaris-idgen-api")) + api(project(":polaris-idgen-impl")) + api(project(":polaris-idgen-spi")) + api(project(":polaris-config-docs-annotations")) api(project(":polaris-config-docs-generator")) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 087d11b9a4..5a5af7c8b0 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -99,12 +99,16 @@ s3mock-testcontainers = { module = "com.adobe.testing:s3mock-testcontainers", ve slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } smallrye-common-annotation = { module = "io.smallrye.common:smallrye-common-annotation", version = "2.13.9" } smallrye-config-core = { module = "io.smallrye.config:smallrye-config-core", version = "3.14.0" } +smallrye-jandex = { module = "io.smallrye:jandex", version = "3.4.0" } spark35-sql-scala212 = { module = "org.apache.spark:spark-sql_2.12", version.ref = "spark35" } swagger-annotations = { module = "io.swagger:swagger-annotations", version.ref = "swagger" } swagger-jaxrs = { module = "io.swagger:swagger-jaxrs", version.ref = "swagger" } testcontainers-bom = { module = "org.testcontainers:testcontainers-bom", version = "1.21.3" } testcontainers-keycloak = { module = "com.github.dasniko:testcontainers-keycloak", version = "3.8.0" } threeten-extra = { module = "org.threeten:threeten-extra", version = "1.8.0" } +vertx-core = { module = "io.vertx:vertx-core", version = "5.0.4" } +weld-se-core = { module = "org.jboss.weld.se:weld-se-core", version = "6.0.3.Final" } +weld-junit5 = { module = "org.jboss.weld:weld-junit5", version = "5.0.1.Final" } [plugins] jcstress = { id = "io.github.reyerizo.gradle.jcstress", version = "0.8.15" } diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 2f43aa2518..f6c46f8be6 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -48,6 +48,10 @@ polaris-config-docs-annotations=tools/config-docs/annotations polaris-config-docs-generator=tools/config-docs/generator polaris-config-docs-site=tools/config-docs/site +# executor abstraction +polaris-async-api=persistence/nosql/async/api +polaris-async-java=persistence/nosql/async/java +polaris-async-vertx=persistence/nosql/async/vertx # id generation polaris-idgen-api=persistence/nosql/idgen/api polaris-idgen-impl=persistence/nosql/idgen/impl diff --git a/persistence/nosql/async/README.md b/persistence/nosql/async/README.md new file mode 100644 index 0000000000..e35e697f9a --- /dev/null +++ b/persistence/nosql/async/README.md @@ -0,0 +1,31 @@ + + +# Async execution API + +Provides an abstraction to submit asynchronous tasks, optionally with a delay or delay + repetition and implementations +based on Java's `ThreadPoolExecutor` and Vert.X. + +## Code structure + +The code is structured into multiple modules. Consuming code should almost always pull in only the API module. + +* `polaris-async-api` provides the necessary Java interfaces and immutable types. +* `polaris-async-java` implementation leveraging `CompletableFuture.delayedExecutor` for delayed/scheduled invocations. +* `polaris-async-vertx` implementation leveraging Vert.X for delayed/scheduled invocations. diff --git a/persistence/nosql/async/api/build.gradle.kts b/persistence/nosql/async/api/build.gradle.kts new file mode 100644 index 0000000000..913dd86ea2 --- /dev/null +++ b/persistence/nosql/async/api/build.gradle.kts @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris async execution API" + +dependencies { + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + compileOnly(libs.smallrye.config.core) + compileOnly(platform(libs.quarkus.bom)) + compileOnly("io.quarkus:quarkus-core") + + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-databind") + + testFixturesCompileOnly(platform(libs.jackson.bom)) + testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-databind") + + testFixturesApi(libs.jakarta.annotation.api) + testFixturesApi(libs.jakarta.validation.api) + testFixturesApi(libs.jakarta.inject.api) + testFixturesApi(libs.jakarta.enterprise.cdi.api) + + testFixturesApi(project(":polaris-idgen-api")) + + testFixturesImplementation(libs.weld.se.core) + testFixturesImplementation(libs.weld.junit5) + testFixturesImplementation(libs.guava) + testFixturesRuntimeOnly(libs.smallrye.jandex) +} diff --git a/persistence/nosql/async/api/src/main/java/org/apache/polaris/nosql/async/AsyncConfiguration.java b/persistence/nosql/async/api/src/main/java/org/apache/polaris/nosql/async/AsyncConfiguration.java new file mode 100644 index 0000000000..3b71db1dcf --- /dev/null +++ b/persistence/nosql/async/api/src/main/java/org/apache/polaris/nosql/async/AsyncConfiguration.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import java.time.Duration; +import java.util.Optional; +import java.util.OptionalInt; +import org.apache.polaris.immutables.PolarisImmutable; + +/** Advanced configuration options to tune async activities. */ +@PolarisImmutable +@ConfigMapping(prefix = "polaris.async") +@JsonSerialize(as = ImmutableAsyncConfiguration.class) +@JsonDeserialize(as = ImmutableAsyncConfiguration.class) +public interface AsyncConfiguration { + + String DEFAULT_THREAD_KEEP_ALIVE_STRING = "PT1S"; + Duration DEFAULT_THREAD_KEEP_ALIVE = Duration.parse(DEFAULT_THREAD_KEEP_ALIVE_STRING); + + String DEFAULT_MAX_THREADS_STRING = "256"; + int DEFAULT_MAX_THREADS = Integer.parseInt(DEFAULT_MAX_THREADS_STRING); + + /** Duration to keep idle threads alive. */ + @WithDefault(DEFAULT_THREAD_KEEP_ALIVE_STRING) + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @JsonFormat(shape = JsonFormat.Shape.STRING) + Optional threadKeepAlive(); + + /** Maximum number of threads available for asynchronous execution. Default is 256. */ + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalInt maxThreads(); + + static ImmutableAsyncConfiguration.Builder builder() { + return ImmutableAsyncConfiguration.builder(); + } +} diff --git a/persistence/nosql/async/api/src/main/java/org/apache/polaris/nosql/async/AsyncExec.java b/persistence/nosql/async/api/src/main/java/org/apache/polaris/nosql/async/AsyncExec.java new file mode 100644 index 0000000000..c8b778878c --- /dev/null +++ b/persistence/nosql/async/api/src/main/java/org/apache/polaris/nosql/async/AsyncExec.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async; + +import static java.util.concurrent.Executors.callable; + +import jakarta.enterprise.context.ApplicationScoped; +import java.time.Duration; +import java.util.concurrent.Callable; + +/** + * Abstraction for platform/environment-specific scheduler implementations for delayed and + * optionally repeated executions. + * + *

Quarkus production systems use Vert.x, tests usually use Java executors. + * + *

Implementations, like Java executors or Vert.X, are usually {@link + * ApplicationScoped @ApplicationScoped} in CDI environments. + */ +public interface AsyncExec { + + default Cancelable submit(Callable callable) { + return schedule(callable, Duration.ZERO); + } + + /** + * Asynchronously run the given {@linkplain Callable callable} after the provided {@linkplain + * Duration delay}. If the delay is not positive, the function is scheduled for immediate + * execution. + * + * @param callable the callable to execute + * @param delay the execution delay, zero and negative values mean immediate scheduling + * @param return type of the callable propagated through the returned cancelable + * @return the cancelable for the scheduled task + */ + Cancelable schedule(Callable callable, Duration delay); + + /** + * This is a convenience function for {@link #schedule(Callable, Duration)} with a void result, + * using a {@link Runnable}. + */ + default Cancelable schedule(Runnable runnable, Duration delay) { + return schedule(callable(runnable, null), delay); + } + + /** + * Schedules a runnable to be executed repeatedly using the given initial delay. This is + * equivalent to calling {@link #schedulePeriodic(Runnable, Duration, Duration)} with the {@code + * initialDelay} and {@code delay} having the same values. + * + *

There is intentionally no variant of {@code schedulePeriodic()} that takes a {@link Callable + * Callable} because there are multiple invocations of the runnable. + */ + default Cancelable schedulePeriodic(Runnable runnable, Duration delay) { + return schedulePeriodic(runnable, delay, delay); + } + + /** + * Schedules a runnable to be executed repeatedly, starting after the given initial delay. + * + *

There is intentionally no variant of {@code schedulePeriodic()} that takes a {@link Callable + * Callable} because there are multiple invocations of the runnable. + * + * @param runnable the runnable to execute + * @param initialDelay initial delay, zero and negative values mean immediate scheduling + * @param delay repetition delay, zero and negative cause an {@link IllegalArgumentException} + * @return cancelable instance + */ + Cancelable schedulePeriodic(Runnable runnable, Duration initialDelay, Duration delay); +} diff --git a/persistence/nosql/async/api/src/main/java/org/apache/polaris/nosql/async/Cancelable.java b/persistence/nosql/async/api/src/main/java/org/apache/polaris/nosql/async/Cancelable.java new file mode 100644 index 0000000000..814d2204b5 --- /dev/null +++ b/persistence/nosql/async/api/src/main/java/org/apache/polaris/nosql/async/Cancelable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async; + +import java.util.concurrent.CompletionStage; + +/** + * Implementation agnostic interface for asynchronous delayed and optionally repeated executions. + * + *

Implementations may use JVM-local backends, like Java executors or Vert.X. Vert.X's API is not + * based on Java's {@code (Completable)Future} or {@code CompletionStage}. The cancellation + * semantics/guarantees are different for Vert.X and Java. + * + * @param + */ +public interface Cancelable { + /** + * Attempt to cancel the delayed execution of a callable. Already running callables are not + * interrupted. A callable may still be invoked after calling this function, because of side + * effects and race conditions. + * + *

After cancellation, the result of this instance's might be either in state "completed + * exceptionally" ({@link java.util.concurrent.CancellationException}) or successfully completed. + */ + void cancel(); + + /** + * Retrieve the {@link CompletionStage} associated with this {@link Cancelable} for the submitted + * async and potentially periodic execution. + */ + CompletionStage completionStage(); +} diff --git a/persistence/nosql/async/api/src/main/resources/META-INF/beans.xml b/persistence/nosql/async/api/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/persistence/nosql/async/api/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/AppScopedChecker.java b/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/AppScopedChecker.java new file mode 100644 index 0000000000..782ce1c2ac --- /dev/null +++ b/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/AppScopedChecker.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async; + +import jakarta.enterprise.context.ApplicationScoped; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Async-API-specific tests check utility, only used to verify that invocations into CDI beans work, + * not for "general reuse". + */ +@ApplicationScoped +class AppScopedChecker { + static final AtomicInteger COUNTER = new AtomicInteger(); + + int getAndIncrement() { + return COUNTER.getAndIncrement(); + } +} diff --git a/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/AsyncExecTestBase.java b/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/AsyncExecTestBase.java new file mode 100644 index 0000000000..fcc330d04e --- /dev/null +++ b/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/AsyncExecTestBase.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.assertj.core.api.Assertions.assertThat; + +import jakarta.inject.Inject; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; +import org.assertj.core.api.CompletableFutureAssert; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.jboss.weld.junit5.EnableWeld; +import org.jboss.weld.junit5.WeldInitiator; +import org.jboss.weld.junit5.WeldSetup; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@SuppressWarnings("CdiInjectionPointsInspection") +@EnableWeld +@ExtendWith(SoftAssertionsExtension.class) +public abstract class AsyncExecTestBase { + @InjectSoftAssertions protected SoftAssertions soft; + + @WeldSetup WeldInitiator weld = WeldInitiator.performDefaultDiscovery(); + + @Inject protected AsyncExec executor; + + protected final Duration asyncTimeout = Duration.ofMinutes(5); + + @Inject AppScopedChecker appScopedChecker; + + @Test + public void simpleTests() throws Exception { + soft.assertThat(executor.submit(() -> "foo").completionStage()) + .succeedsWithin(asyncTimeout) + .isEqualTo("foo"); + soft.assertThat(executor.schedule(() -> "foo", Duration.ZERO).completionStage()) + .succeedsWithin(asyncTimeout) + .isEqualTo("foo"); + soft.assertThat(executor.schedule(() -> "foo", Duration.ofMillis(1)).completionStage()) + .succeedsWithin(asyncTimeout) + .isEqualTo("foo"); + + var done = new AtomicBoolean(); + soft.assertThat(executor.schedule(() -> done.set(true), Duration.ZERO).completionStage()) + .succeedsWithin(asyncTimeout); + soft.assertThat(done.get()).isTrue(); + done.set(false); + soft.assertThat(executor.schedule(() -> done.set(true), Duration.ofMillis(1)).completionStage()) + .succeedsWithin(asyncTimeout); + soft.assertThat(done.get()).isTrue(); + + // .cancel() after first execution + for (var initialDelay : List.of(Duration.ZERO, Duration.ofMillis(1))) { + var sem = new Semaphore(0); + var cancelable = executor.schedulePeriodic(sem::release, initialDelay, Duration.ofMillis(1)); + soft.assertThat(sem.tryAcquire(asyncTimeout.toMillis(), MILLISECONDS)) + .describedAs("initialDelay %s", initialDelay) + .isTrue(); + cancelable.cancel(); + cancelledAssert(soft.assertThat(cancelable.completionStage())); + } + + // .cancel() before execution + var cancelable = executor.schedulePeriodic(() -> {}, Duration.ofHours(12)); + cancelable.cancel(); + cancelledAssert(soft.assertThat(cancelable.completionStage())); + + cancelable = executor.schedule(() -> {}, Duration.ofHours(12)); + cancelable.cancel(); + cancelledAssert(soft.assertThat(cancelable.completionStage())); + } + + private void cancelledAssert(CompletableFutureAssert assertion) { + assertion.satisfiesAnyOf( + completionStage -> + assertThat(completionStage) + .failsWithin(asyncTimeout) + .withThrowableThat() + .isInstanceOf(CancellationException.class), + completionStage -> assertThat(completionStage).succeedsWithin(asyncTimeout)); + } + + @Test + public void applicationScopedInvocation() { + var expect = AppScopedChecker.COUNTER.get(); + soft.assertThat(executor.submit(() -> appScopedChecker.getAndIncrement()).completionStage()) + .succeedsWithin(asyncTimeout) + .isEqualTo(expect); + } + + @Test + public void submitMany() { + int numTasks = 50; + var sem = new Semaphore(0); + var completableFutures = + IntStream.range(0, numTasks) + .mapToObj( + i -> + executor.submit( + () -> { + soft.assertThat(sem.tryAcquire(asyncTimeout.toMillis(), MILLISECONDS)) + .isTrue(); + threadAssertion(); + return "foo"; + })) + .map(Cancelable::completionStage) + .map(CompletionStage::toCompletableFuture) + .toList(); + + soft.assertThat(completableFutures).noneMatch(CompletableFuture::isDone); + + sem.release(numTasks); + + soft.assertThat(completableFutures) + .allSatisfy(cf -> assertThat(cf).succeedsWithin(asyncTimeout).isEqualTo("foo")); + } + + @Test + public void submitManyFailing() { + int numTasks = 50; + var sem = new Semaphore(0); + var completableFutures = + IntStream.range(0, numTasks) + .mapToObj( + i -> + executor.submit( + () -> { + soft.assertThat(sem.tryAcquire(asyncTimeout.toMillis(), MILLISECONDS)) + .isTrue(); + threadAssertion(); + throw new RuntimeException("FAILED"); + })) + .map(Cancelable::completionStage) + .map(CompletionStage::toCompletableFuture) + .toList(); + + soft.assertThat(completableFutures).noneMatch(CompletableFuture::isDone); + + sem.release(numTasks); + + soft.assertThat(completableFutures) + .allSatisfy( + cf -> + assertThat(cf) + .failsWithin(asyncTimeout) + .withThrowableThat() + .isInstanceOf(ExecutionException.class) + .havingCause() + .isInstanceOf(RuntimeException.class) + .withMessage("FAILED")); + } + + @Test + public void scheduleMany() { + var numTasks = 50; + var sem = new Semaphore(0); + var completableFutures = + IntStream.range(0, numTasks) + .mapToObj( + i -> + executor.schedule( + () -> { + soft.assertThat(sem.tryAcquire(asyncTimeout.toMillis(), MILLISECONDS)) + .isTrue(); + threadAssertion(); + return "foo"; + }, + Duration.ofMillis(1))) + .map(Cancelable::completionStage) + .map(CompletionStage::toCompletableFuture) + .toList(); + + soft.assertThat(completableFutures).noneMatch(CompletableFuture::isDone); + + sem.release(numTasks); + + soft.assertThat(completableFutures) + .allSatisfy(cf -> assertThat(cf).succeedsWithin(asyncTimeout).isEqualTo("foo")); + } + + @SuppressWarnings({"InnerClassMayBeStatic", "ClassCanBeStatic"}) + final class PeriodicPerTask { + final Semaphore before; + final Semaphore after; + final AtomicInteger runs; + volatile Cancelable c; + volatile CompletableFuture f; + + PeriodicPerTask() { + before = new Semaphore(0); + after = new Semaphore(0); + runs = new AtomicInteger(0); + } + } + + @ParameterizedTest + @ValueSource(ints = {Integer.MAX_VALUE, 0, 1, 3, 5}) + public void periodic(int failAfter) { + var dontFail = failAfter == Integer.MAX_VALUE; + var numTasks = 50; + var stop = new AtomicBoolean(); + var completableFutures = + IntStream.range(0, numTasks) + .mapToObj( + i -> { + var task = new PeriodicPerTask(); + task.c = + executor.schedulePeriodic( + () -> { + threadAssertion(); + try { + soft.assertThat( + task.before.tryAcquire(asyncTimeout.toMillis(), MILLISECONDS)) + .isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try { + if (stop.get()) { + return; + } + var iter = task.runs.getAndIncrement(); + if (iter == failAfter) { + throw new RuntimeException("FAILED"); + } + } finally { + task.after.release(1); + } + }, + Duration.ofMillis(1)); + task.f = task.c.completionStage().toCompletableFuture(); + return task; + }) + .toList(); + + var iterations = dontFail ? 10 : (failAfter + 1); + for (int i = 0; i < iterations; i++) { + soft.assertThat(completableFutures).noneMatch(c -> c.f.isDone()); + + completableFutures.forEach(t -> t.before.release()); + + assertThat(completableFutures) + .describedAs("iteration %d", i) + .allSatisfy( + t -> assertThat(t.after.tryAcquire(asyncTimeout.toMillis(), MILLISECONDS)).isTrue()); + } + + stop.set(true); + + if (dontFail) { + completableFutures.forEach(t -> t.c.cancel()); + } + + soft.assertThat(completableFutures).allMatch(t -> t.runs.get() == iterations); + + // Just in case the periodic tasks get called again, let those run + completableFutures.forEach(s -> s.before.release(1000)); + + soft.assertThat(completableFutures) + .allSatisfy( + t -> { + if (dontFail) { + cancelledAssert(assertThat(t.f)); + } else { + assertThat(t.f) + .completesExceptionallyWithin(asyncTimeout) + .withThrowableThat() + .isInstanceOf(ExecutionException.class) + .havingCause() + .isInstanceOf(RuntimeException.class) + .withMessage("FAILED"); + } + }); + + soft.assertThat(completableFutures).allMatch(t -> t.f.isDone()); + if (dontFail) { + soft.assertThat(completableFutures).allMatch(t -> t.f.isCancelled()); + } else { + soft.assertThat(completableFutures).noneMatch(t -> t.f.isCancelled()); + } + // cancellation is still an exceptional completion + soft.assertThat(completableFutures).allMatch(t -> t.f.isCompletedExceptionally()); + } + + protected abstract void threadAssertion(); +} diff --git a/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/AsyncTestConfigProvider.java b/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/AsyncTestConfigProvider.java new file mode 100644 index 0000000000..cbe583e7db --- /dev/null +++ b/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/AsyncTestConfigProvider.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +@ApplicationScoped +public class AsyncTestConfigProvider { + @Produces + AsyncConfiguration asyncConfiguration() { + return AsyncConfiguration.builder().build(); + } +} diff --git a/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/MockAsyncExec.java b/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/MockAsyncExec.java new file mode 100644 index 0000000000..092fbc4692 --- /dev/null +++ b/persistence/nosql/async/api/src/testFixtures/java/org/apache/polaris/nosql/async/MockAsyncExec.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async; + +import static com.google.common.base.Preconditions.checkState; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import org.apache.polaris.ids.api.MonotonicClock; + +/** + * An {@link AsyncExec} implementation used in tests that need to verify the interaction with {@link + * AsyncExec}. Execution of submitted tasks needs to be manually triggered by the tests that use + * this class. + * + *

This implementation is best used in combination with {@code MutableMonotonicClock} to allow + * assertions on scheduled/execution timestamps. + */ +public class MockAsyncExec implements AsyncExec { + + public MockAsyncExec(MonotonicClock clock) { + this.clock = clock; + } + + public record CallResult(boolean called, T result, Throwable failure) {} + + public final class Task { + private final Callable callable; + private final Duration initialDelay; + private final Duration delay; + public Instant nextExecution; + public final MockCancelable cancelable; + + Task(Callable callable, Duration initialDelay, Duration delay) { + this.callable = callable; + this.initialDelay = initialDelay; + this.delay = delay; + this.nextExecution = clock.currentInstant().plus(initialDelay); + this.cancelable = new MockCancelable<>(); + } + + public Duration initialDelay() { + return initialDelay; + } + + public Duration delay() { + return delay; + } + + public Instant nextExecution() { + return nextExecution; + } + + public boolean ready() { + return ready(clock.currentInstant()); + } + + public boolean ready(Instant at) { + return nextExecution.compareTo(at) <= 0; + } + + public CallResult call() { + switch (cancelable.future.state()) { + case CANCELLED -> throw new CancellationException(); + case FAILED -> + throw new IllegalStateException("Already completed", cancelable.future.exceptionNow()); + case SUCCESS -> throw new IllegalStateException("Already completed"); + case RUNNING -> {} + default -> throw new IllegalStateException("Unknown state " + cancelable.future.state()); + } + try { + var r = callable.call(); + if (delay == null) { + cancelable.future.complete(r); + done(); + } else { + nextExecution = clock.currentInstant().plus(delay); + } + return new CallResult<>(true, r, null); + } catch (Exception e) { + cancelable.future.completeExceptionally(e); + return new CallResult<>(true, null, e); + } + } + + private void done() { + checkState(tasks.remove(Task.this)); + } + + public class MockCancelable implements Cancelable { + private final CompletableFuture future = new CompletableFuture<>(); + + @Override + public CompletionStage completionStage() { + return future; + } + + @Override + public void cancel() { + future.cancel(true); + done(); + } + } + } + + private final MonotonicClock clock; + private final List> tasks = new ArrayList<>(); + + public List> tasks() { + return tasks; + } + + public Optional> nextReady() { + return nextReadyAt(clock.currentInstant()); + } + + public Optional> nextReadyAt(Instant at) { + return tasks.stream().filter(t -> t.ready(at)).min(Comparator.comparing(Task::nextExecution)); + } + + public long readyCount() { + return readyCount(clock.currentInstant()); + } + + public long readyCount(Instant at) { + return tasks.stream().filter(t -> t.ready(at)).count(); + } + + public List> readyCallables() { + var ready = new ArrayList>(); + for (var iter = tasks.iterator(); iter.hasNext(); ) { + var task = iter.next(); + if (task.ready()) { + ready.add(task); + iter.remove(); + } + } + return ready; + } + + @Override + public Cancelable schedulePeriodic( + Runnable runnable, Duration initialDelay, Duration delay) { + var scheduled = + new Task( + () -> { + runnable.run(); + return null; + }, + initialDelay, + delay); + this.tasks.add(scheduled); + return scheduled.cancelable; + } + + @Override + public Cancelable schedule(Callable callable, Duration delay) { + var scheduled = new Task<>(callable, delay, null); + this.tasks.add(scheduled); + return scheduled.cancelable; + } +} diff --git a/persistence/nosql/async/api/src/testFixtures/resources/META-INF/beans.xml b/persistence/nosql/async/api/src/testFixtures/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/persistence/nosql/async/api/src/testFixtures/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/persistence/nosql/async/java/build.gradle.kts b/persistence/nosql/async/java/build.gradle.kts new file mode 100644 index 0000000000..72482d3ab9 --- /dev/null +++ b/persistence/nosql/async/java/build.gradle.kts @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris async execution - Java thread pool" + +dependencies { + implementation(project(":polaris-async-api")) + + implementation(libs.slf4j.api) + implementation(libs.guava) + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-databind") + + testImplementation(testFixtures(project(":polaris-async-api"))) +} + +tasks.withType { isFailOnError = false } diff --git a/persistence/nosql/async/java/src/main/java/org/apache/polaris/nosql/async/java/JavaPoolAsyncExec.java b/persistence/nosql/async/java/src/main/java/org/apache/polaris/nosql/async/java/JavaPoolAsyncExec.java new file mode 100644 index 0000000000..97f1665a9e --- /dev/null +++ b/persistence/nosql/async/java/src/main/java/org/apache/polaris/nosql/async/java/JavaPoolAsyncExec.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async.java; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.polaris.nosql.async.AsyncConfiguration.DEFAULT_MAX_THREADS; +import static org.apache.polaris.nosql.async.AsyncConfiguration.DEFAULT_THREAD_KEEP_ALIVE; + +import com.google.common.annotations.VisibleForTesting; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.polaris.nosql.async.AsyncConfiguration; +import org.apache.polaris.nosql.async.AsyncExec; +import org.apache.polaris.nosql.async.Cancelable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * CDI {@link ApplicationScoped} Java executor service based {@link AsyncExec} implementation using + * {@link ScheduledThreadPoolExecutor} for the timers, backed by a Java thread pool to execute + * blocking tasks. + */ +@ApplicationScoped +public class JavaPoolAsyncExec implements AsyncExec, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(JavaPoolAsyncExec.class.getName()); + private static final Duration MAX_DURATION = Duration.ofDays(7); + + public static final String EXECUTOR_THREAD_NAME_PREFIX = "JavaPoolTaskExecutor#"; + public static final String SCHEDULER_THREAD_NAME_PREFIX = "JavaPoolTaskScheduler#"; + + private final ThreadPoolExecutor executorService; + private final ScheduledThreadPoolExecutor scheduler; + + /** Pool-ID generator, useful for debugging when multiple pools are created for multiple tests. */ + private static final AtomicInteger POOL_ID = new AtomicInteger(); + + /** Pool-ID, useful for debugging when multiple pools are created for multiple tests. */ + private final int poolId = POOL_ID.incrementAndGet(); + + private final AtomicInteger executorThreadId = new AtomicInteger(); + private final AtomicInteger schedulerThreadId = new AtomicInteger(); + private volatile boolean shutdown; + // Track live tasks for prompt cancellation on shutdown + private final Set> tasks = ConcurrentHashMap.newKeySet(); + + @VisibleForTesting + public JavaPoolAsyncExec() { + this(AsyncConfiguration.builder().build()); + } + + @Inject + JavaPoolAsyncExec(Instance asyncConfiguration) { + this( + asyncConfiguration.isResolvable() + ? asyncConfiguration.get() + : AsyncConfiguration.builder().build()); + } + + JavaPoolAsyncExec(AsyncConfiguration asyncConfiguration) { + RejectedExecutionHandler rejectedExecutionHandler = + (r, executor) -> { + Future future = null; + if (r instanceof CancelableFuture cancelable) { + future = cancelable.completable; + } else if (r instanceof Future f) { + future = f; + } + if (future != null && future.isDone()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(rejectedMessage(r, executor)); + } + return; + } + var msg = rejectedMessage(r, executor); + var ex = new RejectedExecutionException(msg); + LOGGER.error(msg); + throw ex; + }; + executorService = + new ThreadPoolExecutor( + // core pool size + 0, + // max pool size + asyncConfiguration.maxThreads().orElse(DEFAULT_MAX_THREADS), + // keep-alive time + asyncConfiguration.threadKeepAlive().orElse(DEFAULT_THREAD_KEEP_ALIVE).toMillis(), + MILLISECONDS, + new SynchronousQueue<>(), + // thread factory + r -> { + var t = + new Thread( + r, + EXECUTOR_THREAD_NAME_PREFIX + + poolId + + "-" + + executorThreadId.incrementAndGet()); + t.setDaemon(true); + return t; + }, + rejectedExecutionHandler); + executorService.allowCoreThreadTimeOut(true); + scheduler = + new ScheduledThreadPoolExecutor( + 2, + r -> { + var t = + new Thread( + r, + SCHEDULER_THREAD_NAME_PREFIX + + poolId + + "-" + + schedulerThreadId.incrementAndGet()); + t.setDaemon(true); + return t; + }, + rejectedExecutionHandler); + scheduler.setRemoveOnCancelPolicy(true); + LOGGER.debug("JavaPoolAsyncExec initialized with pool ID {}", poolId); + } + + private String rejectedMessage(Runnable r, ThreadPoolExecutor executor) { + return format("Runnable '%s' rejected against pool ID %s / '%s'", r, poolId, executor); + } + + @PreDestroy + @VisibleForTesting + @Override + public void close() { + shutdown = true; + LOGGER.debug("Shutting down JavaPoolAsyncExec {} / '{}'", poolId, executorService); + try { + // Proactively cancel all tasks and timers + tasks.forEach(CancelableFuture::cancel); + tasks.clear(); + + var remaining = scheduler.shutdownNow(); + LOGGER.debug("Scheduler shut down, {} dangling tasks", remaining.size()); + scheduler.awaitTermination(10, TimeUnit.SECONDS); + scheduler.close(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + executorService.shutdownNow(); + try { + executorService.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + executorService.close(); + } + } + + @Override + public Cancelable schedule(Callable callable, Duration delay) { + checkState(!shutdown, "Must not schedule new tasks after shutdown of pool %s", poolId); + checkArgument(delay.compareTo(MAX_DURATION) < 0, "Delay is limited to %s", MAX_DURATION); + + var delayMillis = Math.max(delay.toMillis(), 0L); + var cf = new CancelableFuture<>(callable); + tasks.add(cf); + + if (delayMillis > 0) { + delayed(cf, delayMillis); + } else { + immediate(cf); + } + + return cf; + } + + @Override + public Cancelable schedulePeriodic( + Runnable runnable, Duration initialDelay, Duration delay) { + checkState(!shutdown, "Must not schedule new tasks after shutdown of pool %s", poolId); + checkArgument(delay.isPositive(), "Delay must not be zero or negative"); + checkArgument( + initialDelay.compareTo(MAX_DURATION) < 0, "Initial delay is limited to %s", MAX_DURATION); + checkArgument(delay.compareTo(MAX_DURATION) < 0, "Delay is limited to %s", MAX_DURATION); + + var initialMillis = initialDelay.toMillis(); + var delayMillis = Math.max(delay.toMillis(), 1L); + + var cf = new CancelableFuture(runnable, delayMillis); + tasks.add(cf); + + if (initialMillis > 0) { + delayed(cf, initialMillis); + } else { + immediate(cf); + } + + return cf; + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void immediate(CancelableFuture cancelable) { + executorService.submit(cancelable); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void delayed(CancelableFuture cancelable, long delayMillis) { + cancelable.setScheduledFuture( + scheduler.schedule(() -> immediate(cancelable), delayMillis, MILLISECONDS)); + } + + private final class CancelableFuture implements Cancelable, Runnable { + private final CompletableFuture completable = new CompletableFuture<>(); + private final Runnable runnable; + private final Callable callable; + private final long repeatMillis; + private final AtomicReference> scheduledFuture = new AtomicReference<>(); + + CancelableFuture(Runnable runnable, long repeatMillis) { + this.runnable = requireNonNull(runnable, "Runnable must not be null"); + this.callable = this::runnable; + this.repeatMillis = repeatMillis; + } + + CancelableFuture(Callable callable) { + this.runnable = null; + this.callable = requireNonNull(callable, "Callable must not be null"); + this.repeatMillis = -1L; + } + + void setScheduledFuture(ScheduledFuture scheduledFuture) { + var previous = this.scheduledFuture.getAndSet(scheduledFuture); + if (previous != null) { + previous.cancel(false); + } + } + + @SuppressWarnings("DataFlowIssue") + private R runnable() { + if (cancelledOrShutdown()) { + completable.cancel(false); + } else { + runnable.run(); + } + return null; + } + + @Override + public void run() { + if (cancelledOrShutdown()) { + completable.cancel(false); + tasks.remove(this); + return; + } + try { + var r = callable.call(); + if (repeatMillis >= 0L) { + if (!cancelledOrShutdown()) { + delayed(this, repeatMillis); + } + } else { + completable.complete(r); + // One-shot tasks can be released + tasks.remove(this); + } + } catch (Throwable t) { + completable.completeExceptionally(new CompletionException(t)); + tasks.remove(this); + } finally { + // Avoid ThreadLocal/MDC leakage across logically independent tasks + MDC.clear(); + } + } + + private boolean cancelledOrShutdown() { + return completable.isCancelled() || shutdown; + } + + @Override + public CompletionStage completionStage() { + return completable; + } + + @Override + public void cancel() { + var previous = this.scheduledFuture.getAndSet(null); + if (previous != null) { + previous.cancel(false); + } + completable.cancel(false); + tasks.remove(this); + } + + @Override + public String toString() { + if (runnable != null) { + return "JavaPoolAsyncExec.Completable: Runnable " + runnable; + } else { + return "JavaPoolAsyncExec.Completable: Callable " + callable; + } + } + } + + @Override + public String toString() { + return "JavaAsyncExec with pool ID " + poolId + " backed by " + executorService; + } +} diff --git a/persistence/nosql/async/java/src/main/resources/META-INF/beans.xml b/persistence/nosql/async/java/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/persistence/nosql/async/java/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/persistence/nosql/async/java/src/test/java/org/apache/polaris/nosql/async/java/TestJavaPoolAsyncExec.java b/persistence/nosql/async/java/src/test/java/org/apache/polaris/nosql/async/java/TestJavaPoolAsyncExec.java new file mode 100644 index 0000000000..68d1faf856 --- /dev/null +++ b/persistence/nosql/async/java/src/test/java/org/apache/polaris/nosql/async/java/TestJavaPoolAsyncExec.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async.java; + +import org.apache.polaris.nosql.async.AsyncExecTestBase; + +public class TestJavaPoolAsyncExec extends AsyncExecTestBase { + @Override + protected void threadAssertion() { + var t = Thread.currentThread(); + soft.assertThat(t.getName()).startsWith(JavaPoolAsyncExec.EXECUTOR_THREAD_NAME_PREFIX); + } +} diff --git a/persistence/nosql/async/java/src/test/resources/logback-test.xml b/persistence/nosql/async/java/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..4a4d9a629d --- /dev/null +++ b/persistence/nosql/async/java/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ + + + + + + + %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + diff --git a/persistence/nosql/async/java/src/test/resources/weld.properties b/persistence/nosql/async/java/src/test/resources/weld.properties new file mode 100644 index 0000000000..c26169e0e1 --- /dev/null +++ b/persistence/nosql/async/java/src/test/resources/weld.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://bugs.openjdk.org/browse/JDK-8349545 +org.jboss.weld.bootstrap.concurrentDeployment=false diff --git a/persistence/nosql/async/vertx/build.gradle.kts b/persistence/nosql/async/vertx/build.gradle.kts new file mode 100644 index 0000000000..97d69d7262 --- /dev/null +++ b/persistence/nosql/async/vertx/build.gradle.kts @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris async execution - Vert.x" + +dependencies { + implementation(project(":polaris-async-api")) + + implementation(libs.slf4j.api) + implementation(libs.guava) + implementation(libs.vertx.core) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-databind") + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + testFixturesApi(libs.jakarta.annotation.api) + testFixturesApi(libs.jakarta.validation.api) + testFixturesApi(libs.jakarta.inject.api) + testFixturesApi(libs.jakarta.enterprise.cdi.api) + + testFixturesApi(libs.vertx.core) + + testImplementation(testFixtures(project(":polaris-async-api"))) +} + +tasks.withType { isFailOnError = false } diff --git a/persistence/nosql/async/vertx/src/main/java/org/apache/polaris/nosql/async/vertx/VertxAsyncExec.java b/persistence/nosql/async/vertx/src/main/java/org/apache/polaris/nosql/async/vertx/VertxAsyncExec.java new file mode 100644 index 0000000000..4d92cf67a8 --- /dev/null +++ b/persistence/nosql/async/vertx/src/main/java/org/apache/polaris/nosql/async/vertx/VertxAsyncExec.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async.vertx; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.polaris.nosql.async.AsyncConfiguration.DEFAULT_MAX_THREADS; +import static org.apache.polaris.nosql.async.AsyncConfiguration.DEFAULT_THREAD_KEEP_ALIVE; + +import io.vertx.core.Vertx; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.polaris.nosql.async.AsyncConfiguration; +import org.apache.polaris.nosql.async.AsyncExec; +import org.apache.polaris.nosql.async.Cancelable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * CDI {@link ApplicationScoped} {@link AsyncExec} implementation that uses Vert.X timers, backed by + * a Java thread pool to execute blocking tasks. + */ +@ApplicationScoped +class VertxAsyncExec implements AsyncExec, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(VertxAsyncExec.class.getName()); + private static final Duration MAX_DURATION = Duration.ofDays(7); + + public static final String EXECUTOR_THREAD_NAME_PREFIX = "VertxAsyncExec#"; + + private final ThreadPoolExecutor executorService; + private final Vertx vertx; + + /** Pool-ID generator, useful for debugging when multiple pools are created for multiple tests. */ + private static final AtomicInteger POOL_ID = new AtomicInteger(); + + /** Pool-ID, useful for debugging when multiple pools are created for multiple tests. */ + private final int poolId = POOL_ID.incrementAndGet(); + + private final AtomicInteger executorThreadId = new AtomicInteger(); + private volatile boolean shutdown; + // Track live tasks for prompt cancellation on shutdown + private final Set> tasks = ConcurrentHashMap.newKeySet(); + + @SuppressWarnings("CdiInjectionPointsInspection") + @Inject + VertxAsyncExec(Vertx vertx, AsyncConfiguration asyncConfiguration) { + this.vertx = vertx; + + executorService = + new ThreadPoolExecutor( + // core pool size + 0, + // max pool size + asyncConfiguration.maxThreads().orElse(DEFAULT_MAX_THREADS), + // keep-alive time + asyncConfiguration.threadKeepAlive().orElse(DEFAULT_THREAD_KEEP_ALIVE).toMillis(), + MILLISECONDS, + new SynchronousQueue<>(), + // thread factory + r -> { + var t = + new Thread( + r, + EXECUTOR_THREAD_NAME_PREFIX + + poolId + + "-" + + executorThreadId.incrementAndGet()); + t.setDaemon(true); + return t; + }, + // rejected execution handler + (r, executor) -> { + Future future = null; + if (r instanceof CancelableFuture cancelable) { + cancelable.cancelTimer(); + future = cancelable.completable; + } else if (r instanceof Future f) { + future = f; + } + if (future != null && future.isDone()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(rejectedMessage(r, executor)); + } + return; + } + var msg = rejectedMessage(r, executor); + var ex = new RejectedExecutionException(msg); + LOGGER.error(msg); + throw ex; + }); + executorService.allowCoreThreadTimeOut(true); + LOGGER.debug("VertxAsyncExec initialized with pool ID {}", poolId); + } + + private String rejectedMessage(Runnable r, ThreadPoolExecutor executor) { + return format("Runnable '%s' rejected against pool ID %s / '%s'", r, poolId, executor); + } + + @PreDestroy + @Override + public void close() { + shutdown = true; + LOGGER.debug("Shutting down VertxAsyncExec {} / '{}'", poolId, executorService); + // Proactively cancel all timers/tasks + tasks.forEach(CancelableFuture::cancel); + tasks.clear(); + // Terminate the executor promptly + executorService.shutdownNow(); + try { + executorService.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + @Override + public Cancelable schedule(Callable callable, Duration delay) { + checkState(!shutdown, "Must not schedule new tasks after shutdown of pool %s", poolId); + checkArgument(delay.compareTo(MAX_DURATION) < 0, "Delay is limited to %s", MAX_DURATION); + var cf = new CancelableFuture(callable, Math.max(delay.toMillis(), 0L)); + tasks.add(cf); + return cf; + } + + @Override + public Cancelable schedulePeriodic( + Runnable runnable, Duration initialDelay, Duration delay) { + checkState(!shutdown, "Must not schedule new tasks after shutdown of pool %s", poolId); + checkArgument(delay.isPositive(), "Delay must not be zero or negative"); + checkArgument( + initialDelay.compareTo(MAX_DURATION) < 0, "Initial delay is limited to %s", MAX_DURATION); + checkArgument(delay.compareTo(MAX_DURATION) < 0, "Delay is limited to %s", MAX_DURATION); + + var cf = + new CancelableFuture( + runnable, Math.max(initialDelay.toMillis(), 0L), Math.max(delay.toMillis(), 1L)); + tasks.add(cf); + return cf; + } + + private final class CancelableFuture implements Cancelable, Runnable { + private final long timerId; + + /** Flag whether {@link #timerId} is valid and the Vert.X timer needs to be canceled. */ + private final boolean hasTimerId; + + private final CompletableFuture completable = new CompletableFuture<>(); + private final Runnable runnable; + private final Callable callable; + private final boolean periodic; + + // Prevent overlapping executions for periodic tasks + private final AtomicBoolean running = new AtomicBoolean(false); + // Track in-flight submission to allow cancelling/interrupting + private final AtomicReference> runningFuture = new AtomicReference<>(); + + CancelableFuture(Runnable runnable, long initialMillis, long delayMillis) { + initialMillis = Math.max(initialMillis, 1L); + delayMillis = Math.max(delayMillis, 1L); + this.runnable = requireNonNull(runnable, "Runnable must not be null"); + this.callable = this::runnable; + this.timerId = vertx.setPeriodic(initialMillis, delayMillis, this::execAsync); + this.hasTimerId = true; + this.periodic = true; + } + + CancelableFuture(Callable callable, long delayMillis) { + this.runnable = null; + this.callable = requireNonNull(callable, "Callable must not be null"); + this.periodic = false; + if (delayMillis > 0) { + this.hasTimerId = true; + this.timerId = vertx.setTimer(delayMillis, this::execAsync); + } else { + // Execute immediately + this.hasTimerId = false; + this.timerId = 0L; + execAsync(-1L); + } + } + + private R runnable() { + runnable.run(); + return null; + } + + @SuppressWarnings("FutureReturnValueIgnored") + void execAsync(long unused) { + if (cancelledOrShutdown()) { + cancel(); + return; + } + // Skip overlapping periodic executions + if (periodic && running.get()) { + return; + } + try { + var f = executorService.submit(this); + runningFuture.set(f); + } catch (RejectedExecutionException rex) { + completable.completeExceptionally(rex); + cancel(); + } + } + + @Override + public void run() { + if (cancelledOrShutdown()) { + cancel(); + return; + } + if (periodic && !running.compareAndSet(false, true)) { + // Defensive: in case a race got us here while running + return; + } + try { + var r = callable.call(); + if (!periodic) { + completable.complete(r); + // One-shot tasks can be released + tasks.remove(this); + } + } catch (Throwable e) { + completable.completeExceptionally(e); + cancelTimer(); + tasks.remove(this); + } finally { + runningFuture.set(null); + if (periodic) { + running.set(false); + } + // Avoid ThreadLocal/MDC leakage across logically independent tasks + MDC.clear(); + } + } + + private void cancelTimer() { + if (hasTimerId) { + vertx.cancelTimer(timerId); + } + } + + private boolean cancelledOrShutdown() { + return completable.isCancelled() || shutdown; + } + + @Override + public CompletionStage completionStage() { + return completable; + } + + @Override + public void cancel() { + cancelTimer(); + completable.cancel(false); + var f = runningFuture.getAndSet(null); + if (f != null && !f.isDone()) { + f.cancel(true); + } + tasks.remove(this); + } + + @Override + public String toString() { + if (runnable != null) { + return "VertxAsyncExec.Completable: Runnable " + runnable; + } else { + return "VertxAsyncExec.Completable: Callable " + callable; + } + } + } + + @Override + public String toString() { + return "VertxAsyncExec with pool ID " + poolId + " backed by " + executorService; + } +} diff --git a/persistence/nosql/async/vertx/src/main/resources/META-INF/beans.xml b/persistence/nosql/async/vertx/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/persistence/nosql/async/vertx/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/persistence/nosql/async/vertx/src/test/java/org/apache/polaris/nosql/async/vertx/TestVertxAsyncExec.java b/persistence/nosql/async/vertx/src/test/java/org/apache/polaris/nosql/async/vertx/TestVertxAsyncExec.java new file mode 100644 index 0000000000..3f1307542c --- /dev/null +++ b/persistence/nosql/async/vertx/src/test/java/org/apache/polaris/nosql/async/vertx/TestVertxAsyncExec.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async.vertx; + +import org.apache.polaris.nosql.async.AsyncExecTestBase; + +public class TestVertxAsyncExec extends AsyncExecTestBase { + @Override + protected void threadAssertion() { + var t = Thread.currentThread(); + soft.assertThat(t.getName()).startsWith(VertxAsyncExec.EXECUTOR_THREAD_NAME_PREFIX); + } +} diff --git a/persistence/nosql/async/vertx/src/test/resources/logback-test.xml b/persistence/nosql/async/vertx/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..4a4d9a629d --- /dev/null +++ b/persistence/nosql/async/vertx/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ + + + + + + + %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + diff --git a/persistence/nosql/async/vertx/src/test/resources/weld.properties b/persistence/nosql/async/vertx/src/test/resources/weld.properties new file mode 100644 index 0000000000..c26169e0e1 --- /dev/null +++ b/persistence/nosql/async/vertx/src/test/resources/weld.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://bugs.openjdk.org/browse/JDK-8349545 +org.jboss.weld.bootstrap.concurrentDeployment=false diff --git a/persistence/nosql/async/vertx/src/testFixtures/java/org/apache/polaris/nosql/async/vertx/VertxProvider.java b/persistence/nosql/async/vertx/src/testFixtures/java/org/apache/polaris/nosql/async/vertx/VertxProvider.java new file mode 100644 index 0000000000..4caed78f14 --- /dev/null +++ b/persistence/nosql/async/vertx/src/testFixtures/java/org/apache/polaris/nosql/async/vertx/VertxProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.nosql.async.vertx; + +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Disposes; +import jakarta.enterprise.inject.Produces; +import java.util.concurrent.TimeUnit; + +@ApplicationScoped +public class VertxProvider { + @Produces + Vertx vertx() { + return Vertx.vertx( + new VertxOptions() + .setInternalBlockingPoolSize(16) + .setEventLoopPoolSize(16) + .setWorkerPoolSize(32)); + } + + void dispose(@Disposes Vertx vertx) throws Exception { + vertx.close().toCompletionStage().toCompletableFuture().get(5, TimeUnit.MINUTES); + } +} diff --git a/persistence/nosql/async/vertx/src/testFixtures/resources/META-INF/beans.xml b/persistence/nosql/async/vertx/src/testFixtures/resources/META-INF/beans.xml new file mode 100644 index 0000000000..a297f1aa53 --- /dev/null +++ b/persistence/nosql/async/vertx/src/testFixtures/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/tools/config-docs/site/build.gradle.kts b/tools/config-docs/site/build.gradle.kts index dafe953b1c..93817963f3 100644 --- a/tools/config-docs/site/build.gradle.kts +++ b/tools/config-docs/site/build.gradle.kts @@ -25,6 +25,7 @@ plugins { description = "Polaris site - reference docs" val genProjectPaths = listOf( + ":polaris-async-api", ":polaris-runtime-service", ":polaris-eclipselink", )