Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.Transaction;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand All @@ -43,10 +41,10 @@
import java.util.Optional;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail;
import static java.util.Objects.requireNonNull;

public abstract class BaseTransactionsTable
Expand Down Expand Up @@ -140,7 +138,8 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
try {
List<Transaction> transactions = loadNewTailBackward(fileSystem, table.location(), startVersionExclusive, endVersionInclusive.get()).reversed();
checkArgument(endVersionInclusive.isPresent(), "endVersionInclusive must be present");
List<Transaction> transactions = loadNewTail(fileSystem, table.location(), startVersionExclusive, endVersionInclusive, DataSize.ofBytes(0)).getTransactions();
return new FixedPageSource(buildPages(session, pagesBuilder, transactions, fileSystem));
}
catch (TrinoException e) {
Expand All @@ -151,39 +150,6 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
}
}

// Load a section of the Transaction Log JSON entries. Optionally from a given end version (inclusive) through an start version (exclusive)
private static List<Transaction> loadNewTailBackward(
TrinoFileSystem fileSystem,
String tableLocation,
Optional<Long> startVersion,
long endVersion)
throws IOException
{
ImmutableList.Builder<Transaction> transactionsBuilder = ImmutableList.builder();
String transactionLogDir = getTransactionLogDir(tableLocation);

long version = endVersion;
long entryNumber = version;
boolean endOfHead = false;

while (!endOfHead) {
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.of(0, DataSize.Unit.BYTE));
if (results.isPresent()) {
transactionsBuilder.add(new Transaction(version, results.get()));
version = entryNumber;
entryNumber--;
}
else {
// When there is a gap in the transaction log version, indicate the end of the current head
endOfHead = true;
}
if ((startVersion.isPresent() && version == startVersion.get() + 1) || entryNumber < 0) {
endOfHead = true;
}
}
return transactionsBuilder.build();
}

protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException;
import io.trino.plugin.deltalake.transactionlog.Transaction;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;

Expand All @@ -31,6 +32,7 @@
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static java.util.Objects.requireNonNull;

Expand All @@ -47,6 +49,31 @@ public TransactionLogTail(List<Transaction> entries, long version)
this.version = version;
}

// Load a section of the Transaction Log JSON entries. Optionally from a given start version (exclusive) through an end version (inclusive)
public static TransactionLogTail loadNewTail(
TrinoFileSystem fileSystem,
String tableLocation,
Optional<Long> startVersion,
Optional<Long> endVersion,
DataSize transactionLogMaxCachedFileSize)
throws IOException
{
if (startVersion.isPresent() && endVersion.isPresent() && startVersion.get().equals(endVersion.get())) {
// This is time travel to a specific checkpoint. No need to read transaction log files.
return new TransactionLogTail(ImmutableList.of(), startVersion.get());
}

if (endVersion.isPresent()) {
return loadNewTailBackward(fileSystem, tableLocation, startVersion, endVersion.get(), transactionLogMaxCachedFileSize);
}

if (startVersion.isPresent()) {
return loadNewTail(fileSystem, tableLocation, startVersion.get(), startVersion.get() + 1, transactionLogMaxCachedFileSize);
}

return loadNewTail(fileSystem, tableLocation, 0L, 0L, transactionLogMaxCachedFileSize);
}

/**
* @deprecated use {@link #getEntriesFromJson(long, TrinoInputFile, DataSize)}
*/
Expand Down Expand Up @@ -95,4 +122,73 @@ public long getRetainedSizeInBytes()
+ SIZE_OF_LONG
+ estimatedSizeOf(entries, Transaction::getRetainedSizeInBytes);
}

/**
* Loads a section of the Transaction Log JSON entries starting from {@code startVersion} (inclusive) up to the latest version.
*
* the {@code version} is the latest table version, which is the last entry number in the transaction log we already know,
* the {@code starVersion} is the first entry number we want to load, but it is not guaranteed to be the first entry in the transaction log.
*/
private static TransactionLogTail loadNewTail(
TrinoFileSystem fileSystem,
String tableLocation,
long version,
long startVersion,
DataSize transactionLogMaxCachedFileSize)
throws IOException
{
ImmutableList.Builder<Transaction> entriesBuilder = ImmutableList.builder();
String transactionLogDir = getTransactionLogDir(tableLocation);

long entryNumber = startVersion;
while (true) {
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, transactionLogMaxCachedFileSize);
if (results.isEmpty()) {
break;
}

entriesBuilder.add(new Transaction(entryNumber, results.get()));
version = entryNumber;
entryNumber++;
}

return new TransactionLogTail(entriesBuilder.build(), version);
}

// Load a section of the Transaction Log JSON entries. Optionally from a given end version (inclusive) through a start version (exclusive)
private static TransactionLogTail loadNewTailBackward(
TrinoFileSystem fileSystem,
String tableLocation,
Optional<Long> startVersion,
long endVersion,
DataSize transactionLogMaxCachedFileSize)
throws IOException
{
ImmutableList.Builder<Transaction> transactionsBuilder = ImmutableList.builder();
String transactionLogDir = getTransactionLogDir(tableLocation);

long version = endVersion;
long entryNumber = version;
boolean endOfHead = false;

while (!endOfHead) {
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, transactionLogMaxCachedFileSize);
if (results.isPresent()) {
transactionsBuilder.add(new Transaction(entryNumber, results.get()));
version = entryNumber;
entryNumber--;
}
else {
// When there is a gap in the transaction log version, indicate the end of the current head
endOfHead = true;
if (startVersion.isPresent() && entryNumber > startVersion.get() + 1) {
throw new MissingTransactionLogException(getTransactionLogJsonEntryPath(transactionLogDir, entryNumber).toString());
}
}
if ((startVersion.isPresent() && version == startVersion.get() + 1) || entryNumber < 0) {
endOfHead = true;
}
}
return new TransactionLogTail(transactionsBuilder.build().reversed(), endVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,14 @@
*/
package io.trino.plugin.deltalake.transactionlog.reader;

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException;
import io.trino.plugin.deltalake.transactionlog.Transaction;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.spi.connector.ConnectorSession;

import java.io.IOException;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static java.util.Objects.requireNonNull;

public class FileSystemTransactionLogReader
Expand All @@ -52,41 +43,6 @@ public TransactionLogTail loadNewTail(
DataSize transactionLogMaxCachedFileSize)
throws IOException
{
ImmutableList.Builder<Transaction> entriesBuilder = ImmutableList.builder();

if (startVersion.isPresent() && endVersion.isPresent() && startVersion.get().equals(endVersion.get())) {
// This is time travel to a specific checkpoint. No need to read transaction log files.
return new TransactionLogTail(entriesBuilder.build(), startVersion.get());
}

// TODO: check if we should use startVersion or endVersion, in the case that startVersion is not present this could returns empty entries which is not correct
long version = startVersion.orElse(0L);
long entryNumber = startVersion.map(start -> start + 1).orElse(0L);
checkArgument(endVersion.isEmpty() || entryNumber <= endVersion.get(), "Invalid start/end versions: %s, %s", startVersion, endVersion);

String transactionLogDir = getTransactionLogDir(tableLocation);
TrinoFileSystem fileSystem = fileSystemFactory.create(session);

boolean endOfTail = false;
while (!endOfTail) {
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, transactionLogMaxCachedFileSize);
if (results.isPresent()) {
entriesBuilder.add(new Transaction(entryNumber, results.get()));
version = entryNumber;
entryNumber++;
}
else {
if (endVersion.isPresent()) {
throw new MissingTransactionLogException(getTransactionLogJsonEntryPath(transactionLogDir, entryNumber).toString());
}
endOfTail = true;
}

if (endVersion.isPresent() && version == endVersion.get()) {
endOfTail = true;
}
}

return new TransactionLogTail(entriesBuilder.build(), version);
return TransactionLogTail.loadNewTail(fileSystemFactory.create(session), tableLocation, startVersion, endVersion, transactionLogMaxCachedFileSize);
}
}