Skip to content

Commit 3df9028

Browse files
committed
Refactor TransactionLogTail#loadNewTail method
Refactor TransactionLogTail#loadNewTail to support loading transaction log to based on param `startVersion` and `endVersion` conditional. Previously, the method only supported forward traversal from head to tail, which posed a risk when early transaction logs were missing -- maybe due to cleanup. This refactor enables both forward and backward traversal, increasing robustness and flexibility in tail loading logic. Additionally, remove `BaseTransactionsTable#loadNewTailBackward` and use the enhanced loadNewTail method.
1 parent 5248e2a commit 3df9028

File tree

2 files changed

+100
-38
lines changed

2 files changed

+100
-38
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,13 @@
1313
*/
1414
package io.trino.plugin.deltalake;
1515

16-
import com.google.common.collect.ImmutableList;
1716
import io.airlift.units.DataSize;
1817
import io.trino.filesystem.TrinoFileSystem;
1918
import io.trino.filesystem.TrinoFileSystemFactory;
2019
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
2120
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
2221
import io.trino.plugin.deltalake.transactionlog.Transaction;
2322
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
24-
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
2523
import io.trino.plugin.deltalake.util.PageListBuilder;
2624
import io.trino.spi.Page;
2725
import io.trino.spi.TrinoException;
@@ -43,10 +41,10 @@
4341
import java.util.Optional;
4442
import java.util.stream.IntStream;
4543

44+
import static com.google.common.base.Preconditions.checkArgument;
4645
import static com.google.common.collect.MoreCollectors.onlyElement;
4746
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
48-
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
49-
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
47+
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail;
5048
import static java.util.Objects.requireNonNull;
5149

5250
public abstract class BaseTransactionsTable
@@ -140,7 +138,8 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
140138
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
141139
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
142140
try {
143-
List<Transaction> transactions = loadNewTailBackward(fileSystem, table.location(), startVersionExclusive, endVersionInclusive.get()).reversed();
141+
checkArgument(endVersionInclusive.isPresent(), "endVersionInclusive must be present");
142+
List<Transaction> transactions = loadNewTail(fileSystem, table.location(), startVersionExclusive, endVersionInclusive, DataSize.ofBytes(0)).getTransactions();
144143
return new FixedPageSource(buildPages(session, pagesBuilder, transactions, fileSystem));
145144
}
146145
catch (TrinoException e) {
@@ -151,39 +150,6 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
151150
}
152151
}
153152

154-
// Load a section of the Transaction Log JSON entries. Optionally from a given end version (inclusive) through an start version (exclusive)
155-
private static List<Transaction> loadNewTailBackward(
156-
TrinoFileSystem fileSystem,
157-
String tableLocation,
158-
Optional<Long> startVersion,
159-
long endVersion)
160-
throws IOException
161-
{
162-
ImmutableList.Builder<Transaction> transactionsBuilder = ImmutableList.builder();
163-
String transactionLogDir = getTransactionLogDir(tableLocation);
164-
165-
long version = endVersion;
166-
long entryNumber = version;
167-
boolean endOfHead = false;
168-
169-
while (!endOfHead) {
170-
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.of(0, DataSize.Unit.BYTE));
171-
if (results.isPresent()) {
172-
transactionsBuilder.add(new Transaction(version, results.get()));
173-
version = entryNumber;
174-
entryNumber--;
175-
}
176-
else {
177-
// When there is a gap in the transaction log version, indicate the end of the current head
178-
endOfHead = true;
179-
}
180-
if ((startVersion.isPresent() && version == startVersion.get() + 1) || entryNumber < 0) {
181-
endOfHead = true;
182-
}
183-
}
184-
return transactionsBuilder.build();
185-
}
186-
187153
protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
188154
throws IOException;
189155
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.trino.filesystem.TrinoFileSystem;
2020
import io.trino.filesystem.TrinoInputFile;
2121
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
22+
import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException;
2223
import io.trino.plugin.deltalake.transactionlog.Transaction;
2324
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
2425

@@ -31,6 +32,7 @@
3132
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
3233
import static io.airlift.slice.SizeOf.estimatedSizeOf;
3334
import static io.airlift.slice.SizeOf.instanceSize;
35+
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
3436
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
3537
import static java.util.Objects.requireNonNull;
3638

@@ -47,6 +49,31 @@ public TransactionLogTail(List<Transaction> entries, long version)
4749
this.version = version;
4850
}
4951

52+
// Load a section of the Transaction Log JSON entries. Optionally from a given start version (exclusive) through an end version (inclusive)
53+
public static TransactionLogTail loadNewTail(
54+
TrinoFileSystem fileSystem,
55+
String tableLocation,
56+
Optional<Long> startVersion,
57+
Optional<Long> endVersion,
58+
DataSize transactionLogMaxCachedFileSize)
59+
throws IOException
60+
{
61+
if (startVersion.isPresent() && endVersion.isPresent() && startVersion.get().equals(endVersion.get())) {
62+
// This is time travel to a specific checkpoint. No need to read transaction log files.
63+
return new TransactionLogTail(ImmutableList.of(), startVersion.get());
64+
}
65+
66+
if (endVersion.isPresent()) {
67+
return loadNewTailBackward(fileSystem, tableLocation, startVersion, endVersion.get(), transactionLogMaxCachedFileSize);
68+
}
69+
70+
if (startVersion.isPresent()) {
71+
return loadNewTail(fileSystem, tableLocation, startVersion.get(), startVersion.get() + 1, transactionLogMaxCachedFileSize);
72+
}
73+
74+
return loadNewTail(fileSystem, tableLocation, 0L, 0L, transactionLogMaxCachedFileSize);
75+
}
76+
5077
/**
5178
* @deprecated use {@link #getEntriesFromJson(long, TrinoInputFile, DataSize)}
5279
*/
@@ -95,4 +122,73 @@ public long getRetainedSizeInBytes()
95122
+ SIZE_OF_LONG
96123
+ estimatedSizeOf(entries, Transaction::getRetainedSizeInBytes);
97124
}
125+
126+
/**
127+
* Loads a section of the Transaction Log JSON entries starting from {@code startVersion} (inclusive) up to the latest version.
128+
*
129+
* the {@code version} is the latest table version, which is the last entry number in the transaction log we already know,
130+
* the {@code starVersion} is the first entry number we want to load, but it is not guaranteed to be the first entry in the transaction log.
131+
*/
132+
private static TransactionLogTail loadNewTail(
133+
TrinoFileSystem fileSystem,
134+
String tableLocation,
135+
long version,
136+
long startVersion,
137+
DataSize transactionLogMaxCachedFileSize)
138+
throws IOException
139+
{
140+
ImmutableList.Builder<Transaction> entriesBuilder = ImmutableList.builder();
141+
String transactionLogDir = getTransactionLogDir(tableLocation);
142+
143+
long entryNumber = startVersion;
144+
while (true) {
145+
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, transactionLogMaxCachedFileSize);
146+
if (results.isEmpty()) {
147+
break;
148+
}
149+
150+
entriesBuilder.add(new Transaction(entryNumber, results.get()));
151+
version = entryNumber;
152+
entryNumber++;
153+
}
154+
155+
return new TransactionLogTail(entriesBuilder.build(), version);
156+
}
157+
158+
// Load a section of the Transaction Log JSON entries. Optionally from a given end version (inclusive) through a start version (exclusive)
159+
private static TransactionLogTail loadNewTailBackward(
160+
TrinoFileSystem fileSystem,
161+
String tableLocation,
162+
Optional<Long> startVersion,
163+
long endVersion,
164+
DataSize transactionLogMaxCachedFileSize)
165+
throws IOException
166+
{
167+
ImmutableList.Builder<Transaction> transactionsBuilder = ImmutableList.builder();
168+
String transactionLogDir = getTransactionLogDir(tableLocation);
169+
170+
long version = endVersion;
171+
long entryNumber = version;
172+
boolean endOfHead = false;
173+
174+
while (!endOfHead) {
175+
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, transactionLogMaxCachedFileSize);
176+
if (results.isPresent()) {
177+
transactionsBuilder.add(new Transaction(entryNumber, results.get()));
178+
version = entryNumber;
179+
entryNumber--;
180+
}
181+
else {
182+
// When there is a gap in the transaction log version, indicate the end of the current head
183+
endOfHead = true;
184+
if (startVersion.isPresent() && entryNumber > startVersion.get() + 1) {
185+
throw new MissingTransactionLogException(getTransactionLogJsonEntryPath(transactionLogDir, entryNumber).toString());
186+
}
187+
}
188+
if ((startVersion.isPresent() && version == startVersion.get() + 1) || entryNumber < 0) {
189+
endOfHead = true;
190+
}
191+
}
192+
return new TransactionLogTail(transactionsBuilder.build().reversed(), endVersion);
193+
}
98194
}

0 commit comments

Comments
 (0)