Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -250,8 +249,6 @@
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.configurationForNewTable;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
import static io.trino.plugin.hive.TableType.MANAGED_TABLE;
Expand Down Expand Up @@ -531,29 +528,14 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa

String tableLocation = table.get().location();
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, Optional.empty());
Map<Class<?>, Object> logEntries;
MetadataEntry metadataEntry = null;
ProtocolEntry protocolEntry = null;
try {
logEntries = transactionLogAccess.getTransactionLogEntries(
session,
tableSnapshot,
ImmutableSet.of(METADATA, PROTOCOL),
entryStream -> entryStream
.filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null)
.map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol())));
metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session);
protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e);
}
throw e;
}
MetadataEntry metadataEntry = (MetadataEntry) logEntries.get(MetadataEntry.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are these checks being done now?

if (metadataEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
}
ProtocolEntry protocolEntry = (ProtocolEntry) logEntries.get(ProtocolEntry.class);
if (protocolEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable()));
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e);
}
if (protocolEntry.getMinReaderVersion() > MAX_READER_VERSION) {
LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.getMinReaderVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class TableSnapshot
private final boolean checkpointRowStatisticsWritingEnabled;
private final int domainCompactionThreshold;

private Optional<MetadataEntry> cachedMetadata = Optional.empty();
private Optional<MetadataEntry> cachedMetadata;
private Optional<ProtocolEntry> cachedProtocol;

private TableSnapshot(
SchemaTableName table,
Expand All @@ -71,7 +72,9 @@ private TableSnapshot(
String tableLocation,
ParquetReaderOptions parquetReaderOptions,
boolean checkpointRowStatisticsWritingEnabled,
int domainCompactionThreshold)
int domainCompactionThreshold,
Optional<MetadataEntry> cachedMetadata,
Optional<ProtocolEntry> cachedProtocol)
Comment on lines +76 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code changes i see that the metadata & protocol are always obtained from the logTail.
Why do we add then those two parameters to the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to get them from the log tail every time, but most of the time they'll be empty, so we need the cached version somehow.

{
this.table = requireNonNull(table, "table is null");
this.lastCheckpoint = requireNonNull(lastCheckpoint, "lastCheckpoint is null");
Expand All @@ -80,6 +83,8 @@ private TableSnapshot(
this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null");
this.checkpointRowStatisticsWritingEnabled = checkpointRowStatisticsWritingEnabled;
this.domainCompactionThreshold = domainCompactionThreshold;
this.cachedMetadata = cachedMetadata;
this.cachedProtocol = cachedProtocol;
}

public static TableSnapshot load(
Expand All @@ -102,7 +107,9 @@ public static TableSnapshot load(
tableLocation,
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold);
domainCompactionThreshold,
transactionLogTail.getMetadataEntry(),
transactionLogTail.getProtocolEntry());
}

public Optional<TableSnapshot> getUpdatedSnapshot(TrinoFileSystem fileSystem, Optional<Long> toVersion)
Expand Down Expand Up @@ -136,7 +143,9 @@ public Optional<TableSnapshot> getUpdatedSnapshot(TrinoFileSystem fileSystem, Op
tableLocation,
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold));
domainCompactionThreshold,
transactionLogTail.getMetadataEntry().or(() -> cachedMetadata),
transactionLogTail.getProtocolEntry().or(() -> cachedProtocol)));
Comment on lines +147 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't quite get what is happening here.
it feels like we may potentially end up down the road with unwanted state of the TableSnapshot.
I'm not comfortable with the setters for the "cached" metadata & protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log tail only contains metadata or protocol entries if needed; most of the time this will be an empty optional. The current Trino implementation is to read the transaction log back until we get the latest version; instead this PR brings in caching so that we do not have to re-read the transaction log, yet still get the updated versions when a new one appears in the log tail 👍

cc @jkylling to double-check if I misrepresented something 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To arrive at a snapshot of a Delta table we do one of:

  1. Read checkpoint + transactions tail since checkpoint commit.
  2. Use existing snapshot + transaction tail since snapshot version.

As we don't know what the snapshot will be used for, we don't eagerly load the data which is part of a snapshot, like the metadata entry, protocol entry, or add actions. However, almost every query will need the protocol entry and metadata entry. Almost all the time these entries are in the checkpoint, so we should remember these entries to avoid reading the checkpoint all the time.

To get the metadata entry for a snapshot we can do one of:

  1. Read metadata entry from checkpoint + metadata entries in transaction tail since checkpoint commit. Use the last metadata entry.
  2. Use metadata entry from existing snapshot + metadata entries in transaction tail since snapshot version. Use the last metadata entry.

Currently Trino does 1., while this PR does 2. The highlighted snippet does step 2:
It tries to use the last metadata entry in the tail, if it's present, and then uses the metadata entry from existing snapshot if there was no new entry in the tail.

}

public long getVersion()
Expand All @@ -154,6 +163,11 @@ public Optional<MetadataEntry> getCachedMetadata()
return cachedMetadata;
}

public Optional<ProtocolEntry> getCachedProtocol()
{
return cachedProtocol;
}

public String getTableLocation()
{
return tableLocation;
Expand All @@ -164,6 +178,11 @@ public void setCachedMetadata(Optional<MetadataEntry> cachedMetadata)
this.cachedMetadata = cachedMetadata;
}

public void setCachedProtocol(Optional<ProtocolEntry> cachedProtocol)
{
this.cachedProtocol = cachedProtocol;
}

public List<DeltaLakeTransactionLogEntry> getJsonTransactionLogEntries()
{
return logTail.getFileEntries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.function.Predicate;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Predicates.alwaysFalse;
import static com.google.common.base.Predicates.alwaysTrue;
import static com.google.common.base.Throwables.throwIfUnchecked;
Expand Down Expand Up @@ -212,21 +213,25 @@ public void invalidateCache(SchemaTableName schemaTableName, Optional<String> ta
public MetadataEntry getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSession session)
{
if (tableSnapshot.getCachedMetadata().isEmpty()) {
try (Stream<MetadataEntry> metadataEntries = getEntries(
tableSnapshot,
METADATA,
entryStream -> entryStream.map(DeltaLakeTransactionLogEntry::getMetaData).filter(Objects::nonNull),
session,
fileSystemFactory.create(session),
fileFormatDataSourceStats)) {
// Get last entry in the stream
tableSnapshot.setCachedMetadata(metadataEntries.reduce((first, second) -> second));
}
populateMetadataAndProtocol(tableSnapshot, session);
}
return tableSnapshot.getCachedMetadata()
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
}

private void populateMetadataAndProtocol(TableSnapshot tableSnapshot, ConnectorSession session)
{
Map<Class<?>, Object> logEntries = getTransactionLogEntries(
session,
tableSnapshot,
ImmutableSet.of(METADATA, PROTOCOL),
entryStream -> entryStream
.filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null)
.map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol())));
tableSnapshot.setCachedMetadata(Optional.ofNullable((MetadataEntry) logEntries.get(MetadataEntry.class)));
tableSnapshot.setCachedProtocol(Optional.ofNullable((ProtocolEntry) logEntries.get(ProtocolEntry.class)));
}

// Deprecated in favor of the namesake method which allows checkpoint filtering
// to be able to perform partition pruning and stats projection on the `add` entries
// from the checkpoint.
Expand Down Expand Up @@ -420,10 +425,11 @@ public Map<Class<?>, Object> getTransactionLogEntries(

public ProtocolEntry getProtocolEntry(ConnectorSession session, TableSnapshot tableSnapshot)
{
try (Stream<ProtocolEntry> protocolEntries = getProtocolEntries(tableSnapshot, session)) {
return protocolEntries.reduce((first, second) -> second)
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol entry not found in transaction log for table " + tableSnapshot.getTable()));
if (tableSnapshot.getCachedProtocol().isEmpty()) {
populateMetadataAndProtocol(tableSnapshot, session);
}
return tableSnapshot.getCachedProtocol()
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol entry not found in transaction log for table " + tableSnapshot.getTable()));
}

public Stream<ProtocolEntry> getProtocolEntries(TableSnapshot tableSnapshot, ConnectorSession session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.Transaction;

import java.io.BufferedReader;
Expand All @@ -27,6 +29,7 @@
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -44,10 +47,15 @@ public class TransactionLogTail
private final List<Transaction> entries;
private final long version;

private TransactionLogTail(List<Transaction> entries, long version)
private final Optional<MetadataEntry> metadataEntry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should this class know about metadataEntry & protocolEntry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not need to. The getMetadataEntry and getProtocolEntry methods below can be rewritten to get this on the fly from entries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Pluies should we change this to compute metadataEntry and protocolEntry on the fly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! On it 👍

private final Optional<ProtocolEntry> protocolEntry;

private TransactionLogTail(List<Transaction> entries, long version, Optional<MetadataEntry> metadataEntry, Optional<ProtocolEntry> protocolEntry)
{
this.entries = ImmutableList.copyOf(requireNonNull(entries, "entries is null"));
this.version = version;
this.metadataEntry = metadataEntry;
this.protocolEntry = protocolEntry;
}

public static TransactionLogTail loadNewTail(
Expand Down Expand Up @@ -75,12 +83,19 @@ public static TransactionLogTail loadNewTail(

String transactionLogDir = getTransactionLogDir(tableLocation);
Optional<List<DeltaLakeTransactionLogEntry>> results;
MetadataEntry metadataEntry = null;
ProtocolEntry protocolEntry = null;

boolean endOfTail = false;
while (!endOfTail) {
results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem);
if (results.isPresent()) {
entriesBuilder.add(new Transaction(entryNumber, results.get()));
// There is at most one metadata or protocol entry per file https://github.com/delta-io/delta/blob/d74cc6897730f4effb5d7272c21bd2554bdfacdb/PROTOCOL.md#delta-log-entries-1
metadataEntry = results.get().stream().map(DeltaLakeTransactionLogEntry::getMetaData)
.filter(Objects::nonNull).findAny().orElse(metadataEntry);
protocolEntry = results.get().stream().map(DeltaLakeTransactionLogEntry::getProtocol)
.filter(Objects::nonNull).findAny().orElse(protocolEntry);
version = entryNumber;
entryNumber++;
}
Expand All @@ -96,7 +111,7 @@ public static TransactionLogTail loadNewTail(
}
}

return new TransactionLogTail(entriesBuilder.build(), version);
return new TransactionLogTail(entriesBuilder.build(), version, Optional.ofNullable(metadataEntry), Optional.ofNullable(protocolEntry));
}

public Optional<TransactionLogTail> getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation, Optional<Long> endVersion)
Expand All @@ -112,7 +127,9 @@ public Optional<TransactionLogTail> getUpdatedTail(TrinoFileSystem fileSystem, S
.addAll(entries)
.addAll(newTail.entries)
.build(),
newTail.version));
newTail.version,
newTail.getMetadataEntry().or(() -> metadataEntry),
newTail.getProtocolEntry().or(() -> protocolEntry)));
}

public static Optional<List<DeltaLakeTransactionLogEntry>> getEntriesFromJson(long entryNumber, String transactionLogDir, TrinoFileSystem fileSystem)
Expand Down Expand Up @@ -152,6 +169,16 @@ public List<Transaction> getTransactions()
return entries;
}

public Optional<MetadataEntry> getMetadataEntry()
{
return metadataEntry;
}

public Optional<ProtocolEntry> getProtocolEntry()
{
return protocolEntry;
}

public long getVersion()
{
return version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,62 @@ protected DistributedQueryRunner createQueryRunner()
}
}

@Test
public void testCheckpointFileOperations()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the test in a preparatory commit so that the "gains" are easier visible where you add caching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair call 👍

{
assertUpdate("DROP TABLE IF EXISTS test_checkpoint_file_operations");
assertUpdate("CREATE TABLE test_checkpoint_file_operations(key varchar, data varchar) with (checkpoint_interval = 2, partitioned_by=ARRAY['key'])");
assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p1', '1-abc')", 1);
assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p2', '2-xyz')", 1);
assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_checkpoint_file_operations')");
trackingFileSystemFactory.reset();
assertFileSystemAccesses(
"SELECT * FROM test_checkpoint_file_operations",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2)
.addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4)
.addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1)
.build());
trackingFileSystemFactory.reset();
// reads of checkpoint and commits are cached
assertFileSystemAccessesNoMetadataCacheFlush(
getSession(),
"SELECT * FROM test_checkpoint_file_operations",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1)
.build());
assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p3', '3-xyz')", 1);
trackingFileSystemFactory.reset();
assertFileSystemAccessesNoMetadataCacheFlush(
getSession(),
"SELECT * FROM test_checkpoint_file_operations",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_NEW_STREAM), 1)
.build());
trackingFileSystemFactory.reset();
assertFileSystemAccessesNoMetadataCacheFlush(
getSession(),
"SELECT * FROM test_checkpoint_file_operations",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_NEW_STREAM), 1)
.build());
}

@Test
public void testCreateTableAsSelect()
{
Expand Down Expand Up @@ -775,7 +831,11 @@ private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<Fi
private void assertFileSystemAccesses(Session session, @Language("SQL") String query, Multiset<FileOperation> expectedAccesses)
{
assertUpdate("CALL system.flush_metadata_cache()");
assertFileSystemAccessesNoMetadataCacheFlush(session, query, expectedAccesses);
}

private void assertFileSystemAccessesNoMetadataCacheFlush(Session session, @Language("SQL") String query, Multiset<FileOperation> expectedAccesses)
{
trackingFileSystemFactory.reset();
getDistributedQueryRunner().executeWithQueryId(session, query);
assertMultisetsEqual(getOperations(), expectedAccesses);
Expand Down Expand Up @@ -809,6 +869,9 @@ public static FileOperation create(String path, OperationType operationType)
if (path.matches(".*/_delta_log/\\d+\\.json")) {
return new FileOperation(TRANSACTION_LOG_JSON, fileName, operationType);
}
if (path.matches(".*/_delta_log/\\d+\\.checkpoint.parquet")) {
return new FileOperation(CHECKPOINT, fileName, operationType);
}
if (path.matches(".*/_delta_log/_trino_meta/extended_stats.json")) {
return new FileOperation(TRINO_EXTENDED_STATS_JSON, fileName, operationType);
}
Expand Down