Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ values. Typical usage does not require you to configure them.
- Name of the catalog to which ``SELECT`` queries are redirected when a
Hive table is detected.
-
* - ``delta.dynamic-filtering.wait-timeout``
- Duration to wait for completion of dynamic filters during split generation
-
* - ``delta.table-statistics-enabled``
- Enables :ref:`Table statistics <delta-lake-table-statistics>` for
performance improvements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.SECONDS;

public class DeltaLakeConfig
{
Expand All @@ -53,6 +54,7 @@ public class DeltaLakeConfig
private boolean ignoreCheckpointWriteFailures;
private Duration vacuumMinRetention = new Duration(7, DAYS);
private Optional<String> hiveCatalogName = Optional.empty();
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
private boolean extendedStatisticsEnabled = true;
private HiveCompressionCodec compressionCodec = HiveCompressionCodec.SNAPPY;
Expand Down Expand Up @@ -256,6 +258,20 @@ public DeltaLakeConfig setCheckpointRowStatisticsWritingEnabled(boolean checkpoi
return this;
}

@NotNull
public Duration getDynamicFilteringWaitTimeout()
{
return dynamicFilteringWaitTimeout;
}

@Config("delta.dynamic-filtering.wait-timeout")
@ConfigDescription("Duration to wait for completion of dynamic filters during split generation")
public DeltaLakeConfig setDynamicFilteringWaitTimeout(Duration dynamicFilteringWaitTimeout)
{
this.dynamicFilteringWaitTimeout = dynamicFilteringWaitTimeout;
return this;
}

public boolean isTableStatisticsEnabled()
{
return tableStatisticsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class DeltaLakeSessionProperties
// This property is not supported by Delta Lake and exists solely for technical reasons.
@Deprecated
private static final String TIMESTAMP_PRECISION = "timestamp_precision";
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";
private static final String TABLE_STATISTICS_ENABLED = "statistics_enabled";
private static final String EXTENDED_STATISTICS_ENABLED = "extended_statistics_enabled";

Expand Down Expand Up @@ -118,6 +119,11 @@ public DeltaLakeSessionProperties(
MILLISECONDS,
value -> { throw new IllegalStateException("The property cannot be set"); },
true),
durationProperty(
DYNAMIC_FILTERING_WAIT_TIMEOUT,
"Duration to wait for completion of dynamic filters during split generation",
deltaLakeConfig.getDynamicFilteringWaitTimeout(),
false),
booleanProperty(
TABLE_STATISTICS_ENABLED,
"Expose table statistics",
Expand Down Expand Up @@ -187,6 +193,11 @@ public static DataSize getParquetWriterPageSize(ConnectorSession session)
return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class);
}

public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class);
}

public static boolean isTableStatisticsEnabled(ConnectorSession session)
{
return session.getProperty(TABLE_STATISTICS_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxInitialSplitSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxSplitSize;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
Expand Down Expand Up @@ -110,6 +111,7 @@ public ConnectorSplitSource getSplits(
maxSplitsPerSecond,
maxOutstandingSplits,
dynamicFilter,
getDynamicFilteringWaitTimeout(session),
deltaLakeTableHandle.isRecordScannedFiles());

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.plugin.hive.util.AsyncQueue;
import io.trino.plugin.hive.util.ThrottledAsyncQueue;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -45,17 +47,21 @@
import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class DeltaLakeSplitSource
implements ConnectorSplitSource
{
private static final ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitBatch(ImmutableList.of(), false);
private static final Logger LOG = Logger.get(DeltaLakeSplitSource.class);

private final SchemaTableName tableName;
private final AsyncQueue<ConnectorSplit> queue;
private final boolean recordScannedFiles;
private final ImmutableSet.Builder<String> scannedFilePaths = ImmutableSet.builder();
private final DynamicFilter dynamicFilter;
private final long dynamicFilteringWaitTimeoutMillis;
private final Stopwatch dynamicFilterWaitStopwatch;
private volatile TrinoException trinoException;

public DeltaLakeSplitSource(
Expand All @@ -65,12 +71,15 @@ public DeltaLakeSplitSource(
int maxSplitsPerSecond,
int maxOutstandingSplits,
DynamicFilter dynamicFilter,
Duration dynamicFilteringWaitTimeout,
boolean recordScannedFiles)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.queue = new ThrottledAsyncQueue<>(maxSplitsPerSecond, maxOutstandingSplits, executor);
this.recordScannedFiles = recordScannedFiles;
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.dynamicFilteringWaitTimeoutMillis = requireNonNull(dynamicFilteringWaitTimeout, "dynamicFilteringWaitTimeout is null").toMillis();
this.dynamicFilterWaitStopwatch = Stopwatch.createStarted();
queueSplits(splits, queue, executor)
.exceptionally(throwable -> {
// set trinoException before finishing the queue to ensure failure is observed instead of successful completion
Expand All @@ -91,6 +100,13 @@ public DeltaLakeSplitSource(
@Override
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
{
long timeLeft = dynamicFilteringWaitTimeoutMillis - dynamicFilterWaitStopwatch.elapsed(MILLISECONDS);
if (dynamicFilter.isAwaitable() && timeLeft > 0) {
return dynamicFilter.isBlocked()
.thenApply(ignored -> EMPTY_BATCH)
.completeOnTimeout(EMPTY_BATCH, timeLeft, MILLISECONDS);
}

boolean noMoreSplits = isFinished();
if (trinoException != null) {
return toCompletableFuture(immediateFailedFuture(trinoException));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

public class TestDeltaLakeConfig
{
Expand All @@ -49,6 +51,7 @@ public void testDefaults()
.setCheckpointRowStatisticsWritingEnabled(true)
.setVacuumMinRetention(new Duration(7, DAYS))
.setHiveCatalogName(null)
.setDynamicFilteringWaitTimeout(new Duration(0, SECONDS))
.setTableStatisticsEnabled(true)
.setExtendedStatisticsEnabled(true)
.setCompressionCodec(HiveCompressionCodec.SNAPPY));
Expand All @@ -73,6 +76,7 @@ public void testExplicitPropertyMappings()
.put("delta.checkpoint-row-statistics-writing.enabled", "false")
.put("delta.vacuum.min-retention", "13h")
.put("delta.hive-catalog-name", "hive")
.put("delta.dynamic-filtering.wait-timeout", "30m")
.put("delta.table-statistics-enabled", "false")
.put("delta.extended-statistics.enabled", "false")
.put("delta.compression-codec", "GZIP")
Expand All @@ -94,6 +98,7 @@ public void testExplicitPropertyMappings()
.setCheckpointRowStatisticsWritingEnabled(false)
.setVacuumMinRetention(new Duration(13, HOURS))
.setHiveCatalogName("hive")
.setDynamicFilteringWaitTimeout(new Duration(30, MINUTES))
.setTableStatisticsEnabled(false)
.setExtendedStatisticsEnabled(false)
.setCompressionCodec(HiveCompressionCodec.GZIP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,56 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.Lifespan;
import io.trino.execution.QueryStats;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.Split;
import io.trino.metadata.TableHandle;
import io.trino.operator.OperatorStats;
import io.trino.plugin.deltalake.util.DockerizedMinioDataLake;
import io.trino.security.AllowAllAccessControl;
import io.trino.spi.QueryId;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;
import io.trino.split.SplitSource;
import io.trino.sql.planner.OptimizerConfig.JoinDistributionType;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.ResultWithQueryId;
import io.trino.transaction.TransactionId;
import io.trino.transaction.TransactionManager;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static com.google.common.base.Verify.verify;
import static io.airlift.concurrent.MoreFutures.unmodifiableFuture;
import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder;
import static io.airlift.testing.Assertions.assertGreaterThan;
import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static io.trino.plugin.deltalake.DeltaLakeDockerizedMinioDataLake.createDockerizedMinioDataLakeForDeltaLake;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.tpch.TpchTable.LINE_ITEM;
import static io.trino.tpch.TpchTable.ORDERS;
import static java.lang.String.format;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestDeltaLakeDynamicFiltering
extends AbstractTestQueryFramework
Expand All @@ -56,8 +80,7 @@ protected QueryRunner createQueryRunner()
QueryRunner queryRunner = DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner(
DELTA_CATALOG,
"default",
// Slowing down the query ensures the dynamic filter has enough time to populate.
ImmutableMap.of("delta.max-splits-per-second", "3"),
ImmutableMap.of(),
dockerizedMinioDataLake.getMinioAddress(),
dockerizedMinioDataLake.getTestingHadoop());

Expand Down Expand Up @@ -94,10 +117,35 @@ public void testDynamicFiltering(JoinDistributionType joinDistributionType)
assertGreaterThan(unfilteredStats.inputPositions, filteredStats.inputPositions);
}

@Test(timeOut = 30_000)
public void testIncompleteDynamicFilterTimeout()
throws Exception
{
QueryRunner runner = getQueryRunner();
TransactionManager transactionManager = runner.getTransactionManager();
TransactionId transactionId = transactionManager.beginTransaction(true);
Session session = Session.builder(getSession())
.setCatalogSessionProperty(DELTA_CATALOG, "dynamic_filtering_wait_timeout", "1s")
.build()
.beginTransactionId(transactionId, transactionManager, new AllowAllAccessControl());
QualifiedObjectName tableName = new QualifiedObjectName(DELTA_CATALOG, "default", "orders");
Optional<TableHandle> tableHandle = runner.getMetadata().getTableHandle(session, tableName);
assertTrue(tableHandle.isPresent());
SplitSource splitSource = runner.getSplitManager()
.getSplits(session, tableHandle.get(), UNGROUPED_SCHEDULING, new IncompleteDynamicFilter(), alwaysTrue());
List<Split> splits = new ArrayList<>();
while (!splitSource.isFinished()) {
splits.addAll(splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1000).get().getSplits());
}
splitSource.close();
assertFalse(splits.isEmpty());
}

private Session sessionWithDynamicFiltering(boolean enabled, JoinDistributionType joinDistributionType)
{
return Session.builder(noJoinReordering(joinDistributionType))
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, String.valueOf(enabled))
.setCatalogSessionProperty(DELTA_CATALOG, "dynamic_filtering_wait_timeout", "1h")
.build();
}

Expand All @@ -113,6 +161,47 @@ private QueryInputStats getQueryInputStats(QueryId queryId)
return new QueryInputStats(numberOfSplits, inputPositions);
}

private static class IncompleteDynamicFilter
implements DynamicFilter
{
@Override
public Set<ColumnHandle> getColumnsCovered()
{
return ImmutableSet.of();
}

@Override
public CompletableFuture<?> isBlocked()
{
return unmodifiableFuture(CompletableFuture.runAsync(() -> {
try {
TimeUnit.HOURS.sleep(1);
}
catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}));
}

@Override
public boolean isComplete()
{
return false;
}

@Override
public boolean isAwaitable()
{
return true;
}

@Override
public TupleDomain<ColumnHandle> getCurrentPredicate()
{
return TupleDomain.all();
}
}

private static class QueryInputStats
{
final long numberOfSplits;
Expand Down
Loading