diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index a3c3894f6f83..808839d8f3a6 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -108,6 +108,9 @@ values. Typical usage does not require you to configure them. - Name of the catalog to which ``SELECT`` queries are redirected when a Hive table is detected. - + * - ``delta.dynamic-filtering.wait-timeout`` + - Duration to wait for completion of dynamic filters during split generation + - * - ``delta.table-statistics-enabled`` - Enables :ref:`Table statistics ` for performance improvements. diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index af9419dd60c4..f51856f559dc 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -28,6 +28,7 @@ import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.concurrent.TimeUnit.DAYS; +import static java.util.concurrent.TimeUnit.SECONDS; public class DeltaLakeConfig { @@ -53,6 +54,7 @@ public class DeltaLakeConfig private boolean ignoreCheckpointWriteFailures; private Duration vacuumMinRetention = new Duration(7, DAYS); private Optional hiveCatalogName = Optional.empty(); + private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); private boolean tableStatisticsEnabled = true; private boolean extendedStatisticsEnabled = true; private HiveCompressionCodec compressionCodec = HiveCompressionCodec.SNAPPY; @@ -256,6 +258,20 @@ public DeltaLakeConfig setCheckpointRowStatisticsWritingEnabled(boolean checkpoi return this; } + @NotNull + public Duration getDynamicFilteringWaitTimeout() + { + return dynamicFilteringWaitTimeout; + } + + @Config("delta.dynamic-filtering.wait-timeout") + @ConfigDescription("Duration to wait for completion of dynamic filters during split generation") + public DeltaLakeConfig setDynamicFilteringWaitTimeout(Duration dynamicFilteringWaitTimeout) + { + this.dynamicFilteringWaitTimeout = dynamicFilteringWaitTimeout; + return this; + } + public boolean isTableStatisticsEnabled() { return tableStatisticsEnabled; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 801c83f21d0d..700804d89c9b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -52,6 +52,7 @@ public final class DeltaLakeSessionProperties // This property is not supported by Delta Lake and exists solely for technical reasons. @Deprecated private static final String TIMESTAMP_PRECISION = "timestamp_precision"; + private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout"; private static final String TABLE_STATISTICS_ENABLED = "statistics_enabled"; private static final String EXTENDED_STATISTICS_ENABLED = "extended_statistics_enabled"; @@ -118,6 +119,11 @@ public DeltaLakeSessionProperties( MILLISECONDS, value -> { throw new IllegalStateException("The property cannot be set"); }, true), + durationProperty( + DYNAMIC_FILTERING_WAIT_TIMEOUT, + "Duration to wait for completion of dynamic filters during split generation", + deltaLakeConfig.getDynamicFilteringWaitTimeout(), + false), booleanProperty( TABLE_STATISTICS_ENABLED, "Expose table statistics", @@ -187,6 +193,11 @@ public static DataSize getParquetWriterPageSize(ConnectorSession session) return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class); } + public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session) + { + return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class); + } + public static boolean isTableStatisticsEnabled(ConnectorSession session) { return session.getProperty(TABLE_STATISTICS_ENABLED, Boolean.class); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index aaecede3a89e..499336108a66 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -51,6 +51,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxInitialSplitSize; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxSplitSize; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; @@ -110,6 +111,7 @@ public ConnectorSplitSource getSplits( maxSplitsPerSecond, maxOutstandingSplits, dynamicFilter, + getDynamicFilteringWaitTimeout(session), deltaLakeTableHandle.isRecordScannedFiles()); return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader()); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java index b573286edf92..ea9ab667a3c7 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java @@ -13,11 +13,13 @@ */ package io.trino.plugin.deltalake; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import io.airlift.concurrent.MoreFutures; import io.airlift.log.Logger; +import io.airlift.units.Duration; import io.trino.plugin.hive.util.AsyncQueue; import io.trino.plugin.hive.util.ThrottledAsyncQueue; import io.trino.spi.TrinoException; @@ -45,10 +47,12 @@ import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; public class DeltaLakeSplitSource implements ConnectorSplitSource { + private static final ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitBatch(ImmutableList.of(), false); private static final Logger LOG = Logger.get(DeltaLakeSplitSource.class); private final SchemaTableName tableName; @@ -56,6 +60,8 @@ public class DeltaLakeSplitSource private final boolean recordScannedFiles; private final ImmutableSet.Builder scannedFilePaths = ImmutableSet.builder(); private final DynamicFilter dynamicFilter; + private final long dynamicFilteringWaitTimeoutMillis; + private final Stopwatch dynamicFilterWaitStopwatch; private volatile TrinoException trinoException; public DeltaLakeSplitSource( @@ -65,12 +71,15 @@ public DeltaLakeSplitSource( int maxSplitsPerSecond, int maxOutstandingSplits, DynamicFilter dynamicFilter, + Duration dynamicFilteringWaitTimeout, boolean recordScannedFiles) { this.tableName = requireNonNull(tableName, "tableName is null"); this.queue = new ThrottledAsyncQueue<>(maxSplitsPerSecond, maxOutstandingSplits, executor); this.recordScannedFiles = recordScannedFiles; this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null"); + this.dynamicFilteringWaitTimeoutMillis = requireNonNull(dynamicFilteringWaitTimeout, "dynamicFilteringWaitTimeout is null").toMillis(); + this.dynamicFilterWaitStopwatch = Stopwatch.createStarted(); queueSplits(splits, queue, executor) .exceptionally(throwable -> { // set trinoException before finishing the queue to ensure failure is observed instead of successful completion @@ -91,6 +100,13 @@ public DeltaLakeSplitSource( @Override public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) { + long timeLeft = dynamicFilteringWaitTimeoutMillis - dynamicFilterWaitStopwatch.elapsed(MILLISECONDS); + if (dynamicFilter.isAwaitable() && timeLeft > 0) { + return dynamicFilter.isBlocked() + .thenApply(ignored -> EMPTY_BATCH) + .completeOnTimeout(EMPTY_BATCH, timeLeft, MILLISECONDS); + } + boolean noMoreSplits = isFinished(); if (trinoException != null) { return toCompletableFuture(immediateFailedFuture(trinoException)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index db1454ec8f24..7680b4a3e903 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -27,6 +27,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; public class TestDeltaLakeConfig { @@ -49,6 +51,7 @@ public void testDefaults() .setCheckpointRowStatisticsWritingEnabled(true) .setVacuumMinRetention(new Duration(7, DAYS)) .setHiveCatalogName(null) + .setDynamicFilteringWaitTimeout(new Duration(0, SECONDS)) .setTableStatisticsEnabled(true) .setExtendedStatisticsEnabled(true) .setCompressionCodec(HiveCompressionCodec.SNAPPY)); @@ -73,6 +76,7 @@ public void testExplicitPropertyMappings() .put("delta.checkpoint-row-statistics-writing.enabled", "false") .put("delta.vacuum.min-retention", "13h") .put("delta.hive-catalog-name", "hive") + .put("delta.dynamic-filtering.wait-timeout", "30m") .put("delta.table-statistics-enabled", "false") .put("delta.extended-statistics.enabled", "false") .put("delta.compression-codec", "GZIP") @@ -94,6 +98,7 @@ public void testExplicitPropertyMappings() .setCheckpointRowStatisticsWritingEnabled(false) .setVacuumMinRetention(new Duration(13, HOURS)) .setHiveCatalogName("hive") + .setDynamicFilteringWaitTimeout(new Duration(30, MINUTES)) .setTableStatisticsEnabled(false) .setExtendedStatisticsEnabled(false) .setCompressionCodec(HiveCompressionCodec.GZIP); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java index 06593dade927..c6a4fa58d5cf 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java @@ -15,32 +15,56 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.trino.Session; import io.trino.execution.DynamicFilterConfig; +import io.trino.execution.Lifespan; import io.trino.execution.QueryStats; +import io.trino.metadata.QualifiedObjectName; +import io.trino.metadata.Split; +import io.trino.metadata.TableHandle; import io.trino.operator.OperatorStats; import io.trino.plugin.deltalake.util.DockerizedMinioDataLake; +import io.trino.security.AllowAllAccessControl; import io.trino.spi.QueryId; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.predicate.TupleDomain; +import io.trino.split.SplitSource; import io.trino.sql.planner.OptimizerConfig.JoinDistributionType; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import io.trino.testing.ResultWithQueryId; +import io.trino.transaction.TransactionId; +import io.trino.transaction.TransactionManager; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static com.google.common.base.Verify.verify; +import static io.airlift.concurrent.MoreFutures.unmodifiableFuture; import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder; import static io.airlift.testing.Assertions.assertGreaterThan; import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; import static io.trino.plugin.deltalake.DeltaLakeDockerizedMinioDataLake.createDockerizedMinioDataLakeForDeltaLake; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; +import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING; +import static io.trino.spi.connector.Constraint.alwaysTrue; +import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static io.trino.testing.DataProviders.toDataProvider; import static io.trino.tpch.TpchTable.LINE_ITEM; import static io.trino.tpch.TpchTable.ORDERS; import static java.lang.String.format; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; public class TestDeltaLakeDynamicFiltering extends AbstractTestQueryFramework @@ -56,8 +80,7 @@ protected QueryRunner createQueryRunner() QueryRunner queryRunner = DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner( DELTA_CATALOG, "default", - // Slowing down the query ensures the dynamic filter has enough time to populate. - ImmutableMap.of("delta.max-splits-per-second", "3"), + ImmutableMap.of(), dockerizedMinioDataLake.getMinioAddress(), dockerizedMinioDataLake.getTestingHadoop()); @@ -94,10 +117,35 @@ public void testDynamicFiltering(JoinDistributionType joinDistributionType) assertGreaterThan(unfilteredStats.inputPositions, filteredStats.inputPositions); } + @Test(timeOut = 30_000) + public void testIncompleteDynamicFilterTimeout() + throws Exception + { + QueryRunner runner = getQueryRunner(); + TransactionManager transactionManager = runner.getTransactionManager(); + TransactionId transactionId = transactionManager.beginTransaction(true); + Session session = Session.builder(getSession()) + .setCatalogSessionProperty(DELTA_CATALOG, "dynamic_filtering_wait_timeout", "1s") + .build() + .beginTransactionId(transactionId, transactionManager, new AllowAllAccessControl()); + QualifiedObjectName tableName = new QualifiedObjectName(DELTA_CATALOG, "default", "orders"); + Optional tableHandle = runner.getMetadata().getTableHandle(session, tableName); + assertTrue(tableHandle.isPresent()); + SplitSource splitSource = runner.getSplitManager() + .getSplits(session, tableHandle.get(), UNGROUPED_SCHEDULING, new IncompleteDynamicFilter(), alwaysTrue()); + List splits = new ArrayList<>(); + while (!splitSource.isFinished()) { + splits.addAll(splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1000).get().getSplits()); + } + splitSource.close(); + assertFalse(splits.isEmpty()); + } + private Session sessionWithDynamicFiltering(boolean enabled, JoinDistributionType joinDistributionType) { return Session.builder(noJoinReordering(joinDistributionType)) .setSystemProperty(ENABLE_DYNAMIC_FILTERING, String.valueOf(enabled)) + .setCatalogSessionProperty(DELTA_CATALOG, "dynamic_filtering_wait_timeout", "1h") .build(); } @@ -113,6 +161,47 @@ private QueryInputStats getQueryInputStats(QueryId queryId) return new QueryInputStats(numberOfSplits, inputPositions); } + private static class IncompleteDynamicFilter + implements DynamicFilter + { + @Override + public Set getColumnsCovered() + { + return ImmutableSet.of(); + } + + @Override + public CompletableFuture isBlocked() + { + return unmodifiableFuture(CompletableFuture.runAsync(() -> { + try { + TimeUnit.HOURS.sleep(1); + } + catch (InterruptedException e) { + throw new IllegalStateException(e); + } + })); + } + + @Override + public boolean isComplete() + { + return false; + } + + @Override + public boolean isAwaitable() + { + return true; + } + + @Override + public TupleDomain getCurrentPredicate() + { + return TupleDomain.all(); + } + } + private static class QueryInputStats { final long numberOfSplits; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicPartitionPruningTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicPartitionPruningTest.java new file mode 100644 index 000000000000..b80aa13462b9 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicPartitionPruningTest.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableMap; +import io.trino.sql.planner.OptimizerConfig.JoinDistributionType; +import io.trino.testing.BaseDynamicPartitionPruningTest; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.testng.SkipException; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.file.Files; +import java.util.List; + +import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; +import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createDeltaLakeQueryRunner; +import static java.lang.String.format; +import static java.util.stream.Collectors.joining; + +public class TestDeltaLakeDynamicPartitionPruningTest + extends BaseDynamicPartitionPruningTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + DistributedQueryRunner queryRunner = createDeltaLakeQueryRunner(EXTRA_PROPERTIES, ImmutableMap.of( + "delta.dynamic-filtering.wait-timeout", "1h", + "delta.enable-non-concurrent-writes", "true")); + for (TpchTable table : REQUIRED_TABLES) { + queryRunner.execute(format("CREATE TABLE %1$s.tpch.%2$s AS SELECT * FROM tpch.tiny.%2$s", DELTA_CATALOG, table.getTableName())); + } + return queryRunner; + } + + @Override + public void testJoinDynamicFilteringMultiJoinOnBucketedTables(JoinDistributionType joinDistributionType) + { + throw new SkipException("Delta Lake does not support bucketing"); + } + + @Override + protected void createLineitemTable(String tableName, List columns, List partitionColumns) + { + String sql = format( + "CREATE TABLE %s WITH (partitioned_by=ARRAY[%s]) AS SELECT %s FROM tpch.tiny.lineitem", + tableName, + partitionColumns.stream().map(column -> "'" + column + "'").collect(joining(",")), + String.join(",", columns)); + getQueryRunner().execute(sql); + } + + @Override + protected void createPartitionedTable(String tableName, List columns, List partitionColumns) + { + String sql = format( + "CREATE TABLE %s (%s) WITH (location='%s', partitioned_by=ARRAY[%s])", + tableName, + String.join(",", columns), + createTableLocation(tableName), + partitionColumns.stream().map(column -> "'" + column + "'").collect(joining(","))); + getQueryRunner().execute(sql); + } + + @Override + protected void createPartitionedAndBucketedTable(String tableName, List columns, List partitionColumns, List bucketColumns) + { + throw new UnsupportedOperationException(); + } + + private static URI createTableLocation(String tableName) + { + try { + return Files.createTempDirectory(tableName).toFile().toURI(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } +}