Skip to content

Commit 0301c34

Browse files
committed
Stream large transaction log jsons instead of storing in-memory
Operations fetching metadata and protocol entries can skip reading the rest of the json file after those entries are found
1 parent 4f48087 commit 0301c34

25 files changed

+751
-322
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@
1414
package io.trino.plugin.deltalake;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import io.airlift.units.DataSize;
1718
import io.trino.filesystem.TrinoFileSystem;
1819
import io.trino.filesystem.TrinoFileSystemFactory;
19-
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
2020
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
2121
import io.trino.plugin.deltalake.transactionlog.Transaction;
2222
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
23+
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
2324
import io.trino.plugin.deltalake.util.PageListBuilder;
2425
import io.trino.spi.Page;
2526
import io.trino.spi.TrinoException;
@@ -144,7 +145,7 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
144145
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
145146
try {
146147
List<Transaction> transactions = loadNewTailBackward(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive.get()).reversed();
147-
return new FixedPageSource(buildPages(session, pagesBuilder, transactions));
148+
return new FixedPageSource(buildPages(session, pagesBuilder, transactions, fileSystem));
148149
}
149150
catch (TrinoException e) {
150151
throw e;
@@ -170,7 +171,7 @@ private static List<Transaction> loadNewTailBackward(
170171
boolean endOfHead = false;
171172

172173
while (!endOfHead) {
173-
Optional<List<DeltaLakeTransactionLogEntry>> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem);
174+
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.of(0, DataSize.Unit.BYTE));
174175
if (results.isPresent()) {
175176
transactionsBuilder.add(new Transaction(version, results.get()));
176177
version = entryNumber;
@@ -187,5 +188,6 @@ private static List<Transaction> loadNewTailBackward(
187188
return transactionsBuilder.build();
188189
}
189190

190-
protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions);
191+
protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
192+
throws IOException;
191193
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCommitSummary.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import com.google.common.collect.ImmutableSet;
18+
import io.trino.filesystem.TrinoFileSystem;
1819
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
1920
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
2021
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
2122
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
23+
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
2224

25+
import java.util.Iterator;
2326
import java.util.List;
2427
import java.util.Map;
2528
import java.util.Optional;
2629
import java.util.Set;
30+
import java.util.stream.Stream;
2731

2832
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues;
2933
import static java.util.Objects.requireNonNull;
@@ -38,7 +42,7 @@ public class DeltaLakeCommitSummary
3842
private final Set<Map<String, Optional<String>>> addedFilesCanonicalPartitionValues;
3943
private final Optional<Boolean> isBlindAppend;
4044

41-
public DeltaLakeCommitSummary(long version, List<DeltaLakeTransactionLogEntry> transactionLogEntries)
45+
public DeltaLakeCommitSummary(long version, TransactionLogEntries transactionLogEntries, TrinoFileSystem fileSystem)
4246
{
4347
requireNonNull(transactionLogEntries, "transactionLogEntries is null");
4448
ImmutableList.Builder<MetadataEntry> metadataUpdatesBuilder = ImmutableList.builder();
@@ -48,26 +52,29 @@ public DeltaLakeCommitSummary(long version, List<DeltaLakeTransactionLogEntry> t
4852
ImmutableSet.Builder<Map<String, Optional<String>>> removedFilesCanonicalPartitionValuesBuilder = ImmutableSet.builder();
4953
boolean containsRemoveFileWithoutPartitionValues = false;
5054

51-
for (DeltaLakeTransactionLogEntry transactionLogEntry : transactionLogEntries) {
52-
if (transactionLogEntry.getMetaData() != null) {
53-
metadataUpdatesBuilder.add(transactionLogEntry.getMetaData());
54-
}
55-
else if (transactionLogEntry.getProtocol() != null) {
56-
optionalProtocol = Optional.of(transactionLogEntry.getProtocol());
57-
}
58-
else if (transactionLogEntry.getCommitInfo() != null) {
59-
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo());
60-
}
61-
else if (transactionLogEntry.getAdd() != null) {
62-
addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues());
63-
}
64-
else if (transactionLogEntry.getRemove() != null) {
65-
Map<String, String> partitionValues = transactionLogEntry.getRemove().partitionValues();
66-
if (partitionValues == null) {
67-
containsRemoveFileWithoutPartitionValues = true;
55+
try (Stream<DeltaLakeTransactionLogEntry> logEntryStream = transactionLogEntries.getEntries(fileSystem)) {
56+
for (Iterator<DeltaLakeTransactionLogEntry> it = logEntryStream.iterator(); it.hasNext(); ) {
57+
DeltaLakeTransactionLogEntry transactionLogEntry = it.next();
58+
if (transactionLogEntry.getMetaData() != null) {
59+
metadataUpdatesBuilder.add(transactionLogEntry.getMetaData());
60+
}
61+
else if (transactionLogEntry.getProtocol() != null) {
62+
optionalProtocol = Optional.of(transactionLogEntry.getProtocol());
63+
}
64+
else if (transactionLogEntry.getCommitInfo() != null) {
65+
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo());
66+
}
67+
else if (transactionLogEntry.getAdd() != null) {
68+
addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues());
6869
}
69-
else {
70-
removedFilesCanonicalPartitionValuesBuilder.add(canonicalizePartitionValues(partitionValues));
70+
else if (transactionLogEntry.getRemove() != null) {
71+
Map<String, String> partitionValues = transactionLogEntry.getRemove().partitionValues();
72+
if (partitionValues == null) {
73+
containsRemoveFileWithoutPartitionValues = true;
74+
}
75+
else {
76+
removedFilesCanonicalPartitionValuesBuilder.add(canonicalizePartitionValues(partitionValues));
77+
}
7178
}
7279
}
7380
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class DeltaLakeConfig
5050
{
5151
public static final String EXTENDED_STATISTICS_ENABLED = "delta.extended-statistics.enabled";
5252
public static final String VACUUM_MIN_RETENTION = "delta.vacuum.min-retention";
53+
public static final DataSize DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE = DataSize.of(16, MEGABYTE);
5354

5455
// Runtime.getRuntime().maxMemory() is not 100% stable and may return slightly different value over JVM lifetime. We use
5556
// constant so default configuration for cache size is stable.
@@ -60,6 +61,7 @@ public class DeltaLakeConfig
6061

6162
private Duration metadataCacheTtl = new Duration(30, TimeUnit.MINUTES);
6263
private DataSize metadataCacheMaxRetainedSize = DEFAULT_METADATA_CACHE_MAX_RETAINED_SIZE;
64+
private DataSize transactionLogMaxCachedFileSize = DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE;
6365
private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE;
6466
private Duration dataFileCacheTtl = new Duration(30, TimeUnit.MINUTES);
6567
private int domainCompactionThreshold = 1000;
@@ -121,6 +123,19 @@ public DeltaLakeConfig setMetadataCacheMaxRetainedSize(DataSize metadataCacheMax
121123
return this;
122124
}
123125

126+
public DataSize getTransactionLogMaxCachedFileSize()
127+
{
128+
return transactionLogMaxCachedFileSize;
129+
}
130+
131+
@Config("delta.transaction-log.max-cached-file-size")
132+
@ConfigDescription("Maximum size of delta transaction log file that will be cached in memory")
133+
public DeltaLakeConfig setTransactionLogMaxCachedFileSize(DataSize transactionLogMaxCachedFileSize)
134+
{
135+
this.transactionLogMaxCachedFileSize = transactionLogMaxCachedFileSize;
136+
return this;
137+
}
138+
124139
public DataSize getDataFileCacheSize()
125140
{
126141
return dataFileCacheSize;

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.plugin.deltalake;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import io.trino.filesystem.TrinoFileSystem;
1718
import io.trino.filesystem.TrinoFileSystemFactory;
1819
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
1920
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
@@ -30,6 +31,7 @@
3031

3132
import java.util.List;
3233
import java.util.Objects;
34+
import java.util.stream.Stream;
3335

3436
import static com.google.common.collect.ImmutableList.toImmutableList;
3537
import static io.trino.spi.type.BigintType.BIGINT;
@@ -74,13 +76,15 @@ public DeltaLakeHistoryTable(
7476
}
7577

7678
@Override
77-
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions)
79+
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
7880
{
79-
List<CommitInfoEntry> commitInfoEntries = transactions.stream()
80-
.flatMap(transaction -> transaction.transactionEntries().stream())
81+
List<CommitInfoEntry> commitInfoEntries;
82+
try (Stream<CommitInfoEntry> commitStream = transactions.stream()
83+
.flatMap(transaction -> transaction.transactionEntries().getEntries(fileSystem))
8184
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
82-
.filter(Objects::nonNull)
83-
.collect(toImmutableList());
85+
.filter(Objects::nonNull)) {
86+
commitInfoEntries = commitStream.collect(toImmutableList());
87+
}
8488

8589
TimeZoneKey timeZoneKey = session.getTimeZoneKey();
8690

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,14 @@
6969
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
7070
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
7171
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException;
72-
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
7372
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
7473
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
7574
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
7675
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
7776
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
77+
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
7878
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
79+
import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries;
7980
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
8081
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics;
8182
import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException;
@@ -197,6 +198,7 @@
197198
import static com.google.common.collect.Iterables.getOnlyElement;
198199
import static com.google.common.collect.Sets.difference;
199200
import static com.google.common.primitives.Ints.max;
201+
import static io.airlift.units.DataSize.Unit.BYTE;
200202
import static io.trino.filesystem.Locations.appendPath;
201203
import static io.trino.filesystem.Locations.areDirectoryLocationsEquivalent;
202204
import static io.trino.hive.formats.HiveClassNames.HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS;
@@ -290,8 +292,6 @@
290292
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
291293
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
292294
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
293-
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA;
294-
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
295295
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
296296
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
297297
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
@@ -630,27 +630,21 @@ public LocatedTableHandle getTableHandle(
630630
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
631631
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersion(fileSystem, tableLocation, version)));
632632

633-
Map<Class<?>, Object> logEntries;
633+
MetadataAndProtocolEntries logEntries;
634634
try {
635-
logEntries = transactionLogAccess.getTransactionLogEntries(
636-
session,
637-
tableSnapshot,
638-
ImmutableSet.of(METADATA, PROTOCOL),
639-
entryStream -> entryStream
640-
.filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null)
641-
.map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol())));
635+
logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, tableSnapshot);
642636
}
643637
catch (TrinoException e) {
644638
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
645639
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e);
646640
}
647641
throw e;
648642
}
649-
MetadataEntry metadataEntry = (MetadataEntry) logEntries.get(MetadataEntry.class);
643+
MetadataEntry metadataEntry = logEntries.metadata().orElse(null);
650644
if (metadataEntry == null) {
651645
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
652646
}
653-
ProtocolEntry protocolEntry = (ProtocolEntry) logEntries.get(ProtocolEntry.class);
647+
ProtocolEntry protocolEntry = logEntries.protocol().orElse(null);
654648
if (protocolEntry == null) {
655649
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable()));
656650
}
@@ -2267,16 +2261,16 @@ private void checkForConcurrentTransactionConflicts(
22672261
if (currentVersion > readVersionValue) {
22682262
String transactionLogDirectory = getTransactionLogDir(tableLocation);
22692263
for (long version = readVersionValue + 1; version <= currentVersion; version++) {
2270-
List<DeltaLakeTransactionLogEntry> transactionLogEntries;
2264+
TransactionLogEntries transactionLogEntries;
22712265
try {
22722266
long finalVersion = version;
2273-
transactionLogEntries = getEntriesFromJson(version, transactionLogDirectory, fileSystem)
2267+
transactionLogEntries = getEntriesFromJson(version, transactionLogDirectory, fileSystem, DataSize.of(0, BYTE))
22742268
.orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + finalVersion));
22752269
}
22762270
catch (IOException e) {
22772271
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Failed to access table metadata", e);
22782272
}
2279-
DeltaLakeCommitSummary commitSummary = new DeltaLakeCommitSummary(version, transactionLogEntries);
2273+
DeltaLakeCommitSummary commitSummary = new DeltaLakeCommitSummary(version, transactionLogEntries, fileSystem);
22802274
checkNoMetadataUpdates(commitSummary);
22812275
checkNoProtocolUpdates(commitSummary);
22822276

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionsTable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import io.airlift.json.JsonCodec;
18+
import io.trino.filesystem.TrinoFileSystem;
1819
import io.trino.filesystem.TrinoFileSystemFactory;
1920
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
2021
import io.trino.plugin.deltalake.transactionlog.Transaction;
@@ -62,12 +63,13 @@ public DeltaLakeTransactionsTable(
6263
}
6364

6465
@Override
65-
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions)
66+
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
6667
{
6768
for (Transaction transaction : transactions) {
6869
pagesBuilder.beginRow();
6970
pagesBuilder.appendBigint(transaction.transactionId());
70-
pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson(transaction.transactionEntries()));
71+
pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson(
72+
transaction.transactionEntries().getEntriesList(fileSystem)));
7173
pagesBuilder.endRow();
7274
}
7375
return pagesBuilder.build();

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import static com.google.common.collect.ImmutableList.toImmutableList;
4141
import static com.google.common.collect.Iterables.getOnlyElement;
42+
import static io.trino.plugin.deltalake.DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE;
4243
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA;
4344
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR;
4445
import static io.trino.plugin.deltalake.functions.tablechanges.TableChangesFileType.CDF_FILE;
@@ -74,11 +75,9 @@ private Stream<ConnectorSplit> prepareSplits(long currentVersion, long tableRead
7475
.boxed()
7576
.flatMap(version -> {
7677
try {
77-
List<DeltaLakeTransactionLogEntry> entries = getEntriesFromJson(version, transactionLogDir, fileSystem)
78-
.orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + version));
79-
if (entries.isEmpty()) {
80-
return ImmutableList.<ConnectorSplit>of().stream();
81-
}
78+
List<DeltaLakeTransactionLogEntry> entries = getEntriesFromJson(version, transactionLogDir, fileSystem, DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE)
79+
.orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + version))
80+
.getEntriesList(fileSystem);
8281
List<CommitInfoEntry> commitInfoEntries = entries.stream()
8382
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
8483
.filter(Objects::nonNull)

0 commit comments

Comments
 (0)