diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index 0d8b033f3498..ce11d1c5e3a1 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -159,11 +159,6 @@ failsafe - - org.gaul - modernizer-maven-annotations - - org.weakref jmxutils diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java index 383640fc491a..40bc76818073 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java @@ -150,7 +150,7 @@ public void createTable(ConnectorSession session, Table table, PrincipalPrivileg throw new TrinoException(DELTA_LAKE_INVALID_TABLE, "Provided location did not contain a valid Delta Lake table: " + tableLocation); } } - catch (IOException e) { + catch (IOException | RuntimeException e) { throw new TrinoException(DELTA_LAKE_INVALID_TABLE, "Failed to access table location: " + tableLocation, e); } delegate.createTable(table, principalPrivileges); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 9904ee3740bd..16e11422179c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -14,13 +14,14 @@ package io.trino.plugin.deltalake.transactionlog; import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.cache.Weigher; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.jmx.CacheStatsMBean; import io.airlift.log.Logger; +import io.trino.collect.cache.EvictableCacheBuilder; import io.trino.parquet.ParquetReaderOptions; import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator; @@ -42,7 +43,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.gaul.modernizer_maven_annotations.SuppressModernizer; import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; @@ -59,11 +59,13 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.airlift.slice.SizeOf.estimatedSizeOf; @@ -110,19 +112,15 @@ public TransactionLogAccess( requireNonNull(deltalakeConfig, "deltalakeConfig is null"); this.checkpointRowStatisticsWritingEnabled = deltalakeConfig.isCheckpointRowStatisticsWritingEnabled(); - tableSnapshots = buildUnsafeCache(CacheBuilder.newBuilder() + tableSnapshots = EvictableCacheBuilder.newBuilder() .expireAfterWrite(config.getMetadataCacheTtl().toMillis(), TimeUnit.MILLISECONDS) - .recordStats()); - activeDataFileCache = buildUnsafeCache(CacheBuilder.newBuilder() + .recordStats() + .build(); + activeDataFileCache = EvictableCacheBuilder.newBuilder() .weigher((Weigher) (key, value) -> Ints.saturatedCast(estimatedSizeOf(key) + value.getRetainedSizeInBytes())) .maximumWeight(config.getDataFileCacheSize().toBytes()) - .recordStats()); - } - - @SuppressModernizer // TODO the caches here are indeed unsafe and need to be fixed - private static Cache buildUnsafeCache(CacheBuilder cacheBuilder) - { - return cacheBuilder.build(); + .recordStats() + .build(); } @Managed @@ -147,19 +145,25 @@ public TableSnapshot loadSnapshot(SchemaTableName table, Path tableLocation, Con TableSnapshot snapshot; FileSystem fileSystem = getFileSystem(tableLocation, table, session); if (cachedSnapshot == null) { - snapshot = TableSnapshot.load( - table, - fileSystem, - tableLocation, - parquetReaderOptions, - checkpointRowStatisticsWritingEnabled); - tableSnapshots.put(location, snapshot); + try { + snapshot = tableSnapshots.get(location, () -> + TableSnapshot.load( + table, + fileSystem, + tableLocation, + parquetReaderOptions, + checkpointRowStatisticsWritingEnabled)); + } + catch (UncheckedExecutionException | ExecutionException e) { + throwIfUnchecked(e.getCause()); + throw new RuntimeException(e); + } } else { Optional updatedSnapshot = cachedSnapshot.getUpdatedSnapshot(fileSystem); if (updatedSnapshot.isPresent()) { snapshot = updatedSnapshot.get(); - tableSnapshots.put(location, snapshot); + tableSnapshots.asMap().replace(location, cachedSnapshot, snapshot); } else { snapshot = cachedSnapshot; @@ -196,50 +200,48 @@ public Optional getMetadataEntry(TableSnapshot tableSnapshot, Con public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session) { - String tableLocation = tableSnapshot.getTableLocation().toString(); - DeltaLakeDataFileCacheEntry cachedTable = activeDataFileCache.getIfPresent(tableLocation); - - if (cachedTable == null || cachedTable.getVersion() > tableSnapshot.getVersion()) { + try { + String tableLocation = tableSnapshot.getTableLocation().toString(); FileSystem fileSystem = getFileSystem(tableSnapshot, session); - try (Stream entries = getEntries( - tableSnapshot, - ImmutableSet.of(ADD), - this::activeAddEntries, - session, - fileSystem, - hdfsEnvironment, - fileFormatDataSourceStats)) { - List activeFiles = entries.collect(toImmutableList()); - if (cachedTable == null) { - DeltaLakeDataFileCacheEntry cache = new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles); - activeDataFileCache.put(tableLocation, cache); - } - else { - log.warn("Query run with outdated Transaction Log Snapshot, retrieved stale table entries for table: %s and query %s", - tableSnapshot.getTable(), session.getQueryId()); - } - return activeFiles; + DeltaLakeDataFileCacheEntry cachedTable = activeDataFileCache.get(tableLocation, () -> { + List activeFiles = loadActiveFiles(tableSnapshot, session, fileSystem); + return new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles); + }); + if (cachedTable.getVersion() > tableSnapshot.getVersion()) { + log.warn("Query run with outdated Transaction Log Snapshot, retrieved stale table entries for table: %s and query %s", tableSnapshot.getTable(), session.getQueryId()); + return loadActiveFiles(tableSnapshot, session, fileSystem); } - } - else if (cachedTable.getVersion() < tableSnapshot.getVersion()) { - FileSystem fileSystem = getFileSystem(tableSnapshot, session); - try { + else if (cachedTable.getVersion() < tableSnapshot.getVersion()) { List newEntries = getJsonEntries( cachedTable.getVersion(), tableSnapshot.getVersion(), tableSnapshot, fileSystem); - DeltaLakeDataFileCacheEntry updatedCacheEntry = cachedTable.withUpdatesApplied(newEntries, tableSnapshot.getVersion()); - activeDataFileCache.put(tableLocation, updatedCacheEntry); + activeDataFileCache.asMap().replace(tableLocation, cachedTable, updatedCacheEntry); cachedTable = updatedCacheEntry; } - catch (IOException e) { - throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed accessing transaction log for table: " + tableSnapshot.getTable(), e); - } + return cachedTable.getActiveFiles(); + } + catch (IOException | ExecutionException | UncheckedExecutionException e) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed accessing transaction log for table: " + tableSnapshot.getTable(), e); + } + } + + private List loadActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session, FileSystem fileSystem) + { + try (Stream entries = getEntries( + tableSnapshot, + ImmutableSet.of(ADD), + this::activeAddEntries, + session, + fileSystem, + hdfsEnvironment, + fileFormatDataSourceStats)) { + List activeFiles = entries.collect(toImmutableList()); + return activeFiles; } - return cachedTable.getActiveFiles(); } public static List columnsWithStats(MetadataEntry metadataEntry, TypeManager typeManager)