Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ Property Name Description
metadata optimization is skipped.

Set to ``0`` to disable metadata optimization.

``iceberg.split-manager-threads`` Number of threads to use for generating Iceberg splits. ``Number of available processors``
======================================================= ============================================================= ============

Table Properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForIcebergSplitManager {}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static java.lang.Math.toIntExact;
import static java.util.concurrent.Executors.newFixedThreadPool;
Expand Down Expand Up @@ -132,6 +133,7 @@ public void setup(Binder binder)
binder.bind(IcebergTableProperties.class).in(Scopes.SINGLETON);

binder.bind(ConnectorSplitManager.class).to(IcebergSplitManager.class).in(Scopes.SINGLETON);
newExporter(binder).export(ConnectorSplitManager.class).as(generatedNameOf(IcebergSplitManager.class, connectorId));
binder.bind(ConnectorPageSourceProvider.class).to(IcebergPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSinkProvider.class).to(IcebergPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -175,6 +177,19 @@ public ExecutorService createCachingHiveMetastoreExecutor(MetastoreClientConfig
daemonThreadsNamed("hive-metastore-iceberg-%s"));
}

@Provides
@Singleton
@ForIcebergSplitManager
public ExecutorService createSplitManagerExecutor(IcebergConfig config)
{
if (config.getSplitManagerThreads() == 0) {
return newDirectExecutorService();
}
return newFixedThreadPool(
config.getSplitManagerThreads(),
daemonThreadsNamed("iceberg-split-manager-" + connectorId + "-%s"));
}

@Singleton
@Provides
public OrcFileTailSource createOrcFileTailSource(OrcCacheConfig orcCacheConfig, MBeanExporter exporter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class IcebergConfig
private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
private int splitManagerThreads = Runtime.getRuntime().availableProcessors();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that they used Runtime.getRuntime().availableProcessors() * 2 as the default value, probably because each processor core commonly supports 2 threads (hyper-threading). Are there any specific reasons for your setting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default value is to be consistent with the default value of Iceberg's iceberg.worker.num-threads attribute:https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/SystemConfigs.java#L38-L43

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Thanks for the clarification.


@NotNull
public FileFormat getFileFormat()
Expand Down Expand Up @@ -334,4 +335,18 @@ public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxConte
this.manifestCacheMaxContentLength = manifestCacheMaxContentLength;
return this;
}

@Min(0)
public int getSplitManagerThreads()
{
return splitManagerThreads;
}

@Config("iceberg.split-manager-threads")
@ConfigDescription("Number of threads to use for generating splits")
public IcebergConfig setSplitManagerThreads(int splitManagerThreads)
{
this.splitManagerThreads = splitManagerThreads;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.iceberg;

import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.iceberg.changelog.ChangelogSplitSource;
Expand All @@ -31,9 +32,14 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.inject.Inject;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
Expand All @@ -46,13 +52,19 @@ public class IcebergSplitManager
{
private final IcebergTransactionManager transactionManager;
private final TypeManager typeManager;
private final ExecutorService executor;
private final ThreadPoolExecutorMBean executorServiceMBean;

@Inject
public IcebergSplitManager(IcebergTransactionManager transactionManager,
TypeManager typeManager)
public IcebergSplitManager(
IcebergTransactionManager transactionManager,
TypeManager typeManager,
@ForIcebergSplitManager ExecutorService executor)
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.executor = requireNonNull(executor, "executor is null");
this.executorServiceMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) executor);
}

@Override
Expand Down Expand Up @@ -93,7 +105,8 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
else {
TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(predicate))
.useSnapshot(table.getIcebergTableName().getSnapshotId().get());
.useSnapshot(table.getIcebergTableName().getSnapshotId().get())
.planWith(executor);

// TODO Use residual. Right now there is no way to propagate residual to presto but at least we can
// propagate it at split level so the parquet pushdown can leverage it.
Expand All @@ -105,4 +118,11 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
return splitSource;
}
}

@Managed
@Nested
public ThreadPoolExecutorMBean getExecutor()
{
return executorServiceMBean;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public void testDefaults()
.setFileIOImpl(HadoopFileIO.class.getName())
.setMaxManifestCacheSize(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT)
.setManifestCacheExpireDuration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
.setManifestCacheMaxContentLength(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT));
.setManifestCacheMaxContentLength(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT)
.setSplitManagerThreads(Runtime.getRuntime().availableProcessors()));
}

@Test
Expand All @@ -86,6 +87,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.io.manifest.cache.max-total-bytes", "1048576000")
.put("iceberg.io.manifest.cache.expiration-interval-ms", "600000")
.put("iceberg.io.manifest.cache.max-content-length", "10485760")
.put("iceberg.split-manager-threads", "42")
.build();

IcebergConfig expected = new IcebergConfig()
Expand All @@ -108,7 +110,8 @@ public void testExplicitPropertyMappings()
.setFileIOImpl("com.facebook.presto.iceberg.HdfsFileIO")
.setMaxManifestCacheSize(1048576000)
.setManifestCacheExpireDuration(600000)
.setManifestCacheMaxContentLength(10485760);
.setManifestCacheMaxContentLength(10485760)
.setSplitManagerThreads(42);

assertFullMapping(properties, expected);
}
Expand Down