From 5c635b779be755dc329d9c1c088da549e29205dc Mon Sep 17 00:00:00 2001 From: wangd Date: Sun, 19 Oct 2025 23:42:00 +0800 Subject: [PATCH] refactor: Avoid List and Array to track row number column in ORC format --- .../presto/hive/orc/OrcBatchPageSource.java | 22 +++++++++---------- .../hive/orc/OrcBatchPageSourceFactory.java | 5 ++--- .../iceberg/IcebergPageSourceProvider.java | 15 +++++++++---- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSource.java index 9573c25b3cb40..7fd753773077d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSource.java @@ -32,12 +32,12 @@ import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.PrestoException; import com.google.common.collect.ImmutableList; -import com.google.common.primitives.Booleans; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; @@ -72,15 +72,16 @@ public class OrcBatchPageSource private final RuntimeStats runtimeStats; - private final boolean[] isRowNumberList; + private final OptionalInt rowNumberColumnIndex; private final RowIDCoercer coercer; /** * @param columns an ordered list of the fields to read - * @param isRowNumberList list of indices of columns. If true, then the column then the column - * at the same position in {@code columns} is a row number. If false, it isn't. - * This should have the same length as {@code columns}. + * @param rowNumberColumnIndex specifies the index of the row number column. Its value should + * be less than the length of {@code columns}. Set to OptionalInt.empty() if no row number + * column is present. + * * #throws IllegalArgumentException if columns and isRowNumberList do not have the same size */ // TODO(elharo) HiveColumnHandle should know whether it's a row number or not. Alternatively, @@ -93,8 +94,7 @@ public OrcBatchPageSource( OrcAggregatedMemoryContext systemMemoryContext, FileFormatDataSourceStats stats, RuntimeStats runtimeStats, - // TODO avoid conversion; just pass a boolean array here - List isRowNumberList, + OptionalInt rowNumberColumnIndex, byte[] rowIDPartitionComponent, String rowGroupId) { @@ -105,9 +105,9 @@ public OrcBatchPageSource( this.stats = requireNonNull(stats, "stats is null"); this.runtimeStats = requireNonNull(runtimeStats, "runtimeStats is null"); - requireNonNull(isRowNumberList, "isRowNumberList is null"); - checkArgument(isRowNumberList.size() == numColumns, "row number list size %s does not match columns size %s", isRowNumberList.size(), columns.size()); - this.isRowNumberList = Booleans.toArray(isRowNumberList); + checkArgument(rowNumberColumnIndex.isEmpty() || + (rowNumberColumnIndex.getAsInt() >= 0 && rowNumberColumnIndex.getAsInt() < numColumns), "row number column index is incorrect"); + this.rowNumberColumnIndex = rowNumberColumnIndex; this.coercer = new RowIDCoercer(rowIDPartitionComponent, rowGroupId); this.constantBlocks = new Block[numColumns]; @@ -264,7 +264,7 @@ protected void closeWithSuppression(Throwable throwable) private boolean isRowPositionColumn(int column) { - return isRowNumberList[column]; + return rowNumberColumnIndex.isPresent() && rowNumberColumnIndex.getAsInt() == column; } private boolean isRowIDColumn(int column) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java index f1b892d4495b2..35669ab368b07 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance; @@ -72,7 +73,6 @@ import static com.facebook.presto.orc.OrcEncoding.ORC; import static com.facebook.presto.orc.OrcReader.INITIAL_BATCH_SIZE; import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; public class OrcBatchPageSourceFactory @@ -241,7 +241,6 @@ public static ConnectorPageSource createOrcPageSource( String rowGroupID = path.getName(); // none of the columns are row numbers - List isRowNumberList = nCopies(physicalColumns.size(), false); return new OrcBatchPageSource( recordReader, reader.getOrcDataSource(), @@ -250,7 +249,7 @@ public static ConnectorPageSource createOrcPageSource( systemMemoryUsage, stats, hiveFileContext.getStats(), - isRowNumberList, + OptionalInt.empty(), partitionID, rowGroupID); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java index 2a2460238edff..c99e5b8034c3a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java @@ -116,6 +116,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; import java.util.function.Function; @@ -172,6 +173,7 @@ import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches; import static com.facebook.presto.parquet.reader.ColumnIndexFilterUtils.getColumnIndexStore; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Predicates.not; import static com.google.common.base.Suppliers.memoize; import static com.google.common.base.Verify.verify; @@ -524,8 +526,9 @@ private static ConnectorPageSourceWithRowPositions createBatchOrcPageSource( Map fileOrcColumnsByName = uniqueIndex(fileOrcColumns, orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH)); int nextMissingColumnIndex = fileOrcColumnsByName.size(); - List isRowPositionList = new ArrayList<>(); - for (IcebergColumnHandle column : regularColumns) { + OptionalInt rowPositionColumnIndex = OptionalInt.empty(); + for (int idx = 0; idx < regularColumns.size(); idx++) { + IcebergColumnHandle column = regularColumns.get(idx); IcebergOrcColumn icebergOrcColumn; if (fileOrcColumnByIcebergId.isEmpty()) { @@ -562,7 +565,11 @@ private static ConnectorPageSourceWithRowPositions createBatchOrcPageSource( column.getRequiredSubfields(), Optional.empty())); } - isRowPositionList.add(column.isRowPositionColumn()); + + if (column.isRowPositionColumn()) { + checkArgument(rowPositionColumnIndex.isEmpty(), "Requesting more than 1 row number columns is not allowed."); + rowPositionColumnIndex = OptionalInt.of(idx); + } } // Skip the time type columns in predicate, converted on page source level @@ -619,7 +626,7 @@ private static ConnectorPageSourceWithRowPositions createBatchOrcPageSource( systemMemoryUsage, stats, runtimeStats, - isRowPositionList, + rowPositionColumnIndex, // Iceberg doesn't support row IDs new byte[0], ""),