diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 68ac1b7d5be6..5087770756c5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -113,6 +113,7 @@ public TransactionLogAccess( tableSnapshots = EvictableCacheBuilder.newBuilder() .expireAfterWrite(deltaLakeConfig.getMetadataCacheTtl().toMillis(), TimeUnit.MILLISECONDS) + .shareNothingWhenDisabled() .recordStats() .build(); activeDataFileCache = EvictableCacheBuilder.newBuilder() diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index ecb3fc0b915d..4d44dda6f151 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.Files; +import io.airlift.units.Duration; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; @@ -54,6 +55,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -110,6 +112,12 @@ private void setupTransactionLogAccess(String tableName) private void setupTransactionLogAccess(String tableName, Path tableLocation) throws IOException + { + setupTransactionLogAccess(tableName, tableLocation, new DeltaLakeConfig()); + } + + private void setupTransactionLogAccess(String tableName, Path tableLocation, DeltaLakeConfig deltaLakeConfig) + throws IOException { TestingConnectorContext context = new TestingConnectorContext(); TypeManager typeManager = context.getTypeManager(); @@ -125,7 +133,7 @@ private void setupTransactionLogAccess(String tableName, Path tableLocation) SESSION, typeManager, new CheckpointSchemaManager(typeManager), - new DeltaLakeConfig(), + deltaLakeConfig, fileFormatDataSourceStats, hdfsEnvironment, new ParquetReaderConfig()); @@ -697,6 +705,37 @@ public void testParquetStructStatistics() } } + @Test + public void testTableSnapshotsCacheDisabled() + throws Exception + { + String tableName = "person"; + Path tableDir = new Path(getClass().getClassLoader().getResource("databricks/" + tableName).toURI()); + DeltaLakeConfig cacheDisabledConfig = new DeltaLakeConfig(); + cacheDisabledConfig.setMetadataCacheTtl(new Duration(0, TimeUnit.SECONDS)); + setupTransactionLogAccess(tableName, tableDir, cacheDisabledConfig); + + assertEquals( + transactionLogAccess.getAccessTrackingFileSystem().getOpenCount(), + ImmutableMap.of( + "_last_checkpoint", 1, + "00000000000000000011.json", 1, + "00000000000000000012.json", 1, + "00000000000000000013.json", 1, + "00000000000000000014.json", 1)); + + // With the transaction log cache disabled, when loading the snapshot again, all the needed files will be opened again + transactionLogAccess.loadSnapshot(new SchemaTableName("schema", tableName), tableDir, SESSION); + assertEquals( + transactionLogAccess.getAccessTrackingFileSystem().getOpenCount(), + ImmutableMap.of( + "_last_checkpoint", 2, + "00000000000000000011.json", 2, + "00000000000000000012.json", 2, + "00000000000000000013.json", 2, + "00000000000000000014.json", 2)); + } + private void copyTransactionLogEntry(int startVersion, int endVersion, File sourceDir, File targetDir) throws IOException {