Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Booleans;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
Expand Down Expand Up @@ -72,15 +72,16 @@ public class OrcBatchPageSource

private final RuntimeStats runtimeStats;

private final boolean[] isRowNumberList;
private final OptionalInt rowNumberColumnIndex;

private final RowIDCoercer coercer;

/**
* @param columns an ordered list of the fields to read
* @param isRowNumberList list of indices of columns. If true, then the column then the column
* at the same position in {@code columns} is a row number. If false, it isn't.
* This should have the same length as {@code columns}.
* @param rowNumberColumnIndex specifies the index of the row number column. Its value should
* be less than the length of {@code columns}. Set to OptionalInt.empty() if no row number
* column is present.
*
* #throws IllegalArgumentException if columns and isRowNumberList do not have the same size
*/
// TODO(elharo) HiveColumnHandle should know whether it's a row number or not. Alternatively,
Expand All @@ -93,8 +94,7 @@ public OrcBatchPageSource(
OrcAggregatedMemoryContext systemMemoryContext,
FileFormatDataSourceStats stats,
RuntimeStats runtimeStats,
// TODO avoid conversion; just pass a boolean array here
List<Boolean> isRowNumberList,
OptionalInt rowNumberColumnIndex,
byte[] rowIDPartitionComponent,
String rowGroupId)
{
Expand All @@ -105,9 +105,9 @@ public OrcBatchPageSource(

this.stats = requireNonNull(stats, "stats is null");
this.runtimeStats = requireNonNull(runtimeStats, "runtimeStats is null");
requireNonNull(isRowNumberList, "isRowNumberList is null");
checkArgument(isRowNumberList.size() == numColumns, "row number list size %s does not match columns size %s", isRowNumberList.size(), columns.size());
this.isRowNumberList = Booleans.toArray(isRowNumberList);
checkArgument(rowNumberColumnIndex.isEmpty() ||
(rowNumberColumnIndex.getAsInt() >= 0 && rowNumberColumnIndex.getAsInt() < numColumns), "row number column index is incorrect");
this.rowNumberColumnIndex = rowNumberColumnIndex;
this.coercer = new RowIDCoercer(rowIDPartitionComponent, rowGroupId);

this.constantBlocks = new Block[numColumns];
Expand Down Expand Up @@ -264,7 +264,7 @@ protected void closeWithSuppression(Throwable throwable)

private boolean isRowPositionColumn(int column)
{
return isRowNumberList[column];
return rowNumberColumnIndex.isPresent() && rowNumberColumnIndex.getAsInt() == column;
}

private boolean isRowIDColumn(int column)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;

import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance;
Expand All @@ -72,7 +73,6 @@
import static com.facebook.presto.orc.OrcEncoding.ORC;
import static com.facebook.presto.orc.OrcReader.INITIAL_BATCH_SIZE;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;

public class OrcBatchPageSourceFactory
Expand Down Expand Up @@ -241,7 +241,6 @@ public static ConnectorPageSource createOrcPageSource(
String rowGroupID = path.getName();

// none of the columns are row numbers
List<Boolean> isRowNumberList = nCopies(physicalColumns.size(), false);
return new OrcBatchPageSource(
recordReader,
reader.getOrcDataSource(),
Expand All @@ -250,7 +249,7 @@ public static ConnectorPageSource createOrcPageSource(
systemMemoryUsage,
stats,
hiveFileContext.getStats(),
isRowNumberList,
OptionalInt.empty(),
partitionID,
rowGroupID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -172,6 +173,7 @@
import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches;
import static com.facebook.presto.parquet.reader.ColumnIndexFilterUtils.getColumnIndexStore;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.not;
import static com.google.common.base.Suppliers.memoize;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -524,8 +526,9 @@ private static ConnectorPageSourceWithRowPositions createBatchOrcPageSource(
Map<String, IcebergOrcColumn> fileOrcColumnsByName = uniqueIndex(fileOrcColumns, orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH));

int nextMissingColumnIndex = fileOrcColumnsByName.size();
List<Boolean> isRowPositionList = new ArrayList<>();
for (IcebergColumnHandle column : regularColumns) {
OptionalInt rowPositionColumnIndex = OptionalInt.empty();
for (int idx = 0; idx < regularColumns.size(); idx++) {
IcebergColumnHandle column = regularColumns.get(idx);
IcebergOrcColumn icebergOrcColumn;

if (fileOrcColumnByIcebergId.isEmpty()) {
Expand Down Expand Up @@ -562,7 +565,11 @@ private static ConnectorPageSourceWithRowPositions createBatchOrcPageSource(
column.getRequiredSubfields(),
Optional.empty()));
}
isRowPositionList.add(column.isRowPositionColumn());

if (column.isRowPositionColumn()) {
checkArgument(rowPositionColumnIndex.isEmpty(), "Requesting more than 1 row number columns is not allowed.");
rowPositionColumnIndex = OptionalInt.of(idx);
}
}

// Skip the time type columns in predicate, converted on page source level
Expand Down Expand Up @@ -619,7 +626,7 @@ private static ConnectorPageSourceWithRowPositions createBatchOrcPageSource(
systemMemoryUsage,
stats,
runtimeStats,
isRowPositionList,
rowPositionColumnIndex,
// Iceberg doesn't support row IDs
new byte[0],
""),
Expand Down
Loading