diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index a6c2267f7610..764556211963 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -95,6 +95,10 @@ values. Typical usage does not require you to configure them. to be specified in :ref:`prop-type-data-size` values such as ``64MB``. Default is calculated to 10% of the maximum memory allocated to the JVM. - + * - ``delta.metadata.live-files.cache-ttl`` + - Caching duration for active files which correspond to the Delta Lake + tables. + - ``30m`` * - ``delta.compression-codec`` - The compression codec to be used when writing new data files. Possible values are diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index da61588f7b09..2ff4b3dbc7e2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -47,6 +47,7 @@ public class DeltaLakeConfig private Duration metadataCacheTtl = new Duration(5, TimeUnit.MINUTES); private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE; + private Duration dataFileCacheTtl = new Duration(30, TimeUnit.MINUTES); private int domainCompactionThreshold = 100; private int maxOutstandingSplits = 1_000; private int maxSplitsPerSecond = Integer.MAX_VALUE; @@ -96,6 +97,20 @@ public DeltaLakeConfig setDataFileCacheSize(DataSize dataFileCacheSize) return this; } + @NotNull + public Duration getDataFileCacheTtl() + { + return dataFileCacheTtl; + } + + @Config("delta.metadata.live-files.cache-ttl") + @ConfigDescription("Caching duration for Delta data file metadata (e.g. table schema, partition info)") + public DeltaLakeConfig setDataFileCacheTtl(Duration dataFileCacheTtl) + { + this.dataFileCacheTtl = dataFileCacheTtl; + return this; + } + @Min(1) public int getDomainCompactionThreshold() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index be2a668a467e..9710d5bafaac 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -119,6 +119,8 @@ public TransactionLogAccess( activeDataFileCache = EvictableCacheBuilder.newBuilder() .weigher((Weigher) (key, value) -> Ints.saturatedCast(estimatedSizeOf(key) + value.getRetainedSizeInBytes())) .maximumWeight(deltaLakeConfig.getDataFileCacheSize().toBytes()) + .expireAfterWrite(deltaLakeConfig.getDataFileCacheTtl().toMillis(), TimeUnit.MILLISECONDS) + .shareNothingWhenDisabled() .recordStats() .build(); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingHdfsEnvironment.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingHdfsEnvironment.java new file mode 100644 index 000000000000..e0fbc7b0e7e7 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingHdfsEnvironment.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.HdfsConfig; +import io.trino.plugin.hive.HdfsConfiguration; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.authentication.HdfsAuthentication; +import io.trino.spi.security.ConnectorIdentity; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class AccessTrackingHdfsEnvironment + extends HdfsEnvironment +{ + private Map fileSystemPathNames = new HashMap<>(); + + public AccessTrackingHdfsEnvironment(HdfsConfiguration hdfsConfiguration, HdfsConfig config, HdfsAuthentication hdfsAuthentication) + { + super(hdfsConfiguration, config, hdfsAuthentication); + } + + @Override + public FileSystem getFileSystem(ConnectorIdentity identity, Path path, Configuration configuration) + throws IOException + { + incrementAccessedPathNamesCount(path.getName()); + return super.getFileSystem(identity, path, configuration); + } + + public Map getAccessedPathNames() + { + return ImmutableMap.copyOf(fileSystemPathNames); + } + + private void incrementAccessedPathNamesCount(String fileName) + { + fileSystemPathNames.put(fileName, fileSystemPathNames.getOrDefault(fileName, 0) + 1); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index f98b218ba982..a51a4efea350 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -126,6 +126,7 @@ protected QueryRunner createQueryRunner() QueryRunner queryRunner = createDeltaLakeQueryRunner( ImmutableMap.builder() .put("delta.metadata.cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s") + .put("delta.metadata.live-files.cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s") .put("hive.metastore-cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s") .buildOrThrow()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 76caf7dc0a03..257adbcf3874 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -40,6 +40,7 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(DeltaLakeConfig.class) .setDataFileCacheSize(DeltaLakeConfig.DEFAULT_DATA_FILE_CACHE_SIZE) + .setDataFileCacheTtl(new Duration(30, MINUTES)) .setMetadataCacheTtl(new Duration(5, TimeUnit.MINUTES)) .setDomainCompactionThreshold(100) .setMaxSplitsPerSecond(Integer.MAX_VALUE) @@ -71,6 +72,7 @@ public void testExplicitPropertyMappings() Map properties = ImmutableMap.builder() .put("delta.metadata.cache-ttl", "10m") .put("delta.metadata.live-files.cache-size", "0 MB") + .put("delta.metadata.live-files.cache-ttl", "60m") .put("delta.domain-compaction-threshold", "500") .put("delta.max-outstanding-splits", "200") .put("delta.max-splits-per-second", "10") @@ -97,6 +99,7 @@ public void testExplicitPropertyMappings() DeltaLakeConfig expected = new DeltaLakeConfig() .setDataFileCacheSize(DataSize.succinctBytes(0)) + .setDataFileCacheTtl(new Duration(60, MINUTES)) .setMetadataCacheTtl(new Duration(10, TimeUnit.MINUTES)) .setDomainCompactionThreshold(500) .setMaxOutstandingSplits(200) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePlugin.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePlugin.java index 873736bbac9a..7c3157ada182 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePlugin.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePlugin.java @@ -117,4 +117,16 @@ public void testNoCaching() "delta.metadata.cache-ttl", "0s"), new TestingConnectorContext()); } + + @Test + public void testNoActiveDataFilesCaching() + { + Plugin plugin = new DeltaLakePlugin(); + ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories()); + factory.create("test", + ImmutableMap.of( + "hive.metastore.uri", "thrift://foo:1234", + "delta.metadata.live-files.cache-ttl", "0s"), + new TestingConnectorContext()); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 4d44dda6f151..40bb4fb92677 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -30,7 +30,6 @@ import io.trino.plugin.hive.HdfsConfig; import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; -import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.HiveHdfsConfiguration; import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.parquet.ParquetReaderConfig; @@ -104,6 +103,8 @@ public class TestTransactionLogAccess private TrackingTransactionLogAccess transactionLogAccess; private TableSnapshot tableSnapshot; + private AccessTrackingHdfsEnvironment hdfsEnvironment; + private void setupTransactionLogAccess(String tableName) throws Exception { @@ -124,7 +125,7 @@ private void setupTransactionLogAccess(String tableName, Path tableLocation, Del HdfsConfig hdfsConfig = new HdfsConfig(); HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()); - HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); + hdfsEnvironment = new AccessTrackingHdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); FileFormatDataSourceStats fileFormatDataSourceStats = new FileFormatDataSourceStats(); transactionLogAccess = new TrackingTransactionLogAccess( @@ -736,6 +737,63 @@ public void testTableSnapshotsCacheDisabled() "00000000000000000014.json", 2)); } + @Test + public void testTableSnapshotsActiveDataFilesCache() + throws Exception + { + String tableName = "person"; + Path tableDir = new Path(getClass().getClassLoader().getResource("databricks/" + tableName).toURI()); + DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig(); + shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(10, TimeUnit.MINUTES)); + setupTransactionLogAccess(tableName, tableDir, shortLivedActiveDataFilesCacheConfig); + + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + assertEquals(addFileEntries.size(), 12); + assertEquals( + hdfsEnvironment.getAccessedPathNames(), + ImmutableMap.of( + "person", 1, + "00000000000000000010.checkpoint.parquet", 2)); + + addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + assertEquals(addFileEntries.size(), 12); + // The internal data cache should still contain the data files for the table + assertEquals( + hdfsEnvironment.getAccessedPathNames(), + ImmutableMap.of( + "person", 1, + "00000000000000000010.checkpoint.parquet", 2)); + } + + @Test + public void testTableSnapshotsActiveDataFilesCacheDisabled() + throws Exception + { + String tableName = "person"; + Path tableDir = new Path(getClass().getClassLoader().getResource("databricks/" + tableName).toURI()); + DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig(); + shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(0, TimeUnit.SECONDS)); + setupTransactionLogAccess(tableName, tableDir, shortLivedActiveDataFilesCacheConfig); + + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + assertEquals(addFileEntries.size(), 12); + assertEquals( + hdfsEnvironment.getAccessedPathNames(), + ImmutableMap.of( + "person", 1, + "00000000000000000010.checkpoint.parquet", 2)); + + // With no caching for the transaction log entries, when loading the snapshot again, + // the checkpoint file will be read again + addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + assertEquals(addFileEntries.size(), 12); + assertEquals( + hdfsEnvironment.getAccessedPathNames(), + ImmutableMap.of( + "person", 1, + "00000000000000000010.checkpoint.parquet", 4)); + } + private void copyTransactionLogEntry(int startVersion, int endVersion, File sourceDir, File targetDir) throws IOException {