Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -250,8 +249,6 @@
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.configurationForNewTable;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
import static io.trino.plugin.hive.TableType.MANAGED_TABLE;
Expand Down Expand Up @@ -531,29 +528,14 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa

String tableLocation = table.get().location();
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, Optional.empty());
Map<Class<?>, Object> logEntries;
MetadataEntry metadataEntry = null;
ProtocolEntry protocolEntry = null;
try {
logEntries = transactionLogAccess.getTransactionLogEntries(
session,
tableSnapshot,
ImmutableSet.of(METADATA, PROTOCOL),
entryStream -> entryStream
.filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null)
.map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol())));
metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session);
protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, session);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e);
}
throw e;
}
MetadataEntry metadataEntry = (MetadataEntry) logEntries.get(MetadataEntry.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are these checks being done now?

if (metadataEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
}
ProtocolEntry protocolEntry = (ProtocolEntry) logEntries.get(ProtocolEntry.class);
if (protocolEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable()));
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e);
}
if (protocolEntry.getMinReaderVersion() > MAX_READER_VERSION) {
LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.getMinReaderVersion());
Expand Down Expand Up @@ -761,7 +743,7 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
String tableLocation = metastoreTable.get().location();
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation);
MetadataEntry metadata = transactionLogAccess.getMetadataEntry(snapshot, session);
ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(session, snapshot);
ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(snapshot, session);
Map<String, String> columnComments = getColumnComments(metadata);
Map<String, Boolean> columnsNullability = getColumnsNullability(metadata);
Map<String, String> columnGenerations = getGeneratedColumnExpressions(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
SchemaTableName baseTableName = new SchemaTableName(tableName.getSchemaName(), DeltaLakeTableName.tableNameFrom(tableName.getTableName()));
TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(session, baseTableName, tableLocation);
metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session);
protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, session);
}
catch (IOException e) {
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + tableLocation, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void doVacuum(
accessControl.checkCanDeleteFromTable(null, tableName);

TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), handle.getReadVersion());
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, session);
if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class TableSnapshot
private final boolean checkpointRowStatisticsWritingEnabled;
private final int domainCompactionThreshold;

private Optional<MetadataEntry> cachedMetadata = Optional.empty();
private Optional<MetadataEntry> cachedMetadata;
private Optional<ProtocolEntry> cachedProtocol;

private TableSnapshot(
SchemaTableName table,
Expand All @@ -71,7 +72,9 @@ private TableSnapshot(
String tableLocation,
ParquetReaderOptions parquetReaderOptions,
boolean checkpointRowStatisticsWritingEnabled,
int domainCompactionThreshold)
int domainCompactionThreshold,
Optional<MetadataEntry> cachedMetadata,
Optional<ProtocolEntry> cachedProtocol)
Comment on lines +76 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code changes i see that the metadata & protocol are always obtained from the logTail.
Why do we add then those two parameters to the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to get them from the log tail every time, but most of the time they'll be empty, so we need the cached version somehow.

{
this.table = requireNonNull(table, "table is null");
this.lastCheckpoint = requireNonNull(lastCheckpoint, "lastCheckpoint is null");
Expand All @@ -80,6 +83,8 @@ private TableSnapshot(
this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null");
this.checkpointRowStatisticsWritingEnabled = checkpointRowStatisticsWritingEnabled;
this.domainCompactionThreshold = domainCompactionThreshold;
this.cachedMetadata = cachedMetadata;
this.cachedProtocol = cachedProtocol;
}

public static TableSnapshot load(
Expand All @@ -102,7 +107,9 @@ public static TableSnapshot load(
tableLocation,
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold);
domainCompactionThreshold,
transactionLogTail.getMetadataEntry(),
transactionLogTail.getProtocolEntry());
}

public Optional<TableSnapshot> getUpdatedSnapshot(TrinoFileSystem fileSystem, Optional<Long> toVersion)
Expand Down Expand Up @@ -136,7 +143,9 @@ public Optional<TableSnapshot> getUpdatedSnapshot(TrinoFileSystem fileSystem, Op
tableLocation,
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold));
domainCompactionThreshold,
transactionLogTail.getMetadataEntry().or(() -> cachedMetadata),
transactionLogTail.getProtocolEntry().or(() -> cachedProtocol)));
Comment on lines +147 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't quite get what is happening here.
it feels like we may potentially end up down the road with unwanted state of the TableSnapshot.
I'm not comfortable with the setters for the "cached" metadata & protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log tail only contains metadata or protocol entries if needed; most of the time this will be an empty optional. The current Trino implementation is to read the transaction log back until we get the latest version; instead this PR brings in caching so that we do not have to re-read the transaction log, yet still get the updated versions when a new one appears in the log tail 👍

cc @jkylling to double-check if I misrepresented something 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To arrive at a snapshot of a Delta table we do one of:

  1. Read checkpoint + transactions tail since checkpoint commit.
  2. Use existing snapshot + transaction tail since snapshot version.

As we don't know what the snapshot will be used for, we don't eagerly load the data which is part of a snapshot, like the metadata entry, protocol entry, or add actions. However, almost every query will need the protocol entry and metadata entry. Almost all the time these entries are in the checkpoint, so we should remember these entries to avoid reading the checkpoint all the time.

To get the metadata entry for a snapshot we can do one of:

  1. Read metadata entry from checkpoint + metadata entries in transaction tail since checkpoint commit. Use the last metadata entry.
  2. Use metadata entry from existing snapshot + metadata entries in transaction tail since snapshot version. Use the last metadata entry.

Currently Trino does 1., while this PR does 2. The highlighted snippet does step 2:
It tries to use the last metadata entry in the tail, if it's present, and then uses the metadata entry from existing snapshot if there was no new entry in the tail.

}

public long getVersion()
Expand All @@ -154,6 +163,11 @@ public Optional<MetadataEntry> getCachedMetadata()
return cachedMetadata;
}

public Optional<ProtocolEntry> getCachedProtocol()
{
return cachedProtocol;
}

public String getTableLocation()
{
return tableLocation;
Expand All @@ -164,6 +178,11 @@ public void setCachedMetadata(Optional<MetadataEntry> cachedMetadata)
this.cachedMetadata = cachedMetadata;
}

public void setCachedProtocol(Optional<ProtocolEntry> cachedProtocol)
{
this.cachedProtocol = cachedProtocol;
}

public List<DeltaLakeTransactionLogEntry> getJsonTransactionLogEntries()
{
return logTail.getFileEntries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.function.Predicate;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Predicates.alwaysFalse;
import static com.google.common.base.Predicates.alwaysTrue;
import static com.google.common.base.Throwables.throwIfUnchecked;
Expand Down Expand Up @@ -212,21 +213,25 @@ public void invalidateCache(SchemaTableName schemaTableName, Optional<String> ta
public MetadataEntry getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSession session)
{
if (tableSnapshot.getCachedMetadata().isEmpty()) {
try (Stream<MetadataEntry> metadataEntries = getEntries(
tableSnapshot,
METADATA,
entryStream -> entryStream.map(DeltaLakeTransactionLogEntry::getMetaData).filter(Objects::nonNull),
session,
fileSystemFactory.create(session),
fileFormatDataSourceStats)) {
// Get last entry in the stream
tableSnapshot.setCachedMetadata(metadataEntries.reduce((first, second) -> second));
}
populateMetadataAndProtocol(tableSnapshot, session);
}
return tableSnapshot.getCachedMetadata()
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
}

private void populateMetadataAndProtocol(TableSnapshot tableSnapshot, ConnectorSession session)
{
Map<Class<?>, Object> logEntries = getTransactionLogEntries(
session,
tableSnapshot,
ImmutableSet.of(METADATA, PROTOCOL),
entryStream -> entryStream
.filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null)
.map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol())));
tableSnapshot.setCachedMetadata(Optional.ofNullable((MetadataEntry) logEntries.get(MetadataEntry.class)));
tableSnapshot.setCachedProtocol(Optional.ofNullable((ProtocolEntry) logEntries.get(ProtocolEntry.class)));
}

// Deprecated in favor of the namesake method which allows checkpoint filtering
// to be able to perform partition pruning and stats projection on the `add` entries
// from the checkpoint.
Expand Down Expand Up @@ -418,12 +423,13 @@ public Map<Class<?>, Object> getTransactionLogEntries(
}
}

public ProtocolEntry getProtocolEntry(ConnectorSession session, TableSnapshot tableSnapshot)
public ProtocolEntry getProtocolEntry(TableSnapshot tableSnapshot, ConnectorSession session)
{
try (Stream<ProtocolEntry> protocolEntries = getProtocolEntries(tableSnapshot, session)) {
return protocolEntries.reduce((first, second) -> second)
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol entry not found in transaction log for table " + tableSnapshot.getTable()));
if (tableSnapshot.getCachedProtocol().isEmpty()) {
populateMetadataAndProtocol(tableSnapshot, session);
}
return tableSnapshot.getCachedProtocol()
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol entry not found in transaction log for table " + tableSnapshot.getTable()));
}

public Stream<ProtocolEntry> getProtocolEntries(TableSnapshot tableSnapshot, ConnectorSession session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.Transaction;

import java.io.BufferedReader;
Expand All @@ -27,6 +29,7 @@
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -44,10 +47,15 @@ public class TransactionLogTail
private final List<Transaction> entries;
private final long version;

private TransactionLogTail(List<Transaction> entries, long version)
private final Optional<MetadataEntry> metadataEntry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should this class know about metadataEntry & protocolEntry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not need to. The getMetadataEntry and getProtocolEntry methods below can be rewritten to get this on the fly from entries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Pluies should we change this to compute metadataEntry and protocolEntry on the fly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! On it 👍

private final Optional<ProtocolEntry> protocolEntry;

private TransactionLogTail(List<Transaction> entries, long version, Optional<MetadataEntry> metadataEntry, Optional<ProtocolEntry> protocolEntry)
{
this.entries = ImmutableList.copyOf(requireNonNull(entries, "entries is null"));
this.version = version;
this.metadataEntry = metadataEntry;
this.protocolEntry = protocolEntry;
}

public static TransactionLogTail loadNewTail(
Expand Down Expand Up @@ -75,12 +83,19 @@ public static TransactionLogTail loadNewTail(

String transactionLogDir = getTransactionLogDir(tableLocation);
Optional<List<DeltaLakeTransactionLogEntry>> results;
MetadataEntry metadataEntry = null;
ProtocolEntry protocolEntry = null;

boolean endOfTail = false;
while (!endOfTail) {
results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem);
if (results.isPresent()) {
entriesBuilder.add(new Transaction(entryNumber, results.get()));
// There is at most one metadata or protocol entry per file https://github.com/delta-io/delta/blob/d74cc6897730f4effb5d7272c21bd2554bdfacdb/PROTOCOL.md#delta-log-entries-1
metadataEntry = results.get().stream().map(DeltaLakeTransactionLogEntry::getMetaData)
.filter(Objects::nonNull).findAny().orElse(metadataEntry);
protocolEntry = results.get().stream().map(DeltaLakeTransactionLogEntry::getProtocol)
.filter(Objects::nonNull).findAny().orElse(protocolEntry);
version = entryNumber;
entryNumber++;
}
Expand All @@ -96,7 +111,7 @@ public static TransactionLogTail loadNewTail(
}
}

return new TransactionLogTail(entriesBuilder.build(), version);
return new TransactionLogTail(entriesBuilder.build(), version, Optional.ofNullable(metadataEntry), Optional.ofNullable(protocolEntry));
}

public Optional<TransactionLogTail> getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation, Optional<Long> endVersion)
Expand All @@ -112,7 +127,9 @@ public Optional<TransactionLogTail> getUpdatedTail(TrinoFileSystem fileSystem, S
.addAll(entries)
.addAll(newTail.entries)
.build(),
newTail.version));
newTail.version,
newTail.getMetadataEntry().or(() -> metadataEntry),
newTail.getProtocolEntry().or(() -> protocolEntry)));
}

public static Optional<List<DeltaLakeTransactionLogEntry>> getEntriesFromJson(long entryNumber, String transactionLogDir, TrinoFileSystem fileSystem)
Expand Down Expand Up @@ -152,6 +169,16 @@ public List<Transaction> getTransactions()
return entries;
}

public Optional<MetadataEntry> getMetadataEntry()
{
return metadataEntry;
}

public Optional<ProtocolEntry> getProtocolEntry()
{
return protocolEntry;
}

public long getVersion()
{
return version;
Expand Down
Loading