Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private ReaderPageSource createParquetPageSource(Location path)
dataColumns.stream()
.map(DeltaLakeColumnHandle::toHiveColumnHandle)
.collect(toImmutableList()),
TupleDomain.all(),
ImmutableList.of(TupleDomain.all()),
true,
parquetDateTimeZone,
new FileFormatDataSourceStats(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public ConnectorPageSource createPageSource(
split.getStart(),
split.getLength(),
hiveColumnHandles.build(),
parquetPredicate,
ImmutableList.of(parquetPredicate),
true,
parquetDateTimeZone,
fileFormatDataSourceStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private static DeltaLakePageSource createDeltaLakePageSource(
0,
split.fileSize(),
splitColumns.stream().filter(column -> column.getColumnType() == REGULAR).map(DeltaLakeColumnHandle::toHiveColumnHandle).collect(toImmutableList()),
TupleDomain.all(), // TODO add predicate pushdown https://github.com/trinodb/trino/issues/16990
ImmutableList.of(TupleDomain.all()), // TODO add predicate pushdown https://github.com/trinodb/trino/issues/16990
true,
parquetDateTimeZone,
fileFormatDataSourceStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;

import java.util.Objects;

import static java.util.Objects.requireNonNull;

public class DeltaLakeTransactionLogEntry
Expand Down Expand Up @@ -156,6 +158,31 @@ public DeltaLakeTransactionLogEntry withCommitInfo(CommitInfoEntry commitInfo)
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DeltaLakeTransactionLogEntry that = (DeltaLakeTransactionLogEntry) o;
return Objects.equals(txn, that.txn) &&
Objects.equals(add, that.add) &&
Objects.equals(remove, that.remove) &&
Objects.equals(metaData, that.metaData) &&
Objects.equals(protocol, that.protocol) &&
Objects.equals(commitInfo, that.commitInfo) &&
Objects.equals(cdcEntry, that.cdcEntry);
}

@Override
public int hashCode()
{
return Objects.hash(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
Expand Down Expand Up @@ -187,20 +186,20 @@ public CheckpointEntryIterator(
this.columnsWithMinMaxStats = columnsWithStats(schema, this.metadataEntry.getOriginalPartitionColumns());
}

List<HiveColumnHandle> columns = fields.stream()
.map(field -> buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry).toHiveColumnHandle())
.collect(toImmutableList());

TupleDomain<HiveColumnHandle> tupleDomain = columns.size() > 1 ?
TupleDomain.all() :
buildTupleDomainColumnHandle(getOnlyElement(fields), getOnlyElement(columns));
ImmutableList.Builder<HiveColumnHandle> columnsBuilder = ImmutableList.builderWithExpectedSize(fields.size());
ImmutableList.Builder<TupleDomain<HiveColumnHandle>> disjunctDomainsBuilder = ImmutableList.builderWithExpectedSize(fields.size());
for (EntryType field : fields) {
HiveColumnHandle column = buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry).toHiveColumnHandle();
columnsBuilder.add(column);
disjunctDomainsBuilder.add(buildTupleDomainColumnHandle(field, column));
}

ReaderPageSource pageSource = ParquetPageSourceFactory.createPageSource(
checkpoint,
0,
fileSize,
columns,
tupleDomain,
columnsBuilder.build(),
disjunctDomainsBuilder.build(), // OR-ed condition
true,
DateTimeZone.UTC,
stats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

@TestInstance(PER_CLASS)
Expand All @@ -82,6 +83,16 @@ public void tearDown()
checkpointSchemaManager = null;
}

@Test
public void testReadNoEntries()
throws Exception
{
URI checkpointUri = getResource(TEST_CHECKPOINT).toURI();
assertThatThrownBy(() -> createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(), Optional.empty(), Optional.empty()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("fields is empty");
}

@Test
public void testReadMetadataEntry()
throws Exception
Expand Down Expand Up @@ -134,6 +145,49 @@ public void testReadProtocolEntries()
Optional.empty()));
}

@Test
public void testReadMetadataAndProtocolEntry()
throws Exception
{
URI checkpointUri = getResource(TEST_CHECKPOINT).toURI();
CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(METADATA, PROTOCOL), Optional.empty(), Optional.empty());
List<DeltaLakeTransactionLogEntry> entries = ImmutableList.copyOf(checkpointEntryIterator);

assertThat(entries).hasSize(2);
assertThat(entries).containsExactlyInAnyOrder(
DeltaLakeTransactionLogEntry.metadataEntry(new MetadataEntry(
"b6aeffad-da73-4dde-b68e-937e468b1fde",
null,
null,
new MetadataEntry.Format("parquet", Map.of()),
"{\"type\":\"struct\",\"fields\":[" +
"{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"married\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}}," +

"{\"name\":\"phones\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"struct\",\"fields\":[" +
"{\"name\":\"number\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"label\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}," +
"\"containsNull\":true},\"nullable\":true,\"metadata\":{}}," +

"{\"name\":\"address\",\"type\":{\"type\":\"struct\",\"fields\":[" +
"{\"name\":\"street\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"city\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"state\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"zip\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}," +

"{\"name\":\"income\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}",
List.of("age"),
Map.of(),
1579190100722L)),
DeltaLakeTransactionLogEntry.protocolEntry(
new ProtocolEntry(
1,
2,
Optional.empty(),
Optional.empty())));
}

@Test
public void testReadAddEntries()
throws Exception
Expand Down Expand Up @@ -309,6 +363,8 @@ public void testSkipRemoveEntries()
targetFile.delete(); // file must not exist when writer is called
writer.write(entries, createOutputFile(targetPath));

CheckpointEntryIterator metadataAndProtocolEntryIterator =
createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(METADATA, PROTOCOL), Optional.empty(), Optional.empty());
CheckpointEntryIterator addEntryIterator = createCheckpointEntryIterator(
URI.create(targetPath),
ImmutableSet.of(ADD),
Expand All @@ -319,10 +375,12 @@ public void testSkipRemoveEntries()
CheckpointEntryIterator txnEntryIterator =
createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(TRANSACTION), Optional.empty(), Optional.empty());

assertThat(Iterators.size(metadataAndProtocolEntryIterator)).isEqualTo(2);
assertThat(Iterators.size(addEntryIterator)).isEqualTo(1);
assertThat(Iterators.size(removeEntryIterator)).isEqualTo(numRemoveEntries);
assertThat(Iterators.size(txnEntryIterator)).isEqualTo(0);

assertThat(metadataAndProtocolEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(3L);
assertThat(addEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(2L);
assertThat(removeEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(100L);
assertThat(txnEntryIterator.getCompletedPositions().orElseThrow()).isEqualTo(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public Optional<ReaderPageSource> createPageSource(
start,
length,
columns,
effectivePredicate,
ImmutableList.of(effectivePredicate),
isUseParquetColumnNames(session),
timeZone,
stats,
Expand All @@ -210,7 +210,7 @@ public static ReaderPageSource createPageSource(
long start,
long length,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<TupleDomain<HiveColumnHandle>> disjunctTupleDomains,
boolean useColumnNames,
DateTimeZone timeZone,
FileFormatDataSourceStats stats,
Expand All @@ -237,35 +237,51 @@ public static ReaderPageSource createPageSource(
messageColumn = getColumnIO(fileSchema, requestedSchema);

Map<List<String>, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
TupleDomain<ColumnDescriptor> parquetTupleDomain = options.isIgnoreStatistics()
? TupleDomain.all()
: getParquetTupleDomain(descriptorsByPath, effectivePredicate, fileSchema, useColumnNames);

TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone);
List<TupleDomain<ColumnDescriptor>> parquetTupleDomains;
List<TupleDomainParquetPredicate> parquetPredicates;
if (options.isIgnoreStatistics()) {
parquetTupleDomains = ImmutableList.of();
parquetPredicates = ImmutableList.of();
}
else {
ImmutableList.Builder<TupleDomain<ColumnDescriptor>> parquetTupleDomainsBuilder = ImmutableList.builderWithExpectedSize(disjunctTupleDomains.size());
ImmutableList.Builder<TupleDomainParquetPredicate> parquetPredicatesBuilder = ImmutableList.builderWithExpectedSize(disjunctTupleDomains.size());
for (TupleDomain<HiveColumnHandle> tupleDomain : disjunctTupleDomains) {
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, tupleDomain, fileSchema, useColumnNames);
parquetTupleDomainsBuilder.add(parquetTupleDomain);
parquetPredicatesBuilder.add(buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone));
}
parquetTupleDomains = parquetTupleDomainsBuilder.build();
parquetPredicates = parquetPredicatesBuilder.build();
}

long nextStart = 0;
ImmutableList.Builder<BlockMetaData> blocks = ImmutableList.builder();
ImmutableList.Builder<Long> blockStarts = ImmutableList.builder();
ImmutableList.Builder<Optional<ColumnIndexStore>> columnIndexes = ImmutableList.builder();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
Optional<ColumnIndexStore> columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options);
Optional<BloomFilterStore> bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options);

if (start <= firstDataPage && firstDataPage < start + length
&& predicateMatches(
parquetPredicate,
block,
dataSource,
descriptorsByPath,
parquetTupleDomain,
columnIndex,
bloomFilterStore,
timeZone,
domainCompactionThreshold)) {
blocks.add(block);
blockStarts.add(nextStart);
columnIndexes.add(columnIndex);
for (int i = 0; i < disjunctTupleDomains.size(); i++) {
TupleDomain<ColumnDescriptor> parquetTupleDomain = parquetTupleDomains.get(i);
TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i);
Optional<ColumnIndexStore> columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options);
Optional<BloomFilterStore> bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options);
if (start <= firstDataPage && firstDataPage < start + length
&& predicateMatches(
parquetPredicate,
block,
dataSource,
descriptorsByPath,
parquetTupleDomain,
columnIndex,
bloomFilterStore,
timeZone,
domainCompactionThreshold)) {
blocks.add(block);
blockStarts.add(nextStart);
columnIndexes.add(columnIndex);
break;
}
}
nextStart += block.getRowCount();
}
Expand All @@ -289,7 +305,9 @@ && predicateMatches(
memoryContext,
options,
exception -> handleException(dataSourceId, exception),
Optional.of(parquetPredicate),
// We avoid using disjuncts of parquetPredicate for page pruning in ParquetReader as currently column indexes
// are not present in the Parquet files which are read with disjunct predicates.
parquetPredicates.size() == 1 ? Optional.of(parquetPredicates.get(0)) : Optional.empty(),
columnIndexes.build(),
parquetWriteValidation);
ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider);
Expand Down