diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14_1.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14_1.plan.txt index 9d5966e6652b2..28e7f4363d123 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14_1.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14_1.plan.txt @@ -24,10 +24,8 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [i_item_sk_1]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [i_brand_id_8, i_category_id_12, i_class_id_10]) - scan item final aggregation over (expr_216, expr_217, expr_218) - local exchange (GATHER, SINGLE, []) + local exchange (REPARTITION, HASH, [expr_216, expr_217, expr_218]) remote exchange (REPARTITION, HASH, [i_brand_id_53, i_category_id_57, i_class_id_55]) partial aggregation over (i_brand_id_53, i_category_id_57, i_class_id_55) join (INNER, REPLICATED): @@ -61,6 +59,9 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [i_brand_id_8, i_category_id_12, i_class_id_10]) + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) final aggregation over () @@ -103,10 +104,8 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [i_item_sk_552]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [i_brand_id_559, i_category_id_563, i_class_id_561]) - scan item final aggregation over (expr_836, expr_837, expr_838) - local exchange (GATHER, SINGLE, []) + local exchange (REPARTITION, HASH, [expr_836, expr_837, expr_838]) remote exchange (REPARTITION, HASH, [i_brand_id_604, i_category_id_608, i_class_id_606]) partial aggregation over (i_brand_id_604, i_category_id_608, i_class_id_606) join (INNER, REPLICATED): @@ -140,6 +139,9 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [i_brand_id_559, i_category_id_563, i_class_id_561]) + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) final aggregation over () @@ -182,10 +184,8 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [i_item_sk_1179]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [i_brand_id_1186, i_category_id_1190, i_class_id_1188]) - scan item final aggregation over (expr_1463, expr_1464, expr_1465) - local exchange (GATHER, SINGLE, []) + local exchange (REPARTITION, HASH, [expr_1463, expr_1464, expr_1465]) remote exchange (REPARTITION, HASH, [i_brand_id_1231, i_category_id_1235, i_class_id_1233]) partial aggregation over (i_brand_id_1231, i_category_id_1235, i_class_id_1233) join (INNER, REPLICATED): @@ -219,6 +219,9 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [i_brand_id_1186, i_category_id_1190, i_class_id_1188]) + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) final aggregation over () diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14_2.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14_2.plan.txt index 8dcae8f2d4aaa..06caddf83f707 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14_2.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q14_2.plan.txt @@ -21,10 +21,8 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [i_item_sk_1]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [i_brand_id_8, i_category_id_12, i_class_id_10]) - scan item final aggregation over (expr_216, expr_217, expr_218) - local exchange (GATHER, SINGLE, []) + local exchange (REPARTITION, HASH, [expr_216, expr_217, expr_218]) remote exchange (REPARTITION, HASH, [i_brand_id_53, i_category_id_57, i_class_id_55]) partial aggregation over (i_brand_id_53, i_category_id_57, i_class_id_55) join (INNER, REPLICATED): @@ -58,6 +56,9 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [i_brand_id_8, i_category_id_12, i_class_id_10]) + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) local exchange (GATHER, SINGLE, []) @@ -106,10 +107,8 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [i_item_sk_578]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [i_brand_id_585, i_category_id_589, i_class_id_587]) - scan item final aggregation over (expr_862, expr_863, expr_864) - local exchange (GATHER, SINGLE, []) + local exchange (REPARTITION, HASH, [expr_862, expr_863, expr_864]) remote exchange (REPARTITION, HASH, [i_brand_id_630, i_category_id_634, i_class_id_632]) partial aggregation over (i_brand_id_630, i_category_id_634, i_class_id_632) join (INNER, REPLICATED): @@ -143,6 +142,9 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [i_brand_id_585, i_category_id_589, i_class_id_587]) + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) local exchange (GATHER, SINGLE, []) diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q16.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q16.plan.txt index f0df98483720b..394d3251383a1 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q16.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q16.plan.txt @@ -2,30 +2,31 @@ final aggregation over () local exchange (GATHER, SINGLE, []) remote exchange (GATHER, SINGLE, []) partial aggregation over () - join (LEFT, PARTITIONED): - final aggregation over (ca_state, cc_county, cs_call_center_sk, cs_ext_ship_cost, cs_net_profit, cs_order_number, cs_ship_addr_sk, cs_ship_date_sk, cs_warehouse_sk, d_date, unique) - local exchange (GATHER, SINGLE, []) - partial aggregation over (ca_state, cc_county, cs_call_center_sk, cs_ext_ship_cost, cs_net_profit, cs_order_number, cs_ship_addr_sk, cs_ship_date_sk, cs_warehouse_sk, d_date, unique) - join (RIGHT, PARTITIONED): - remote exchange (REPARTITION, HASH, [cs_order_number_17]) - scan catalog_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cs_order_number]) - join (INNER, REPLICATED): + local exchange (GATHER, SINGLE, []) + join (RIGHT, PARTITIONED): + final aggregation over (cr_order_number) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [cr_order_number]) + partial aggregation over (cr_order_number) + scan catalog_returns + final aggregation over (ca_state, cc_county, cs_call_center_sk, cs_ext_ship_cost, cs_net_profit, cs_order_number, cs_ship_addr_sk, cs_ship_date_sk, cs_warehouse_sk, d_date, unique) + local exchange (GATHER, SINGLE, []) + partial aggregation over (ca_state, cc_county, cs_call_center_sk, cs_ext_ship_cost, cs_net_profit, cs_order_number, cs_ship_addr_sk, cs_ship_date_sk, cs_warehouse_sk, d_date, unique) + join (RIGHT, PARTITIONED): + remote exchange (REPARTITION, HASH, [cs_order_number_17]) + scan catalog_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [cs_order_number]) join (INNER, REPLICATED): join (INNER, REPLICATED): - scan catalog_sales + join (INNER, REPLICATED): + scan catalog_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan customer_address local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan customer_address + scan date_dim local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan call_center - final aggregation over (cr_order_number) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cr_order_number]) - partial aggregation over (cr_order_number) - scan catalog_returns + scan call_center diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q44.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q44.plan.txt index f12b13c5d2d70..54e9801300c81 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q44.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q44.plan.txt @@ -1,50 +1,48 @@ local exchange (GATHER, SINGLE, []) remote exchange (GATHER, SINGLE, []) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_item_sk_61]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_item_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [rank]) - local exchange (GATHER, SINGLE, []) - remote exchange (GATHER, SINGLE, []) - cross join: - final aggregation over (ss_item_sk) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ss_item_sk]) - partial aggregation over (ss_item_sk) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - local exchange (GATHER, SINGLE, []) - remote exchange (GATHER, SINGLE, []) - final aggregation over (ss_store_sk_13) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ss_store_sk_13]) - partial aggregation over (ss_store_sk_13) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [rank_131]) + remote exchange (REPARTITION, HASH, [rank]) + local exchange (GATHER, SINGLE, []) + remote exchange (GATHER, SINGLE, []) + cross join: + final aggregation over (ss_item_sk) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [ss_item_sk]) + partial aggregation over (ss_item_sk) + scan store_sales local exchange (GATHER, SINGLE, []) - remote exchange (GATHER, SINGLE, []) - cross join: - final aggregation over (ss_item_sk_61) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ss_item_sk_61]) - partial aggregation over (ss_item_sk_61) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) + remote exchange (REPLICATE, BROADCAST, []) + local exchange (GATHER, SINGLE, []) + remote exchange (GATHER, SINGLE, []) + final aggregation over (ss_store_sk_13) local exchange (GATHER, SINGLE, []) - remote exchange (GATHER, SINGLE, []) - final aggregation over (ss_store_sk_98) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ss_store_sk_98]) - partial aggregation over (ss_store_sk_98) - scan store_sales + remote exchange (REPARTITION, HASH, [ss_store_sk_13]) + partial aggregation over (ss_store_sk_13) + scan store_sales local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [i_item_sk]) - scan item + remote exchange (REPARTITION, HASH, [rank_131]) + local exchange (GATHER, SINGLE, []) + remote exchange (GATHER, SINGLE, []) + cross join: + final aggregation over (ss_item_sk_61) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [ss_item_sk_61]) + partial aggregation over (ss_item_sk_61) + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + local exchange (GATHER, SINGLE, []) + remote exchange (GATHER, SINGLE, []) + final aggregation over (ss_store_sk_98) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [ss_store_sk_98]) + partial aggregation over (ss_store_sk_98) + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan item local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [i_item_sk_148]) + remote exchange (REPLICATE, BROADCAST, []) scan item diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q64.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q64.plan.txt index ca8de6cd6d5ce..5c03039efd82b 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q64.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q64.plan.txt @@ -2,182 +2,163 @@ remote exchange (GATHER, SINGLE, []) local exchange (GATHER, UNKNOWN, []) remote exchange (REPARTITION, ROUND_ROBIN, []) join (INNER, PARTITIONED): - final aggregation over (ca_city, ca_city_98, ca_street_name, ca_street_name_95, ca_street_number, ca_street_number_94, ca_zip, ca_zip_101, d_year, d_year_28, d_year_56, i_item_sk, i_product_name, s_store_name, s_zip) - local exchange (GATHER, SINGLE, []) - partial aggregation over (ca_city, ca_city_98, ca_street_name, ca_street_name_95, ca_street_number, ca_street_number_94, ca_zip, ca_zip_101, d_year, d_year_28, d_year_56, i_item_sk, i_product_name, s_store_name, s_zip) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [cs_item_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [hd_income_band_sk_88]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [hd_income_band_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [c_current_addr_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_addr_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [c_current_hdemo_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_hdemo_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_promo_sk]) + remote exchange (REPARTITION, HASH, [i_item_sk, s_store_name, s_zip]) + final aggregation over (ca_city, ca_city_98, ca_street_name, ca_street_name_95, ca_street_number, ca_street_number_94, ca_zip, ca_zip_101, d_year, d_year_28, d_year_56, i_item_sk, i_product_name, s_store_name, s_zip) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [ca_city, ca_city_98, ca_street_name, ca_street_name_95, ca_street_number, ca_street_number_94, ca_zip, ca_zip_101, d_year, d_year_28, d_year_56, i_item_sk, i_product_name, s_store_name, s_zip]) + partial aggregation over (ca_city, ca_city_98, ca_street_name, ca_street_name_95, ca_street_number, ca_street_number_94, ca_zip, ca_zip_101, d_year, d_year_28, d_year_56, i_item_sk, i_product_name, s_store_name, s_zip) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, [c_current_addr_sk]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, [ss_addr_sk]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [c_current_cdemo_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_cdemo_sk]) + remote exchange (REPARTITION, HASH, [ss_customer_sk]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [c_first_shipto_date_sk]) + remote exchange (REPARTITION, HASH, [sr_item_sk]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [c_first_sales_date_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_customer_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_store_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_sold_date_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [sr_item_sk]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_item_sk, ss_ticket_number]) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [sr_item_sk, sr_ticket_number]) - scan store_returns - final aggregation over (cs_item_sk) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cs_item_sk]) - partial aggregation over (cs_item_sk) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [cs_item_sk, cs_order_number]) - scan catalog_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cr_item_sk, cr_order_number]) - scan catalog_returns - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [d_date_sk]) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [s_store_sk]) - scan store - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [c_customer_sk]) - scan customer + remote exchange (REPARTITION, HASH, [ss_item_sk, ss_ticket_number]) + scan store_sales local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [d_date_sk_22]) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [d_date_sk_50]) - scan date_dim + remote exchange (REPARTITION, HASH, [sr_item_sk, sr_ticket_number]) + scan store_returns + final aggregation over (cs_item_sk) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [cs_item_sk]) + partial aggregation over (cs_item_sk) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, [cs_item_sk, cs_order_number]) + scan catalog_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [cr_item_sk, cr_order_number]) + scan catalog_returns + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cd_demo_sk]) - scan customer_demographics + remote exchange (REPLICATE, BROADCAST, []) + scan store local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cd_demo_sk_78]) - scan customer_demographics + remote exchange (REPARTITION, HASH, [c_customer_sk]) + scan customer + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [p_promo_sk]) - scan promotion + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan customer_demographics local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [hd_demo_sk]) - scan household_demographics + remote exchange (REPLICATE, BROADCAST, []) + scan customer_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan promotion local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [hd_demo_sk_87]) + remote exchange (REPLICATE, BROADCAST, []) scan household_demographics - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ca_address_sk]) - scan customer_address - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ca_address_sk_92]) - scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan household_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [ca_address_sk]) + scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [ca_address_sk_92]) + scan customer_address local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ib_income_band_sk]) + remote exchange (REPLICATE, BROADCAST, []) scan income_band + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan income_band local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ib_income_band_sk_105]) - scan income_band - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [i_item_sk]) - scan item - final aggregation over (ca_city_547, ca_city_560, ca_street_name_544, ca_street_name_557, ca_street_number_543, ca_street_number_556, ca_zip_550, ca_zip_563, d_year_369, d_year_397, d_year_425, i_item_sk_573, i_product_name_594, s_store_name_452, s_zip_472) - local exchange (GATHER, SINGLE, []) - partial aggregation over (ca_city_547, ca_city_560, ca_street_name_544, ca_street_name_557, ca_street_number_543, ca_street_number_556, ca_zip_550, ca_zip_563, d_year_369, d_year_397, d_year_425, i_item_sk_573, i_product_name_594, s_store_name_452, s_zip_472) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [cs_item_sk_292]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [hd_income_band_sk_537]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [hd_income_band_sk_532]) + remote exchange (REPLICATE, BROADCAST, []) + scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [i_item_sk_573, s_store_name_452, s_zip_472]) + final aggregation over (ca_city_547, ca_city_560, ca_street_name_544, ca_street_name_557, ca_street_number_543, ca_street_number_556, ca_zip_550, ca_zip_563, d_year_369, d_year_397, d_year_425, i_item_sk_573, i_product_name_594, s_store_name_452, s_zip_472) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [ca_city_547, ca_city_560, ca_street_name_544, ca_street_name_557, ca_street_number_543, ca_street_number_556, ca_zip_550, ca_zip_563, d_year_369, d_year_397, d_year_425, i_item_sk_573, i_product_name_594, s_store_name_452, s_zip_472]) + partial aggregation over (ca_city_547, ca_city_560, ca_street_name_544, ca_street_name_557, ca_street_number_543, ca_street_number_556, ca_zip_550, ca_zip_563, d_year_369, d_year_397, d_year_425, i_item_sk_573, i_product_name_594, s_store_name_452, s_zip_472) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): join (INNER, PARTITIONED): remote exchange (REPARTITION, HASH, [c_current_addr_sk_480]) join (INNER, PARTITIONED): remote exchange (REPARTITION, HASH, [ss_addr_sk_240]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [c_current_hdemo_sk_479]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_hdemo_sk_239]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_promo_sk_242]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [c_current_cdemo_sk_478]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_cdemo_sk_238]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [c_first_shipto_date_sk_481]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [c_first_sales_date_sk_482]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_customer_sk_237]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, [ss_customer_sk_237]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, [sr_item_sk_259]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, [ss_item_sk_236, ss_ticket_number_243]) + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [sr_item_sk_259, sr_ticket_number_266]) + scan store_returns + final aggregation over (cs_item_sk_292) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [cs_item_sk_292]) + partial aggregation over (cs_item_sk_292) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_store_sk_241]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_sold_date_sk_234]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [sr_item_sk_259]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [ss_item_sk_236, ss_ticket_number_243]) - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [sr_item_sk_259, sr_ticket_number_266]) - scan store_returns - final aggregation over (cs_item_sk_292) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cs_item_sk_292]) - partial aggregation over (cs_item_sk_292) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [cs_item_sk_292, cs_order_number_294]) - scan catalog_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cr_item_sk_313, cr_order_number_327]) - scan catalog_returns - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [d_date_sk_363]) - scan date_dim + remote exchange (REPARTITION, HASH, [cs_item_sk_292, cs_order_number_294]) + scan catalog_sales local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [s_store_sk_447]) - scan store - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [c_customer_sk_476]) - scan customer - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [d_date_sk_391]) - scan date_dim + remote exchange (REPARTITION, HASH, [cr_item_sk_313, cr_order_number_327]) + scan catalog_returns local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [d_date_sk_419]) + remote exchange (REPLICATE, BROADCAST, []) scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cd_demo_sk_494]) - scan customer_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan store + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [c_customer_sk_476]) + scan customer local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [cd_demo_sk_503]) - scan customer_demographics + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [p_promo_sk_512]) - scan promotion + remote exchange (REPLICATE, BROADCAST, []) + scan customer_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan customer_demographics local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [hd_demo_sk_531]) - scan household_demographics + remote exchange (REPLICATE, BROADCAST, []) + scan promotion + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan household_demographics local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [hd_demo_sk_536]) + remote exchange (REPLICATE, BROADCAST, []) scan household_demographics local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [ca_address_sk_541]) @@ -185,12 +166,12 @@ remote exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [ca_address_sk_554]) scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan income_band local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ib_income_band_sk_567]) + remote exchange (REPLICATE, BROADCAST, []) scan income_band - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [ib_income_band_sk_570]) - scan income_band - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [i_item_sk_573]) - scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan item diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q04.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q04.plan.txt index f387a01c3d8a6..7d4d4bfff5054 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q04.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q04.plan.txt @@ -6,10 +6,11 @@ remote exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [orderpriority]) partial aggregation over (orderpriority) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [orderkey]) - scan orders final aggregation over (orderkey_0) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [orderkey_0]) partial aggregation over (orderkey_0) scan lineitem + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [orderkey]) + scan orders diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q09.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q09.plan.txt index 9a36538c96eb6..d289b536de3a5 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q09.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q09.plan.txt @@ -5,28 +5,27 @@ remote exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [expr_18, name_15]) partial aggregation over (expr_18, name_15) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [nationkey]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [orderkey]) + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, [orderkey]) + join (INNER, PARTITIONED): join (INNER, PARTITIONED): - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [suppkey_4]) - join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, [suppkey_4]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, [partkey_3]) + scan lineitem + local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [partkey]) scan part - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [partkey_3]) - scan lineitem - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [suppkey]) - scan supplier local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [suppkey_8]) - scan partsupp - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [orderkey_11]) - scan orders + remote exchange (REPARTITION, HASH, [suppkey]) + scan supplier + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [suppkey_8]) + scan partsupp + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, [orderkey_11]) + scan orders local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [nationkey_14]) + remote exchange (REPLICATE, BROADCAST, []) scan nation diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q13.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q13.plan.txt index 99812af1a6922..3918dc7fd2ef4 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q13.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q13.plan.txt @@ -8,9 +8,9 @@ remote exchange (GATHER, SINGLE, []) final aggregation over (custkey) local exchange (GATHER, SINGLE, []) partial aggregation over (custkey) - join (LEFT, PARTITIONED): - remote exchange (REPARTITION, HASH, [custkey]) - scan customer + join (RIGHT, PARTITIONED): + remote exchange (REPARTITION, HASH, [custkey_0]) + scan orders local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [custkey_0]) - scan orders + remote exchange (REPARTITION, HASH, [custkey]) + scan customer diff --git a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q21.plan.txt b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q21.plan.txt index 15945f860d3b6..27fba0c272d93 100644 --- a/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q21.plan.txt +++ b/presto-benchto-benchmarks/src/test/resources/sql/presto/tpch/q21.plan.txt @@ -10,23 +10,21 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) partial aggregation over (commitdate, name, name_7, nationkey, orderkey, orderstatus, receiptdate, suppkey, unique_189) join (LEFT, PARTITIONED): - remote exchange (REPARTITION, HASH, [orderkey]) + join (INNER, REPLICATED): join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [nationkey]) + remote exchange (REPARTITION, HASH, [orderkey]) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [orderkey]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, [suppkey]) - scan supplier - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [suppkey_0]) - scan lineitem + remote exchange (REPARTITION, HASH, [suppkey_0]) + scan lineitem local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [orderkey_3]) - scan orders + remote exchange (REPARTITION, HASH, [suppkey]) + scan supplier local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, [nationkey_6]) - scan nation + remote exchange (REPARTITION, HASH, [orderkey_3]) + scan orders + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan nation local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, [orderkey_10]) scan lineitem diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 72812c27111c6..3d1c0009535fb 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -73,6 +73,7 @@ public final class SystemSessionProperties public static final String OPTIMIZE_HASH_GENERATION = "optimize_hash_generation"; public static final String JOIN_DISTRIBUTION_TYPE = "join_distribution_type"; public static final String JOIN_MAX_BROADCAST_TABLE_SIZE = "join_max_broadcast_table_size"; + public static final String SIZE_BASED_JOIN_DISTRIBUTION_TYPE = "size_based_join_distribution_type"; public static final String DISTRIBUTED_JOIN = "distributed_join"; public static final String DISTRIBUTED_INDEX_JOIN = "distributed_index_join"; public static final String HASH_PARTITION_COUNT = "hash_partition_count"; @@ -303,6 +304,11 @@ public SystemSessionProperties( true, value -> DataSize.valueOf((String) value), DataSize::toString), + booleanProperty( + SIZE_BASED_JOIN_DISTRIBUTION_TYPE, + "Consider source table size when determining join distribution type when CBO fails", + featuresConfig.isSizeBasedJoinDistributionTypeEnabled(), + false), booleanProperty( DISTRIBUTED_INDEX_JOIN, "Distribute index joins on join keys instead of executing inline", @@ -1382,6 +1388,11 @@ public static DataSize getJoinMaxBroadcastTableSize(Session session) return session.getSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, DataSize.class); } + public static boolean isSizeBasedJoinDistributionTypeEnabled(Session session) + { + return session.getSystemProperty(SIZE_BASED_JOIN_DISTRIBUTION_TYPE, Boolean.class); + } + public static boolean isDistributedIndexJoinEnabled(Session session) { return session.getSystemProperty(DISTRIBUTED_INDEX_JOIN, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index f50a7cd51946b..a80000d3e5a4a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -70,6 +70,7 @@ public class FeaturesConfig private boolean distributedIndexJoinsEnabled; private JoinDistributionType joinDistributionType = JoinDistributionType.AUTOMATIC; private DataSize joinMaxBroadcastTableSize = new DataSize(100, MEGABYTE); + private boolean sizeBasedJoinDistributionTypeEnabled = true; private boolean colocatedJoinsEnabled = true; private boolean groupedExecutionEnabled = true; private boolean recoverableGroupedExecutionEnabled; @@ -507,6 +508,18 @@ public FeaturesConfig setJoinMaxBroadcastTableSize(DataSize joinMaxBroadcastTabl return this; } + @Config("optimizer.size-based-join-distribution-type-enabled") + public FeaturesConfig setSizeBasedJoinDistributionTypeEnabled(boolean considerTableSize) + { + this.sizeBasedJoinDistributionTypeEnabled = considerTableSize; + return this; + } + + public boolean isSizeBasedJoinDistributionTypeEnabled() + { + return sizeBasedJoinDistributionTypeEnabled; + } + public boolean isGroupedExecutionEnabled() { return groupedExecutionEnabled; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java index 8fa9ae8d8d8c0..68755ec550889 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java @@ -22,29 +22,43 @@ import com.facebook.presto.matching.Captures; import com.facebook.presto.matching.Pattern; import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.plan.ValuesNode; import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType; +import com.facebook.presto.sql.planner.iterative.GroupReference; +import com.facebook.presto.sql.planner.iterative.Lookup; import com.facebook.presto.sql.planner.iterative.Rule; +import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher; import com.facebook.presto.sql.planner.plan.JoinNode; +import com.facebook.presto.sql.planner.plan.UnnestNode; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import io.airlift.units.DataSize; import java.util.ArrayList; import java.util.List; +import java.util.stream.Stream; import static com.facebook.presto.SystemSessionProperties.getJoinDistributionType; import static com.facebook.presto.SystemSessionProperties.getJoinMaxBroadcastTableSize; +import static com.facebook.presto.SystemSessionProperties.isSizeBasedJoinDistributionTypeEnabled; import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateJoinCostWithoutOutput; import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.AUTOMATIC; import static com.facebook.presto.sql.planner.optimizations.QueryCardinalityUtil.isAtMostScalar; import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.PARTITIONED; import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.REPLICATED; import static com.facebook.presto.sql.planner.plan.Patterns.join; +import static java.lang.Double.NaN; +import static java.lang.Double.isNaN; import static java.util.Objects.requireNonNull; public class DetermineJoinDistributionType implements Rule { private static final Pattern PATTERN = join().matching(joinNode -> !joinNode.getDistributionType().isPresent()); + private static final List> EXPANDING_NODE_CLASSES = ImmutableList.of(JoinNode.class, UnnestNode.class); + private static final double SIZE_DIFFERENCE_THRESHOLD = 8; private final CostComparator costComparator; private final TaskCountEstimator taskCountEstimator; @@ -78,7 +92,9 @@ public static boolean isBelowMaxBroadcastSize(JoinNode joinNode, Context context PlanNode buildSide = joinNode.getRight(); PlanNodeStatsEstimate buildSideStatsEstimate = context.getStatsProvider().getStats(buildSide); double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide.getOutputVariables()); - return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes(); + return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes() + || (isSizeBasedJoinDistributionTypeEnabled(context.getSession()) + && getSourceTablesSizeInBytes(buildSide, context) <= joinMaxBroadcastTableSize.toBytes()); } private PlanNode getCostBasedJoin(JoinNode joinNode, Context context) @@ -89,6 +105,9 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context) addJoinsWithDifferentDistributions(joinNode.flipChildren(), possibleJoinNodes, context); if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) { + if (isSizeBasedJoinDistributionTypeEnabled(context.getSession())) { + return getSizeBasedJoin(joinNode, context); + } return getSyntacticOrderJoin(joinNode, context, AUTOMATIC); } @@ -97,6 +116,128 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context) return planNodeOrderings.min(possibleJoinNodes).getPlanNode(); } + private JoinNode getSizeBasedJoin(JoinNode joinNode, Context context) + { + DataSize joinMaxBroadcastTableSize = getJoinMaxBroadcastTableSize(context.getSession()); + + boolean isRightSideSmall = getSourceTablesSizeInBytes(joinNode.getRight(), context) <= joinMaxBroadcastTableSize.toBytes(); + if (isRightSideSmall && !mustPartition(joinNode)) { + // choose right join side with small source tables as replicated build side + return joinNode.withDistributionType(REPLICATED); + } + + boolean isLeftSideSmall = getSourceTablesSizeInBytes(joinNode.getLeft(), context) <= joinMaxBroadcastTableSize.toBytes(); + JoinNode flippedJoin = joinNode.flipChildren(); + if (isLeftSideSmall && !mustPartition(flippedJoin)) { + // choose join left side with small source tables as replicated build side + return flippedJoin.withDistributionType(REPLICATED); + } + + if (isRightSideSmall) { + // right side is small enough, but must be partitioned + return joinNode.withDistributionType(PARTITIONED); + } + + if (isLeftSideSmall) { + // left side is small enough, but must be partitioned + return flippedJoin.withDistributionType(PARTITIONED); + } + + // Flip join sides if one side is smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times. + // We use 8x factor because getFirstKnownOutputSizeInBytes may not have accounted for the reduction in the size of + // the output from a filter or aggregation due to lack of estimates. + // We use getFirstKnownOutputSizeInBytes instead of getSourceTablesSizeInBytes to account for the reduction in + // output size from the operators between the join and the table scan as much as possible when comparing the sizes of the join sides. + double leftOutputSizeInBytes = getFirstKnownOutputSizeInBytes(joinNode.getLeft(), context); + double rightOutputSizeInBytes = getFirstKnownOutputSizeInBytes(joinNode.getRight(), context); + // All the REPLICATED cases were handled in the code above, so now we only consider PARTITIONED cases here + if (rightOutputSizeInBytes * SIZE_DIFFERENCE_THRESHOLD < leftOutputSizeInBytes && !mustReplicate(joinNode, context)) { + return joinNode.withDistributionType(PARTITIONED); + } + + if (leftOutputSizeInBytes * SIZE_DIFFERENCE_THRESHOLD < rightOutputSizeInBytes && !mustReplicate(flippedJoin, context)) { + return flippedJoin.withDistributionType(PARTITIONED); + } + + // neither side is small enough, choose syntactic join order + return getSyntacticOrderJoin(joinNode, context, AUTOMATIC); + } + + public static double getSourceTablesSizeInBytes(PlanNode node, Context context) + { + return getSourceTablesSizeInBytes(node, context.getLookup(), context.getStatsProvider()); + } + + @VisibleForTesting + static double getSourceTablesSizeInBytes(PlanNode node, Lookup lookup, StatsProvider statsProvider) + { + boolean hasExpandingNodes = PlanNodeSearcher.searchFrom(node, lookup) + .whereIsInstanceOfAny(EXPANDING_NODE_CLASSES) + .matches(); + if (hasExpandingNodes) { + return NaN; + } + + List sourceNodes = PlanNodeSearcher.searchFrom(node, lookup) + .whereIsInstanceOfAny(ImmutableList.of(TableScanNode.class, ValuesNode.class)) + .findAll(); + + return sourceNodes.stream() + .mapToDouble(sourceNode -> statsProvider.getStats(sourceNode).getOutputSizeInBytes(sourceNode.getOutputVariables())) + .sum(); + } + + private static double getFirstKnownOutputSizeInBytes(PlanNode node, Context context) + { + return getFirstKnownOutputSizeInBytes(node, context.getLookup(), context.getStatsProvider()); + } + + /** + * Recursively looks for the first source node with a known estimate and uses that to return an approximate output size. + * Returns NaN if an un-estimated expanding node (Join or Unnest) is encountered. + * The amount of reduction in size from un-estimated non-expanding nodes (e.g. an un-estimated filter or aggregation) + * is not accounted here. We make use of the first available estimate and make decision about flipping join sides only if + * we find a large difference in output size of both sides. + */ + @VisibleForTesting + public static double getFirstKnownOutputSizeInBytes(PlanNode node, Lookup lookup, StatsProvider statsProvider) + { + return Stream.of(node) + .flatMap(planNode -> { + if (planNode instanceof GroupReference) { + return lookup.resolveGroup(node); + } + return Stream.of(planNode); + }) + .mapToDouble(resolvedNode -> { + double outputSizeInBytes = statsProvider.getStats(resolvedNode).getOutputSizeInBytes( + resolvedNode.getOutputVariables()); + if (!isNaN(outputSizeInBytes)) { + return outputSizeInBytes; + } + + if (EXPANDING_NODE_CLASSES.stream().anyMatch(clazz -> clazz.isInstance(resolvedNode))) { + return NaN; + } + + List sourceNodes = resolvedNode.getSources(); + if (sourceNodes.isEmpty()) { + return NaN; + } + + double sourcesOutputSizeInBytes = 0; + for (PlanNode sourceNode : sourceNodes) { + double firstKnownOutputSizeInBytes = getFirstKnownOutputSizeInBytes(sourceNode, lookup, statsProvider); + if (isNaN(firstKnownOutputSizeInBytes)) { + return NaN; + } + sourcesOutputSizeInBytes += firstKnownOutputSizeInBytes; + } + return sourcesOutputSizeInBytes; + }) + .sum(); + } + private void addJoinsWithDifferentDistributions(JoinNode joinNode, List possibleJoinNodes, Context context) { if (!mustPartition(joinNode) && isBelowMaxBroadcastSize(joinNode, context)) { @@ -108,7 +249,7 @@ private void addJoinsWithDifferentDistributions(JoinNode joinNode, List result.getCost().hasUnknownComponents())) { + if (isSizeBasedJoinDistributionTypeEnabled(context.getSession())) { + return getSizeBaseDistributionType(node, context); + } return node.withDistributionType(PARTITIONED); } @@ -112,6 +118,18 @@ private PlanNode getCostBasedDistributionType(SemiJoinNode node, Context context return planNodeOrderings.min(possibleJoinNodes).getPlanNode(); } + private PlanNode getSizeBaseDistributionType(SemiJoinNode node, Context context) + { + DataSize joinMaxBroadcastTableSize = getJoinMaxBroadcastTableSize(context.getSession()); + + if (getSourceTablesSizeInBytes(node.getFilteringSource(), context) <= joinMaxBroadcastTableSize.toBytes()) { + // choose replicated distribution type as filtering source contains small source tables only + return node.withDistributionType(REPLICATED); + } + + return node.withDistributionType(PARTITIONED); + } + private boolean canReplicate(SemiJoinNode node, Context context) { DataSize joinMaxBroadcastTableSize = getJoinMaxBroadcastTableSize(context.getSession()); @@ -119,7 +137,9 @@ private boolean canReplicate(SemiJoinNode node, Context context) PlanNode buildSide = node.getFilteringSource(); PlanNodeStatsEstimate buildSideStatsEstimate = context.getStatsProvider().getStats(buildSide); double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide.getOutputVariables()); - return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes(); + return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes() + || (isSizeBasedJoinDistributionTypeEnabled(context.getSession()) + && getSourceTablesSizeInBytes(buildSide, context) <= joinMaxBroadcastTableSize.toBytes()); } private PlanNodeWithCost getSemiJoinNodeWithCost(SemiJoinNode possibleJoinNode, Context context) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PlanNodeSearcher.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PlanNodeSearcher.java index fbf40011c0cd0..e33a577fa4a26 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PlanNodeSearcher.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PlanNodeSearcher.java @@ -24,6 +24,7 @@ import static com.facebook.presto.sql.planner.iterative.Lookup.noLookup; import static com.facebook.presto.sql.planner.plan.ChildReplacer.replaceChildren; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Predicates.alwaysFalse; import static com.google.common.base.Predicates.alwaysTrue; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; @@ -57,6 +58,15 @@ private PlanNodeSearcher(PlanNode node, Lookup lookup) this.lookup = requireNonNull(lookup, "lookup is null"); } + public final PlanNodeSearcher whereIsInstanceOfAny(List> classes) + { + Predicate predicate = alwaysFalse(); + for (Class clazz : classes) { + predicate = predicate.or(clazz::isInstance); + } + return where(predicate); + } + public PlanNodeSearcher where(Predicate where) { this.where = requireNonNull(where, "where is null"); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index cb539276887aa..38b348c603b72 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -62,6 +62,7 @@ public void testDefaults() .setDistributedIndexJoinsEnabled(false) .setJoinDistributionType(JoinDistributionType.AUTOMATIC) .setJoinMaxBroadcastTableSize(new DataSize(100, MEGABYTE)) + .setSizeBasedJoinDistributionTypeEnabled(true) .setGroupedExecutionEnabled(true) .setRecoverableGroupedExecutionEnabled(false) .setMaxFailedTaskPercentage(0.3) @@ -235,6 +236,7 @@ public void testExplicitPropertyMappings() .put("distributed-index-joins-enabled", "true") .put("join-distribution-type", "BROADCAST") .put("join-max-broadcast-table-size", "42GB") + .put("optimizer.size-based-join-distribution-type-enabled", "false") .put("grouped-execution-enabled", "false") .put("recoverable-grouped-execution-enabled", "true") .put("max-failed-task-percentage", "0.8") @@ -373,6 +375,7 @@ public void testExplicitPropertyMappings() .setDistributedIndexJoinsEnabled(true) .setJoinDistributionType(BROADCAST) .setJoinMaxBroadcastTableSize(new DataSize(42, GIGABYTE)) + .setSizeBasedJoinDistributionTypeEnabled(false) .setGroupedExecutionEnabled(false) .setRecoverableGroupedExecutionEnabled(true) .setMaxFailedTaskPercentage(0.8) diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java index 857dac83fb37e..02e86ad21bf77 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java @@ -1246,6 +1246,46 @@ public void testComplexOrderBy() project(values("col1"))))))))))); } + @Test + public void testSizeBasedJoin() + { + // both local.sf100000.nation and local.sf100000.orders don't provide stats, therefore no reordering happens + assertDistributedPlan("SELECT custkey FROM local.\"sf42.5\".nation, local.\"sf42.5\".orders WHERE nation.nationkey = orders.custkey", + output( + anyTree( + join(INNER, ImmutableList.of(equiJoinClause("NATIONKEY", "CUSTKEY")), + anyTree(tableScan("nation", ImmutableMap.of("NATIONKEY", "nationkey"))), + anyTree(tableScan("orders", ImmutableMap.of("CUSTKEY", "custkey"))))))); + + // values node provides stats + assertDistributedPlan("SELECT custkey FROM (VALUES CAST(1 AS BIGINT), CAST(2 AS BIGINT)) t(a), local.\"sf42.5\".orders WHERE t.a = orders.custkey", + output( + anyTree( + join(INNER, ImmutableList.of(equiJoinClause("CUSTKEY", "T_A")), Optional.empty(), Optional.of(REPLICATED), + anyTree(tableScan("orders", ImmutableMap.of("CUSTKEY", "custkey"))), + anyTree(values("T_A")))))); + } + + @Test + public void testSizeBasedSemiJoin() + { + // both local.sf100000.nation and local.sf100000.orders don't provide stats, therefore no reordering happens + assertDistributedPlan("SELECT custkey FROM local.\"sf42.5\".orders WHERE orders.custkey NOT IN (SELECT nationkey FROM local.\"sf42.5\".nation)", + output( + anyTree( + semiJoin("CUSTKEY", "NATIONKEY", "OUT", Optional.of(SemiJoinNode.DistributionType.PARTITIONED), + anyTree(tableScan("orders", ImmutableMap.of("CUSTKEY", "custkey"))), + anyTree(tableScan("nation", ImmutableMap.of("NATIONKEY", "nationkey"))))))); + + // values node provides stats + assertDistributedPlan("SELECT custkey FROM local.\"sf42.5\".orders WHERE orders.custkey NOT IN (SELECT t.a FROM (VALUES CAST(1 AS BIGINT), CAST(2 AS BIGINT)) t(a))", + output( + anyTree( + semiJoin("CUSTKEY", "T_A", "OUT", Optional.of(SemiJoinNode.DistributionType.REPLICATED), + anyTree(tableScan("orders", ImmutableMap.of("CUSTKEY", "custkey"))), + anyTree(values("T_A")))))); + } + @Test public void testJoinNullFilters() { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java index 262f44629c28a..744d887a2e197 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java @@ -13,19 +13,29 @@ */ package com.facebook.presto.sql.planner.iterative.rule; +import com.facebook.presto.common.type.VarcharType; import com.facebook.presto.cost.CostComparator; import com.facebook.presto.cost.PlanNodeStatsEstimate; import com.facebook.presto.cost.TaskCountEstimator; import com.facebook.presto.cost.VariableStatsEstimate; +import com.facebook.presto.spi.TestingColumnHandle; +import com.facebook.presto.spi.plan.FilterNode; +import com.facebook.presto.spi.plan.LimitNode; import com.facebook.presto.spi.plan.PlanNodeId; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.plan.ValuesNode; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType; +import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder; import com.facebook.presto.sql.planner.iterative.rule.test.RuleAssert; import com.facebook.presto.sql.planner.iterative.rule.test.RuleTester; import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.planner.plan.JoinNode.DistributionType; import com.facebook.presto.sql.planner.plan.JoinNode.Type; +import com.facebook.presto.sql.planner.plan.UnnestNode; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -36,10 +46,16 @@ import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static com.facebook.presto.SystemSessionProperties.JOIN_MAX_BROADCAST_TABLE_SIZE; import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; +import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.enforceSingleRow; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; +import static com.facebook.presto.sql.planner.iterative.Lookup.noLookup; +import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.getFirstKnownOutputSizeInBytes; +import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.getSourceTablesSizeInBytes; import static com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder.castToRowExpression; import static com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder.constantExpressions; import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.PARTITIONED; @@ -48,6 +64,8 @@ import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER; import static com.facebook.presto.sql.planner.plan.JoinNode.Type.LEFT; import static com.facebook.presto.sql.planner.plan.JoinNode.Type.RIGHT; +import static java.lang.Double.NaN; +import static org.testng.Assert.assertEquals; @Test(singleThreaded = true) public class TestDetermineJoinDistributionType @@ -690,6 +708,486 @@ public void testChoosesRightWhenFallsBackToSyntactic() values(ImmutableMap.of("A1", 0)))); } + @Test + public void testReplicatesWhenSourceIsSmall() + { + VarcharType variableType = createUnboundedVarcharType(); // variable width so that average row size is respected + int aRows = 10_000; + int bRows = 10; + + // output size exceeds JOIN_MAX_BROADCAST_TABLE_SIZE limit + PlanNodeStatsEstimate aStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "A1", variableType), + new VariableStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + // output size exceeds JOIN_MAX_BROADCAST_TABLE_SIZE limit + PlanNodeStatsEstimate bStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "B1", variableType), + new VariableStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + // output size does not exceed JOIN_MAX_BROADCAST_TABLE_SIZE limit + PlanNodeStatsEstimate bSourceStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "B1", variableType), + new VariableStatsEstimate(0, 100, 0, 64, 10))) + .build(); + + // immediate join sources exceeds JOIN_MAX_BROADCAST_TABLE_SIZE limit but build tables are small + // therefore replicated distribution type is chosen + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", aStatsEstimate) + .overrideStats("filterB", bStatsEstimate) + .overrideStats("valuesB", bSourceStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.filter(new PlanNodeId("filterB"), TRUE_CONSTANT, p.values(new PlanNodeId("valuesB"), bRows, b1)), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1, b1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + + // same but with join sides reversed + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", aStatsEstimate) + .overrideStats("filterB", bStatsEstimate) + .overrideStats("valuesB", bSourceStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.filter(new PlanNodeId("filterB"), TRUE_CONSTANT, p.values(new PlanNodeId("valuesB"), bRows, b1)), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + + // only probe side (with small tables) source stats are available, join sides should be flipped + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("filterB", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bSourceStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + LEFT, + p.filter(new PlanNodeId("filterB"), TRUE_CONSTANT, p.values(new PlanNodeId("valuesB"), bRows, b1)), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + RIGHT, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + } + + @Test + public void testFlipWhenSizeDifferenceLarge() + { + VarcharType variableType = createUnboundedVarcharType(); // variable width so that average row size is respected + int aRows = 10_000; + int bRows = 1_000; + + // output size exceeds JOIN_MAX_BROADCAST_TABLE_SIZE limit + PlanNodeStatsEstimate aStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "A1", variableType), + new VariableStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + // output size exceeds JOIN_MAX_BROADCAST_TABLE_SIZE limit + PlanNodeStatsEstimate bStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "B1", variableType), + new VariableStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + + // source tables size exceeds JOIN_MAX_BROADCAST_TABLE_SIZE limit but one side is significantly bigger than the other + // therefore repartitioned distribution type is chosen with the smaller side on build + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", aStatsEstimate) + .overrideStats("valuesB", bStatsEstimate) + .overrideStats("filterB", PlanNodeStatsEstimate.unknown()) // unestimated term to trigger size based join ordering + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.filter( + new PlanNodeId("filterB"), + TRUE_CONSTANT, + p.values(new PlanNodeId("valuesB"), bRows, b1)), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1, b1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + + // same but with join sides reversed + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", aStatsEstimate) + .overrideStats("valuesB", bStatsEstimate) + .overrideStats("filterB", PlanNodeStatsEstimate.unknown()) // unestimated term to trigger size based join ordering + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.filter( + new PlanNodeId("filterB"), + TRUE_CONSTANT, + p.values(new PlanNodeId("valuesB"), bRows, b1)), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + + // Use REPLICATED join type for cross join + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", aStatsEstimate) + .overrideStats("valuesB", bStatsEstimate) + .overrideStats("filterB", PlanNodeStatsEstimate.unknown()) // unestimated term to trigger size based join ordering + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.filter( + new PlanNodeId("filterB"), + TRUE_CONSTANT, + p.values(new PlanNodeId("valuesB"), bRows, b1)), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(), + Optional.empty(), + Optional.of(REPLICATED), + filter("true", values(ImmutableMap.of("B1", 0))), + values(ImmutableMap.of("A1", 0)))); + + // Don't flip sides when both are similar in size + bStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "B1", variableType), + new VariableStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", aStatsEstimate) + .overrideStats("valuesB", bStatsEstimate) + .overrideStats("filterB", PlanNodeStatsEstimate.unknown()) // unestimated term to trigger size based join ordering + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.filter( + new PlanNodeId("filterB"), + TRUE_CONSTANT, + p.values(new PlanNodeId("valuesB"), aRows, b1)), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("B1", "A1")), + Optional.empty(), + Optional.of(PARTITIONED), + filter("true", values(ImmutableMap.of("B1", 0))), + values(ImmutableMap.of("A1", 0)))); + } + + @Test + public void testGetSourceTablesSizeInBytes() + { + PlanBuilder planBuilder = new PlanBuilder(tester.getSession(), new PlanNodeIdAllocator(), tester.getMetadata()); + VariableReferenceExpression variable = planBuilder.variable("col"); + VariableReferenceExpression sourceVariable1 = planBuilder.variable("source1"); + VariableReferenceExpression sourceVariable2 = planBuilder.variable("soruce2"); + + // missing source stats + assertEquals( + getSourceTablesSizeInBytes( + planBuilder.values(variable), + noLookup(), + node -> PlanNodeStatsEstimate.unknown()), + NaN); + + // two source plan nodes + PlanNodeStatsEstimate sourceStatsEstimate1 = PlanNodeStatsEstimate.builder() + .setOutputRowCount(10) + .build(); + PlanNodeStatsEstimate sourceStatsEstimate2 = PlanNodeStatsEstimate.builder() + .setOutputRowCount(20) + .build(); + assertEquals( + getSourceTablesSizeInBytes( + planBuilder.union( + ImmutableListMultimap.builder() + .put(variable, sourceVariable1) + .put(variable, sourceVariable2) + .build(), + ImmutableList.of(planBuilder.tableScan( + ImmutableList.of(sourceVariable1), + ImmutableMap.of(sourceVariable1, new TestingColumnHandle("col"))), + planBuilder.values(new PlanNodeId("valuesNode"), sourceVariable2))), + noLookup(), + node -> { + if (node instanceof TableScanNode) { + return sourceStatsEstimate1; + } + + if (node instanceof ValuesNode) { + return sourceStatsEstimate2; + } + + return PlanNodeStatsEstimate.unknown(); + }), + 270.0); + + // join node + assertEquals( + getSourceTablesSizeInBytes( + planBuilder.join( + INNER, + planBuilder.values(sourceVariable1), + planBuilder.values(sourceVariable2)), + noLookup(), + node -> sourceStatsEstimate1), + NaN); + + // unnest node + assertEquals( + getSourceTablesSizeInBytes( + planBuilder.unnest( + planBuilder.values(sourceVariable1), + ImmutableList.of(), + ImmutableMap.of(sourceVariable1, ImmutableList.of(sourceVariable1)), + Optional.empty()), + noLookup(), + node -> sourceStatsEstimate1), + NaN); + } + + @Test + public void testGetApproximateSourceSizeInBytes() + { + PlanBuilder planBuilder = new PlanBuilder(tester.getSession(), new PlanNodeIdAllocator(), tester.getMetadata()); + VariableReferenceExpression variable = planBuilder.variable("col"); + VariableReferenceExpression sourceVariable1 = planBuilder.variable("source1"); + VariableReferenceExpression sourceVariable2 = planBuilder.variable("source2"); + + // missing source stats + assertEquals( + getFirstKnownOutputSizeInBytes( + planBuilder.values(variable), + noLookup(), + node -> PlanNodeStatsEstimate.unknown()), + NaN); + + // two source plan nodes + PlanNodeStatsEstimate sourceStatsEstimate1 = PlanNodeStatsEstimate.builder() + .setOutputRowCount(1000) + .build(); + PlanNodeStatsEstimate sourceStatsEstimate2 = PlanNodeStatsEstimate.builder() + .setOutputRowCount(2000) + .build(); + PlanNodeStatsEstimate filterStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(250) + .build(); + PlanNodeStatsEstimate limitStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(20) + .build(); + double sourceRowCount = sourceStatsEstimate1.getOutputRowCount() + sourceStatsEstimate2.getOutputRowCount(); + double unionInputRowCount = filterStatsEstimate.getOutputRowCount() + limitStatsEstimate.getOutputRowCount(); + double sourceSizeInBytes = sourceRowCount + sourceRowCount * BIGINT.getFixedSize(); + // un-estimated union with non-expanding source + assertEquals( + getFirstKnownOutputSizeInBytes( + planBuilder.union( + ImmutableListMultimap.builder() + .put(variable, sourceVariable1) + .put(variable, sourceVariable2) + .build(), + ImmutableList.of( + planBuilder.filter( + TRUE_CONSTANT, + planBuilder.tableScan( + ImmutableList.of(sourceVariable1), + ImmutableMap.of(sourceVariable1, new TestingColumnHandle("col")))), + planBuilder.limit(20, planBuilder.values(sourceVariable2)))), + noLookup(), + node -> { + if (node instanceof TableScanNode) { + return sourceStatsEstimate1; + } + if (node instanceof FilterNode) { + return filterStatsEstimate; + } + if (node instanceof ValuesNode) { + return sourceStatsEstimate2; + } + if (node instanceof LimitNode) { + return limitStatsEstimate; + } + + return PlanNodeStatsEstimate.unknown(); + }), + (unionInputRowCount / sourceRowCount) * sourceSizeInBytes); + + // join node with known estimate + assertEquals( + getFirstKnownOutputSizeInBytes( + planBuilder.join( + INNER, + planBuilder.values(sourceVariable1), + planBuilder.values(sourceVariable2)), + noLookup(), + node -> sourceStatsEstimate1), + sourceStatsEstimate1.getOutputRowCount() * 2 * (BIGINT.getFixedSize() + 1)); + + // un-estimated join with non-expanding source + assertEquals( + getFirstKnownOutputSizeInBytes( + planBuilder.join( + INNER, + planBuilder.tableScan( + ImmutableList.of(sourceVariable1), + ImmutableMap.of(sourceVariable1, new TestingColumnHandle("col"))), + planBuilder.values(sourceVariable2)), + noLookup(), + node -> { + if (node instanceof TableScanNode) { + return sourceStatsEstimate1; + } + if (node instanceof ValuesNode) { + return sourceStatsEstimate2; + } + + return PlanNodeStatsEstimate.unknown(); + }), + NaN); + + // un-estimated union with estimated expanding source + assertEquals( + getFirstKnownOutputSizeInBytes( + planBuilder.union( + ImmutableListMultimap.builder() + .put(variable, sourceVariable1) + .put(variable, sourceVariable2) + .build(), + ImmutableList.of( + planBuilder.unnest( + planBuilder.values(sourceVariable1), + ImmutableList.of(), + ImmutableMap.of(sourceVariable1, ImmutableList.of(sourceVariable1)), + Optional.empty()), + planBuilder.values(sourceVariable2))), + noLookup(), + node -> { + if (node instanceof UnnestNode) { + return sourceStatsEstimate1; + } + if (node instanceof ValuesNode) { + return sourceStatsEstimate2; + } + + return PlanNodeStatsEstimate.unknown(); + }), + sourceSizeInBytes); + + // un-estimated union with un-estimated expanding source + assertEquals( + getFirstKnownOutputSizeInBytes( + planBuilder.union( + ImmutableListMultimap.builder() + .put(variable, sourceVariable1) + .put(variable, sourceVariable2) + .build(), + ImmutableList.of( + planBuilder.unnest( + planBuilder.values(sourceVariable1), + ImmutableList.of(), + ImmutableMap.of(sourceVariable1, ImmutableList.of(sourceVariable1)), + Optional.empty()), + planBuilder.values(sourceVariable2))), + noLookup(), + node -> { + if (node instanceof ValuesNode) { + return sourceStatsEstimate2; + } + + return PlanNodeStatsEstimate.unknown(); + }), + NaN); + } + private RuleAssert assertDetermineJoinDistributionType() { return assertDetermineJoinDistributionType(COST_COMPARATOR); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineSemiJoinDistributionType.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineSemiJoinDistributionType.java index 67d2207562712..33a910abae616 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineSemiJoinDistributionType.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineSemiJoinDistributionType.java @@ -27,6 +27,7 @@ */ package com.facebook.presto.sql.planner.iterative.rule; +import com.facebook.presto.common.type.Type; import com.facebook.presto.cost.CostComparator; import com.facebook.presto.cost.PlanNodeStatsEstimate; import com.facebook.presto.cost.TaskCountEstimator; @@ -47,6 +48,9 @@ import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static com.facebook.presto.SystemSessionProperties.JOIN_MAX_BROADCAST_TABLE_SIZE; import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; +import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.semiJoin; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; import static com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder.constantExpressions; @@ -305,6 +309,65 @@ public void testReplicatesWhenNotRestricted() values(ImmutableMap.of("B1", 0)))); } + @Test + public void testReplicatesWhenSourceIsSmall() + { + Type variableType = createUnboundedVarcharType(); // variable width so that average row size is respected + int aRows = 10_000; + int bRows = 10; + + PlanNodeStatsEstimate probeSideStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "A1", variableType), + new VariableStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + PlanNodeStatsEstimate buildSideStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "B1", variableType), + new VariableStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + PlanNodeStatsEstimate buildSideSourceStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "B1", variableType), + new VariableStatsEstimate(0, 100, 0, 64, 10))) + .build(); + + // build side exceeds JOIN_MAX_BROADCAST_TABLE_SIZE limit but source plan nodes are small + // therefore replicated distribution type is chosen + assertDetermineSemiJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .overrideStats("valuesA", probeSideStatsEstimate) + .overrideStats("filterB", buildSideStatsEstimate) + .overrideStats("valuesB", buildSideSourceStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.semiJoin( + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.filter( + new PlanNodeId("filterB"), + TRUE_CONSTANT, + p.values(new PlanNodeId("valuesB"), bRows, b1)), + a1, + b1, + p.variable("output"), + Optional.empty(), + Optional.empty(), + Optional.empty()); + }) + .matches(semiJoin( + "A1", + "B1", + "output", + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + filter("true", values(ImmutableMap.of("B1", 0))))); + } + private RuleAssert assertDetermineSemiJoinDistributionType() { return assertDetermineSemiJoinDistributionType(COST_COMPARATOR); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java index 04e6470f37e3f..78abc8df575b2 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java @@ -315,7 +315,12 @@ public FilterNode filter(Expression predicate, PlanNode source) public FilterNode filter(RowExpression predicate, PlanNode source) { - return new FilterNode(source.getSourceLocation(), idAllocator.getNextId(), source, predicate); + return filter(idAllocator.getNextId(), predicate, source); + } + + public FilterNode filter(PlanNodeId planNodeId, RowExpression predicate, PlanNode source) + { + return new FilterNode(source.getSourceLocation(), planNodeId, source, predicate); } public AggregationNode aggregation(Consumer aggregationBuilderConsumer) diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/RuleTester.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/RuleTester.java index 0252e559b942e..112b37355b0d3 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/RuleTester.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/RuleTester.java @@ -156,6 +156,11 @@ public Metadata getMetadata() return metadata; } + public Session getSession() + { + return session; + } + public SplitManager getSplitManager() { return splitManager;