diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index baca992dee1d0..8eeb18e1085a5 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -247,6 +247,8 @@ Property Name Description metadata optimization is skipped. Set to ``0`` to disable metadata optimization. + +``iceberg.split-manager-threads`` Number of threads to use for generating Iceberg splits. ``Number of available processors`` ======================================================= ============================================================= ============ Table Properties diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ForIcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ForIcebergSplitManager.java new file mode 100644 index 0000000000000..d032e5173002e --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ForIcebergSplitManager.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@BindingAnnotation +public @interface ForIcebergSplitManager {} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 185249c305b26..6d1476433e5f0 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -88,6 +88,7 @@ import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.configuration.ConfigBinder.configBinder; import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static java.lang.Math.toIntExact; import static java.util.concurrent.Executors.newFixedThreadPool; @@ -132,6 +133,7 @@ public void setup(Binder binder) binder.bind(IcebergTableProperties.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(IcebergSplitManager.class).in(Scopes.SINGLETON); + newExporter(binder).export(ConnectorSplitManager.class).as(generatedNameOf(IcebergSplitManager.class, connectorId)); binder.bind(ConnectorPageSourceProvider.class).to(IcebergPageSourceProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSinkProvider.class).to(IcebergPageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class).in(Scopes.SINGLETON); @@ -175,6 +177,19 @@ public ExecutorService createCachingHiveMetastoreExecutor(MetastoreClientConfig daemonThreadsNamed("hive-metastore-iceberg-%s")); } + @Provides + @Singleton + @ForIcebergSplitManager + public ExecutorService createSplitManagerExecutor(IcebergConfig config) + { + if (config.getSplitManagerThreads() == 0) { + return newDirectExecutorService(); + } + return newFixedThreadPool( + config.getSplitManagerThreads(), + daemonThreadsNamed("iceberg-split-manager-" + connectorId + "-%s")); + } + @Singleton @Provides public OrcFileTailSource createOrcFileTailSource(OrcCacheConfig orcCacheConfig, MBeanExporter exporter) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index d2efa5a5becd4..e49832a64ec9b 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -60,6 +60,7 @@ public class IcebergConfig private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT; private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT; private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT; + private int splitManagerThreads = Runtime.getRuntime().availableProcessors(); @NotNull public FileFormat getFileFormat() @@ -334,4 +335,18 @@ public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxConte this.manifestCacheMaxContentLength = manifestCacheMaxContentLength; return this; } + + @Min(0) + public int getSplitManagerThreads() + { + return splitManagerThreads; + } + + @Config("iceberg.split-manager-threads") + @ConfigDescription("Number of threads to use for generating splits") + public IcebergConfig setSplitManagerThreads(int splitManagerThreads) + { + this.splitManagerThreads = splitManagerThreads; + return this; + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index 360ef046d69ac..0a2bc13de0d99 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.iceberg; +import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.iceberg.changelog.ChangelogSplitSource; @@ -31,9 +32,14 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; import javax.inject.Inject; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG; @@ -46,13 +52,19 @@ public class IcebergSplitManager { private final IcebergTransactionManager transactionManager; private final TypeManager typeManager; + private final ExecutorService executor; + private final ThreadPoolExecutorMBean executorServiceMBean; @Inject - public IcebergSplitManager(IcebergTransactionManager transactionManager, - TypeManager typeManager) + public IcebergSplitManager( + IcebergTransactionManager transactionManager, + TypeManager typeManager, + @ForIcebergSplitManager ExecutorService executor) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.executor = requireNonNull(executor, "executor is null"); + this.executorServiceMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) executor); } @Override @@ -93,7 +105,8 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) { else { TableScan tableScan = icebergTable.newScan() .filter(toIcebergExpression(predicate)) - .useSnapshot(table.getIcebergTableName().getSnapshotId().get()); + .useSnapshot(table.getIcebergTableName().getSnapshotId().get()) + .planWith(executor); // TODO Use residual. Right now there is no way to propagate residual to presto but at least we can // propagate it at split level so the parquet pushdown can leverage it. @@ -105,4 +118,11 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) { return splitSource; } } + + @Managed + @Nested + public ThreadPoolExecutorMBean getExecutor() + { + return executorServiceMBean; + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index d1d6db239e358..1bf74d006d8c5 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -59,7 +59,8 @@ public void testDefaults() .setFileIOImpl(HadoopFileIO.class.getName()) .setMaxManifestCacheSize(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT) .setManifestCacheExpireDuration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) - .setManifestCacheMaxContentLength(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT)); + .setManifestCacheMaxContentLength(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT) + .setSplitManagerThreads(Runtime.getRuntime().availableProcessors())); } @Test @@ -86,6 +87,7 @@ public void testExplicitPropertyMappings() .put("iceberg.io.manifest.cache.max-total-bytes", "1048576000") .put("iceberg.io.manifest.cache.expiration-interval-ms", "600000") .put("iceberg.io.manifest.cache.max-content-length", "10485760") + .put("iceberg.split-manager-threads", "42") .build(); IcebergConfig expected = new IcebergConfig() @@ -108,7 +110,8 @@ public void testExplicitPropertyMappings() .setFileIOImpl("com.facebook.presto.iceberg.HdfsFileIO") .setMaxManifestCacheSize(1048576000) .setManifestCacheExpireDuration(600000) - .setManifestCacheMaxContentLength(10485760); + .setManifestCacheMaxContentLength(10485760) + .setSplitManagerThreads(42); assertFullMapping(properties, expected); }