diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 10efb526281d..6edf31d13445 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -20,6 +20,8 @@ import io.airlift.units.Duration; import io.trino.plugin.hive.HiveCompressionCodec; +import javax.validation.constraints.DecimalMax; +import javax.validation.constraints.DecimalMin; import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -58,6 +60,7 @@ public class IcebergConfig // putting schemas in locations with extraneous files), so default to false // to avoid deleting those files if Trino is unable to check. private boolean deleteSchemaLocationsFallback; + private double minimumAssignedSplitWeight = 0.05; public CatalogType getCatalogType() { @@ -271,4 +274,19 @@ public IcebergConfig setDeleteSchemaLocationsFallback(boolean deleteSchemaLocati this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; return this; } + + @Config("iceberg.minimum-assigned-split-weight") + @ConfigDescription("Minimum weight that a split can be assigned") + public IcebergConfig setMinimumAssignedSplitWeight(double minimumAssignedSplitWeight) + { + this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; + return this; + } + + @DecimalMax("1") + @DecimalMin(value = "0", inclusive = false) + public double getMinimumAssignedSplitWeight() + { + return minimumAssignedSplitWeight; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 39923d934a1f..1adac3626ddb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -73,6 +73,7 @@ public final class IcebergSessionProperties private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled"; private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size"; private static final String HIVE_CATALOG_NAME = "hive_catalog_name"; + private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "expire_snapshots_min_retention"; public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention"; @@ -229,6 +230,11 @@ public IcebergSessionProperties( // Session-level redirections configuration does not work well with views, as view body is analyzed in context // of a session with properties stripped off. Thus, this property is more of a test-only, or at most POC usefulness. true)) + .add(doubleProperty( + MINIMUM_ASSIGNED_SPLIT_WEIGHT, + "Minimum assigned split weight", + icebergConfig.getMinimumAssignedSplitWeight(), + false)) .add(durationProperty( EXPIRE_SNAPSHOTS_MIN_RETENTION, "Minimal retention period for expire_snapshot procedure", @@ -394,4 +400,9 @@ public static Duration getRemoveOrphanFilesMinRetention(ConnectorSession session { return session.getProperty(REMOVE_ORPHAN_FILES_MIN_RETENTION, Duration.class); } + + public static double getMinimumAssignedSplitWeight(ConnectorSession session) + { + return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index a2ab5b7ec1cd..13875a72309c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.plugin.iceberg.delete.TrinoDeleteFile; import io.trino.spi.HostAddress; +import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; import org.openjdk.jol.info.ClassLayout; @@ -43,6 +44,7 @@ public class IcebergSplit private final String partitionSpecJson; private final String partitionDataJson; private final List deletes; + private final SplitWeight splitWeight; @JsonCreator public IcebergSplit( @@ -55,7 +57,8 @@ public IcebergSplit( @JsonProperty("addresses") List addresses, @JsonProperty("partitionSpecJson") String partitionSpecJson, @JsonProperty("partitionDataJson") String partitionDataJson, - @JsonProperty("deletes") List deletes) + @JsonProperty("deletes") List deletes, + @JsonProperty("splitWeight") SplitWeight splitWeight) { this.path = requireNonNull(path, "path is null"); this.start = start; @@ -67,6 +70,7 @@ public IcebergSplit( this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null"); this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null")); + this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); } @Override @@ -136,6 +140,13 @@ public List getDeletes() return deletes; } + @JsonProperty + @Override + public SplitWeight getSplitWeight() + { + return splitWeight; + } + @Override public Object getInfo() { @@ -154,7 +165,8 @@ public long getRetainedSizeInBytes() + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + estimatedSizeOf(partitionSpecJson) + estimatedSizeOf(partitionDataJson) - + estimatedSizeOf(deletes, TrinoDeleteFile::getRetainedSizeInBytes); + + estimatedSizeOf(deletes, TrinoDeleteFile::getRetainedSizeInBytes) + + splitWeight.getRetainedSizeInBytes(); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index be8e4aab6a6d..b0af1965c900 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -31,6 +31,7 @@ import javax.inject.Inject; import static io.trino.plugin.iceberg.IcebergSessionProperties.getDynamicFilteringWaitTimeout; +import static io.trino.plugin.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; import static java.util.Objects.requireNonNull; public class IcebergSplitManager @@ -79,7 +80,8 @@ public ConnectorSplitSource getSplits( dynamicFilteringWaitTimeout, constraint, typeManager, - table.isRecordScannedFiles()); + table.isRecordScannedFiles(), + getMinimumAssignedSplitWeight(session)); return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 54dfcca65c2d..38115a8b0aad 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -18,10 +18,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; -import com.google.common.collect.Streams; +import com.google.common.io.Closer; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.plugin.iceberg.delete.TrinoDeleteFile; +import io.trino.spi.SplitWeight; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPartitionHandle; @@ -36,7 +37,6 @@ import io.trino.spi.predicate.ValueSet; import io.trino.spi.type.TypeManager; import org.apache.hadoop.fs.Path; -import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpecParser; @@ -44,7 +44,9 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.TableScanUtil; import javax.annotation.Nullable; @@ -53,8 +55,6 @@ import java.net.URI; import java.net.URLEncoder; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -100,9 +100,11 @@ public class IcebergSplitSource private final Stopwatch dynamicFilterWaitStopwatch; private final Constraint constraint; private final TypeManager typeManager; + private final Closer closer = Closer.create(); + private final double minimumAssignedSplitWeight; - private CloseableIterable combinedScanIterable; - private Iterator fileScanIterator; + private CloseableIterable fileScanTaskIterable; + private CloseableIterator fileScanTaskIterator; private TupleDomain pushedDownDynamicFilterPredicate; private final boolean recordScannedFiles; @@ -116,7 +118,8 @@ public IcebergSplitSource( Duration dynamicFilteringWaitTimeout, Constraint constraint, TypeManager typeManager, - boolean recordScannedFiles) + boolean recordScannedFiles, + double minimumAssignedSplitWeight) { this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); this.tableScan = requireNonNull(tableScan, "tableScan is null"); @@ -128,6 +131,7 @@ public IcebergSplitSource( this.constraint = requireNonNull(constraint, "constraint is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.recordScannedFiles = recordScannedFiles; + this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; } @Override @@ -140,7 +144,7 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan .completeOnTimeout(EMPTY_BATCH, timeLeft, MILLISECONDS); } - if (combinedScanIterable == null) { + if (fileScanTaskIterable == null) { // Used to avoid duplicating work if the Dynamic Filter was already pushed down to the Iceberg API this.pushedDownDynamicFilterPredicate = dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast); TupleDomain fullPredicate = tableHandle.getUnenforcedPredicate() @@ -161,14 +165,14 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan } Expression filterExpression = toIcebergExpression(effectivePredicate); - this.combinedScanIterable = tableScan - .filter(filterExpression) - .includeColumnStats() - .planTasks(); - this.fileScanIterator = Streams.stream(combinedScanIterable) - .map(CombinedScanTask::files) - .flatMap(Collection::stream) - .iterator(); + this.fileScanTaskIterable = TableScanUtil.splitFiles( + tableScan.filter(filterExpression) + .includeColumnStats() + .planFiles(), + tableScan.targetSplitSize()); + closer.register(fileScanTaskIterable); + this.fileScanTaskIterator = fileScanTaskIterable.iterator(); + closer.register(fileScanTaskIterator); } TupleDomain dynamicFilterPredicate = dynamicFilter.getCurrentPredicate() @@ -178,7 +182,7 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan return completedFuture(NO_MORE_SPLITS_BATCH); } - Iterator fileScanTasks = Iterators.limit(fileScanIterator, maxSize); + Iterator fileScanTasks = Iterators.limit(fileScanTaskIterator, maxSize); ImmutableList.Builder splits = ImmutableList.builder(); while (fileScanTasks.hasNext()) { FileScanTask scanTask = fileScanTasks.next(); @@ -238,14 +242,14 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan private void finish() { close(); - this.combinedScanIterable = CloseableIterable.empty(); - this.fileScanIterator = Collections.emptyIterator(); + this.fileScanTaskIterable = CloseableIterable.empty(); + this.fileScanTaskIterator = CloseableIterator.empty(); } @Override public boolean isFinished() { - return fileScanIterator != null && !fileScanIterator.hasNext(); + return fileScanTaskIterator != null && !fileScanTaskIterator.hasNext(); } @Override @@ -261,13 +265,11 @@ public Optional> getTableExecuteSplitsInfo() @Override public void close() { - if (combinedScanIterable != null) { - try { - combinedScanIterable.close(); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + try { + closer.close(); + } + catch (IOException e) { + throw new UncheckedIOException(e); } } @@ -376,7 +378,7 @@ static boolean partitionMatchesPredicate( return true; } - private static IcebergSplit toIcebergSplit(FileScanTask task) + private IcebergSplit toIcebergSplit(FileScanTask task) { return new IcebergSplit( hadoopPath(task.file().path().toString()), @@ -390,7 +392,8 @@ private static IcebergSplit toIcebergSplit(FileScanTask task) PartitionData.toJson(task.file().partition()), task.deletes().stream() .map(TrinoDeleteFile::copyOf) - .collect(toImmutableList())); + .collect(toImmutableList()), + SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0))); } private static String hadoopPath(String path) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 0c4d14aa2ae8..4a694b59972f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -55,7 +55,8 @@ public void testDefaults() .setExpireSnapshotsMinRetention(new Duration(7, DAYS)) .setRemoveOrphanFilesMinRetention(new Duration(7, DAYS)) .setDeleteSchemaLocationsFallback(false) - .setTargetMaxFileSize(DataSize.of(1, GIGABYTE))); + .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) + .setMinimumAssignedSplitWeight(0.05)); } @Test @@ -77,6 +78,7 @@ public void testExplicitPropertyMappings() .put("iceberg.remove_orphan_files.min-retention", "14h") .put("iceberg.delete-schema-locations-fallback", "true") .put("iceberg.target-max-file-size", "1MB") + .put("iceberg.minimum-assigned-split-weight", "0.01") .buildOrThrow(); IcebergConfig expected = new IcebergConfig() @@ -94,7 +96,8 @@ public void testExplicitPropertyMappings() .setExpireSnapshotsMinRetention(new Duration(13, HOURS)) .setRemoveOrphanFilesMinRetention(new Duration(14, HOURS)) .setDeleteSchemaLocationsFallback(true) - .setTargetMaxFileSize(DataSize.of(1, MEGABYTE)); + .setTargetMaxFileSize(DataSize.of(1, MEGABYTE)) + .setMinimumAssignedSplitWeight(0.01); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 9a154212d597..266215721217 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -35,6 +35,7 @@ import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; import io.trino.spi.Page; +import io.trino.spi.SplitWeight; import io.trino.spi.block.BlockBuilder; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; @@ -161,7 +162,8 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle ImmutableList.of(), PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), PartitionData.toJson(new PartitionData(new Object[] {})), - ImmutableList.of()); + ImmutableList.of(), + SplitWeight.standard()); TableHandle tableHandle = new TableHandle( new CatalogName(ICEBERG_CATALOG_NAME), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 7d3bbc38be09..d26c488e8a41 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -185,7 +185,8 @@ public TupleDomain getCurrentPredicate() new Duration(2, SECONDS), alwaysTrue(), new TestingTypeManager(), - false); + false, + new IcebergConfig().getMinimumAssignedSplitWeight()); ImmutableList.Builder splits = ImmutableList.builder(); while (!splitSource.isFinished()) {