diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java index f33fb4e84a0..387d1f0d088 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java @@ -14,12 +14,13 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.Transaction; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries; import io.trino.plugin.deltalake.util.PageListBuilder; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -144,7 +145,7 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); try { List transactions = loadNewTailBackward(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive.get()).reversed(); - return new FixedPageSource(buildPages(session, pagesBuilder, transactions)); + return new FixedPageSource(buildPages(session, pagesBuilder, transactions, fileSystem)); } catch (TrinoException e) { throw e; @@ -170,7 +171,7 @@ private static List loadNewTailBackward( boolean endOfHead = false; while (!endOfHead) { - Optional> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem); + Optional results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.of(0, DataSize.Unit.BYTE)); if (results.isPresent()) { transactionsBuilder.add(new Transaction(version, results.get())); version = entryNumber; @@ -187,5 +188,6 @@ private static List loadNewTailBackward( return transactionsBuilder.build(); } - protected abstract List buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List transactions); + protected abstract List buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List transactions, TrinoFileSystem fileSystem) + throws IOException; } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCommitSummary.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCommitSummary.java index 30d8423c9bc..fddc6afac65 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCommitSummary.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCommitSummary.java @@ -15,15 +15,19 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; +import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues; import static java.util.Objects.requireNonNull; @@ -38,7 +42,7 @@ public class DeltaLakeCommitSummary private final Set>> addedFilesCanonicalPartitionValues; private final Optional isBlindAppend; - public DeltaLakeCommitSummary(long version, List transactionLogEntries) + public DeltaLakeCommitSummary(long version, TransactionLogEntries transactionLogEntries, TrinoFileSystem fileSystem) { requireNonNull(transactionLogEntries, "transactionLogEntries is null"); ImmutableList.Builder metadataUpdatesBuilder = ImmutableList.builder(); @@ -48,26 +52,29 @@ public DeltaLakeCommitSummary(long version, List t ImmutableSet.Builder>> removedFilesCanonicalPartitionValuesBuilder = ImmutableSet.builder(); boolean containsRemoveFileWithoutPartitionValues = false; - for (DeltaLakeTransactionLogEntry transactionLogEntry : transactionLogEntries) { - if (transactionLogEntry.getMetaData() != null) { - metadataUpdatesBuilder.add(transactionLogEntry.getMetaData()); - } - else if (transactionLogEntry.getProtocol() != null) { - optionalProtocol = Optional.of(transactionLogEntry.getProtocol()); - } - else if (transactionLogEntry.getCommitInfo() != null) { - optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo()); - } - else if (transactionLogEntry.getAdd() != null) { - addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues()); - } - else if (transactionLogEntry.getRemove() != null) { - Map partitionValues = transactionLogEntry.getRemove().partitionValues(); - if (partitionValues == null) { - containsRemoveFileWithoutPartitionValues = true; + try (Stream logEntryStream = transactionLogEntries.getEntries(fileSystem)) { + for (Iterator it = logEntryStream.iterator(); it.hasNext(); ) { + DeltaLakeTransactionLogEntry transactionLogEntry = it.next(); + if (transactionLogEntry.getMetaData() != null) { + metadataUpdatesBuilder.add(transactionLogEntry.getMetaData()); + } + else if (transactionLogEntry.getProtocol() != null) { + optionalProtocol = Optional.of(transactionLogEntry.getProtocol()); + } + else if (transactionLogEntry.getCommitInfo() != null) { + optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo()); + } + else if (transactionLogEntry.getAdd() != null) { + addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues()); } - else { - removedFilesCanonicalPartitionValuesBuilder.add(canonicalizePartitionValues(partitionValues)); + else if (transactionLogEntry.getRemove() != null) { + Map partitionValues = transactionLogEntry.getRemove().partitionValues(); + if (partitionValues == null) { + containsRemoveFileWithoutPartitionValues = true; + } + else { + removedFilesCanonicalPartitionValuesBuilder.add(canonicalizePartitionValues(partitionValues)); + } } } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 870703998c4..e02a6f5d8cc 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -50,6 +50,7 @@ public class DeltaLakeConfig { public static final String EXTENDED_STATISTICS_ENABLED = "delta.extended-statistics.enabled"; public static final String VACUUM_MIN_RETENTION = "delta.vacuum.min-retention"; + public static final DataSize DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE = DataSize.of(16, MEGABYTE); // Runtime.getRuntime().maxMemory() is not 100% stable and may return slightly different value over JVM lifetime. We use // constant so default configuration for cache size is stable. @@ -60,6 +61,7 @@ public class DeltaLakeConfig private Duration metadataCacheTtl = new Duration(30, TimeUnit.MINUTES); private DataSize metadataCacheMaxRetainedSize = DEFAULT_METADATA_CACHE_MAX_RETAINED_SIZE; + private DataSize transactionLogMaxCachedFileSize = DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE; private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE; private Duration dataFileCacheTtl = new Duration(30, TimeUnit.MINUTES); private int domainCompactionThreshold = 1000; @@ -121,6 +123,19 @@ public DeltaLakeConfig setMetadataCacheMaxRetainedSize(DataSize metadataCacheMax return this; } + public DataSize getTransactionLogMaxCachedFileSize() + { + return transactionLogMaxCachedFileSize; + } + + @Config("delta.transaction-log.max-cached-file-size") + @ConfigDescription("Maximum size of delta transaction log file that will be cached in memory") + public DeltaLakeConfig setTransactionLogMaxCachedFileSize(DataSize transactionLogMaxCachedFileSize) + { + this.transactionLogMaxCachedFileSize = transactionLogMaxCachedFileSize; + return this; + } + public DataSize getDataFileCacheSize() { return dataFileCacheSize; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java index 3c5050dc1bf..7f048eb4d1c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Objects; +import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.spi.type.BigintType.BIGINT; @@ -74,13 +76,15 @@ public DeltaLakeHistoryTable( } @Override - protected List buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List transactions) + protected List buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List transactions, TrinoFileSystem fileSystem) { - List commitInfoEntries = transactions.stream() - .flatMap(transaction -> transaction.transactionEntries().stream()) + List commitInfoEntries; + try (Stream commitStream = transactions.stream() + .flatMap(transaction -> transaction.transactionEntries().getEntries(fileSystem)) .map(DeltaLakeTransactionLogEntry::getCommitInfo) - .filter(Objects::nonNull) - .collect(toImmutableList()); + .filter(Objects::nonNull)) { + commitInfoEntries = commitStream.collect(toImmutableList()); + } TimeZoneKey timeZoneKey = session.getTimeZoneKey(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 0fd44423d42..658ecec123c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -69,13 +69,14 @@ import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException; -import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager; +import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics; import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException; @@ -197,6 +198,7 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Sets.difference; import static com.google.common.primitives.Ints.max; +import static io.airlift.units.DataSize.Unit.BYTE; import static io.trino.filesystem.Locations.appendPath; import static io.trino.filesystem.Locations.areDirectoryLocationsEquivalent; import static io.trino.hive.formats.HiveClassNames.HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS; @@ -290,8 +292,6 @@ import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; -import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; -import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; @@ -630,15 +630,9 @@ public LocatedTableHandle getTableHandle( TrinoFileSystem fileSystem = fileSystemFactory.create(session); TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersion(fileSystem, tableLocation, version))); - Map, Object> logEntries; + MetadataAndProtocolEntries logEntries; try { - logEntries = transactionLogAccess.getTransactionLogEntries( - session, - tableSnapshot, - ImmutableSet.of(METADATA, PROTOCOL), - entryStream -> entryStream - .filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null) - .map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol()))); + logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, tableSnapshot); } catch (TrinoException e) { if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) { @@ -646,11 +640,11 @@ public LocatedTableHandle getTableHandle( } throw e; } - MetadataEntry metadataEntry = (MetadataEntry) logEntries.get(MetadataEntry.class); + MetadataEntry metadataEntry = logEntries.metadata().orElse(null); if (metadataEntry == null) { return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable())); } - ProtocolEntry protocolEntry = (ProtocolEntry) logEntries.get(ProtocolEntry.class); + ProtocolEntry protocolEntry = logEntries.protocol().orElse(null); if (protocolEntry == null) { return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable())); } @@ -2267,16 +2261,16 @@ private void checkForConcurrentTransactionConflicts( if (currentVersion > readVersionValue) { String transactionLogDirectory = getTransactionLogDir(tableLocation); for (long version = readVersionValue + 1; version <= currentVersion; version++) { - List transactionLogEntries; + TransactionLogEntries transactionLogEntries; try { long finalVersion = version; - transactionLogEntries = getEntriesFromJson(version, transactionLogDirectory, fileSystem) + transactionLogEntries = getEntriesFromJson(version, transactionLogDirectory, fileSystem, DataSize.of(0, BYTE)) .orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + finalVersion)); } catch (IOException e) { throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Failed to access table metadata", e); } - DeltaLakeCommitSummary commitSummary = new DeltaLakeCommitSummary(version, transactionLogEntries); + DeltaLakeCommitSummary commitSummary = new DeltaLakeCommitSummary(version, transactionLogEntries, fileSystem); checkNoMetadataUpdates(commitSummary); checkNoProtocolUpdates(commitSummary); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionsTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionsTable.java index b35ce607461..194481f9abe 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionsTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionsTable.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.json.JsonCodec; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.Transaction; @@ -62,12 +63,13 @@ public DeltaLakeTransactionsTable( } @Override - protected List buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List transactions) + protected List buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List transactions, TrinoFileSystem fileSystem) { for (Transaction transaction : transactions) { pagesBuilder.beginRow(); pagesBuilder.appendBigint(transaction.transactionId()); - pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson(transaction.transactionEntries())); + pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson( + transaction.transactionEntries().getEntriesList(fileSystem))); pagesBuilder.endRow(); } return pagesBuilder.build(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java index 084fe36f0f4..e9056e42ec8 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java @@ -39,6 +39,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.deltalake.DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR; import static io.trino.plugin.deltalake.functions.tablechanges.TableChangesFileType.CDF_FILE; @@ -74,11 +75,9 @@ private Stream prepareSplits(long currentVersion, long tableRead .boxed() .flatMap(version -> { try { - List entries = getEntriesFromJson(version, transactionLogDir, fileSystem) - .orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + version)); - if (entries.isEmpty()) { - return ImmutableList.of().stream(); - } + List entries = getEntriesFromJson(version, transactionLogDir, fileSystem, DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE) + .orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + version)) + .getEntriesList(fileSystem); List commitInfoEntries = entries.stream() .map(DeltaLakeTransactionLogEntry::getCommitInfo) .filter(Objects::nonNull) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index bc27669c87f..e84a5d700c2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -190,8 +190,7 @@ private void doVacuum( checkUnsupportedUniversalFormat(handle.getMetadataEntry()); - TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), Optional.of(handle.getReadVersion())); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot); + ProtocolEntry protocolEntry = handle.getProtocolEntry(); if (protocolEntry.minWriterVersion() > MAX_WRITER_VERSION) { throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.minWriterVersion())); } @@ -205,6 +204,7 @@ private void doVacuum( throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(DELETION_VECTORS_FEATURE_NAME)); } + TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), Optional.of(handle.getReadVersion())); String tableLocation = tableSnapshot.getTableLocation(); String transactionLogDir = getTransactionLogDir(tableLocation); TrinoFileSystem fileSystem = fileSystemFactory.create(session); @@ -222,24 +222,26 @@ private void doVacuum( handle.getProtocolEntry(), TupleDomain.all(), alwaysFalse())) { - retainedPaths = Stream.concat( - activeAddEntries - // paths can be absolute as well in case of shallow-cloned tables, and they shouldn't be deleted as part of vacuum because according to - // delta-protocol absolute paths are inherited from base table and the vacuum procedure should only list and delete local file references - .map(AddFileEntry::getPath), - transactionLogAccess.getJsonEntries( - fileSystem, - transactionLogDir, - // discard oldest "recent" snapshot, since we take RemoveFileEntry only, to identify files that are no longer - // active files, but still needed to read a "recent" snapshot - recentVersions.stream().sorted(naturalOrder()) - .skip(1) - .collect(toImmutableList())) - .map(DeltaLakeTransactionLogEntry::getRemove) - .filter(Objects::nonNull) - .map(RemoveFileEntry::path)) - .peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path)) - .collect(toImmutableSet()); + try (Stream pathEntries = Stream.concat( + activeAddEntries + // paths can be absolute as well in case of shallow-cloned tables, and they shouldn't be deleted as part of vacuum because according to + // delta-protocol absolute paths are inherited from base table and the vacuum procedure should only list and delete local file references + .map(AddFileEntry::getPath), + transactionLogAccess.getJsonEntries( + fileSystem, + transactionLogDir, + // discard oldest "recent" snapshot, since we take RemoveFileEntry only, to identify files that are no longer + // active files, but still needed to read a "recent" snapshot + recentVersions.stream().sorted(naturalOrder()) + .skip(1) + .collect(toImmutableList())) + .map(DeltaLakeTransactionLogEntry::getRemove) + .filter(Objects::nonNull) + .map(RemoveFileEntry::path))) { + retainedPaths = pathEntries + .peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path)) + .collect(toImmutableSet()); + } } log.debug( diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index 203861cfe01..16e5bd1d29e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import io.airlift.units.DataSize; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; @@ -70,10 +71,12 @@ public class TableSnapshot private final TransactionLogTail logTail; private final String tableLocation; private final ParquetReaderOptions parquetReaderOptions; + private final DataSize transactionLogMaxCachedFileSize; private final boolean checkpointRowStatisticsWritingEnabled; private final int domainCompactionThreshold; private Optional cachedMetadata = Optional.empty(); + private Optional cachedProtocol = Optional.empty(); private TableSnapshot( SchemaTableName table, @@ -82,7 +85,8 @@ private TableSnapshot( String tableLocation, ParquetReaderOptions parquetReaderOptions, boolean checkpointRowStatisticsWritingEnabled, - int domainCompactionThreshold) + int domainCompactionThreshold, + DataSize transactionLogMaxCachedFileSize) { this.table = requireNonNull(table, "table is null"); this.lastCheckpoint = requireNonNull(lastCheckpoint, "lastCheckpoint is null"); @@ -91,6 +95,7 @@ private TableSnapshot( this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null"); this.checkpointRowStatisticsWritingEnabled = checkpointRowStatisticsWritingEnabled; this.domainCompactionThreshold = domainCompactionThreshold; + this.transactionLogMaxCachedFileSize = requireNonNull(transactionLogMaxCachedFileSize, "transactionLogMaxCachedFileSize is null"); } public static TableSnapshot load( @@ -101,11 +106,12 @@ public static TableSnapshot load( ParquetReaderOptions parquetReaderOptions, boolean checkpointRowStatisticsWritingEnabled, int domainCompactionThreshold, + DataSize transactionLogMaxCachedFileSize, Optional endVersion) throws IOException { Optional lastCheckpointVersion = lastCheckpoint.map(LastCheckpoint::version); - TransactionLogTail transactionLogTail = TransactionLogTail.loadNewTail(fileSystem, tableLocation, lastCheckpointVersion, endVersion); + TransactionLogTail transactionLogTail = TransactionLogTail.loadNewTail(fileSystem, tableLocation, lastCheckpointVersion, endVersion, transactionLogMaxCachedFileSize); return new TableSnapshot( table, @@ -114,7 +120,8 @@ public static TableSnapshot load( tableLocation, parquetReaderOptions, checkpointRowStatisticsWritingEnabled, - domainCompactionThreshold); + domainCompactionThreshold, + transactionLogMaxCachedFileSize); } public Optional getUpdatedSnapshot(TrinoFileSystem fileSystem, Optional toVersion) @@ -136,12 +143,13 @@ public Optional getUpdatedSnapshot(TrinoFileSystem fileSystem, Op parquetReaderOptions, checkpointRowStatisticsWritingEnabled, domainCompactionThreshold, + transactionLogMaxCachedFileSize, Optional.empty())); } } } - Optional updatedLogTail = logTail.getUpdatedTail(fileSystem, tableLocation, toVersion); + Optional updatedLogTail = logTail.getUpdatedTail(fileSystem, tableLocation, toVersion, transactionLogMaxCachedFileSize); return updatedLogTail.map(transactionLogTail -> new TableSnapshot( table, lastCheckpoint, @@ -149,7 +157,8 @@ public Optional getUpdatedSnapshot(TrinoFileSystem fileSystem, Op tableLocation, parquetReaderOptions, checkpointRowStatisticsWritingEnabled, - domainCompactionThreshold)); + domainCompactionThreshold, + transactionLogMaxCachedFileSize)); } public long getVersion() @@ -167,6 +176,11 @@ public Optional getCachedMetadata() return cachedMetadata; } + public Optional getCachedProtocol() + { + return cachedProtocol; + } + public String getTableLocation() { return tableLocation; @@ -177,9 +191,14 @@ public void setCachedMetadata(Optional cachedMetadata) this.cachedMetadata = cachedMetadata; } - public List getJsonTransactionLogEntries() + public void setCachedProtocol(Optional cachedProtocol) + { + this.cachedProtocol = cachedProtocol; + } + + public List getJsonTransactionLogEntries(TrinoFileSystem fileSystem) { - return logTail.getFileEntries(); + return logTail.getFileEntries(fileSystem); } public List getTransactions() @@ -194,7 +213,8 @@ public long getRetainedSizeInBytes() + table.getRetainedSizeInBytes() + logTail.getRetainedSizeInBytes() + estimatedSizeOf(tableLocation) - + sizeOf(cachedMetadata, MetadataEntry::getRetainedSizeInBytes); + + sizeOf(cachedMetadata, MetadataEntry::getRetainedSizeInBytes) + + sizeOf(cachedProtocol, ProtocolEntry::getRetainedSizeInBytes); } public Stream getCheckpointTransactionLogEntries( @@ -360,7 +380,9 @@ private Stream getV2CheckpointEntries( { if (checkpointFile.location().fileName().endsWith(".json")) { try { - return getEntriesFromJson(checkpoint.version(), checkpointFile).stream().flatMap(List::stream); + return getEntriesFromJson(checkpoint.version(), checkpointFile, transactionLogMaxCachedFileSize) + .stream() + .flatMap(logEntries -> logEntries.getEntries(fileSystem)); } catch (IOException e) { throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, format("Unexpected IO exception occurred while reading the entries of the file: %s for the table %s", checkpoint, table), e); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java index ebfa2b305d4..d5c3b0b8dcf 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java @@ -13,30 +13,25 @@ */ package io.trino.plugin.deltalake.transactionlog; -import com.google.common.collect.ImmutableList; - -import java.util.List; - import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.SizeOf.SIZE_OF_LONG; -import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; import static java.util.Objects.requireNonNull; -public record Transaction(long transactionId, List transactionEntries) +public record Transaction(long transactionId, TransactionLogEntries transactionEntries) { private static final int INSTANCE_SIZE = instanceSize(Transaction.class); public Transaction { checkArgument(transactionId >= 0, "transactionId must be >= 0"); - transactionEntries = ImmutableList.copyOf(requireNonNull(transactionEntries, "transactionEntries is null")); + requireNonNull(transactionEntries, "transactionEntries is null"); } public long getRetainedSizeInBytes() { return INSTANCE_SIZE + SIZE_OF_LONG - + estimatedSizeOf(transactionEntries, DeltaLakeTransactionLogEntry::getRetainedSizeInBytes); + + transactionEntries.getRetainedSizeInBytes(); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index a2b76fa3e4d..19a730e9efe 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -20,6 +20,7 @@ import com.google.common.primitives.Ints; import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.inject.Inject; +import io.airlift.units.DataSize; import io.trino.cache.CacheStatsMBean; import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.FileEntry; @@ -37,6 +38,7 @@ import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint; +import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries; import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.spi.TrinoException; @@ -56,7 +58,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.time.Instant; -import java.util.Collection; import java.util.Comparator; import java.util.HashSet; import java.util.LinkedHashMap; @@ -92,7 +93,6 @@ import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; -import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE; import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -110,6 +110,7 @@ public class TransactionLogAccess private final ParquetReaderOptions parquetReaderOptions; private final boolean checkpointRowStatisticsWritingEnabled; private final int domainCompactionThreshold; + private final DataSize transactionLogMaxCachedFileSize; private final Cache tableSnapshots; private final Cache activeDataFileCache; @@ -130,6 +131,7 @@ public TransactionLogAccess( this.parquetReaderOptions = parquetReaderConfig.toParquetReaderOptions().withBloomFilter(false); this.checkpointRowStatisticsWritingEnabled = deltaLakeConfig.isCheckpointRowStatisticsWritingEnabled(); this.domainCompactionThreshold = deltaLakeConfig.getDomainCompactionThreshold(); + this.transactionLogMaxCachedFileSize = deltaLakeConfig.getTransactionLogMaxCachedFileSize(); tableSnapshots = EvictableCacheBuilder.newBuilder() .weigher((Weigher) (key, value) -> Ints.saturatedCast(key.getRetainedSizeInBytes() + value.getRetainedSizeInBytes())) @@ -184,6 +186,7 @@ public TableSnapshot loadSnapshot(ConnectorSession session, SchemaTableName tabl parquetReaderOptions, checkpointRowStatisticsWritingEnabled, domainCompactionThreshold, + transactionLogMaxCachedFileSize, endVersion)); } catch (UncheckedExecutionException | ExecutionException e) { @@ -215,6 +218,7 @@ private TableSnapshot loadSnapshotForTimeTravel(TrinoFileSystem fileSystem, Sche parquetReaderOptions, checkpointRowStatisticsWritingEnabled, domainCompactionThreshold, + transactionLogMaxCachedFileSize, Optional.of(endVersion)); } @@ -299,12 +303,21 @@ public void invalidateCache(SchemaTableName schemaTableName, Optional ta public MetadataEntry getMetadataEntry(ConnectorSession session, TableSnapshot tableSnapshot) { if (tableSnapshot.getCachedMetadata().isEmpty()) { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); try (Stream metadataEntries = getEntries( session, tableSnapshot, - METADATA, - entryStream -> entryStream.map(DeltaLakeTransactionLogEntry::getMetaData).filter(Objects::nonNull), - fileSystemFactory.create(session), + ImmutableSet.of(METADATA), + (checkpointStream, jsonTransactions) -> + Stream.concat( + checkpointStream + .map(DeltaLakeTransactionLogEntry::getMetaData) + .filter(Objects::nonNull), + jsonTransactions.stream() + .map(transaction -> transaction.transactionEntries().getMetadataAndProtocol(fileSystem)) + .filter(entry -> entry.metadata().isPresent()) + .map(entry -> entry.metadata().get())), + fileSystem, fileFormatDataSourceStats)) { // Get last entry in the stream tableSnapshot.setCachedMetadata(metadataEntries.reduce((first, second) -> second)); @@ -391,17 +404,18 @@ public Stream loadActiveFiles( Predicate addStatsMinMaxColumnFilter) { List transactions = tableSnapshot.getTransactions(); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); try (Stream checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries( session, ImmutableSet.of(ADD), checkpointSchemaManager, typeManager, - fileSystemFactory.create(session), + fileSystem, fileFormatDataSourceStats, Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)), partitionConstraint, Optional.of(addStatsMinMaxColumnFilter))) { - return activeAddEntries(checkpointEntries, transactions) + return activeAddEntries(checkpointEntries, transactions, fileSystem) .filter(partitionConstraint.isAll() ? addAction -> true : addAction -> partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), partitionConstraint.getDomains().orElseThrow())); @@ -427,7 +441,7 @@ public static ImmutableList columnsWithStats(List activeAddEntries(Stream checkpointEntries, List transactions) + private Stream activeAddEntries(Stream checkpointEntries, List transactions, TrinoFileSystem fileSystem) { Map activeJsonEntries = new LinkedHashMap<>(); HashSet removedFiles = new HashSet<>(); @@ -438,16 +452,18 @@ private Stream activeAddEntries(Stream { Map addFilesInTransaction = new LinkedHashMap<>(); Set removedFilesInTransaction = new HashSet<>(); - transaction.transactionEntries().forEach(deltaLakeTransactionLogEntry -> { - if (deltaLakeTransactionLogEntry.getAdd() != null) { - AddFileEntry add = deltaLakeTransactionLogEntry.getAdd(); - addFilesInTransaction.put(new FileEntryKey(add.getPath(), add.getDeletionVector().map(DeletionVectorEntry::uniqueId)), add); - } - else if (deltaLakeTransactionLogEntry.getRemove() != null) { - RemoveFileEntry remove = deltaLakeTransactionLogEntry.getRemove(); - removedFilesInTransaction.add(new FileEntryKey(remove.path(), remove.deletionVector().map(DeletionVectorEntry::uniqueId))); - } - }); + try (Stream entries = transaction.transactionEntries().getEntries(fileSystem)) { + entries.forEach(deltaLakeTransactionLogEntry -> { + if (deltaLakeTransactionLogEntry.getAdd() != null) { + AddFileEntry add = deltaLakeTransactionLogEntry.getAdd(); + addFilesInTransaction.put(new FileEntryKey(add.getPath(), add.getDeletionVector().map(DeletionVectorEntry::uniqueId)), add); + } + else if (deltaLakeTransactionLogEntry.getRemove() != null) { + RemoveFileEntry remove = deltaLakeTransactionLogEntry.getRemove(); + removedFilesInTransaction.add(new FileEntryKey(remove.path(), remove.deletionVector().map(DeletionVectorEntry::uniqueId))); + } + }); + } // Process 'remove' entries first because deletion vectors register both 'add' and 'remove' entries and the 'add' entry should be kept removedFiles.addAll(removedFilesInTransaction); @@ -468,50 +484,62 @@ else if (deltaLakeTransactionLogEntry.getRemove() != null) { private record FileEntryKey(String path, Optional deletionVectorId) {} - public Stream getRemoveEntries(ConnectorSession session, TableSnapshot tableSnapshot) - { - return getEntries( - session, - tableSnapshot, - REMOVE, - entryStream -> entryStream.map(DeltaLakeTransactionLogEntry::getRemove).filter(Objects::nonNull), - fileSystemFactory.create(session), - fileFormatDataSourceStats); - } - - public Map, Object> getTransactionLogEntries( - ConnectorSession session, - TableSnapshot tableSnapshot, - Set entryTypes, - Function, Stream> entryMapper) + public MetadataAndProtocolEntries getMetadataAndProtocolEntry(ConnectorSession session, TableSnapshot tableSnapshot) { - try (Stream entries = getEntries( - session, - tableSnapshot, - entryTypes, - (checkpointStream, jsonStream) -> entryMapper.apply(Stream.concat(checkpointStream, jsonStream.stream().map(Transaction::transactionEntries).flatMap(Collection::stream))), - fileSystemFactory.create(session), - fileFormatDataSourceStats)) { - return entries.collect(toImmutableMap(Object::getClass, Function.identity(), (first, second) -> second)); + if (tableSnapshot.getCachedMetadata().isEmpty() || tableSnapshot.getCachedProtocol().isEmpty()) { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + try (Stream entries = getEntries( + session, + tableSnapshot, + ImmutableSet.of(METADATA, PROTOCOL), + (checkpointStream, jsonTransactions) -> + Stream.concat( + checkpointStream + .filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null) + .map(entry -> new MetadataAndProtocolEntries(entry.getMetaData(), entry.getProtocol())), + jsonTransactions.stream() + .map(transaction -> transaction.transactionEntries().getMetadataAndProtocol(fileSystem))), + fileSystem, + fileFormatDataSourceStats)) { + Map, Object> logEntries = entries + .flatMap(MetadataAndProtocolEntries::stream) + .collect(toImmutableMap(Object::getClass, Function.identity(), (_, second) -> second)); + tableSnapshot.setCachedMetadata(Optional.ofNullable((MetadataEntry) logEntries.get(MetadataEntry.class))); + tableSnapshot.setCachedProtocol(Optional.ofNullable((ProtocolEntry) logEntries.get(ProtocolEntry.class))); + } } + return new MetadataAndProtocolEntries(tableSnapshot.getCachedMetadata(), tableSnapshot.getCachedProtocol()); } public ProtocolEntry getProtocolEntry(ConnectorSession session, TableSnapshot tableSnapshot) { - try (Stream protocolEntries = getProtocolEntries(session, tableSnapshot)) { - return protocolEntries.reduce((first, second) -> second) - .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol entry not found in transaction log for table " + tableSnapshot.getTable())); + if (tableSnapshot.getCachedProtocol().isEmpty()) { + try (Stream protocolEntries = getProtocolEntries(session, tableSnapshot)) { + // Get last entry in the stream + tableSnapshot.setCachedProtocol(protocolEntries.reduce((first, second) -> second)); + } } + return tableSnapshot.getCachedProtocol() + .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol entry not found in transaction log for table " + tableSnapshot.getTable())); } public Stream getProtocolEntries(ConnectorSession session, TableSnapshot tableSnapshot) { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); return getEntries( session, tableSnapshot, - PROTOCOL, - entryStream -> entryStream.map(DeltaLakeTransactionLogEntry::getProtocol).filter(Objects::nonNull), - fileSystemFactory.create(session), + ImmutableSet.of(PROTOCOL), + (checkpointStream, jsonTransactions) -> + Stream.concat( + checkpointStream + .map(DeltaLakeTransactionLogEntry::getProtocol) + .filter(Objects::nonNull), + jsonTransactions.stream() + .map(transaction -> transaction.transactionEntries().getMetadataAndProtocol(fileSystem)) + .filter(entry -> entry.protocol().isPresent()) + .map(entry -> entry.protocol().get())), + fileSystem, fileFormatDataSourceStats); } @@ -550,34 +578,13 @@ private Stream getEntries( } } - /** - * Convenience method for accessors which don't need to separate out the checkpoint entries from the json entries. - */ - private Stream getEntries( - ConnectorSession session, - TableSnapshot tableSnapshot, - CheckpointEntryIterator.EntryType entryType, - Function, Stream> entryMapper, - TrinoFileSystem fileSystem, - FileFormatDataSourceStats stats) - { - return getEntries( - session, - tableSnapshot, - ImmutableSet.of(entryType), - (checkpointStream, jsonStream) -> entryMapper.apply(Stream.concat(checkpointStream, jsonStream.stream().map(Transaction::transactionEntries).flatMap(Collection::stream))), - fileSystem, - stats); - } - public Stream getJsonEntries(TrinoFileSystem fileSystem, String transactionLogDir, List forVersions) { return forVersions.stream() .flatMap(version -> { try { - Optional> entriesFromJson = getEntriesFromJson(version, transactionLogDir, fileSystem); - //noinspection SimplifyOptionalCallChains - return entriesFromJson.map(List::stream) + Optional entriesFromJson = getEntriesFromJson(version, transactionLogDir, fileSystem, transactionLogMaxCachedFileSize); + return entriesFromJson.map(entries -> entries.getEntries(fileSystem)) // transaction log does not exist. Might have been expired. .orElseGet(Stream::of); } @@ -614,17 +621,17 @@ public List getPastTableVersions(TrinoFileSystem fileSystem, String transa return result.build(); } - private static List getJsonEntries(long startVersion, long endVersion, TableSnapshot tableSnapshot, TrinoFileSystem fileSystem) + private List getJsonEntries(long startVersion, long endVersion, TableSnapshot tableSnapshot, TrinoFileSystem fileSystem) throws IOException { Optional lastCheckpointVersion = tableSnapshot.getLastCheckpointVersion(); if (lastCheckpointVersion.isPresent() && startVersion < lastCheckpointVersion.get()) { return ImmutableList.builder() - .addAll(TransactionLogTail.loadNewTail(fileSystem, tableSnapshot.getTableLocation(), Optional.of(startVersion), lastCheckpointVersion).getFileEntries()) - .addAll(tableSnapshot.getJsonTransactionLogEntries()) + .addAll(TransactionLogTail.loadNewTail(fileSystem, tableSnapshot.getTableLocation(), Optional.of(startVersion), lastCheckpointVersion, transactionLogMaxCachedFileSize).getFileEntries(fileSystem)) + .addAll(tableSnapshot.getJsonTransactionLogEntries(fileSystem)) .build(); } - return TransactionLogTail.loadNewTail(fileSystem, tableSnapshot.getTableLocation(), Optional.of(startVersion), Optional.of(endVersion)).getFileEntries(); + return TransactionLogTail.loadNewTail(fileSystem, tableSnapshot.getTableLocation(), Optional.of(startVersion), Optional.of(endVersion), transactionLogMaxCachedFileSize).getFileEntries(fileSystem); } public static String canonicalizeColumnName(String columnName) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogEntries.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogEntries.java new file mode 100644 index 00000000000..55038dbe9c5 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogEntries.java @@ -0,0 +1,176 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; +import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries; +import io.trino.spi.TrinoException; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Streams.stream; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.instanceSize; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.parseJson; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static java.nio.charset.StandardCharsets.UTF_8; + +public final class TransactionLogEntries +{ + private static final int INSTANCE_SIZE = instanceSize(TransactionLogEntries.class); + private static final int JSON_LOG_ENTRY_READ_BUFFER_SIZE = 1024 * 1024; + + private final long entryNumber; + private final Location transactionLogFilePath; + + private final Optional> cachedEntries; + + public TransactionLogEntries(long entryNumber, TrinoInputFile inputFile, DataSize maxCachedFileSize) + { + this.entryNumber = entryNumber; + this.transactionLogFilePath = inputFile.location(); + try { + if (inputFile.length() > maxCachedFileSize.toBytes()) { + this.cachedEntries = Optional.empty(); + } + else { + this.cachedEntries = Optional.of(ImmutableList.copyOf(new TransactionLogEntryIterator(entryNumber, inputFile))); + } + } + catch (IOException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Error while reading from transaction entry iterator for the file %s".formatted(transactionLogFilePath)); + } + } + + /** + * Returns a stream of DeltaLakeTransactionLogEntry + * Caller has the responsibility to close this stream as it potentially holds an open file + */ + public Stream getEntries(TrinoFileSystem fileSystem) + { + if (cachedEntries.isPresent()) { + return cachedEntries.get().stream(); + } + TransactionLogEntryIterator iterator = new TransactionLogEntryIterator(entryNumber, fileSystem.newInputFile(transactionLogFilePath)); + return stream(iterator).onClose(iterator::close); + } + + public List getEntriesList(TrinoFileSystem fileSystem) + { + try (Stream jsonStream = getEntries(fileSystem)) { + return jsonStream.collect(toImmutableList()); + } + } + + public MetadataAndProtocolEntries getMetadataAndProtocol(TrinoFileSystem fileSystem) + { + // There can be at most one metadata and protocol entry per transaction log + // We use that stop reading from file when a metadata and protocol entry are found + try (Stream logEntryStream = getEntries(fileSystem)) { + Optional metadataEntry = Optional.empty(); + Optional protocolEntry = Optional.empty(); + for (Iterator it = logEntryStream.iterator(); it.hasNext(); ) { + DeltaLakeTransactionLogEntry transactionLogEntry = it.next(); + if (transactionLogEntry.getMetaData() != null) { + metadataEntry = Optional.of(transactionLogEntry.getMetaData()); + } + else if (transactionLogEntry.getProtocol() != null) { + protocolEntry = Optional.of(transactionLogEntry.getProtocol()); + } + + if (protocolEntry.isPresent() && metadataEntry.isPresent()) { + break; + } + } + return new MetadataAndProtocolEntries(metadataEntry, protocolEntry); + } + } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + SIZE_OF_LONG + + estimatedSizeOf(transactionLogFilePath.path()) + + sizeOf(cachedEntries, entries -> estimatedSizeOf(entries, DeltaLakeTransactionLogEntry::getRetainedSizeInBytes)); + } + + private static final class TransactionLogEntryIterator + extends AbstractIterator + { + private final long entryNumber; + private final Location location; + private final BufferedReader reader; + + public TransactionLogEntryIterator(long entryNumber, TrinoInputFile inputFile) + { + this.entryNumber = entryNumber; + this.location = inputFile.location(); + TrinoInputStream inputStream; + try { + inputStream = inputFile.newStream(); + } + catch (Exception e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Error while initializing the transaction entry iterator for the file %s".formatted(inputFile.location())); + } + this.reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8), JSON_LOG_ENTRY_READ_BUFFER_SIZE); + } + + @Override + protected DeltaLakeTransactionLogEntry computeNext() + { + String line; + try { + line = reader.readLine(); + } + catch (IOException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Error while reading from transaction entry iterator for the file %s".formatted(location)); + } + if (line == null) { + close(); + return endOfData(); + } + DeltaLakeTransactionLogEntry deltaLakeTransactionLogEntry = parseJson(line); + if (deltaLakeTransactionLogEntry.getCommitInfo() != null && deltaLakeTransactionLogEntry.getCommitInfo().version() == 0L) { + // In case that the commit info version is missing, use the version from the transaction log file name + deltaLakeTransactionLogEntry = deltaLakeTransactionLogEntry.withCommitInfo(deltaLakeTransactionLogEntry.getCommitInfo().withVersion(entryNumber)); + } + return deltaLakeTransactionLogEntry; + } + + public void close() + { + try { + reader.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java index 5403ad33628..a1d9050047f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java @@ -147,7 +147,7 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) } } - snapshot.getJsonTransactionLogEntries() + snapshot.getJsonTransactionLogEntries(fileSystem) .forEach(checkpointBuilder::addLogEntry); Location transactionLogDir = Location.of(getTransactionLogDir(snapshot.getTableLocation())); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/MetadataAndProtocolEntries.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/MetadataAndProtocolEntries.java new file mode 100644 index 00000000000..d4f1fdbd743 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/MetadataAndProtocolEntries.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog.checkpoint; + +import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; + +import java.util.Optional; +import java.util.stream.Stream; + +import static io.airlift.slice.SizeOf.instanceSize; +import static io.airlift.slice.SizeOf.sizeOf; + +public record MetadataAndProtocolEntries(Optional metadata, Optional protocol) +{ + private static final int INSTANCE_SIZE = instanceSize(MetadataAndProtocolEntries.class); + + public MetadataAndProtocolEntries(MetadataEntry metadata, ProtocolEntry protocol) + { + this(Optional.ofNullable(metadata), Optional.ofNullable(protocol)); + } + + public Stream stream() + { + if (metadata.isPresent() && protocol.isPresent()) { + return Stream.of(metadata.get(), protocol.get()); + } + if (metadata.isPresent()) { + return Stream.of(metadata.get()); + } + if (protocol.isPresent()) { + return Stream.of(protocol.get()); + } + return Stream.of(); + } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + sizeOf(metadata, MetadataEntry::getRetainedSizeInBytes) + + sizeOf(protocol, ProtocolEntry::getRetainedSizeInBytes); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java index ec3e8c3b973..6f63a9efa01 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java @@ -14,18 +14,17 @@ package io.trino.plugin.deltalake.transactionlog.checkpoint; import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException; import io.trino.plugin.deltalake.transactionlog.Transaction; +import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries; -import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Collection; import java.util.List; import java.util.Optional; @@ -34,16 +33,13 @@ import static io.airlift.slice.SizeOf.SIZE_OF_LONG; import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; -import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.parseJson; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; public class TransactionLogTail { private static final int INSTANCE_SIZE = instanceSize(TransactionLogTail.class); - private static final int JSON_LOG_ENTRY_READ_BUFFER_SIZE = 1024 * 1024; private final List entries; private final long version; @@ -59,7 +55,8 @@ public static TransactionLogTail loadNewTail( TrinoFileSystem fileSystem, String tableLocation, Optional startVersion, - Optional endVersion) + Optional endVersion, + DataSize transactionLogMaxCachedFileSize) throws IOException { ImmutableList.Builder entriesBuilder = ImmutableList.builder(); @@ -74,11 +71,10 @@ public static TransactionLogTail loadNewTail( checkArgument(endVersion.isEmpty() || entryNumber <= endVersion.get(), "Invalid start/end versions: %s, %s", startVersion, endVersion); String transactionLogDir = getTransactionLogDir(tableLocation); - Optional> results; boolean endOfTail = false; while (!endOfTail) { - results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem); + Optional results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, transactionLogMaxCachedFileSize); if (results.isPresent()) { entriesBuilder.add(new Transaction(entryNumber, results.get())); version = entryNumber; @@ -99,11 +95,11 @@ public static TransactionLogTail loadNewTail( return new TransactionLogTail(entriesBuilder.build(), version); } - public Optional getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation, Optional endVersion) + public Optional getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation, Optional endVersion, DataSize transactionLogMaxCachedFileSize) throws IOException { checkArgument(endVersion.isEmpty() || endVersion.get() > version, "Invalid endVersion, expected higher than %s, but got %s", version, endVersion); - TransactionLogTail newTail = loadNewTail(fileSystem, tableLocation, Optional.of(version), endVersion); + TransactionLogTail newTail = loadNewTail(fileSystem, tableLocation, Optional.of(version), endVersion, transactionLogMaxCachedFileSize); if (newTail.version == version) { return Optional.empty(); } @@ -115,42 +111,32 @@ public Optional getUpdatedTail(TrinoFileSystem fileSystem, S newTail.version)); } - public static Optional> getEntriesFromJson(long entryNumber, String transactionLogDir, TrinoFileSystem fileSystem) + public static Optional getEntriesFromJson(long entryNumber, String transactionLogDir, TrinoFileSystem fileSystem, DataSize transactionLogMaxCachedFileSize) throws IOException { Location transactionLogFilePath = getTransactionLogJsonEntryPath(transactionLogDir, entryNumber); TrinoInputFile inputFile = fileSystem.newInputFile(transactionLogFilePath); - return getEntriesFromJson(entryNumber, inputFile); + return getEntriesFromJson(entryNumber, inputFile, transactionLogMaxCachedFileSize); } - public static Optional> getEntriesFromJson(long entryNumber, TrinoInputFile inputFile) + public static Optional getEntriesFromJson(long entryNumber, TrinoInputFile inputFile, DataSize transactionLogMaxCachedFileSize) throws IOException { - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(inputFile.newStream(), UTF_8), - JSON_LOG_ENTRY_READ_BUFFER_SIZE)) { - ImmutableList.Builder resultsBuilder = ImmutableList.builder(); - String line = reader.readLine(); - while (line != null) { - DeltaLakeTransactionLogEntry deltaLakeTransactionLogEntry = parseJson(line); - if (deltaLakeTransactionLogEntry.getCommitInfo() != null && deltaLakeTransactionLogEntry.getCommitInfo().version() == 0L) { - // In case that the commit info version is missing, use the version from the transaction log file name - deltaLakeTransactionLogEntry = deltaLakeTransactionLogEntry.withCommitInfo(deltaLakeTransactionLogEntry.getCommitInfo().withVersion(entryNumber)); - } - resultsBuilder.add(deltaLakeTransactionLogEntry); - line = reader.readLine(); - } - - return Optional.of(resultsBuilder.build()); + try { + inputFile.length(); // File length is cached and used in TransactionLogEntries } catch (FileNotFoundException e) { return Optional.empty(); // end of tail } + return Optional.of(new TransactionLogEntries(entryNumber, inputFile, transactionLogMaxCachedFileSize)); } - public List getFileEntries() + public List getFileEntries(TrinoFileSystem fileSystem) { - return entries.stream().map(Transaction::transactionEntries).flatMap(Collection::stream).collect(toImmutableList()); + return entries.stream() + .map(Transaction::transactionEntries) + .flatMap(logEntries -> logEntries.getEntriesList(fileSystem).stream()) + .collect(toImmutableList()); } public List getTransactions() diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java index 69754390332..910c3657acf 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java @@ -77,7 +77,7 @@ public void testTableDataCachedWhileTransactionLogNotCached() ImmutableMultiset.builder() .addCopies(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000002.checkpoint.parquet"), 2) .addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2) - .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "00000000000000000003.json")) + .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint")) .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 220)) .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 220)) @@ -91,7 +91,7 @@ public void testTableDataCachedWhileTransactionLogNotCached() ImmutableMultiset.builder() .addCopies(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000002.checkpoint.parquet"), 2) .addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2) - .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "00000000000000000003.json")) + .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint")) .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 220)) .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 220)) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java index bf5ccba08a1..ac3f29b7413 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java @@ -42,6 +42,7 @@ import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.testing.Closeables.closeAllSuppress; +import static io.trino.plugin.deltalake.DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.EXTENDED_STATISTICS_COLLECT_ON_WRITE; import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents; @@ -1009,7 +1010,9 @@ public void testNoColumnStatsMixedCase() """); // Version 3 should be created with recalculated statistics. - List transactionLogAfterUpdate = getEntriesFromJson(3, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); + List transactionLogAfterUpdate = getEntriesFromJson(3, tableLocation + "/_delta_log", FILE_SYSTEM, DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE) + .orElseThrow() + .getEntriesList(FILE_SYSTEM); assertThat(transactionLogAfterUpdate).hasSize(2); AddFileEntry updateAddFileEntry = transactionLogAfterUpdate.get(1).getAdd(); DeltaLakeFileStatistics updateStats = updateAddFileEntry.getStats().orElseThrow(); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 8d1db3984a1..5197075cb65 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -39,6 +39,7 @@ import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; +import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.plugin.hive.parquet.TrinoParquetDataSource; import io.trino.spi.Page; @@ -85,11 +86,11 @@ import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.parquet.ParquetTestUtils.createParquetReader; +import static io.trino.plugin.deltalake.DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE; import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata; -import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; @@ -157,6 +158,7 @@ protected QueryRunner createQueryRunner() .addDeltaProperty("hive.metastore.catalog.dir", catalogDir.toUri().toString()) .addDeltaProperty("delta.register-table-procedure.enabled", "true") .addDeltaProperty("delta.enable-non-concurrent-writes", "true") + .addDeltaProperty("delta.transaction-log.max-cached-file-size", "0B") // Tests json log streaming code path .build(); } @@ -431,7 +433,7 @@ private void testOptimizeWithColumnMappingMode(String columnMappingMode) "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); // Verify 'add' entry contains the expected physical name in the stats - List transactionLog = getEntriesFromJson(4, tableLocation.resolve("_delta_log").toString(), FILE_SYSTEM).orElseThrow(); + List transactionLog = getEntriesFromJson(4, tableLocation.resolve("_delta_log").toString()); assertThat(transactionLog).hasSize(5); assertThat(transactionLog.get(0).getCommitInfo()).isNotNull(); assertThat(transactionLog.get(1).getRemove()).isNotNull(); @@ -695,7 +697,7 @@ public void testStatisticsWithColumnCaseSensitivity() assertUpdate("INSERT INTO " + tableName + " VALUES (10, 1), (20, 1), (null, 1)", 3); - List transactionLog = getEntriesFromJson(1, tableLocation.resolve("_delta_log").toString(), FILE_SYSTEM).orElseThrow(); + List transactionLog = getEntriesFromJson(1, tableLocation.resolve("_delta_log").toString()); assertThat(transactionLog).hasSize(2); AddFileEntry addFileEntry = transactionLog.get(1).getAdd(); DeltaLakeFileStatistics stats = addFileEntry.getStats().orElseThrow(); @@ -705,7 +707,7 @@ public void testStatisticsWithColumnCaseSensitivity() assertUpdate("UPDATE " + tableName + " SET upper_case = upper_case + 10", 3); - List transactionLogAfterUpdate = getEntriesFromJson(2, tableLocation.resolve("_delta_log").toString(), FILE_SYSTEM).orElseThrow(); + List transactionLogAfterUpdate = getEntriesFromJson(2, tableLocation.resolve("_delta_log").toString()); assertThat(transactionLogAfterUpdate).hasSize(3); AddFileEntry updateAddFileEntry = transactionLogAfterUpdate.get(2).getAdd(); DeltaLakeFileStatistics updateStats = updateAddFileEntry.getStats().orElseThrow(); @@ -859,7 +861,7 @@ private void testTrinoCreateTableWithTimestampNtz(ZoneId sessionZone, Consumer transactionLogs = getEntriesFromJson(0, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); + List transactionLogs = getEntriesFromJson(0, tableLocation + "/_delta_log"); ProtocolEntry protocolEntry = transactionLogs.get(1).getProtocol(); assertThat(protocolEntry).isNotNull(); assertThat(protocolEntry.minReaderVersion()).isEqualTo(3); @@ -1035,7 +1037,7 @@ private void testTimestampNtzPartitioned(ZoneId sessionZone) ('part', null, 8.0, 0.1111111111111111, null, null, null), (null, null, null, null, 9.0, null, null) """); - List transactionLogs = getEntriesFromJson(2, tableLocation.resolve("_delta_log").toString(), FILE_SYSTEM).orElseThrow(); + List transactionLogs = getEntriesFromJson(2, tableLocation.resolve("_delta_log").toString()); assertThat(transactionLogs).hasSize(2); AddFileEntry addFileEntry = transactionLogs.get(1).getAdd(); assertThat(addFileEntry).isNotNull(); @@ -1057,7 +1059,7 @@ public void testAddTimestampNtzColumn() assertQuery("SELECT * FROM " + tableName, "VALUES (1, TIMESTAMP '2023-01-02 03:04:05.123456')"); String tableLocation = getTableLocation(tableName); - List transactionLogsByCreateTable = getEntriesFromJson(0, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); + List transactionLogsByCreateTable = getEntriesFromJson(0, tableLocation + "/_delta_log"); ProtocolEntry protocolEntryByCreateTable = transactionLogsByCreateTable.get(1).getProtocol(); assertThat(protocolEntryByCreateTable).isNotNull(); assertThat(protocolEntryByCreateTable.minReaderVersion()).isEqualTo(1); @@ -1065,7 +1067,7 @@ public void testAddTimestampNtzColumn() assertThat(protocolEntryByCreateTable.readerFeatures()).isEmpty(); assertThat(protocolEntryByCreateTable.writerFeatures()).isEmpty(); - List transactionLogsByAddColumn = getEntriesFromJson(1, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); + List transactionLogsByAddColumn = getEntriesFromJson(1, tableLocation + "/_delta_log"); ProtocolEntry protocolEntryByAddColumn = transactionLogsByAddColumn.get(1).getProtocol(); assertThat(protocolEntryByAddColumn).isNotNull(); assertThat(protocolEntryByAddColumn.minReaderVersion()).isEqualTo(3); @@ -1090,7 +1092,7 @@ public void testIdentityColumns() assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri())); assertQueryReturnsEmptyResult("SELECT * FROM " + tableName); - List transactionLog = getEntriesFromJson(0, tableLocation.resolve("_delta_log").toString(), FILE_SYSTEM).orElseThrow(); + List transactionLog = getEntriesFromJson(0, tableLocation.resolve("_delta_log").toString()); assertThat(transactionLog).hasSize(3); MetadataEntry metadataEntry = transactionLog.get(2).getMetaData(); assertThat(getColumnsMetadata(metadataEntry).get("b")) @@ -1102,7 +1104,7 @@ public void testIdentityColumns() // Verify a column operation preserves delta.identity.* column properties assertUpdate("COMMENT ON COLUMN " + tableName + ".b IS 'test column comment'"); - List transactionLogAfterComment = getEntriesFromJson(1, tableLocation.resolve("_delta_log").toString(), FILE_SYSTEM).orElseThrow(); + List transactionLogAfterComment = getEntriesFromJson(1, tableLocation.resolve("_delta_log").toString()); assertThat(transactionLogAfterComment).hasSize(3); MetadataEntry commentMetadataEntry = transactionLogAfterComment.get(2).getMetaData(); assertThat(getColumnsMetadata(commentMetadataEntry).get("b")) @@ -1193,7 +1195,7 @@ private void testDeletionVectorsEnabledCreateTable(String tableDefinition) .contains("deletion_vectors_enabled = true"); String tableLocation = getTableLocation(table.getName()); - List transactionLogs = getEntriesFromJson(0, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); + List transactionLogs = getEntriesFromJson(0, tableLocation + "/_delta_log"); assertThat(transactionLogs.get(1).getProtocol()) .isEqualTo(new ProtocolEntry(3, 7, Optional.of(Set.of("deletionVectors")), Optional.of(Set.of("deletionVectors")))); @@ -1226,7 +1228,7 @@ private void testDeletionVectorsDisabledCreateTable(String tableDefinition) .doesNotContain("deletion_vectors_enabled"); String tableLocation = getTableLocation(table.getName()); - List transactionLogs = getEntriesFromJson(0, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); + List transactionLogs = getEntriesFromJson(0, tableLocation + "/_delta_log"); assertThat(transactionLogs.get(1).getProtocol()) .isEqualTo(new ProtocolEntry(1, 2, Optional.empty(), Optional.empty())); @@ -1295,8 +1297,8 @@ public void testDeletionVectorsAllRows() assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 1); // 'remove' entry should have the same deletion vector as the previous operation when deleting all rows - DeletionVectorEntry deletionVector = getEntriesFromJson(2, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow().get(2).getAdd().getDeletionVector().orElseThrow(); - assertThat(getEntriesFromJson(3, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow().get(1).getRemove().deletionVector().orElseThrow()) + DeletionVectorEntry deletionVector = getEntriesFromJson(2, tableLocation + "/_delta_log").get(2).getAdd().getDeletionVector().orElseThrow(); + assertThat(getEntriesFromJson(3, tableLocation + "/_delta_log").get(1).getRemove().deletionVector().orElseThrow()) .isEqualTo(deletionVector); assertUpdate("INSERT INTO " + tableName + " VALUES (3, 31), (3, 32)", 2); @@ -2248,8 +2250,7 @@ public void testUnsupportedWriterVersion() private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation) throws IOException { - TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION); - DeltaLakeTransactionLogEntry transactionLog = getEntriesFromJson(entryNumber, tableLocation.resolve("_delta_log").toString(), fileSystem).orElseThrow().stream() + DeltaLakeTransactionLogEntry transactionLog = getEntriesFromJson(entryNumber, tableLocation.resolve("_delta_log").toString()).stream() .filter(log -> log.getMetaData() != null) .collect(onlyElement()); return transactionLog.getMetaData(); @@ -2258,8 +2259,7 @@ private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocat private static ProtocolEntry loadProtocolEntry(long entryNumber, Path tableLocation) throws IOException { - TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION); - DeltaLakeTransactionLogEntry transactionLog = getEntriesFromJson(entryNumber, tableLocation.resolve("_delta_log").toString(), fileSystem).orElseThrow().stream() + DeltaLakeTransactionLogEntry transactionLog = getEntriesFromJson(entryNumber, tableLocation.resolve("_delta_log").toString()).stream() .filter(log -> log.getProtocol() != null) .collect(onlyElement()); return transactionLog.getProtocol(); @@ -2276,4 +2276,12 @@ private String getTableLocation(String tableName) } throw new IllegalStateException("Location not found in SHOW CREATE TABLE result"); } + + private static List getEntriesFromJson(long entryNumber, String transactionLogDir) + throws IOException + { + return TransactionLogTail.getEntriesFromJson(entryNumber, transactionLogDir, FILE_SYSTEM, DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE) + .orElseThrow() + .getEntriesList(FILE_SYSTEM); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeColumnMapping.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeColumnMapping.java index 6746c8f2a52..9d2447df488 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeColumnMapping.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeColumnMapping.java @@ -37,6 +37,7 @@ import static com.google.common.collect.MoreCollectors.onlyElement; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.deltalake.DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE; import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; @@ -144,7 +145,9 @@ private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocat throws IOException { TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION); - DeltaLakeTransactionLogEntry transactionLog = getEntriesFromJson(entryNumber, tableLocation.resolve("_delta_log").toString(), fileSystem).orElseThrow().stream() + DeltaLakeTransactionLogEntry transactionLog = getEntriesFromJson(entryNumber, tableLocation.resolve("_delta_log").toString(), fileSystem, DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE) + .orElseThrow() + .getEntriesList(fileSystem).stream() .filter(log -> log.getMetaData() != null) .collect(onlyElement()); return transactionLog.getMetaData(); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 3ef6e8c0a50..78750f664d2 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -44,6 +44,7 @@ public void testDefaults() .setDataFileCacheTtl(new Duration(30, MINUTES)) .setMetadataCacheTtl(new Duration(30, TimeUnit.MINUTES)) .setMetadataCacheMaxRetainedSize(DeltaLakeConfig.DEFAULT_METADATA_CACHE_MAX_RETAINED_SIZE) + .setTransactionLogMaxCachedFileSize(DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE) .setDomainCompactionThreshold(1000) .setMaxSplitsPerSecond(Integer.MAX_VALUE) .setMaxOutstandingSplits(1_000) @@ -84,6 +85,7 @@ public void testExplicitPropertyMappings() Map properties = ImmutableMap.builder() .put("delta.metadata.cache-ttl", "10m") .put("delta.metadata.cache-max-retained-size", "1GB") + .put("delta.transaction-log.max-cached-file-size", "1MB") .put("delta.metadata.live-files.cache-size", "0 MB") .put("delta.metadata.live-files.cache-ttl", "60m") .put("delta.domain-compaction-threshold", "500") @@ -125,6 +127,7 @@ public void testExplicitPropertyMappings() .setDataFileCacheTtl(new Duration(60, MINUTES)) .setMetadataCacheTtl(new Duration(10, TimeUnit.MINUTES)) .setMetadataCacheMaxRetainedSize(DataSize.of(1, GIGABYTE)) + .setTransactionLogMaxCachedFileSize(DataSize.of(1, MEGABYTE)) .setDomainCompactionThreshold(500) .setMaxOutstandingSplits(200) .setMaxSplitsPerSecond(10) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 7460919d0bf..38e348644f4 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -137,8 +137,9 @@ public void testCreateOrReplaceTable() assertFileSystemAccesses("CREATE OR REPLACE TABLE test_create_or_replace (id VARCHAR, age INT)", ImmutableMultiset.builder() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "OutputFile.createOrOverwrite")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists")) .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.exists")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) @@ -167,8 +168,9 @@ public void testCreateOrReplaceTableAsSelect() .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "OutputFile.createOrOverwrite")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "OutputFile.createOrOverwrite")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(DATA, "no partition", "OutputFile.create")) @@ -196,7 +198,10 @@ public void testReadUnpartitionedTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2) .build()); @@ -209,7 +214,10 @@ public void testReadUnpartitionedTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2) .build()); @@ -233,7 +241,7 @@ public void testReadTableCheckpointInterval() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", "InputFile.length"), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", "InputFile.newInput"), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2) .build()); @@ -256,7 +264,7 @@ public void testReadPartitionTableWithCheckpointFiltering() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", "InputFile.length"), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", "InputFile.newInput"), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .addCopies(new FileOperation(DATA, "key=p1/", "InputFile.newInput"), 2) .addCopies(new FileOperation(DATA, "key=p2/", "InputFile.newInput"), 2) .build()); @@ -283,7 +291,10 @@ public void testReadWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .addCopies(new FileOperation(DATA, "key=p1/", "InputFile.newInput"), 2) .addCopies(new FileOperation(DATA, "key=p2/", "InputFile.newInput"), 2) .build()); @@ -297,7 +308,10 @@ public void testReadWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .addCopies(new FileOperation(DATA, "key=p1/", "InputFile.newInput"), 2) .addCopies(new FileOperation(DATA, "key=p2/", "InputFile.newInput"), 2) @@ -312,7 +326,10 @@ public void testReadWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .build()); @@ -325,7 +342,10 @@ public void testReadWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .build()); // Read partition and synthetic columns @@ -337,7 +357,10 @@ public void testReadWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .build()); @@ -350,7 +373,10 @@ public void testReadWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .build()); assertUpdate("DROP TABLE test_read_part_key"); @@ -383,7 +409,9 @@ public void testReadWholePartitionSplittableFile() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .build()); @@ -396,7 +424,9 @@ public void testReadWholePartitionSplittableFile() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) .build()); assertUpdate("DROP TABLE test_read_whole_splittable_file"); @@ -417,7 +447,8 @@ public void testSelfJoin() ImmutableMultiset.builder() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2) .build()); @@ -438,6 +469,7 @@ public void testSelectFromVersionedTable() ImmutableMultiset.builder() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists")) .build()); assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF 1", @@ -445,6 +477,8 @@ public void testSelectFromVersionedTable() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists")) .add(new FileOperation(DATA, "no partition", "InputFile.newInput")) .build()); @@ -454,6 +488,9 @@ public void testSelectFromVersionedTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.exists")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2) .build()); @@ -469,6 +506,15 @@ public void testSelectFromVersionedTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000008.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000008.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000008.json", "InputFile.exists")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 8) .build()); @@ -480,6 +526,9 @@ public void testSelectFromVersionedTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000011.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000012.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000013.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000011.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000012.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000013.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000013.json", "InputFile.exists")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 13) .build()); @@ -499,6 +548,9 @@ public void testSelectFromVersionedTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000021.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000022.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000023.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000021.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000022.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000023.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000023.json", "InputFile.exists")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 23) .build()); @@ -527,7 +579,10 @@ public void testDeleteWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.exists")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "OutputFile.createOrOverwrite")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .build()); assertUpdate("DROP TABLE test_delete_part_key"); @@ -553,7 +608,10 @@ public void testDeleteWholeTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.exists")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "OutputFile.createOrOverwrite")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .build()); assertUpdate("DROP TABLE test_delete_whole_table"); @@ -578,7 +636,11 @@ public void testDeleteWithNonPartitionFilter() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.exists")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "OutputFile.createOrOverwrite")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.length")) .addCopies(new FileOperation(DATA, "key=domain1/", "InputFile.newInput"), 2) .add(new FileOperation(DATA, "key=domain1/", "InputFile.length")) .add(new FileOperation(DATA, "key=domain1/", "OutputFile.create")) @@ -603,7 +665,12 @@ public void testHistorySystemTable() .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream"), 2) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.newStream")) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.length"), 2) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.length")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .build()); @@ -614,7 +681,12 @@ public void testHistorySystemTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), 2) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"), 2) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.length")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .build()); @@ -625,7 +697,12 @@ public void testHistorySystemTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream"), 2) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.length"), 2) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.length")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .build()); @@ -636,7 +713,12 @@ public void testHistorySystemTable() .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream"), 2) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.length"), 2) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.length")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .build()); @@ -647,7 +729,12 @@ public void testHistorySystemTable() .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), 2) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"), 2) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.length")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .build()); @@ -658,7 +745,12 @@ public void testHistorySystemTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.length")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .build()); } @@ -688,7 +780,14 @@ public void testTableChangesFileSystemAccess() .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream"), 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.newStream"), 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", "InputFile.newStream"), 2) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", "InputFile.newStream")) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.length"), 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", "InputFile.length"), 2) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", "InputFile.length")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(CDF_DATA, "key=domain1/", "InputFile.newInput")) .addCopies(new FileOperation(CDF_DATA, "key=domain2/", "InputFile.newInput"), cdfFilesForDomain2) @@ -738,9 +837,12 @@ private void testInformationSchemaColumns(boolean removeCachedProperties) ImmutableMultiset.builder() .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), tables * 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), tables * 2) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), tables) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"), tables) .build() : ImmutableMultiset.builder() .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables) @@ -774,7 +876,10 @@ private void testInformationSchemaColumns(boolean removeCachedProperties) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .build()); // Pointed lookup with LIKE predicate (as if unintentional) @@ -793,7 +898,10 @@ private void testInformationSchemaColumns(boolean removeCachedProperties) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .build()); for (int i = 0; i < tables; i++) { @@ -842,9 +950,12 @@ private void testSystemMetadataTableComments(boolean removeCachedProperties) ImmutableMultiset.builder() .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), tables * 2) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), tables * 2) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"), tables) .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"), tables) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"), tables * 2) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"), tables) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"), tables) .build() : ImmutableMultiset.builder() .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"), tables) @@ -887,7 +998,10 @@ private void testSystemMetadataTableComments(boolean removeCachedProperties) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .build()); // Pointed lookup with LIKE predicate (as if unintentional) @@ -929,7 +1043,8 @@ public void testReadMultipartCheckpoint() .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", "InputFile.length"), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", "InputFile.newInput"), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000008.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000008.json", "InputFile.length")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 7) .build()); } @@ -947,7 +1062,7 @@ public void testV2CheckpointJson() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.73a4ddb8-2bfc-40d8-b09f-1b6a0abdfb04.json", "InputFile.length")) .add(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.73a4ddb8-2bfc-40d8-b09f-1b6a0abdfb04.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) // TransactionLogTail.getEntriesFromJson access non-existing file as end of tail + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) // TransactionLogTail.getEntriesFromJson access non-existing file as end of tail .build()); assertFileSystemAccesses("SELECT * FROM " + tableName, @@ -957,7 +1072,7 @@ public void testV2CheckpointJson() .addCopies(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.73a4ddb8-2bfc-40d8-b09f-1b6a0abdfb04.json", "InputFile.newStream"), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .add(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.0000000001.0000000001.90cf4e21-dbaa-41d6-8ae5-6709cfbfbfe0.parquet", "InputFile.length")) .add(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.0000000001.0000000001.90cf4e21-dbaa-41d6-8ae5-6709cfbfbfe0.parquet", "InputFile.newInput")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) // TransactionLogTail.getEntriesFromJson access non-existing file as end of tail + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) // TransactionLogTail.getEntriesFromJson access non-existing file as end of tail .add(new FileOperation(DATA, "no partition", "InputFile.newInput")) .build()); @@ -977,7 +1092,7 @@ public void testV2CheckpointParquet() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .addCopies(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.156b3304-76b2-49c3-a9a1-626f07df27c9.parquet", "InputFile.length"), 2) .add(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.156b3304-76b2-49c3-a9a1-626f07df27c9.parquet", "InputFile.newInput")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) // TransactionLogTail.getEntriesFromJson access non-existing file as end of tail + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) // TransactionLogTail.getEntriesFromJson access non-existing file as end of tail .build()); assertFileSystemAccesses("SELECT * FROM " + tableName, @@ -987,7 +1102,7 @@ public void testV2CheckpointParquet() .addCopies(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.156b3304-76b2-49c3-a9a1-626f07df27c9.parquet", "InputFile.newInput"), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .add(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.0000000001.0000000001.03288d7e-af16-44ed-829c-196064a71812.parquet", "InputFile.length")) .add(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.0000000001.0000000001.03288d7e-af16-44ed-829c-196064a71812.parquet", "InputFile.newInput")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) // TransactionLogTail.getEntriesFromJson access non-existing file as end of tail + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) // TransactionLogTail.getEntriesFromJson access non-existing file as end of tail .add(new FileOperation(DATA, "no partition", "InputFile.newInput")) .build()); @@ -1006,7 +1121,10 @@ public void testDeletionVectors() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(DELETION_VECTOR, "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin", "InputFile.newInput")) .add(new FileOperation(DATA, "no partition", "InputFile.newInput")) @@ -1017,7 +1135,10 @@ public void testDeletionVectors() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) - .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) .add(new FileOperation(STARBURST_EXTENDED_STATS_JSON, "extendeded_stats.json", "InputFile.newStream")) .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 3593fc783ef..cba08151ff3 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -27,10 +27,10 @@ import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; -import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; +import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; @@ -103,14 +103,6 @@ public class TestTransactionLogAccess "age=28/part-00000-40dd1707-1d42-4328-a59a-21f5c945fe60.c000.snappy.parquet", "age=29/part-00000-3794c463-cb0c-4beb-8d07-7cc1e3b5920f.c000.snappy.parquet"); - private static final Set EXPECTED_REMOVE_ENTRIES = ImmutableSet.of( - new RemoveFileEntry("age=30/part-00000-7e43a3c3-ea26-4ae7-8eac-8f60cbb4df03.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()), - new RemoveFileEntry("age=30/part-00000-72a56c23-01ba-483a-9062-dd0accc86599.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()), - new RemoveFileEntry("age=42/part-00000-951068bd-bcf4-4094-bb94-536f3c41d31f.c000.snappy.parquet", null, 1579190155406L, false, Optional.empty()), - new RemoveFileEntry("age=25/part-00000-609e34b1-5466-4dbc-a780-2708166e7adb.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()), - new RemoveFileEntry("age=42/part-00000-6aed618a-2beb-4edd-8466-653e67a9b380.c000.snappy.parquet", null, 1579190155406L, false, Optional.empty()), - new RemoveFileEntry("age=42/part-00000-b82d8859-84a0-4f05-872c-206b07dd54f0.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty())); - private final TestingTelemetry testingTelemetry = TestingTelemetry.create("transaction-log-access"); private final TracingFileSystemFactory tracingFileSystemFactory = new TracingFileSystemFactory(testingTelemetry.getTracer(), new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)); @@ -322,18 +314,6 @@ public void testAddRemoveAdd() } } - @Test - public void testGetRemoveEntries() - throws Exception - { - setupTransactionLogAccessFromResources("person", "databricks73/person"); - - try (Stream removeEntries = transactionLogAccess.getRemoveEntries(SESSION, tableSnapshot)) { - Set removedEntries = removeEntries.collect(Collectors.toSet()); - assertThat(removedEntries).isEqualTo(EXPECTED_REMOVE_ENTRIES); - } - } - @Test public void testAllGetMetadataEntry() throws Exception @@ -359,6 +339,35 @@ private void testAllGetMetadataEntry(String tableName, String resourcePath) assertThat(format.provider()).isEqualTo("parquet"); } + @Test + void testGetMetadataAndProtocolEntry() + throws Exception + { + testGetMetadataAndProtocolEntry("person", "databricks73/person"); + testGetMetadataAndProtocolEntry("person_without_last_checkpoint", "databricks73/person_without_last_checkpoint"); + testGetMetadataAndProtocolEntry("person_without_old_jsons", "databricks73/person_without_old_jsons"); + testGetMetadataAndProtocolEntry("person_without_checkpoints", "databricks73/person_without_checkpoints"); + } + + private void testGetMetadataAndProtocolEntry(String tableName, String resourcePath) + throws Exception + { + setupTransactionLogAccessFromResources(tableName, resourcePath); + + transactionLogAccess.getMetadataAndProtocolEntry(SESSION, tableSnapshot); + MetadataAndProtocolEntries logEntries = transactionLogAccess.getMetadataAndProtocolEntry(SESSION, tableSnapshot); + + MetadataEntry metadataEntry = logEntries.metadata().orElseThrow(); + assertThat(metadataEntry.getOriginalPartitionColumns()).containsOnly("age"); + MetadataEntry.Format format = metadataEntry.getFormat(); + assertThat(format.options().keySet()).isEmpty(); + assertThat(format.provider()).isEqualTo("parquet"); + + ProtocolEntry protocolEntry = logEntries.protocol().orElseThrow(); + assertThat(protocolEntry.minReaderVersion()).isEqualTo(1); + assertThat(protocolEntry.minWriterVersion()).isEqualTo(2); + } + @Test public void testAllGetActiveAddEntries() throws Exception @@ -384,29 +393,6 @@ private void testAllGetActiveAddEntries(String tableName, String resourcePath) } } - @Test - public void testAllGetRemoveEntries() - throws Exception - { - testAllGetRemoveEntries("person", "databricks73/person"); - testAllGetRemoveEntries("person_without_last_checkpoint", "databricks73/person_without_last_checkpoint"); - testAllGetRemoveEntries("person_without_old_jsons", "databricks73/person_without_old_jsons"); - testAllGetRemoveEntries("person_without_checkpoints", "databricks73/person_without_checkpoints"); - } - - private void testAllGetRemoveEntries(String tableName, String resourcePath) - throws Exception - { - setupTransactionLogAccessFromResources(tableName, resourcePath); - - try (Stream removeEntries = transactionLogAccess.getRemoveEntries(SESSION, tableSnapshot)) { - Set removedPaths = removeEntries.map(RemoveFileEntry::path).collect(Collectors.toSet()); - Set expectedPaths = EXPECTED_REMOVE_ENTRIES.stream().map(RemoveFileEntry::path).collect(Collectors.toSet()); - - assertThat(removedPaths).isEqualTo(expectedPaths); - } - } - @Test public void testAllGetProtocolEntries() throws Exception @@ -597,7 +583,8 @@ public void testIncrementalCacheUpdates() .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.newInput")) .add(new FileOperation("00000000000000000011.json", "InputFile.newStream")) - .add(new FileOperation("00000000000000000012.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.length")) + .add(new FileOperation("00000000000000000012.json", "InputFile.length")) .build()); copyTransactionLogEntry(12, 14, resourceDir, transactionLogDir); @@ -615,7 +602,9 @@ public void testIncrementalCacheUpdates() .add(new FileOperation("_last_checkpoint", "InputFile.newStream")) .addCopies(new FileOperation("00000000000000000012.json", "InputFile.newStream"), 2) .addCopies(new FileOperation("00000000000000000013.json", "InputFile.newStream"), 2) - .add(new FileOperation("00000000000000000014.json", "InputFile.newStream")) + .addCopies(new FileOperation("00000000000000000012.json", "InputFile.length"), 2) + .addCopies(new FileOperation("00000000000000000013.json", "InputFile.length"), 2) + .add(new FileOperation("00000000000000000014.json", "InputFile.length")) .build()); } @@ -777,7 +766,10 @@ public void testTableSnapshotsCacheDisabled() .add(new FileOperation("00000000000000000011.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000012.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000013.json", "InputFile.newStream")) - .add(new FileOperation("00000000000000000014.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.length")) + .add(new FileOperation("00000000000000000012.json", "InputFile.length")) + .add(new FileOperation("00000000000000000013.json", "InputFile.length")) + .add(new FileOperation("00000000000000000014.json", "InputFile.length")) .build()); // With the transaction log cache disabled, when loading the snapshot again, all the needed files will be opened again @@ -790,7 +782,10 @@ public void testTableSnapshotsCacheDisabled() .add(new FileOperation("00000000000000000011.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000012.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000013.json", "InputFile.newStream")) - .add(new FileOperation("00000000000000000014.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.length")) + .add(new FileOperation("00000000000000000012.json", "InputFile.length")) + .add(new FileOperation("00000000000000000013.json", "InputFile.length")) + .add(new FileOperation("00000000000000000014.json", "InputFile.length")) .build()); } @@ -826,7 +821,10 @@ public void testTableSnapshotsActiveDataFilesCache() .add(new FileOperation("00000000000000000011.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000012.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000013.json", "InputFile.newStream")) - .add(new FileOperation("00000000000000000014.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.length")) + .add(new FileOperation("00000000000000000012.json", "InputFile.length")) + .add(new FileOperation("00000000000000000013.json", "InputFile.length")) + .add(new FileOperation("00000000000000000014.json", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.newInput")) .build()); @@ -866,7 +864,10 @@ public void testFlushSnapshotAndActiveFileCache() .add(new FileOperation("00000000000000000011.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000012.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000013.json", "InputFile.newStream")) - .add(new FileOperation("00000000000000000014.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.length")) + .add(new FileOperation("00000000000000000012.json", "InputFile.length")) + .add(new FileOperation("00000000000000000013.json", "InputFile.length")) + .add(new FileOperation("00000000000000000014.json", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.newInput")) .build()); @@ -885,7 +886,10 @@ public void testFlushSnapshotAndActiveFileCache() .add(new FileOperation("00000000000000000011.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000012.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000013.json", "InputFile.newStream")) - .add(new FileOperation("00000000000000000014.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.length")) + .add(new FileOperation("00000000000000000012.json", "InputFile.length")) + .add(new FileOperation("00000000000000000013.json", "InputFile.length")) + .add(new FileOperation("00000000000000000014.json", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.newInput")) .build()); @@ -916,7 +920,10 @@ public void testTableSnapshotsActiveDataFilesCacheDisabled() .add(new FileOperation("00000000000000000011.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000012.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000013.json", "InputFile.newStream")) - .add(new FileOperation("00000000000000000014.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.length")) + .add(new FileOperation("00000000000000000012.json", "InputFile.length")) + .add(new FileOperation("00000000000000000013.json", "InputFile.length")) + .add(new FileOperation("00000000000000000014.json", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.newInput")) .build()); @@ -968,6 +975,16 @@ public void testLoadSnapshotWithEndVersion() .add(new FileOperation("00000000000000000007.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000008.json", "InputFile.newStream")) .add(new FileOperation("00000000000000000009.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000000.json", "InputFile.length")) + .add(new FileOperation("00000000000000000001.json", "InputFile.length")) + .add(new FileOperation("00000000000000000002.json", "InputFile.length")) + .add(new FileOperation("00000000000000000003.json", "InputFile.length")) + .add(new FileOperation("00000000000000000004.json", "InputFile.length")) + .add(new FileOperation("00000000000000000005.json", "InputFile.length")) + .add(new FileOperation("00000000000000000006.json", "InputFile.length")) + .add(new FileOperation("00000000000000000007.json", "InputFile.length")) + .add(new FileOperation("00000000000000000008.json", "InputFile.length")) + .add(new FileOperation("00000000000000000009.json", "InputFile.length")) .build()); setupTransactionLogAccess("person", getClass().getClassLoader().getResource("databricks73/person").toString(), new DeltaLakeConfig(), Optional.of(10L)); @@ -999,6 +1016,7 @@ public void testLoadSnapshotWithEndVersion() .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length")) .add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.newInput")) .add(new FileOperation("00000000000000000011.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.length")) .build()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index 130c788225d..0960f74ffd2 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -46,6 +46,7 @@ import static com.google.common.base.Predicates.alwaysTrue; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION; +import static io.trino.plugin.deltalake.DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE; import static io.trino.plugin.deltalake.transactionlog.TableSnapshot.MetadataAndProtocolEntry; import static io.trino.plugin.deltalake.transactionlog.TableSnapshot.load; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint; @@ -98,19 +99,23 @@ public void testOnlyReadsTrailingJsonFiles() parquetReaderOptions, true, domainCompactionThreshold, + DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE, Optional.empty())); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("_last_checkpoint", "InputFile.newStream"), 1) - .addCopies(new FileOperation("00000000000000000011.json", "InputFile.newStream"), 1) - .addCopies(new FileOperation("00000000000000000012.json", "InputFile.newStream"), 1) - .addCopies(new FileOperation("00000000000000000013.json", "InputFile.newStream"), 1) - .addCopies(new FileOperation("00000000000000000014.json", "InputFile.newStream"), 1) + .add(new FileOperation("_last_checkpoint", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000012.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000013.json", "InputFile.newStream")) + .add(new FileOperation("00000000000000000011.json", "InputFile.length")) + .add(new FileOperation("00000000000000000012.json", "InputFile.length")) + .add(new FileOperation("00000000000000000013.json", "InputFile.length")) + .add(new FileOperation("00000000000000000014.json", "InputFile.length")) .build()); assertFileSystemAccesses( () -> { - tableSnapshot.get().getJsonTransactionLogEntries().forEach(entry -> {}); + tableSnapshot.get().getJsonTransactionLogEntries(trackingFileSystem).forEach(entry -> {}); }, ImmutableMultiset.of()); } @@ -129,6 +134,7 @@ public void readsCheckpointFile() parquetReaderOptions, true, domainCompactionThreshold, + DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE, Optional.empty()); TestingConnectorContext context = new TestingConnectorContext(); TypeManager typeManager = context.getTypeManager(); @@ -257,6 +263,7 @@ public void testMaxTransactionId() parquetReaderOptions, true, domainCompactionThreshold, + DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE, Optional.empty()); assertThat(tableSnapshot.getVersion()).isEqualTo(13L); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestTransactionLogTail.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestTransactionLogTail.java index b42a58cb04a..d4610bd756f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestTransactionLogTail.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestTransactionLogTail.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Optional; +import static io.trino.plugin.deltalake.DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE; import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; @@ -49,17 +50,17 @@ private List updateJsonTransactionLogTails(String throws Exception { TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION); - TransactionLogTail transactionLogTail = TransactionLogTail.loadNewTail(fileSystem, tableLocation, Optional.of(10L), Optional.of(12L)); - Optional updatedLogTail = transactionLogTail.getUpdatedTail(fileSystem, tableLocation, Optional.empty()); + TransactionLogTail transactionLogTail = TransactionLogTail.loadNewTail(fileSystem, tableLocation, Optional.of(10L), Optional.of(12L), DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE); + Optional updatedLogTail = transactionLogTail.getUpdatedTail(fileSystem, tableLocation, Optional.empty(), DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE); assertThat(updatedLogTail).isPresent(); - return updatedLogTail.get().getFileEntries(); + return updatedLogTail.get().getFileEntries(fileSystem); } private List readJsonTransactionLogTails(String tableLocation) throws Exception { TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION); - TransactionLogTail transactionLogTail = TransactionLogTail.loadNewTail(fileSystem, tableLocation, Optional.of(10L), Optional.empty()); - return transactionLogTail.getFileEntries(); + TransactionLogTail transactionLogTail = TransactionLogTail.loadNewTail(fileSystem, tableLocation, Optional.of(10L), Optional.empty(), DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE); + return transactionLogTail.getFileEntries(fileSystem); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/BaseTestDeltaLakeMinioReads.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/BaseTestDeltaLakeMinioReads.java index 351ea5a17a1..88df1a82af3 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/BaseTestDeltaLakeMinioReads.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/BaseTestDeltaLakeMinioReads.java @@ -79,7 +79,7 @@ public void testReadRegionTable() format("SELECT count(name) FROM delta.default.\"%s\"", tableName))) .containsOnly(row(5L)); - assertNotificationsCount(NOTIFICATIONS_TABLE, OBJECT_ACCESSED_HEAD, tableName + "/_delta_log/00000000000000000000.json", 0); + assertNotificationsCount(NOTIFICATIONS_TABLE, OBJECT_ACCESSED_HEAD, tableName + "/_delta_log/00000000000000000000.json", 1); assertNotificationsCount(NOTIFICATIONS_TABLE, OBJECT_ACCESSED_GET, tableName + "/_delta_log/00000000000000000000.json", 1); onTrino().executeQuery(format("DROP TABLE delta.default.\"%s\"", tableName)); }