Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,14 @@ Property Name Description
``iceberg.target_split_size`` Overrides the target split size for all tables in a query in bytes.
Set to 0 to use the value in each Iceberg table's
``read.split.target-size`` property.
``iceberg.affinity_scheduling_file_section_size`` When the ``node_selection_strategy`` or
``hive.node-selection-strategy`` property is set to ``SOFT_AFFINITY``,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the property's name be iceberg.node-selection-strategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way we register the config, I believe it is still hive.node-selection-strategy. The config comes from HiveCommonClientConfig.java which is bound in HiveCommonModule.java. The injector doesn't register a prefix with the config, so it uses the same value as in the *Config class which is hive.node-selection-strategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, you are right. Perhaps in future we should consider binding separate prefixes to the configs in presto-hive-common in each lake house connector's own Module.

this configuration property will change the size of a file chunk that
is hashed to a particular node when determining the which worker to
assign a split to. Splits which read data from the same file within
the same chunk will hash to the same node. A smaller chunk size will
result in a higher probability splits being distributed evenly across
the cluster, but reduce locality.
===================================================== ======================================================================

Caching Support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class HiveCommonClientConfig
private boolean readNullMaskedParquetEncryptedValueEnabled;
private boolean useParquetColumnNames;
private boolean zstdJniDecompressionEnabled;
private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE);

public NodeSelectionStrategy getNodeSelectionStrategy()
{
Expand Down Expand Up @@ -284,4 +285,17 @@ public HiveCommonClientConfig setZstdJniDecompressionEnabled(boolean zstdJniDeco
this.zstdJniDecompressionEnabled = zstdJniDecompressionEnabled;
return this;
}

@NotNull
public DataSize getAffinitySchedulingFileSectionSize()
{
return affinitySchedulingFileSectionSize;
}

@Config("hive.affinity-scheduling-file-section-size")
public HiveCommonClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize)
{
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class HiveCommonSessionProperties
@VisibleForTesting
public static final String PARQUET_BATCH_READ_OPTIMIZATION_ENABLED = "parquet_batch_read_optimization_enabled";

private static final String NODE_SELECTION_STRATEGY = "node_selection_strategy";
public static final String NODE_SELECTION_STRATEGY = "node_selection_strategy";
private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled";
private static final String ORC_LAZY_READ_SMALL_RANGES = "orc_lazy_read_small_ranges";
private static final String ORC_MAX_BUFFER_SIZE = "orc_max_buffer_size";
Expand All @@ -61,6 +61,7 @@ public class HiveCommonSessionProperties
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_USE_COLUMN_NAMES = "parquet_use_column_names";
public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled";
public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -177,6 +178,11 @@ public HiveCommonSessionProperties(HiveCommonClientConfig hiveCommonClientConfig
READ_MASKED_VALUE_ENABLED,
"Return null when access is denied for an encrypted parquet column",
hiveCommonClientConfig.getReadNullMaskedParquetEncryptedValue(),
false),
dataSizeSessionProperty(
AFFINITY_SCHEDULING_FILE_SECTION_SIZE,
"Size of file section for affinity scheduling",
hiveCommonClientConfig.getAffinitySchedulingFileSectionSize(),
false));
}

Expand Down Expand Up @@ -299,4 +305,9 @@ public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, St
value -> DataSize.valueOf((String) value),
DataSize::toString);
}

public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session)
{
return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;

import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class TestHiveCommonClientConfig
{
Expand All @@ -47,7 +48,8 @@ public void testDefaults()
.setZstdJniDecompressionEnabled(false)
.setParquetBatchReaderVerificationEnabled(false)
.setParquetBatchReadOptimizationEnabled(false)
.setReadNullMaskedParquetEncryptedValue(false));
.setReadNullMaskedParquetEncryptedValue(false)
.setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE)));
}

@Test
Expand All @@ -72,6 +74,7 @@ public void testExplicitPropertyMappings()
.put("hive.enable-parquet-batch-reader-verification", "true")
.put("hive.parquet-batch-read-optimization-enabled", "true")
.put("hive.read-null-masked-parquet-encrypted-value-enabled", "true")
.put("hive.affinity-scheduling-file-section-size", "512MB")
.build();

HiveCommonClientConfig expected = new HiveCommonClientConfig()
Expand All @@ -92,7 +95,8 @@ public void testExplicitPropertyMappings()
.setZstdJniDecompressionEnabled(true)
.setParquetBatchReaderVerificationEnabled(true)
.setParquetBatchReadOptimizationEnabled(true)
.setReadNullMaskedParquetEncryptedValue(true);
.setReadNullMaskedParquetEncryptedValue(true)
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE));

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public class HiveClientConfig
private Duration parquetQuickStatsFileMetadataFetchTimeout = new Duration(60, TimeUnit.SECONDS);
private int parquetQuickStatsMaxConcurrentCalls = 500;
private int quickStatsMaxConcurrentCalls = 100;
private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE);
private boolean legacyTimestampBucketing;

@Min(0)
Expand Down Expand Up @@ -1793,19 +1792,6 @@ public int getMaxParallelParsingConcurrency()
return this.maxParallelParsingConcurrency;
}

@NotNull
public DataSize getAffinitySchedulingFileSectionSize()
{
return affinitySchedulingFileSectionSize;
}

@Config("hive.affinity-scheduling-file-section-size")
public HiveClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize)
{
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
return this;
}

@Config("hive.skip-empty-files")
@ConfigDescription("Enables skip of empty files avoiding output error")
public HiveClientConfig setSkipEmptyFilesEnabled(boolean skipEmptyFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public final class HiveSessionProperties
public static final String QUICK_STATS_INLINE_BUILD_TIMEOUT = "quick_stats_inline_build_timeout";
public static final String QUICK_STATS_BACKGROUND_BUILD_TIMEOUT = "quick_stats_background_build_timeout";
public static final String DYNAMIC_SPLIT_SIZES_ENABLED = "dynamic_split_sizes_enabled";
public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size";
public static final String SKIP_EMPTY_FILES = "skip_empty_files";
public static final String LEGACY_TIMESTAMP_BUCKETING = "legacy_timestamp_bucketing";

Expand Down Expand Up @@ -639,11 +638,6 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
false,
value -> Duration.valueOf((String) value),
Duration::toString),
dataSizeSessionProperty(
AFFINITY_SCHEDULING_FILE_SECTION_SIZE,
"Size of file section for affinity scheduling",
hiveClientConfig.getAffinitySchedulingFileSectionSize(),
false),
booleanProperty(
SKIP_EMPTY_FILES,
"If it is required empty files will be skipped",
Expand Down Expand Up @@ -1126,11 +1120,6 @@ public static Duration getQuickStatsBackgroundBuildTimeout(ConnectorSession sess
return session.getProperty(QUICK_STATS_BACKGROUND_BUILD_TIMEOUT, Duration.class);
}

public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session)
{
return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class);
}

public static boolean isSkipEmptyFilesEnabled(ConnectorSession session)
{
return session.getProperty(SKIP_EMPTY_FILES, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@

import static com.facebook.airlift.concurrent.MoreFutures.failedFuture;
import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMinimumAssignedSplitWeight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ public void testDefaults()
.setMaxConcurrentParquetQuickStatsCalls(500)
.setCteVirtualBucketCount(128)
.setSkipEmptyFilesEnabled(false)
.setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE))
.setLegacyTimestampBucketing(false));
}

Expand Down Expand Up @@ -290,7 +289,6 @@ public void testExplicitPropertyMappings()
.put("hive.quick-stats.parquet.max-concurrent-calls", "399")
.put("hive.quick-stats.max-concurrent-calls", "101")
.put("hive.cte-virtual-bucket-count", "256")
.put("hive.affinity-scheduling-file-section-size", "512MB")
.put("hive.skip-empty-files", "true")
.put("hive.legacy-timestamp-bucketing", "true")
.build();
Expand Down Expand Up @@ -411,10 +409,8 @@ public void testExplicitPropertyMappings()
.setParquetQuickStatsFileMetadataFetchTimeout(new Duration(30, TimeUnit.SECONDS))
.setMaxConcurrentParquetQuickStatsCalls(399)
.setMaxConcurrentQuickStatsCalls(101)
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE))
.setSkipEmptyFilesEnabled(true)
.setCteVirtualBucketCount(256)
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE))
.setLegacyTimestampBucketing(true);

ConfigAssertions.assertFullMapping(properties, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import static com.facebook.presto.hive.CacheQuotaScope.GLOBAL;
import static com.facebook.presto.hive.CacheQuotaScope.PARTITION;
import static com.facebook.presto.hive.CacheQuotaScope.TABLE;
import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class IcebergSplit
private final List<DeleteFile> deletes;
private final Optional<ChangelogSplitInfo> changelogSplitInfo;
private final long dataSequenceNumber;
private final long affinitySchedulingFileSectionSize;
private final long affinitySchedulingFileSectionIndex;

@JsonCreator
public IcebergSplit(
Expand All @@ -66,7 +68,8 @@ public IcebergSplit(
@JsonProperty("splitWeight") SplitWeight splitWeight,
@JsonProperty("deletes") List<DeleteFile> deletes,
@JsonProperty("changelogSplitInfo") Optional<ChangelogSplitInfo> changelogSplitInfo,
@JsonProperty("dataSequenceNumber") long dataSequenceNumber)
@JsonProperty("dataSequenceNumber") long dataSequenceNumber,
@JsonProperty("affinitySchedulingSectionSize") long affinitySchedulingFileSectionSize)
{
requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
this.path = requireNonNull(path, "path is null");
Expand All @@ -82,6 +85,8 @@ public IcebergSplit(
this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null"));
this.changelogSplitInfo = requireNonNull(changelogSplitInfo, "changelogSplitInfo is null");
this.dataSequenceNumber = dataSequenceNumber;
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
this.affinitySchedulingFileSectionIndex = start / affinitySchedulingFileSectionSize;
}

@JsonProperty
Expand Down Expand Up @@ -143,7 +148,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
return nodeProvider.get(path);
return nodeProvider.get(path + "#" + affinitySchedulingFileSectionIndex);
}
return addresses;
}
Expand Down Expand Up @@ -173,6 +178,12 @@ public long getDataSequenceNumber()
return dataSequenceNumber;
}

@JsonProperty
public long getAffinitySchedulingFileSectionSize()
{
return affinitySchedulingFileSectionSize;
}

@Override
public Object getInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
Expand All @@ -60,6 +61,7 @@ public class IcebergSplitSource
private final double minimumAssignedSplitWeight;
private final long targetSplitSize;
private final NodeSelectionStrategy nodeSelectionStrategy;
private final long affinitySchedulingFileSectionSize;

private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;

Expand All @@ -73,11 +75,12 @@ public IcebergSplitSource(
this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes();
this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session);
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
this.affinitySchedulingFileSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
this.fileScanTaskIterator = closer.register(
splitFiles(
closer.register(tableScan.planFiles()),
targetSplitSize)
.iterator());
.iterator());
}

@Override
Expand Down Expand Up @@ -139,6 +142,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)),
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
Optional.empty(),
getDataSequenceNumber(task.file()));
getDataSequenceNumber(task.file()),
affinitySchedulingFileSectionSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
Expand All @@ -71,6 +72,7 @@ public class ChangelogSplitSource
private final long targetSplitSize;
private final List<IcebergColumnHandle> columnHandles;
private final NodeSelectionStrategy nodeSelectionStrategy;
private final long affinitySchedulingSectionSize;

public ChangelogSplitSource(
ConnectorSession session,
Expand All @@ -86,6 +88,7 @@ public ChangelogSplitSource(
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
this.fileScanTaskIterable = closer.register(tableScan.planFiles());
this.fileScanTaskIterator = closer.register(fileScanTaskIterable.iterator());
this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
}

@Override
Expand Down Expand Up @@ -153,6 +156,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask<DataFile> task, Ch
changeTask.changeOrdinal(),
changeTask.commitSnapshotId(),
columnHandles)),
getDataSequenceNumber(task.file()));
getDataSequenceNumber(task.file()),
affinitySchedulingSectionSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent;
Expand All @@ -52,6 +53,7 @@ public class EqualityDeletesSplitSource
{
private final ConnectorSession session;
private final Map<Integer, PartitionSpec> specById;
private final long affinitySchedulingSectionSize;
private CloseableIterator<DeleteFile> deleteFiles;

public EqualityDeletesSplitSource(
Expand All @@ -64,6 +66,7 @@ public EqualityDeletesSplitSource(
requireNonNull(deleteFiles, "deleteFiles is null");
this.specById = table.specs();
this.deleteFiles = CloseableIterable.filter(deleteFiles, deleteFile -> fromIcebergFileContent(deleteFile.content()) == EQUALITY_DELETES).iterator();
this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
}

@Override
Expand Down Expand Up @@ -121,6 +124,7 @@ private IcebergSplit splitFromDeleteFile(DeleteFile deleteFile)
SplitWeight.standard(),
ImmutableList.of(),
Optional.empty(),
IcebergUtil.getDataSequenceNumber(deleteFile));
IcebergUtil.getDataSequenceNumber(deleteFile),
affinitySchedulingSectionSize);
}
}
Loading
Loading