Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,6 @@
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-annotations</artifactId>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void createTable(ConnectorSession session, Table table, PrincipalPrivileg
throw new TrinoException(DELTA_LAKE_INVALID_TABLE, "Provided location did not contain a valid Delta Lake table: " + tableLocation);
}
}
catch (IOException e) {
catch (IOException | RuntimeException e) {
throw new TrinoException(DELTA_LAKE_INVALID_TABLE, "Failed to access table location: " + tableLocation, e);
}
delegate.createTable(table, principalPrivileges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.Weigher;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.jmx.CacheStatsMBean;
import io.airlift.log.Logger;
import io.trino.collect.cache.EvictableCacheBuilder;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator;
Expand All @@ -42,7 +43,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

Expand All @@ -59,11 +59,13 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
Expand Down Expand Up @@ -110,19 +112,15 @@ public TransactionLogAccess(
requireNonNull(deltalakeConfig, "deltalakeConfig is null");
this.checkpointRowStatisticsWritingEnabled = deltalakeConfig.isCheckpointRowStatisticsWritingEnabled();

tableSnapshots = buildUnsafeCache(CacheBuilder.newBuilder()
tableSnapshots = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(config.getMetadataCacheTtl().toMillis(), TimeUnit.MILLISECONDS)
.recordStats());
activeDataFileCache = buildUnsafeCache(CacheBuilder.newBuilder()
.recordStats()
.build();
activeDataFileCache = EvictableCacheBuilder.newBuilder()
.weigher((Weigher<String, DeltaLakeDataFileCacheEntry>) (key, value) -> Ints.saturatedCast(estimatedSizeOf(key) + value.getRetainedSizeInBytes()))
.maximumWeight(config.getDataFileCacheSize().toBytes())
.recordStats());
}

@SuppressModernizer // TODO the caches here are indeed unsafe and need to be fixed
private static <K, V> Cache<K, V> buildUnsafeCache(CacheBuilder<? super K, ? super V> cacheBuilder)
{
return cacheBuilder.build();
.recordStats()
.build();
}

@Managed
Expand All @@ -147,19 +145,25 @@ public TableSnapshot loadSnapshot(SchemaTableName table, Path tableLocation, Con
TableSnapshot snapshot;
FileSystem fileSystem = getFileSystem(tableLocation, table, session);
if (cachedSnapshot == null) {
snapshot = TableSnapshot.load(
table,
fileSystem,
tableLocation,
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled);
tableSnapshots.put(location, snapshot);
try {
snapshot = tableSnapshots.get(location, () ->
TableSnapshot.load(
table,
fileSystem,
tableLocation,
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled));
}
catch (UncheckedExecutionException | ExecutionException e) {
throwIfUnchecked(e.getCause());
throw new RuntimeException(e);
}
}
else {
Optional<TableSnapshot> updatedSnapshot = cachedSnapshot.getUpdatedSnapshot(fileSystem);
if (updatedSnapshot.isPresent()) {
snapshot = updatedSnapshot.get();
tableSnapshots.put(location, snapshot);
tableSnapshots.asMap().replace(location, cachedSnapshot, snapshot);
}
else {
snapshot = cachedSnapshot;
Expand Down Expand Up @@ -196,50 +200,48 @@ public Optional<MetadataEntry> getMetadataEntry(TableSnapshot tableSnapshot, Con

public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session)
{
String tableLocation = tableSnapshot.getTableLocation().toString();
DeltaLakeDataFileCacheEntry cachedTable = activeDataFileCache.getIfPresent(tableLocation);

if (cachedTable == null || cachedTable.getVersion() > tableSnapshot.getVersion()) {
try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-topic: I was trying to understand where the method getActiveFiles is being used and noticed that the tableSnapshot parameter is always produced by a previous call to TransactionLogAccess as well.

transactionLogAccess.getMetadataEntry depends also on a tableSnapshot previously obtained from TransactionLogAccess as well.

It would be worth considering exposing a method which retrieves both the active metadata & files:

   // pseudocode
   MetadataAndAddFileEntryList getActiveMetadataAndAddFileEntryList(ConnectorSession session)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense but maybe not in scope of this PR. Please create a separate ticket for that,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String tableLocation = tableSnapshot.getTableLocation().toString();
FileSystem fileSystem = getFileSystem(tableSnapshot, session);
try (Stream<AddFileEntry> entries = getEntries(
tableSnapshot,
ImmutableSet.of(ADD),
this::activeAddEntries,
session,
fileSystem,
hdfsEnvironment,
fileFormatDataSourceStats)) {
List<AddFileEntry> activeFiles = entries.collect(toImmutableList());
if (cachedTable == null) {
DeltaLakeDataFileCacheEntry cache = new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles);
activeDataFileCache.put(tableLocation, cache);
}
else {
log.warn("Query run with outdated Transaction Log Snapshot, retrieved stale table entries for table: %s and query %s",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restore

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restoring this is not that simple. I would need to check if data was loaded from the cache or file system. I would need to add atomic boolean etc (in similar fashion to what Marius suggested). Do you think it makes sense to add all this for a simple warn to be logged ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it belongs to

if (cachedTable.getVersion() > tableSnapshot.getVersion()) {
                return loadActiveFiles(tableSnapshot, session, fileSystem)

block? Please double check me on this

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, thanks

tableSnapshot.getTable(), session.getQueryId());
}
return activeFiles;
DeltaLakeDataFileCacheEntry cachedTable = activeDataFileCache.get(tableLocation, () -> {
List<AddFileEntry> activeFiles = loadActiveFiles(tableSnapshot, session, fileSystem);
return new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles);
});
if (cachedTable.getVersion() > tableSnapshot.getVersion()) {
log.warn("Query run with outdated Transaction Log Snapshot, retrieved stale table entries for table: %s and query %s", tableSnapshot.getTable(), session.getQueryId());
return loadActiveFiles(tableSnapshot, session, fileSystem);
}
}
else if (cachedTable.getVersion() < tableSnapshot.getVersion()) {
FileSystem fileSystem = getFileSystem(tableSnapshot, session);
try {
else if (cachedTable.getVersion() < tableSnapshot.getVersion()) {
List<DeltaLakeTransactionLogEntry> newEntries = getJsonEntries(
cachedTable.getVersion(),
tableSnapshot.getVersion(),
tableSnapshot,
fileSystem);

DeltaLakeDataFileCacheEntry updatedCacheEntry = cachedTable.withUpdatesApplied(newEntries, tableSnapshot.getVersion());

activeDataFileCache.put(tableLocation, updatedCacheEntry);
activeDataFileCache.asMap().replace(tableLocation, cachedTable, updatedCacheEntry);
cachedTable = updatedCacheEntry;
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed accessing transaction log for table: " + tableSnapshot.getTable(), e);
}
return cachedTable.getActiveFiles();
}
catch (IOException | ExecutionException | UncheckedExecutionException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed accessing transaction log for table: " + tableSnapshot.getTable(), e);
}
}

private List<AddFileEntry> loadActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session, FileSystem fileSystem)
{
try (Stream<AddFileEntry> entries = getEntries(
tableSnapshot,
ImmutableSet.of(ADD),
this::activeAddEntries,
session,
fileSystem,
hdfsEnvironment,
fileFormatDataSourceStats)) {
List<AddFileEntry> activeFiles = entries.collect(toImmutableList());
return activeFiles;
}
return cachedTable.getActiveFiles();
}

public static List<ColumnMetadata> columnsWithStats(MetadataEntry metadataEntry, TypeManager typeManager)
Expand Down