diff --git a/core/trino-main/src/main/java/io/trino/server/InternalCommunicationHttpClientModule.java b/core/trino-main/src/main/java/io/trino/server/InternalCommunicationHttpClientModule.java index d69c66f622ba..e04b42be7432 100644 --- a/core/trino-main/src/main/java/io/trino/server/InternalCommunicationHttpClientModule.java +++ b/core/trino-main/src/main/java/io/trino/server/InternalCommunicationHttpClientModule.java @@ -72,6 +72,7 @@ protected void setup(Binder binder) static void configureClient(HttpClientConfig httpConfig, InternalCommunicationConfig internalCommunicationConfig) { httpConfig.setHttp2Enabled(internalCommunicationConfig.isHttp2Enabled()); + httpConfig.setUseVirtualThreads(true); if (internalCommunicationConfig.isHttpsRequired() && internalCommunicationConfig.getKeyStorePath() == null && internalCommunicationConfig.getTrustStorePath() == null) { configureClientForAutomaticHttps(httpConfig, internalCommunicationConfig); diff --git a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java index 55bdcab61bc6..efacce7f8580 100644 --- a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java +++ b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java @@ -22,6 +22,7 @@ import com.google.inject.multibindings.ProvidesIntoSet; import io.airlift.concurrent.BoundedExecutor; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.airlift.http.server.EnableVirtualThreads; import io.airlift.http.server.HttpServerConfig; import io.airlift.slice.Slice; import io.airlift.stats.GcMonitor; @@ -167,6 +168,7 @@ import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.airlift.discovery.client.DiscoveryBinder.discoveryBinder; @@ -182,6 +184,7 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; import static java.util.concurrent.TimeUnit.SECONDS; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -213,6 +216,8 @@ protected void setup(Binder binder) httpServerConfig.setHttp2MaxConcurrentStreams(32 * 1024); // from the default 16K }); + newOptionalBinder(binder, Key.get(boolean.class, EnableVirtualThreads.class)) + .setBinding().toInstance(true); binder.bind(PreparedStatementEncoder.class).in(Scopes.SINGLETON); binder.bind(HttpRequestSessionContextFactory.class).in(Scopes.SINGLETON); install(new InternalCommunicationModule()); @@ -567,7 +572,7 @@ public static Executor createStartupExecutor(ServerConfig config) return directExecutor(); } return new BoundedExecutor( - newCachedThreadPool(daemonThreadsNamed("startup-%s")), + newThreadPerTaskExecutor(virtualThreadsNamed("startup-%s")), Runtime.getRuntime().availableProcessors()); } diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java index 4901dc5f50fc..6c3dd16e52f8 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java @@ -21,10 +21,10 @@ import jakarta.annotation.PreDestroy; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; public class GcsFileSystemFactory implements TrinoFileSystemFactory @@ -44,7 +44,7 @@ public GcsFileSystemFactory(GcsFileSystemConfig config, GcsStorageFactory storag this.pageSize = config.getPageSize(); this.batchSize = config.getBatchSize(); this.storageFactory = requireNonNull(storageFactory, "storageFactory is null"); - this.executorService = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("trino-filesystem-gcs-%S"))); + this.executorService = listeningDecorator(newThreadPerTaskExecutor(virtualThreadsNamed("trino-filesystem-gcs-%d"))); } @PreDestroy diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java index 384909120e84..869572eee048 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java @@ -48,11 +48,11 @@ import java.util.function.Function; import static com.google.common.base.Preconditions.checkState; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.getRetryStrategy; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; import static software.amazon.awssdk.core.checksums.ResponseChecksumValidation.WHEN_REQUIRED; final class S3FileSystemLoader @@ -63,7 +63,7 @@ final class S3FileSystemLoader private final S3ClientFactory clientFactory; private final S3Presigner preSigner; private final S3Context context; - private final ExecutorService uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s")); + private final ExecutorService uploadExecutor = newThreadPerTaskExecutor(virtualThreadsNamed("s3-upload-%d")); private final Map, S3Client> clients = new ConcurrentHashMap<>(); @Inject diff --git a/lib/trino-metastore/src/main/java/io/trino/metastore/cache/SharedHiveMetastoreCache.java b/lib/trino-metastore/src/main/java/io/trino/metastore/cache/SharedHiveMetastoreCache.java index 68adfd506063..e76fa5c94ed3 100644 --- a/lib/trino-metastore/src/main/java/io/trino/metastore/cache/SharedHiveMetastoreCache.java +++ b/lib/trino-metastore/src/main/java/io/trino/metastore/cache/SharedHiveMetastoreCache.java @@ -43,10 +43,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Throwables.throwIfInstanceOf; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.trino.cache.SafeCaches.buildNonEvictableCache; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; public class SharedHiveMetastoreCache @@ -112,7 +112,7 @@ public SharedHiveMetastoreCache( public void start() { if (enabled) { - executorService = newCachedThreadPool(daemonThreadsNamed("hive-metastore-" + catalogName + "-%s")); + executorService = newThreadPerTaskExecutor(virtualThreadsNamed("hive-metastore-" + catalogName + "-%d")); } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index b16895f6e148..251d1236c84c 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.jdbc; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Key; @@ -43,9 +42,11 @@ import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static com.google.inject.multibindings.ProvidesIntoOptional.Type.DEFAULT; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.base.ClosingBinder.closingBinder; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; import static org.weakref.jmx.guice.ExportBinder.newExporter; public class JdbcModule @@ -111,8 +112,7 @@ public void setup(Binder binder) newOptionalBinder(binder, Key.get(ExecutorService.class, ForRecordCursor.class)) .setDefault() - .toProvider(MoreExecutors::newDirectExecutorService) - .in(Scopes.SINGLETON); + .toInstance(newThreadPerTaskExecutor(virtualThreadsNamed("jdbc-record-cursor-%s"))); newSetBinder(binder, JdbcQueryEventListener.class); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RemoteQueryCancellationModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RemoteQueryCancellationModule.java index 4ab194a6af53..706961c0bb4d 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RemoteQueryCancellationModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RemoteQueryCancellationModule.java @@ -25,11 +25,10 @@ import java.util.concurrent.ExecutorService; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.airlift.configuration.ConditionalModule.conditionalModule; -import static java.lang.String.format; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; public class RemoteQueryCancellationModule extends AbstractConfigurationAwareModule @@ -67,7 +66,7 @@ public RecordCursorExecutorServiceProvider(CatalogName catalogName) @Override public ExecutorService get() { - return newCachedThreadPool(daemonThreadsNamed(format("%s-record-cursor-%%d", catalogName))); + return newThreadPerTaskExecutor(virtualThreadsNamed(catalogName + "-record-cursor-%d")); } } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java index 50209b68b5a9..9672481c8385 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java @@ -36,17 +36,19 @@ import io.trino.spi.procedure.Procedure; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.base.ClosingBinder.closingBinder; import static io.trino.plugin.base.JdkCompatibilityChecks.verifyConnectorAccessOpened; -import static java.util.concurrent.Executors.newCachedThreadPool; -import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; import static org.weakref.jmx.guice.ExportBinder.newExporter; public class BigQueryConnectorModule @@ -127,9 +129,15 @@ public static BigQueryLabelFactory labelFactory(BigQueryConfig config) @Provides @Singleton - public ListeningExecutorService provideListeningExecutor(BigQueryConfig config) + public ListeningExecutorService provideListeningExecutor(CatalogName catalogName, BigQueryConfig config) { - return listeningDecorator(newFixedThreadPool(config.getMetadataParallelism(), daemonThreadsNamed("big-query-%s"))); // limit parallelism + return listeningDecorator(new ThreadPoolExecutor( + config.getMetadataParallelism(), + config.getMetadataParallelism(), + 0, + TimeUnit.MILLISECONDS, + new LinkedBlockingDeque<>(), + virtualThreadsNamed("bigquery-" + catalogName + "-metadata-%d"))); } @Provides @@ -137,7 +145,7 @@ public ListeningExecutorService provideListeningExecutor(BigQueryConfig config) @ForBigQueryPageSource public ExecutorService provideExecutor(CatalogName catalogName) { - return newCachedThreadPool(daemonThreadsNamed("bigquery-" + catalogName + "-%s")); + return newThreadPerTaskExecutor(virtualThreadsNamed("bigquery-" + catalogName + "-%d")); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index f83d2433a5b1..830baf58e897 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -205,7 +205,6 @@ private List listRemoteSchemaNames(ConnectorSession session) .distinct(); // filter out all the ambiguous schemas to prevent failures if anyone tries to access the listed schemas - return remoteSchemaNames.map(remoteSchema -> client.toRemoteDataset(projectId, remoteSchema.toLowerCase(ENGLISH), () -> datasetIds)) .filter(Optional::isPresent) .map(Optional::get) @@ -488,10 +487,10 @@ protected Stream processInParallel(List list, Function functi return Stream.of(function.apply(list.getFirst())); } - List> futures = list.stream() - .map(element -> executorService.submit(() -> function.apply(element))) - .collect(toImmutableList()); try { + List> futures = list.stream() + .map(element -> executorService.submit(() -> function.apply(element))) + .collect(toImmutableList()); return allAsList(futures).get().stream(); } catch (InterruptedException e) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeExecutorModule.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeExecutorModule.java index a45691509d82..cc5cc4ed88f6 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeExecutorModule.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeExecutorModule.java @@ -22,9 +22,9 @@ import java.util.concurrent.ExecutorService; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.trino.plugin.base.ClosingBinder.closingBinder; -import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; public class DeltaLakeExecutorModule implements Module @@ -41,7 +41,7 @@ public void configure(Binder binder) @ForDeltaLakeMetadata public ExecutorService createMetadataExecutor(CatalogName catalogName) { - return newCachedThreadPool(daemonThreadsNamed("delta-metadata-" + catalogName + "-%s")); + return newThreadPerTaskExecutor(virtualThreadsNamed("delta-metadata-" + catalogName + "-%d")); } @Provides @@ -49,6 +49,6 @@ public ExecutorService createMetadataExecutor(CatalogName catalogName) @ForDeltaLakeSplitManager public ExecutorService createSplitSourceExecutor(CatalogName catalogName) { - return newCachedThreadPool(daemonThreadsNamed("delta-split-source-" + catalogName + "-%s")); + return newThreadPerTaskExecutor(virtualThreadsNamed("delta-split-source-" + catalogName + "-%d")); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeTableMetadataScheduler.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeTableMetadataScheduler.java index d185fc37c979..5e8b8dd78155 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeTableMetadataScheduler.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeTableMetadataScheduler.java @@ -16,8 +16,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; -import io.airlift.concurrent.MoreFutures; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.trino.metastore.Table; @@ -41,23 +41,25 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static io.airlift.concurrent.AsyncSemaphore.processAll; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.airlift.concurrent.Threads.threadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.trino.metastore.Table.TABLE_COMMENT; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isStoreTableMetadataInMetastoreEnabled; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NONE; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMetadata; +import static java.lang.Math.max; import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.function.BinaryOperator.maxBy; @@ -77,7 +79,7 @@ public class DeltaLakeTableMetadataScheduler private final boolean enabled; private final Duration scheduleInterval; - private ExecutorService executor; + private ListeningExecutorService executor; private ScheduledExecutorService scheduler; private final AtomicInteger failedCounts = new AtomicInteger(); @@ -117,7 +119,7 @@ public void putAll(Map tableParameters) public void start() { if (enabled) { - executor = storeTableMetadataThreads == 0 ? newDirectExecutorService() : newFixedThreadPool(storeTableMetadataThreads, threadsNamed("store-table-metadata-%s")); + executor = listeningDecorator(newThreadPerTaskExecutor(virtualThreadsNamed("store-table-metadata-%d"))); scheduler = newSingleThreadScheduledExecutor(daemonThreadsNamed("store-table-metadata")); scheduler.scheduleWithFixedDelay(() -> { @@ -161,7 +163,10 @@ public void process() } try { - executor.invokeAll(tasks).forEach(MoreFutures::getDone); + processAll(tasks, executor::submit, max(1, storeTableMetadataThreads), executor).get(); + } + catch (ExecutionException e) { + throw new RuntimeException(e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveExecutorModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveExecutorModule.java index 9e1810300b31..89780a98c45b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveExecutorModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveExecutorModule.java @@ -24,9 +24,10 @@ import java.util.concurrent.ScheduledExecutorService; import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.trino.plugin.base.ClosingBinder.closingBinder; -import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; public class HiveExecutorModule implements Module @@ -44,7 +45,7 @@ public void configure(Binder binder) @ForHiveMetadata public ExecutorService createMetadataExecutor(CatalogName catalogName) { - return newCachedThreadPool(daemonThreadsNamed("hive-metadata-" + catalogName + "-%s")); + return newThreadPerTaskExecutor(virtualThreadsNamed("hive-metadata-" + catalogName + "-%d")); } @Provides @@ -52,7 +53,7 @@ public ExecutorService createMetadataExecutor(CatalogName catalogName) @ForHiveSplitManager public ExecutorService createSplitSourceExecutor(CatalogName catalogName) { - return newCachedThreadPool(daemonThreadsNamed("hive-split-source-" + catalogName + "-%s")); + return newThreadPerTaskExecutor(virtualThreadsNamed("hive-split-source-" + catalogName + "-%d")); } @Provides diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/InMemoryGlueCache.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/InMemoryGlueCache.java index dc627293c35e..0e3b6b89e3d6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/InMemoryGlueCache.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/InMemoryGlueCache.java @@ -55,9 +55,9 @@ import java.util.function.Supplier; import static com.google.common.cache.CacheLoader.asyncReloading; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.trino.cache.CacheUtils.invalidateAllIf; -import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; class InMemoryGlueCache @@ -99,7 +99,7 @@ public InMemoryGlueCache( int maxMetastoreRefreshThreads, long maximumSize) { - this.refreshExecutor = newCachedThreadPool(daemonThreadsNamed("hive-metastore-" + catalogName + "-%s")); + this.refreshExecutor = newThreadPerTaskExecutor(virtualThreadsNamed("hive-metastore-" + catalogName + "-%d")); Executor boundedRefreshExecutor = new ReentrantBoundedExecutor(refreshExecutor, maxMetastoreRefreshThreads); OptionalLong refreshMillis = refreshInterval.stream().mapToLong(Duration::toMillis).findAny(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java index 67ff7508b367..a67e4adb38d9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.java @@ -22,13 +22,15 @@ import io.trino.spi.catalog.CatalogName; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.concurrent.Threads.virtualThreadsNamed; import static io.trino.plugin.base.ClosingBinder.closingBinder; -import static java.util.concurrent.Executors.newCachedThreadPool; -import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.Executors.newThreadPerTaskExecutor; public class IcebergExecutorModule implements Module @@ -46,7 +48,7 @@ public void configure(Binder binder) @ForIcebergMetadata public ExecutorService createIcebergMetadataExecutor(CatalogName catalogName) { - return newCachedThreadPool(daemonThreadsNamed("iceberg-metadata-" + catalogName + "-%s")); + return newThreadPerTaskExecutor(virtualThreadsNamed("iceberg-metadata-" + catalogName + "-%s")); } @Provides @@ -54,7 +56,7 @@ public ExecutorService createIcebergMetadataExecutor(CatalogName catalogName) @ForIcebergSplitManager public ListeningExecutorService createSplitSourceExecutor(CatalogName catalogName) { - return listeningDecorator(newCachedThreadPool(daemonThreadsNamed("iceberg-split-source-" + catalogName + "-%s"))); + return listeningDecorator(newThreadPerTaskExecutor(virtualThreadsNamed("iceberg-split-source-" + catalogName + "-%s"))); } @Provides @@ -65,8 +67,13 @@ public ExecutorService createScanPlanningExecutor(CatalogName catalogName, Icebe if (config.getSplitManagerThreads() == 0) { return newDirectExecutorService(); } - return newFixedThreadPool( + + return new ThreadPoolExecutor( + config.getSplitManagerThreads(), config.getSplitManagerThreads(), - daemonThreadsNamed("iceberg-split-manager-" + catalogName + "-%s")); + 0, + TimeUnit.MILLISECONDS, + new LinkedBlockingDeque<>(), + virtualThreadsNamed("iceberg-split-manager-" + catalogName + "-%s")); } }