diff --git a/core/trino-main/src/main/java/io/trino/split/SplitManager.java b/core/trino-main/src/main/java/io/trino/split/SplitManager.java index 70b388235b2c..4ec43a8670a7 100644 --- a/core/trino-main/src/main/java/io/trino/split/SplitManager.java +++ b/core/trino-main/src/main/java/io/trino/split/SplitManager.java @@ -30,9 +30,11 @@ import io.trino.spi.connector.Constraint; import io.trino.spi.connector.DynamicFilter; import io.trino.tracing.TrinoAttributes; +import jakarta.annotation.PreDestroy; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors; @@ -44,6 +46,7 @@ public class SplitManager private final CatalogServiceProvider splitManagerProvider; private final Tracer tracer; private final int minScheduleSplitBatchSize; + private final ExecutorService executorService; private final Executor executor; @Inject @@ -52,7 +55,14 @@ public SplitManager(CatalogServiceProvider splitManagerPr this.splitManagerProvider = requireNonNull(splitManagerProvider, "splitManagerProvider is null"); this.tracer = requireNonNull(tracer, "tracer is null"); this.minScheduleSplitBatchSize = config.getMinScheduleSplitBatchSize(); - this.executor = new BoundedExecutor(newCachedThreadPool(daemonThreadsNamed("splits-manager-callback-%s")), config.getMaxSplitManagerCallbackThreads()); + this.executorService = newCachedThreadPool(daemonThreadsNamed("splits-manager-callback-%s")); + this.executor = new BoundedExecutor(executorService, config.getMaxSplitManagerCallbackThreads()); + } + + @PreDestroy + public void shutdown() + { + executorService.shutdown(); } public SplitSource getSplits( diff --git a/core/trino-main/src/test/java/io/trino/split/TestBufferingSplitSource.java b/core/trino-main/src/test/java/io/trino/split/TestBufferingSplitSource.java index 9356c5d4447e..8497ffb14adf 100644 --- a/core/trino-main/src/test/java/io/trino/split/TestBufferingSplitSource.java +++ b/core/trino-main/src/test/java/io/trino/split/TestBufferingSplitSource.java @@ -17,9 +17,14 @@ import com.google.common.util.concurrent.ListenableFuture; import io.airlift.concurrent.BoundedExecutor; import io.trino.split.SplitSource.SplitBatch; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.getFutureValue; @@ -29,13 +34,22 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newCachedThreadPool; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Execution(ExecutionMode.CONCURRENT) public class TestBufferingSplitSource { - private static final Executor executor = new BoundedExecutor(newCachedThreadPool(daemonThreadsNamed(TestBufferingSplitSource.class.getSimpleName() + "-%s")), 10); + private final ExecutorService executorService = newCachedThreadPool(daemonThreadsNamed(TestBufferingSplitSource.class.getSimpleName() + "-%s")); + private final Executor executor = new BoundedExecutor(executorService, 10); + + @AfterAll + public void cleanup() + { + executorService.shutdown(); + } @Test public void testSlowSource()