Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.QueryStats;
import io.trino.operator.OperatorStats;
import io.trino.plugin.deltalake.util.DockerizedMinioDataLake;
import io.trino.spi.QueryId;
import io.trino.sql.planner.OptimizerConfig.JoinDistributionType;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.ResultWithQueryId;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.stream.Stream;

import static com.google.common.base.Verify.verify;
import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder;
import static io.airlift.testing.Assertions.assertGreaterThan;
import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static io.trino.plugin.deltalake.DeltaLakeDockerizedMinioDataLake.createDockerizedMinioDataLakeForDeltaLake;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.tpch.TpchTable.LINE_ITEM;
import static io.trino.tpch.TpchTable.ORDERS;
import static java.lang.String.format;

public class TestDeltaLakeDynamicFiltering
extends AbstractTestQueryFramework
{
private static final String BUCKET_NAME = "delta-lake-test-dynamic-filtering";

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
verify(new DynamicFilterConfig().isEnableDynamicFiltering(), "this class assumes dynamic filtering is enabled by default");
DockerizedMinioDataLake dockerizedMinioDataLake = closeAfterClass(createDockerizedMinioDataLakeForDeltaLake(BUCKET_NAME));
QueryRunner queryRunner = DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner(
DELTA_CATALOG,
"default",
// Slowing down the query ensures the dynamic filter has enough time to populate.
ImmutableMap.of("delta.max-splits-per-second", "3"),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We end up needing this and the large table on build side as we don't have a way to make delta lake split manager wait on DF (similar to dynamic-filtering.wait-timeout in hive and iceberg). It would be great to have that as it would simplify the testing and make it less prone to being flaky.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to Iceberg, DeltaLakeSplitSource produces the splits asynchronously. Does the dynamicFilteringWaitTimeoutMillis concept still apply in this case?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, HiveSplitSource produces splits asynchronously as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #11600 follow up issue

dockerizedMinioDataLake.getMinioAddress(),
dockerizedMinioDataLake.getTestingHadoop());

ImmutableList.of(LINE_ITEM, ORDERS).forEach(table -> {
String tableName = table.getTableName();
dockerizedMinioDataLake.copyResources("io/trino/plugin/deltalake/testing/resources/databricks/" + tableName, tableName);
queryRunner.execute(format("CREATE TABLE %s.%s.%s (dummy int) WITH (location = 's3://%s/%3$s')",
DELTA_CATALOG,
"default",
tableName,
BUCKET_NAME));
});
return queryRunner;
}

@DataProvider
public Object[][] joinDistributionTypes()
{
return Stream.of(JoinDistributionType.values())
.collect(toDataProvider());
}

@Test(timeOut = 60_000, dataProvider = "joinDistributionTypes")
public void testDynamicFiltering(JoinDistributionType joinDistributionType)
{
String query = "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice > 59995 AND orders.totalprice < 60000";
ResultWithQueryId<MaterializedResult> filteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(true, joinDistributionType), query);
ResultWithQueryId<MaterializedResult> unfilteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(false, joinDistributionType), query);
assertEqualsIgnoreOrder(filteredResult.getResult().getMaterializedRows(), unfilteredResult.getResult().getMaterializedRows());

QueryInputStats filteredStats = getQueryInputStats(filteredResult.getQueryId());
QueryInputStats unfilteredStats = getQueryInputStats(unfilteredResult.getQueryId());
assertGreaterThan(unfilteredStats.numberOfSplits, filteredStats.numberOfSplits);
assertGreaterThan(unfilteredStats.inputPositions, filteredStats.inputPositions);
}

private Session sessionWithDynamicFiltering(boolean enabled, JoinDistributionType joinDistributionType)
{
return Session.builder(noJoinReordering(joinDistributionType))
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, String.valueOf(enabled))
.build();
}

private QueryInputStats getQueryInputStats(QueryId queryId)
{
QueryStats stats = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats();
long numberOfSplits = stats.getOperatorSummaries()
.stream()
.filter(summary -> summary.getOperatorType().equals("ScanFilterAndProjectOperator"))
.mapToLong(OperatorStats::getTotalDrivers)
.sum();
long inputPositions = stats.getPhysicalInputPositions();
return new QueryInputStats(numberOfSplits, inputPositions);
}

private static class QueryInputStats
{
final long numberOfSplits;
final long inputPositions;

QueryInputStats(long numberOfSplits, long inputPositions)
{
this.numberOfSplits = numberOfSplits;
this.inputPositions = inputPositions;
}
}
}