diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 19b5959b5380..434b5f08a4d7 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -163,7 +163,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -250,8 +249,6 @@ import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.configurationForNewTable; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; -import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; -import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; @@ -531,29 +528,14 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa String tableLocation = table.get().location(); TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, Optional.empty()); - Map, Object> logEntries; + MetadataEntry metadataEntry = null; + ProtocolEntry protocolEntry = null; try { - logEntries = transactionLogAccess.getTransactionLogEntries( - session, - tableSnapshot, - ImmutableSet.of(METADATA, PROTOCOL), - entryStream -> entryStream - .filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null) - .map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol()))); + metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session); + protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, session); } catch (TrinoException e) { - if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) { - return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e); - } - throw e; - } - MetadataEntry metadataEntry = (MetadataEntry) logEntries.get(MetadataEntry.class); - if (metadataEntry == null) { - return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable())); - } - ProtocolEntry protocolEntry = (ProtocolEntry) logEntries.get(ProtocolEntry.class); - if (protocolEntry == null) { - return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable())); + return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e); } if (protocolEntry.getMinReaderVersion() > MAX_READER_VERSION) { LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.getMinReaderVersion()); @@ -761,7 +743,7 @@ public Iterator streamTableColumns(ConnectorSession sessio String tableLocation = metastoreTable.get().location(); TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation); MetadataEntry metadata = transactionLogAccess.getMetadataEntry(snapshot, session); - ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(session, snapshot); + ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(snapshot, session); Map columnComments = getColumnComments(metadata); Map columnsNullability = getColumnsNullability(metadata); Map columnGenerations = getGeneratedColumnExpressions(metadata); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePropertiesTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePropertiesTable.java index 202b9efffb71..cd9aadd4914a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePropertiesTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePropertiesTable.java @@ -85,7 +85,7 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand SchemaTableName baseTableName = new SchemaTableName(tableName.getSchemaName(), DeltaLakeTableName.tableNameFrom(tableName.getTableName())); TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(session, baseTableName, tableLocation); metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session); - protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot); + protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, session); } catch (IOException e) { throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + tableLocation, e); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index c356d45976d3..af9fac0e1393 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -180,7 +180,7 @@ private void doVacuum( accessControl.checkCanDeleteFromTable(null, tableName); TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), handle.getReadVersion()); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, session); if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) { throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion())); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index 831bc6f588da..9ce46cc6b2bb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -62,7 +62,8 @@ public class TableSnapshot private final boolean checkpointRowStatisticsWritingEnabled; private final int domainCompactionThreshold; - private Optional cachedMetadata = Optional.empty(); + private Optional cachedMetadata; + private Optional cachedProtocol; private TableSnapshot( SchemaTableName table, @@ -71,7 +72,9 @@ private TableSnapshot( String tableLocation, ParquetReaderOptions parquetReaderOptions, boolean checkpointRowStatisticsWritingEnabled, - int domainCompactionThreshold) + int domainCompactionThreshold, + Optional cachedMetadata, + Optional cachedProtocol) { this.table = requireNonNull(table, "table is null"); this.lastCheckpoint = requireNonNull(lastCheckpoint, "lastCheckpoint is null"); @@ -80,6 +83,8 @@ private TableSnapshot( this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null"); this.checkpointRowStatisticsWritingEnabled = checkpointRowStatisticsWritingEnabled; this.domainCompactionThreshold = domainCompactionThreshold; + this.cachedMetadata = cachedMetadata; + this.cachedProtocol = cachedProtocol; } public static TableSnapshot load( @@ -102,7 +107,9 @@ public static TableSnapshot load( tableLocation, parquetReaderOptions, checkpointRowStatisticsWritingEnabled, - domainCompactionThreshold); + domainCompactionThreshold, + transactionLogTail.getMetadataEntry(), + transactionLogTail.getProtocolEntry()); } public Optional getUpdatedSnapshot(TrinoFileSystem fileSystem, Optional toVersion) @@ -136,7 +143,9 @@ public Optional getUpdatedSnapshot(TrinoFileSystem fileSystem, Op tableLocation, parquetReaderOptions, checkpointRowStatisticsWritingEnabled, - domainCompactionThreshold)); + domainCompactionThreshold, + transactionLogTail.getMetadataEntry().or(() -> cachedMetadata), + transactionLogTail.getProtocolEntry().or(() -> cachedProtocol))); } public long getVersion() @@ -154,6 +163,11 @@ public Optional getCachedMetadata() return cachedMetadata; } + public Optional getCachedProtocol() + { + return cachedProtocol; + } + public String getTableLocation() { return tableLocation; @@ -164,6 +178,11 @@ public void setCachedMetadata(Optional cachedMetadata) this.cachedMetadata = cachedMetadata; } + public void setCachedProtocol(Optional cachedProtocol) + { + this.cachedProtocol = cachedProtocol; + } + public List getJsonTransactionLogEntries() { return logTail.getFileEntries(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 47b90eba6e20..589f7526c7b0 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -71,6 +71,7 @@ import java.util.function.Predicate; import java.util.stream.Stream; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Predicates.alwaysFalse; import static com.google.common.base.Predicates.alwaysTrue; import static com.google.common.base.Throwables.throwIfUnchecked; @@ -212,21 +213,25 @@ public void invalidateCache(SchemaTableName schemaTableName, Optional ta public MetadataEntry getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSession session) { if (tableSnapshot.getCachedMetadata().isEmpty()) { - try (Stream metadataEntries = getEntries( - tableSnapshot, - METADATA, - entryStream -> entryStream.map(DeltaLakeTransactionLogEntry::getMetaData).filter(Objects::nonNull), - session, - fileSystemFactory.create(session), - fileFormatDataSourceStats)) { - // Get last entry in the stream - tableSnapshot.setCachedMetadata(metadataEntries.reduce((first, second) -> second)); - } + populateMetadataAndProtocol(tableSnapshot, session); } return tableSnapshot.getCachedMetadata() .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable())); } + private void populateMetadataAndProtocol(TableSnapshot tableSnapshot, ConnectorSession session) + { + Map, Object> logEntries = getTransactionLogEntries( + session, + tableSnapshot, + ImmutableSet.of(METADATA, PROTOCOL), + entryStream -> entryStream + .filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null) + .map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol()))); + tableSnapshot.setCachedMetadata(Optional.ofNullable((MetadataEntry) logEntries.get(MetadataEntry.class))); + tableSnapshot.setCachedProtocol(Optional.ofNullable((ProtocolEntry) logEntries.get(ProtocolEntry.class))); + } + // Deprecated in favor of the namesake method which allows checkpoint filtering // to be able to perform partition pruning and stats projection on the `add` entries // from the checkpoint. @@ -418,12 +423,13 @@ public Map, Object> getTransactionLogEntries( } } - public ProtocolEntry getProtocolEntry(ConnectorSession session, TableSnapshot tableSnapshot) + public ProtocolEntry getProtocolEntry(TableSnapshot tableSnapshot, ConnectorSession session) { - try (Stream protocolEntries = getProtocolEntries(tableSnapshot, session)) { - return protocolEntries.reduce((first, second) -> second) - .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol entry not found in transaction log for table " + tableSnapshot.getTable())); + if (tableSnapshot.getCachedProtocol().isEmpty()) { + populateMetadataAndProtocol(tableSnapshot, session); } + return tableSnapshot.getCachedProtocol() + .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol entry not found in transaction log for table " + tableSnapshot.getTable())); } public Stream getProtocolEntries(TableSnapshot tableSnapshot, ConnectorSession session) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java index 2a7994a3de28..8da4186394e5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java @@ -18,7 +18,9 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; +import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.Transaction; import java.io.BufferedReader; @@ -27,6 +29,7 @@ import java.io.InputStreamReader; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; @@ -44,10 +47,15 @@ public class TransactionLogTail private final List entries; private final long version; - private TransactionLogTail(List entries, long version) + private final Optional metadataEntry; + private final Optional protocolEntry; + + private TransactionLogTail(List entries, long version, Optional metadataEntry, Optional protocolEntry) { this.entries = ImmutableList.copyOf(requireNonNull(entries, "entries is null")); this.version = version; + this.metadataEntry = metadataEntry; + this.protocolEntry = protocolEntry; } public static TransactionLogTail loadNewTail( @@ -75,12 +83,19 @@ public static TransactionLogTail loadNewTail( String transactionLogDir = getTransactionLogDir(tableLocation); Optional> results; + MetadataEntry metadataEntry = null; + ProtocolEntry protocolEntry = null; boolean endOfTail = false; while (!endOfTail) { results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem); if (results.isPresent()) { entriesBuilder.add(new Transaction(entryNumber, results.get())); + // There is at most one metadata or protocol entry per file https://github.com/delta-io/delta/blob/d74cc6897730f4effb5d7272c21bd2554bdfacdb/PROTOCOL.md#delta-log-entries-1 + metadataEntry = results.get().stream().map(DeltaLakeTransactionLogEntry::getMetaData) + .filter(Objects::nonNull).findAny().orElse(metadataEntry); + protocolEntry = results.get().stream().map(DeltaLakeTransactionLogEntry::getProtocol) + .filter(Objects::nonNull).findAny().orElse(protocolEntry); version = entryNumber; entryNumber++; } @@ -96,7 +111,7 @@ public static TransactionLogTail loadNewTail( } } - return new TransactionLogTail(entriesBuilder.build(), version); + return new TransactionLogTail(entriesBuilder.build(), version, Optional.ofNullable(metadataEntry), Optional.ofNullable(protocolEntry)); } public Optional getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation, Optional endVersion) @@ -112,7 +127,9 @@ public Optional getUpdatedTail(TrinoFileSystem fileSystem, S .addAll(entries) .addAll(newTail.entries) .build(), - newTail.version)); + newTail.version, + newTail.getMetadataEntry().or(() -> metadataEntry), + newTail.getProtocolEntry().or(() -> protocolEntry))); } public static Optional> getEntriesFromJson(long entryNumber, String transactionLogDir, TrinoFileSystem fileSystem) @@ -152,6 +169,16 @@ public List getTransactions() return entries; } + public Optional getMetadataEntry() + { + return metadataEntry; + } + + public Optional getProtocolEntry() + { + return protocolEntry; + } + public long getVersion() { return version; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index bf2467effc31..278e1792bdf7 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -113,6 +113,62 @@ protected DistributedQueryRunner createQueryRunner() } } + @Test + public void testCheckpointFileOperations() + { + assertUpdate("DROP TABLE IF EXISTS test_checkpoint_file_operations"); + assertUpdate("CREATE TABLE test_checkpoint_file_operations(key varchar, data varchar) with (checkpoint_interval = 2, partitioned_by=ARRAY['key'])"); + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p1', '1-abc')", 1); + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p2', '2-xyz')", 1); + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_checkpoint_file_operations')"); + trackingFileSystemFactory.reset(); + assertFileSystemAccesses( + "SELECT * FROM test_checkpoint_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) + .build()); + trackingFileSystemFactory.reset(); + // reads of checkpoint and commits are cached + assertFileSystemAccessesNoMetadataCacheFlush( + getSession(), + "SELECT * FROM test_checkpoint_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) + .build()); + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p3', '3-xyz')", 1); + trackingFileSystemFactory.reset(); + assertFileSystemAccessesNoMetadataCacheFlush( + getSession(), + "SELECT * FROM test_checkpoint_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_NEW_STREAM), 1) + .build()); + trackingFileSystemFactory.reset(); + assertFileSystemAccessesNoMetadataCacheFlush( + getSession(), + "SELECT * FROM test_checkpoint_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_NEW_STREAM), 1) + .build()); + } + @Test public void testCreateTableAsSelect() { @@ -775,7 +831,11 @@ private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedAccesses) { assertUpdate("CALL system.flush_metadata_cache()"); + assertFileSystemAccessesNoMetadataCacheFlush(session, query, expectedAccesses); + } + private void assertFileSystemAccessesNoMetadataCacheFlush(Session session, @Language("SQL") String query, Multiset expectedAccesses) + { trackingFileSystemFactory.reset(); getDistributedQueryRunner().executeWithQueryId(session, query); assertMultisetsEqual(getOperations(), expectedAccesses); @@ -809,6 +869,9 @@ public static FileOperation create(String path, OperationType operationType) if (path.matches(".*/_delta_log/\\d+\\.json")) { return new FileOperation(TRANSACTION_LOG_JSON, fileName, operationType); } + if (path.matches(".*/_delta_log/\\d+\\.checkpoint.parquet")) { + return new FileOperation(CHECKPOINT, fileName, operationType); + } if (path.matches(".*/_delta_log/_trino_meta/extended_stats.json")) { return new FileOperation(TRINO_EXTENDED_STATS_JSON, fileName, operationType); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index eb8d720b74d4..a079855ed364 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -202,7 +202,7 @@ public void testGetActiveAddEntries() setupTransactionLogAccessFromResources("person", "databricks73/person"); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); List addFileEntries; try (Stream addFileEntriesStream = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { addFileEntries = addFileEntriesStream.collect(toImmutableList()); @@ -248,7 +248,7 @@ public void testAddFileEntryUppercase() setupTransactionLogAccessFromResources("uppercase_columns", "databricks73/uppercase_columns"); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); try (Stream addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { AddFileEntry addFileEntry = addFileEntries @@ -273,7 +273,7 @@ public void testAddEntryPruning() // - Added in a JSON commit and removed in a later JSON commit setupTransactionLogAccessFromResources("person_test_pruning", "databricks73/person_test_pruning"); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); try (Stream addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { Set paths = addFileEntries .map(AddFileEntry::getPath) @@ -290,7 +290,7 @@ public void testAddEntryOverrides() { setupTransactionLogAccessFromResources("person_test_pruning", "databricks73/person_test_pruning"); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); List addFileEntries; try (Stream addFileEntryStream = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { addFileEntries = addFileEntryStream.collect(toImmutableList()); @@ -316,7 +316,7 @@ public void testAddRemoveAdd() { setupTransactionLogAccessFromResources("person_test_pruning", "databricks73/person_test_pruning"); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); try (Stream addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { // Test data contains an entry added by the parquet checkpoint, removed by a JSON action, and then added back by a later JSON action List activeEntries = addFileEntries @@ -400,7 +400,7 @@ private void testAllGetActiveAddEntries(String tableName, String resourcePath) { setupTransactionLogAccessFromResources(tableName, resourcePath); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); try (Stream addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { Set paths = addFileEntries .map(AddFileEntry::getPath) @@ -497,7 +497,7 @@ public void testUpdatingTailEntriesNoCheckpoint() copyTransactionLogEntry(0, 7, resourceDir, transactionLogDir); setupTransactionLogAccess(tableName, tableDir.toURI().toString()); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); try (Stream activeDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { Set dataFiles = ImmutableSet.of( "age=42/part-00000-b82d8859-84a0-4f05-872c-206b07dd54f0.c000.snappy.parquet", @@ -541,7 +541,7 @@ public void testLoadingTailEntriesPastCheckpoint() setupTransactionLogAccess(tableName, tableDir.toURI().toString()); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); try (Stream activeDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { Set dataFiles = ImmutableSet.of( "age=21/part-00000-3d546786-bedc-407f-b9f7-e97aa12cce0f.c000.snappy.parquet", @@ -590,7 +590,7 @@ public void testIncrementalCacheUpdates() copyTransactionLogEntry(0, 12, resourceDir, transactionLogDir); Files.copy(new File(resourceDir, LAST_CHECKPOINT_FILENAME).toPath(), new File(transactionLogDir, LAST_CHECKPOINT_FILENAME).toPath()); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); Set originalDataFiles = ImmutableSet.of( "age=42/part-00000-b26c891a-7288-4d96-9d3b-bef648f12a34.c000.snappy.parquet", @@ -654,7 +654,7 @@ public void testSnapshotsAreConsistent() setupTransactionLogAccess(tableName, tableDir.toURI().toString()); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); List expectedDataFiles; try (Stream addFileEntryStream = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { expectedDataFiles = addFileEntryStream.collect(toImmutableList()); @@ -679,7 +679,7 @@ public void testSnapshotsAreConsistent() } assertThat(expectedDataFiles.size()).isEqualTo(dataFilesWithFixedVersion.size()); - List columns = extractColumnMetadata(transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION), transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot), TESTING_TYPE_MANAGER); + List columns = extractColumnMetadata(transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION), transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION), TESTING_TYPE_MANAGER); for (int i = 0; i < expectedDataFiles.size(); i++) { AddFileEntry expected = expectedDataFiles.get(i); AddFileEntry actual = dataFilesWithFixedVersion.get(i); @@ -741,7 +741,7 @@ public void testParquetStructStatistics() setupTransactionLogAccess(tableName, getClass().getClassLoader().getResource("databricks73/pruning/" + tableName).toURI().toString()); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); List addFileEntries; try (Stream addFileEntryStream = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION)) { addFileEntries = addFileEntryStream.collect(toImmutableList()); @@ -824,7 +824,7 @@ public void testTableSnapshotsActiveDataFilesCache() DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig(); shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(10, TimeUnit.MINUTES)); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); assertFileSystemAccesses( () -> { @@ -864,7 +864,7 @@ public void testFlushSnapshotAndActiveFileCache() DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig(); shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(10, TimeUnit.MINUTES)); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); assertFileSystemAccesses( () -> { @@ -914,7 +914,7 @@ public void testTableSnapshotsActiveDataFilesCacheDisabled() DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig(); shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(0, TimeUnit.SECONDS)); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); assertFileSystemAccesses( () -> { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java index e2c1b5ba7bd6..3cd627b7048c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java @@ -57,7 +57,7 @@ public static List getTableActiveFiles(TransactionLogAccess transa TableSnapshot snapshot = transactionLogAccess.loadSnapshot(SESSION, dummyTable, tableLocation); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(snapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, snapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(snapshot, SESSION); try (Stream addFileEntries = transactionLogAccess.getActiveFiles(snapshot, metadataEntry, protocolEntry, SESSION)) { return addFileEntries.collect(toImmutableList()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index dbc19f558368..4f77dc792c9f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -140,7 +140,7 @@ public void readsCheckpointFile() trackingFileSystemFactory, new ParquetReaderConfig()); MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(tableSnapshot, SESSION); tableSnapshot.setCachedMetadata(Optional.of(metadataEntry)); try (Stream stream = tableSnapshot.getCheckpointTransactionLogEntries( SESSION,