Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
}
checkUnsupportedWriterFeatures(protocolEntry);

if (!newColumnMetadata.isNullable() && !transactionLogAccess.getActiveFiles(getSnapshot(session, handle), session).isEmpty()) {
if (!newColumnMetadata.isNullable() && !transactionLogAccess.getActiveFiles(getSnapshot(session, handle), handle.getMetadataEntry(), handle.getProtocolEntry(), session).isEmpty()) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add NOT NULL column '%s' for non-empty table: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName()));
}

Expand Down Expand Up @@ -3141,7 +3141,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
private void generateMissingFileStatistics(ConnectorSession session, DeltaLakeTableHandle tableHandle, Collection<ComputedStatistics> computedStatistics)
{
Map<String, AddFileEntry> addFileEntriesWithNoStats = transactionLogAccess.getActiveFiles(
getSnapshot(session, tableHandle), session)
getSnapshot(session, tableHandle), tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session)
.stream()
.filter(addFileEntry -> addFileEntry.getStats().isEmpty()
|| addFileEntry.getStats().get().getNumRecords().isEmpty()
Expand Down Expand Up @@ -3491,7 +3491,7 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl
private List<AddFileEntry> getAddFileEntriesMatchingEnforcedPartitionConstraint(ConnectorSession session, DeltaLakeTableHandle tableHandle)
{
TableSnapshot tableSnapshot = getSnapshot(session, tableHandle);
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, session);
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session);
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
if (enforcedPartitionConstraint.isAll()) {
return validDataFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private Stream<DeltaLakeSplit> getSplits(
catch (IOException e) {
throw new RuntimeException(e);
}
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, session);
List<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session);
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint = tableHandle.getNonPartitionConstraint();
Domain pathDomain = getPathDomain(nonPartitionConstraint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private void doVacuum(
// Any remaining file are not live, and not needed to read any "recent" snapshot.
List<Long> recentVersions = transactionLogAccess.getPastTableVersions(fileSystem, transactionLogDir, threshold, tableSnapshot.getVersion());
Set<String> retainedPaths = Stream.concat(
transactionLogAccess.getActiveFiles(tableSnapshot, session).stream()
transactionLogAccess.getActiveFiles(tableSnapshot, handle.getMetadataEntry(), handle.getProtocolEntry(), session).stream()
.map(AddFileEntry::getPath),
transactionLogAccess.getJsonEntries(
fileSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
.filter(column -> predicatedColumnNames.contains(column.getName()))
.collect(toImmutableList());

for (AddFileEntry addEntry : transactionLogAccess.getActiveFiles(tableSnapshot, session)) {
for (AddFileEntry addEntry : transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session)) {
Optional<? extends DeltaLakeFileStatistics> fileStatistics = addEntry.getStats();
if (fileStatistics.isEmpty()) {
// Open source Delta Lake does not collect stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
Expand All @@ -37,14 +36,12 @@
import java.util.Set;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -180,7 +177,8 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
CheckpointSchemaManager checkpointSchemaManager,
TypeManager typeManager,
TrinoFileSystem fileSystem,
FileFormatDataSourceStats stats)
FileFormatDataSourceStats stats,
Optional<MetadataAndProtocolEntry> metadataAndProtocol)
throws IOException
{
if (lastCheckpoint.isEmpty()) {
Expand All @@ -190,15 +188,8 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
LastCheckpoint checkpoint = lastCheckpoint.get();
// Add entries contain statistics. When struct statistics are used the format of the Parquet file depends on the schema. It is important to use the schema at the time
// of the Checkpoint creation, in case the schema has evolved since it was written.
Optional<MetadataAndProtocolEntry> metadataAndProtocol = Optional.empty();
if (entryTypes.contains(ADD)) {
metadataAndProtocol = Optional.of(getCheckpointMetadataAndProtocolEntries(
session,
checkpointSchemaManager,
typeManager,
fileSystem,
stats,
checkpoint));
checkState(metadataAndProtocol.isPresent(), "metadata and protocol information is needed to process the add log entries");
}

Stream<DeltaLakeTransactionLogEntry> resultStream = Stream.empty();
Expand Down Expand Up @@ -259,51 +250,9 @@ private Iterator<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntrie
domainCompactionThreshold);
}

private MetadataAndProtocolEntry getCheckpointMetadataAndProtocolEntries(
ConnectorSession session,
CheckpointSchemaManager checkpointSchemaManager,
TypeManager typeManager,
TrinoFileSystem fileSystem,
FileFormatDataSourceStats stats,
LastCheckpoint checkpoint)
throws IOException
{
MetadataEntry metadata = null;
ProtocolEntry protocol = null;
for (Location checkpointPath : getCheckpointPartPaths(checkpoint)) {
TrinoInputFile checkpointFile = fileSystem.newInputFile(checkpointPath);
Iterator<DeltaLakeTransactionLogEntry> entries = getCheckpointTransactionLogEntries(
session,
ImmutableSet.of(METADATA, PROTOCOL),
Optional.empty(),
Optional.empty(),
checkpointSchemaManager,
typeManager,
stats,
checkpoint,
checkpointFile);
while (entries.hasNext()) {
DeltaLakeTransactionLogEntry entry = entries.next();
if (metadata == null && entry.getMetaData() != null) {
metadata = entry.getMetaData();
}
if (protocol == null && entry.getProtocol() != null) {
protocol = entry.getProtocol();
}
if (metadata != null && protocol != null) {
break;
}
}
}
if (metadata == null || protocol == null) {
throw new TrinoException(DELTA_LAKE_BAD_DATA, "Checkpoint found without metadata and protocol entry: " + checkpoint);
}
return new MetadataAndProtocolEntry(metadata, protocol);
}

private record MetadataAndProtocolEntry(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
public record MetadataAndProtocolEntry(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
private MetadataAndProtocolEntry
public MetadataAndProtocolEntry
{
requireNonNull(metadataEntry, "metadataEntry is null");
requireNonNull(protocolEntry, "protocolEntry is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.parquet.ParquetReaderOptions;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot.MetadataAndProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
Expand Down Expand Up @@ -255,7 +256,7 @@ public MetadataEntry getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSess
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
}

public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session)
public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session)
{
try {
TableVersion tableVersion = new TableVersion(new TableLocation(tableSnapshot.getTable(), tableSnapshot.getTableLocation()), tableSnapshot.getVersion());
Expand Down Expand Up @@ -285,7 +286,7 @@ public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, ConnectorS
}
}

List<AddFileEntry> activeFiles = loadActiveFiles(tableSnapshot, session);
List<AddFileEntry> activeFiles = loadActiveFiles(tableSnapshot, metadataEntry, protocolEntry, session);
return new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles);
});
return cacheEntry.getActiveFiles();
Expand All @@ -295,17 +296,22 @@ public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, ConnectorS
}
}

private List<AddFileEntry> loadActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session)
private List<AddFileEntry> loadActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session)
{
try (Stream<AddFileEntry> entries = getEntries(
tableSnapshot,
ImmutableSet.of(ADD),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

if (entryTypes.contains(ADD)) {
            metadataAndProtocol = Optional.of(getCheckpointMetadataAndProtocolEntries(
                    session,
                    checkpointSchemaManager,
                    typeManager,
                    fileSystem,
                    stats,
                    checkpoint));

from io.trino.plugin.deltalake.transactionlog.TableSnapshot#getCheckpointTransactionLogEntries

it's now dead code

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this:

snapshot.getCheckpointTransactionLogEntries(
session,
ImmutableSet.of(PROTOCOL, TRANSACTION, ADD, REMOVE, COMMIT),
checkpointSchemaManager,
typeManager,
fileSystem,
fileFormatDataSourceStats)
.forEach(checkpointBuilder::addLogEntry);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the linked code place already reads metadata entry.
let it read protocol as well and pass both

this::activeAddEntries,
List<Transaction> transactions = tableSnapshot.getTransactions();
try (Stream<DeltaLakeTransactionLogEntry> checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries(
session,
ImmutableSet.of(ADD),
checkpointSchemaManager,
typeManager,
fileSystemFactory.create(session),
fileFormatDataSourceStats)) {
List<AddFileEntry> activeFiles = entries.collect(toImmutableList());
return activeFiles;
fileFormatDataSourceStats,
Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)))) {
return activeAddEntries(checkpointEntries, transactions)
.collect(toImmutableList());
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error reading transaction log for " + tableSnapshot.getTable(), e);
}
}

Expand Down Expand Up @@ -439,7 +445,7 @@ private <T> Stream<T> getEntries(
try {
List<Transaction> transactions = tableSnapshot.getTransactions();
Stream<DeltaLakeTransactionLogEntry> checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries(
session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats);
session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty());

return entryMapper.apply(
checkpointEntries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,24 @@
import io.trino.filesystem.TrinoOutputFile;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot.MetadataAndProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.NodeVersion;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TypeManager;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.LAST_CHECKPOINT_FILENAME;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD;
Expand Down Expand Up @@ -92,34 +95,47 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot)
CheckpointBuilder checkpointBuilder = new CheckpointBuilder();

TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Optional<DeltaLakeTransactionLogEntry> checkpointMetadataLogEntry = snapshot
List<DeltaLakeTransactionLogEntry> checkpointLogEntries = snapshot
.getCheckpointTransactionLogEntries(
session,
ImmutableSet.of(METADATA),
ImmutableSet.of(METADATA, PROTOCOL),
checkpointSchemaManager,
typeManager,
fileSystem,
fileFormatDataSourceStats)
.collect(toOptional());
if (checkpointMetadataLogEntry.isPresent()) {
fileFormatDataSourceStats,
Optional.empty())
.filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null)
.collect(toImmutableList());

if (!checkpointLogEntries.isEmpty()) {
// TODO HACK: this call is required only to ensure that cachedMetadataEntry is set in snapshot (https://github.com/trinodb/trino/issues/12032),
// so we can read add entries below this should be reworked so we pass metadata entry explicitly to getCheckpointTransactionLogEntries,
// and we should get rid of `setCachedMetadata` in TableSnapshot to make it immutable.
// Also more proper would be to use metadata entry obtained above in snapshot.getCheckpointTransactionLogEntries to read other checkpoint entries, but using newer one should not do harm.
transactionLogAccess.getMetadataEntry(snapshot, session);

// register metadata entry in writer
checkState(checkpointMetadataLogEntry.get().getMetaData() != null, "metaData not present in log entry");
checkpointBuilder.addLogEntry(checkpointMetadataLogEntry.get());
DeltaLakeTransactionLogEntry metadataLogEntry = checkpointLogEntries.stream()
.filter(logEntry -> logEntry.getMetaData() != null)
.findFirst()
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + snapshot.getTable()));
DeltaLakeTransactionLogEntry protocolLogEntry = checkpointLogEntries.stream()
.filter(logEntry -> logEntry.getProtocol() != null)
.findFirst()
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + snapshot.getTable()));

checkpointBuilder.addLogEntry(metadataLogEntry);
checkpointBuilder.addLogEntry(protocolLogEntry);

// read remaining entries from checkpoint register them in writer
snapshot.getCheckpointTransactionLogEntries(
session,
ImmutableSet.of(PROTOCOL, TRANSACTION, ADD, REMOVE, COMMIT),
ImmutableSet.of(TRANSACTION, ADD, REMOVE, COMMIT),
checkpointSchemaManager,
typeManager,
fileSystem,
fileFormatDataSourceStats)
fileFormatDataSourceStats,
Optional.of(new MetadataAndProtocolEntry(metadataLogEntry.getMetaData(), protocolLogEntry.getProtocol())))
.forEach(checkpointBuilder::addLogEntry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,22 @@ public void testStatsWithMinMaxValuesAsNulls()
""");
}

/**
* @see deltalake.multipart_checkpoint
*/
@Test
public void testReadMultipartCheckpoint()
throws Exception
{
String tableName = "test_multipart_checkpoint_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/multipart_checkpoint").toURI()).toPath(), tableLocation);

assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
assertThat(query("DESCRIBE " + tableName)).projected("Column", "Type").skippingTypesCheck().matches("VALUES ('c', 'integer')");
assertThat(query("SELECT * FROM " + tableName)).matches("VALUES 1, 2, 3, 4, 5, 6, 7");
}

private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation)
throws IOException
{
Expand Down
Loading