Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.Transaction;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -144,7 +145,7 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
try {
List<Transaction> transactions = loadNewTailBackward(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive.get()).reversed();
return new FixedPageSource(buildPages(session, pagesBuilder, transactions));
return new FixedPageSource(buildPages(session, pagesBuilder, transactions, fileSystem));
}
catch (TrinoException e) {
throw e;
Expand All @@ -170,7 +171,7 @@ private static List<Transaction> loadNewTailBackward(
boolean endOfHead = false;

while (!endOfHead) {
Optional<List<DeltaLakeTransactionLogEntry>> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem);
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.of(0, DataSize.Unit.BYTE));
if (results.isPresent()) {
transactionsBuilder.add(new Transaction(version, results.get()));
version = entryNumber;
Expand All @@ -187,5 +188,6 @@ private static List<Transaction> loadNewTailBackward(
return transactionsBuilder.build();
}

protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions);
protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues;
import static java.util.Objects.requireNonNull;
Expand All @@ -38,7 +42,7 @@ public class DeltaLakeCommitSummary
private final Set<Map<String, Optional<String>>> addedFilesCanonicalPartitionValues;
private final Optional<Boolean> isBlindAppend;

public DeltaLakeCommitSummary(long version, List<DeltaLakeTransactionLogEntry> transactionLogEntries)
public DeltaLakeCommitSummary(long version, TransactionLogEntries transactionLogEntries, TrinoFileSystem fileSystem)
{
requireNonNull(transactionLogEntries, "transactionLogEntries is null");
ImmutableList.Builder<MetadataEntry> metadataUpdatesBuilder = ImmutableList.builder();
Expand All @@ -48,26 +52,29 @@ public DeltaLakeCommitSummary(long version, List<DeltaLakeTransactionLogEntry> t
ImmutableSet.Builder<Map<String, Optional<String>>> removedFilesCanonicalPartitionValuesBuilder = ImmutableSet.builder();
boolean containsRemoveFileWithoutPartitionValues = false;

for (DeltaLakeTransactionLogEntry transactionLogEntry : transactionLogEntries) {
if (transactionLogEntry.getMetaData() != null) {
metadataUpdatesBuilder.add(transactionLogEntry.getMetaData());
}
else if (transactionLogEntry.getProtocol() != null) {
optionalProtocol = Optional.of(transactionLogEntry.getProtocol());
}
else if (transactionLogEntry.getCommitInfo() != null) {
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo());
}
else if (transactionLogEntry.getAdd() != null) {
addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues());
}
else if (transactionLogEntry.getRemove() != null) {
Map<String, String> partitionValues = transactionLogEntry.getRemove().partitionValues();
if (partitionValues == null) {
containsRemoveFileWithoutPartitionValues = true;
try (Stream<DeltaLakeTransactionLogEntry> logEntryStream = transactionLogEntries.getEntries(fileSystem)) {
for (Iterator<DeltaLakeTransactionLogEntry> it = logEntryStream.iterator(); it.hasNext(); ) {
DeltaLakeTransactionLogEntry transactionLogEntry = it.next();
if (transactionLogEntry.getMetaData() != null) {
metadataUpdatesBuilder.add(transactionLogEntry.getMetaData());
}
else if (transactionLogEntry.getProtocol() != null) {
optionalProtocol = Optional.of(transactionLogEntry.getProtocol());
}
else if (transactionLogEntry.getCommitInfo() != null) {
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo());
}
else if (transactionLogEntry.getAdd() != null) {
addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues());
}
else {
removedFilesCanonicalPartitionValuesBuilder.add(canonicalizePartitionValues(partitionValues));
else if (transactionLogEntry.getRemove() != null) {
Map<String, String> partitionValues = transactionLogEntry.getRemove().partitionValues();
if (partitionValues == null) {
containsRemoveFileWithoutPartitionValues = true;
}
else {
removedFilesCanonicalPartitionValuesBuilder.add(canonicalizePartitionValues(partitionValues));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DeltaLakeConfig
{
public static final String EXTENDED_STATISTICS_ENABLED = "delta.extended-statistics.enabled";
public static final String VACUUM_MIN_RETENTION = "delta.vacuum.min-retention";
public static final DataSize DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE = DataSize.of(16, MEGABYTE);

// Runtime.getRuntime().maxMemory() is not 100% stable and may return slightly different value over JVM lifetime. We use
// constant so default configuration for cache size is stable.
Expand All @@ -60,6 +61,7 @@ public class DeltaLakeConfig

private Duration metadataCacheTtl = new Duration(30, TimeUnit.MINUTES);
private DataSize metadataCacheMaxRetainedSize = DEFAULT_METADATA_CACHE_MAX_RETAINED_SIZE;
private DataSize transactionLogMaxCachedFileSize = DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE;
private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE;
private Duration dataFileCacheTtl = new Duration(30, TimeUnit.MINUTES);
private int domainCompactionThreshold = 1000;
Expand Down Expand Up @@ -121,6 +123,19 @@ public DeltaLakeConfig setMetadataCacheMaxRetainedSize(DataSize metadataCacheMax
return this;
}

public DataSize getTransactionLogMaxCachedFileSize()
{
return transactionLogMaxCachedFileSize;
}

@Config("delta.transaction-log.max-cached-file-size")
@ConfigDescription("Maximum size of delta transaction log file that will be cached in memory")
public DeltaLakeConfig setTransactionLogMaxCachedFileSize(DataSize transactionLogMaxCachedFileSize)
{
this.transactionLogMaxCachedFileSize = transactionLogMaxCachedFileSize;
return this;
}

public DataSize getDataFileCacheSize()
{
return dataFileCacheSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
Expand All @@ -30,6 +31,7 @@

import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -74,13 +76,15 @@ public DeltaLakeHistoryTable(
}

@Override
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions)
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
{
List<CommitInfoEntry> commitInfoEntries = transactions.stream()
.flatMap(transaction -> transaction.transactionEntries().stream())
List<CommitInfoEntry> commitInfoEntries;
try (Stream<CommitInfoEntry> commitStream = transactions.stream()
.flatMap(transaction -> transaction.transactionEntries().getEntries(fileSystem))
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.collect(toImmutableList());
.filter(Objects::nonNull)) {
commitInfoEntries = commitStream.collect(toImmutableList());
}

TimeZoneKey timeZoneKey = session.getTimeZoneKey();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException;
Expand Down Expand Up @@ -197,6 +198,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Sets.difference;
import static com.google.common.primitives.Ints.max;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.trino.filesystem.Locations.appendPath;
import static io.trino.filesystem.Locations.areDirectoryLocationsEquivalent;
import static io.trino.hive.formats.HiveClassNames.HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS;
Expand Down Expand Up @@ -290,8 +292,6 @@
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
Expand Down Expand Up @@ -630,27 +630,21 @@ public LocatedTableHandle getTableHandle(
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersion(fileSystem, tableLocation, version)));

Map<Class<?>, Object> logEntries;
MetadataAndProtocolEntries logEntries;
try {
logEntries = transactionLogAccess.getTransactionLogEntries(
session,
tableSnapshot,
ImmutableSet.of(METADATA, PROTOCOL),
entryStream -> entryStream
.filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null)
.map(entry -> firstNonNull(entry.getMetaData(), entry.getProtocol())));
logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, tableSnapshot);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e);
}
throw e;
}
MetadataEntry metadataEntry = (MetadataEntry) logEntries.get(MetadataEntry.class);
MetadataEntry metadataEntry = logEntries.metadata().orElse(null);
if (metadataEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
}
ProtocolEntry protocolEntry = (ProtocolEntry) logEntries.get(ProtocolEntry.class);
ProtocolEntry protocolEntry = logEntries.protocol().orElse(null);
if (protocolEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable()));
}
Expand Down Expand Up @@ -2267,16 +2261,16 @@ private void checkForConcurrentTransactionConflicts(
if (currentVersion > readVersionValue) {
String transactionLogDirectory = getTransactionLogDir(tableLocation);
for (long version = readVersionValue + 1; version <= currentVersion; version++) {
List<DeltaLakeTransactionLogEntry> transactionLogEntries;
TransactionLogEntries transactionLogEntries;
try {
long finalVersion = version;
transactionLogEntries = getEntriesFromJson(version, transactionLogDirectory, fileSystem)
transactionLogEntries = getEntriesFromJson(version, transactionLogDirectory, fileSystem, DataSize.of(0, BYTE))
.orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + finalVersion));
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Failed to access table metadata", e);
}
DeltaLakeCommitSummary commitSummary = new DeltaLakeCommitSummary(version, transactionLogEntries);
DeltaLakeCommitSummary commitSummary = new DeltaLakeCommitSummary(version, transactionLogEntries, fileSystem);
checkNoMetadataUpdates(commitSummary);
checkNoProtocolUpdates(commitSummary);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.Transaction;
Expand Down Expand Up @@ -62,12 +63,13 @@ public DeltaLakeTransactionsTable(
}

@Override
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions)
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
{
for (Transaction transaction : transactions) {
pagesBuilder.beginRow();
pagesBuilder.appendBigint(transaction.transactionId());
pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson(transaction.transactionEntries()));
pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson(
transaction.transactionEntries().getEntriesList(fileSystem)));
pagesBuilder.endRow();
}
return pagesBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.deltalake.DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR;
import static io.trino.plugin.deltalake.functions.tablechanges.TableChangesFileType.CDF_FILE;
Expand Down Expand Up @@ -74,11 +75,9 @@ private Stream<ConnectorSplit> prepareSplits(long currentVersion, long tableRead
.boxed()
.flatMap(version -> {
try {
List<DeltaLakeTransactionLogEntry> entries = getEntriesFromJson(version, transactionLogDir, fileSystem)
.orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + version));
if (entries.isEmpty()) {
return ImmutableList.<ConnectorSplit>of().stream();
}
List<DeltaLakeTransactionLogEntry> entries = getEntriesFromJson(version, transactionLogDir, fileSystem, DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE)
.orElseThrow(() -> new TrinoException(DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + version))
.getEntriesList(fileSystem);
List<CommitInfoEntry> commitInfoEntries = entries.stream()
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
Expand Down
Loading
Loading