Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
13 changes: 13 additions & 0 deletions testing/trino-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,19 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parser</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parser</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import io.trino.metadata.TableHandle;
import io.trino.metadata.TableMetadata;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.sql.DynamicFilters;
import io.trino.sql.planner.OptimizerConfig.JoinDistributionType;
import io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy;
import io.trino.sql.planner.assertions.BasePlanTest;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.SemiJoinNode;
import io.trino.sql.planner.plan.TableScanNode;
Expand Down Expand Up @@ -56,6 +58,7 @@
import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector;
import static io.trino.sql.DynamicFilters.extractDynamicFilters;
import static io.trino.sql.planner.LogicalPlanner.Stage.OPTIMIZED_AND_VALIDATED;
import static io.trino.sql.planner.plan.JoinNode.DistributionType.REPLICATED;
import static io.trino.sql.planner.plan.JoinNode.Type.INNER;
Expand Down Expand Up @@ -314,6 +317,22 @@ public Void visitAggregation(AggregationNode node, Integer indent)
return visitPlan(node, indent + 1);
}

@Override
public Void visitFilter(FilterNode node, Integer indent)
{
DynamicFilters.ExtractResult filters = extractDynamicFilters(node.getPredicate());
String inputs = filters.getDynamicConjuncts().stream()
.map(descriptor -> descriptor.getInput().toString())
.sorted()
.collect(joining(", "));

if (!inputs.isEmpty()) {
output(indent, "dynamic filter ([%s])", inputs);
indent = indent + 1;
}
return visitPlan(node, indent);
}

@Override
public Void visitTableScan(TableScanNode node, Integer indent)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ local exchange (GATHER, SINGLE, [])
cross join:
join (LEFT, REPLICATED):
join (INNER, REPLICATED):
scan customer
dynamic filter (["c_customer_sk"])
scan customer
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
join (INNER, REPLICATED):
Expand All @@ -12,7 +13,8 @@ local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["sr_customer_sk", "sr_store_sk"])
partial aggregation over (sr_customer_sk, sr_store_sk)
join (INNER, REPLICATED):
scan store_returns
dynamic filter (["sr_returned_date_sk", "sr_store_sk"])
scan store_returns
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
Expand All @@ -30,7 +32,8 @@ local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["sr_customer_sk_11", "sr_store_sk_15"])
partial aggregation over (sr_customer_sk_11, sr_store_sk_15)
join (INNER, REPLICATED):
scan store_returns
dynamic filter (["sr_returned_date_sk_28"])
scan store_returns
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ remote exchange (GATHER, SINGLE, [])
partial aggregation over (d_day_name, d_week_seq)
join (INNER, REPLICATED):
remote exchange (REPARTITION, ROUND_ROBIN, [])
scan web_sales
scan catalog_sales
dynamic filter (["ws_sold_date_sk"])
scan web_sales
dynamic filter (["cs_sold_date_sk"])
scan catalog_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
dynamic filter (["d_week_seq", "d_week_seq"])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["d_week_seq_23"])
scan date_dim
dynamic filter (["d_week_seq_23"])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["expr_229"])
join (INNER, PARTITIONED):
Expand All @@ -28,11 +32,14 @@ remote exchange (GATHER, SINGLE, [])
partial aggregation over (d_day_name_142, d_week_seq_132)
join (INNER, REPLICATED):
remote exchange (REPARTITION, ROUND_ROBIN, [])
scan web_sales
scan catalog_sales
dynamic filter (["ws_sold_date_sk_85"])
scan web_sales
dynamic filter (["cs_sold_date_sk_123"])
scan catalog_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
dynamic filter (["d_week_seq_132"])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["d_week_seq_178"])
scan date_dim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ local exchange (GATHER, SINGLE, [])
partial aggregation over (d_year, i_brand, i_brand_id)
join (INNER, REPLICATED):
join (INNER, REPLICATED):
scan store_sales
dynamic filter (["ss_item_sk", "ss_sold_date_sk"])
scan store_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan item
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ local exchange (GATHER, SINGLE, [])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, ["ss_customer_sk"])
join (INNER, REPLICATED):
scan store_sales
dynamic filter (["ss_customer_sk", "ss_sold_date_sk"])
scan store_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["c_customer_sk"])
scan customer
dynamic filter (["c_customer_id", "c_customer_id"])
scan customer
join (INNER, PARTITIONED):
join (INNER, PARTITIONED):
final aggregation over (c_birth_country_587, c_customer_id_574, c_email_address_589, c_first_name_581, c_last_name_582, c_login_588, c_preferred_cust_flag_583, d_year_638)
Expand All @@ -24,11 +26,13 @@ local exchange (GATHER, SINGLE, [])
partial aggregation over (c_birth_country_587, c_customer_id_574, c_email_address_589, c_first_name_581, c_last_name_582, c_login_588, c_preferred_cust_flag_583, d_year_638)
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, ["c_customer_sk_573"])
scan customer
dynamic filter (["c_customer_id_574", "c_customer_id_574", "c_customer_id_574", "c_customer_sk_573"])
scan customer
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["cs_bill_customer_sk_596"])
join (INNER, REPLICATED):
scan catalog_sales
dynamic filter (["cs_sold_date_sk_627"])
scan catalog_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
Expand All @@ -39,11 +43,13 @@ local exchange (GATHER, SINGLE, [])
partial aggregation over (c_birth_country_1624, c_customer_id_1611, c_email_address_1626, c_first_name_1618, c_last_name_1619, c_login_1625, c_preferred_cust_flag_1620, d_year_1675)
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, ["c_customer_sk_1610"])
scan customer
dynamic filter (["c_customer_id_1611", "c_customer_id_1611", "c_customer_id_1611", "c_customer_sk_1610"])
scan customer
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["ws_bill_customer_sk_1634"])
join (INNER, REPLICATED):
scan web_sales
dynamic filter (["ws_sold_date_sk_1664"])
scan web_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
Expand All @@ -53,11 +59,13 @@ local exchange (GATHER, SINGLE, [])
partial aggregation over (c_birth_country_1312, c_customer_id_1299, c_email_address_1314, c_first_name_1306, c_last_name_1307, c_login_1313, c_preferred_cust_flag_1308, d_year_1363)
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, ["c_customer_sk_1298"])
scan customer
dynamic filter (["c_customer_id_1299", "c_customer_id_1299", "c_customer_sk_1298"])
scan customer
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["ws_bill_customer_sk_1322"])
join (INNER, REPLICATED):
scan web_sales
dynamic filter (["ws_sold_date_sk_1352"])
scan web_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
Expand All @@ -67,11 +75,13 @@ local exchange (GATHER, SINGLE, [])
partial aggregation over (c_birth_country_899, c_customer_id_886, c_email_address_901, c_first_name_893, c_last_name_894, c_login_900, c_preferred_cust_flag_895, d_year_950)
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, ["c_customer_sk_885"])
scan customer
dynamic filter (["c_customer_id_886", "c_customer_sk_885"])
scan customer
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["cs_bill_customer_sk_908"])
join (INNER, REPLICATED):
scan catalog_sales
dynamic filter (["cs_sold_date_sk_939"])
scan catalog_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
Expand All @@ -82,7 +92,8 @@ local exchange (GATHER, SINGLE, [])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, ["ss_customer_sk_194"])
join (INNER, REPLICATED):
scan store_sales
dynamic filter (["ss_customer_sk_194", "ss_sold_date_sk_214"])
scan store_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
Expand Down
Loading