diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 839df7bb8823e..3d2b39f3d979d 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -171,6 +171,23 @@ impl Partitioning { // If the required exprs do not match, need to leverage the eq_properties provided by the child // and normalize both exprs based on the equivalent groups. if !fast_match { + // Check if partition_exprs is a subset of required_exprs (superset satisfaction) + // DataFusion's Hash partitioning guarantees that all rows with the same value + // combination of partition columns are in the same partition. + if partition_exprs.len() < required_exprs.len() { + let is_subset = partition_exprs.iter().all(|p_expr| { + required_exprs.iter().any(|r_expr| { + physical_exprs_equal( + &[p_expr.clone()], + &[r_expr.clone()], + ) + }) + }); + if is_subset { + return true; + } + } + let eq_groups = eq_properties.eq_group(); if !eq_groups.is_empty() { let normalized_required_exprs = required_exprs diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 1f6ea61c099bd..0db229222ea4a 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -911,10 +911,19 @@ fn add_hash_on_top( .output_partitioning() .satisfy(&dist, input.plan.equivalence_properties()); + // Check if input already has hash partitioning + let has_hash_partitioning = matches!( + input.plan.output_partitioning(), + Partitioning::Hash(_, _) + ); + // Add hash repartitioning when: // - The hash distribution requirement is not satisfied, or - // - We can increase parallelism by adding hash partitioning. - if !satisfied || n_target > input.plan.output_partitioning().partition_count() { + // - We can increase parallelism by adding hash partitioning (but not if already hash partitioned). + // + // When data is already hash partitioned correctly, don't repartition just to increase + // parallelism, as this breaks file-level partitioning and is counterproductive. + if !satisfied || (n_target > input.plan.output_partitioning().partition_count() && !has_hash_partitioning) { // When there is an existing ordering, we preserve ordering during // repartition. This will be rolled back in the future if any of the // following conditions is true: diff --git a/datafusion/sqllogictest/test_files/optimal_poc_query.slt b/datafusion/sqllogictest/test_files/optimal_poc_query.slt new file mode 100644 index 0000000000000..043a192701b4f --- /dev/null +++ b/datafusion/sqllogictest/test_files/optimal_poc_query.slt @@ -0,0 +1,283 @@ +statement ok +SET datafusion.optimizer.enable_round_robin_repartition = false; + +statement ok +SET datafusion.optimizer.preserve_file_partitions = 1; + +statement ok +COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES + (TIMESTAMP '2023-01-01T09:00:00', 95.5), + (TIMESTAMP '2023-01-01T09:00:10', 102.3), + (TIMESTAMP '2023-01-01T09:00:20', 98.7), + (TIMESTAMP '2023-01-01T09:12:20', 105.1), + (TIMESTAMP '2023-01-01T09:12:30', 100.0), + (TIMESTAMP '2023-01-01T09:12:40', 150.0), + (TIMESTAMP '2023-01-01T09:12:50', 120.8) +)) +TO 'test_files/scratch/optimal_poc_query/fact/f_dkey=A/data.parquet' +STORED AS PARQUET; + +statement ok +COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES + (TIMESTAMP '2023-01-01T09:00:00', 75.2), + (TIMESTAMP '2023-01-01T09:00:10', 82.4), + (TIMESTAMP '2023-01-01T09:00:20', 78.9), + (TIMESTAMP '2023-01-01T09:00:30', 85.6), + (TIMESTAMP '2023-01-01T09:12:30', 80.0), + (TIMESTAMP '2023-01-01T09:12:40', 120.0), + (TIMESTAMP '2023-01-01T09:12:50', 92.3) +)) +TO 'test_files/scratch/optimal_poc_query/fact/f_dkey=B/data.parquet' +STORED AS PARQUET; + +statement ok +COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES + (TIMESTAMP '2023-01-01T09:00:00', 300.5), + (TIMESTAMP '2023-01-01T09:00:10', 285.7), + (TIMESTAMP '2023-01-01T09:00:20', 310.2), + (TIMESTAMP '2023-01-01T09:00:30', 295.8), + (TIMESTAMP '2023-01-01T09:00:40', 300.0), + (TIMESTAMP '2023-01-01T09:12:40', 250.0), + (TIMESTAMP '2023-01-01T09:12:50', 275.4) +)) +TO 'test_files/scratch/optimal_poc_query/fact/f_dkey=C/data.parquet' +STORED AS PARQUET; + +query I +COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES + ('dev', 'log', 'ma') +)) +TO 'test_files/scratch/optimal_poc_query/dimension/d_dkey=A/data.parquet' +STORED AS PARQUET; +---- +1 + +query I +COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES + ('prod', 'log', 'ma') +)) +TO 'test_files/scratch/optimal_poc_query/dimension/d_dkey=B/data.parquet' +STORED AS PARQUET; +---- +1 + +query I +COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES + ('prod', 'log', 'vim') +)) +TO 'test_files/scratch/optimal_poc_query/dimension/d_dkey=C/data.parquet' +STORED AS PARQUET; +---- +1 + +query I +COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES + ('prod', 'trace', 'vim') +)) +TO 'test_files/scratch/optimal_poc_query/dimension/d_dkey=D/data.parquet' +STORED AS PARQUET; +---- +1 + +statement ok +CREATE EXTERNAL TABLE fact_table_ordered (timestamp TIMESTAMP, value DOUBLE) +STORED AS PARQUET +PARTITIONED BY (f_dkey STRING) +WITH ORDER (f_dkey ASC, timestamp ASC) +LOCATION 'test_files/scratch/optimal_poc_query/fact/'; + +statement ok +CREATE EXTERNAL TABLE dimension_table (env STRING, service STRING, host STRING) +STORED AS PARQUET +PARTITIONED BY (d_dkey STRING) +LOCATION 'test_files/scratch/optimal_poc_query/dimension/'; + +query TT +EXPLAIN SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value) +FROM fact_table_ordered f +INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey +WHERE d.service = 'log' +GROUP BY f.f_dkey +ORDER BY f.f_dkey; +---- +logical_plan +01)Sort: f.f_dkey ASC NULLS LAST +02)--Projection: f.f_dkey, max(d.env), max(d.service), count(Int64(1)) AS count(*), sum(f.value) +03)----Aggregate: groupBy=[[f.f_dkey]], aggr=[[max(d.env), max(d.service), count(Int64(1)), sum(f.value)]] +04)------Projection: f.value, f.f_dkey, d.env, d.service +05)--------Inner Join: f.f_dkey = d.d_dkey +06)----------SubqueryAlias: f +07)------------TableScan: fact_table_ordered projection=[value, f_dkey] +08)----------SubqueryAlias: d +09)------------Filter: dimension_table.service = Utf8View("log") +10)--------------TableScan: dimension_table projection=[env, service, d_dkey], partial_filters=[dimension_table.service = Utf8View("log")] +physical_plan +01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST] +02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env), max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*), sum(f.value)@4 as sum(f.value)] +03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted +04)------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as env, service@1 as service] +05)--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@2, f_dkey@1)], projection=[env@0, service@1, value@3, f_dkey@4] +06)----------CoalescePartitionsExec +07)------------FilterExec: service@1 = log +08)--------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=C/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=D/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +09)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] + +query TT +EXPLAIN SELECT f_dkey, + timestamp, + MAX(env), -- Add MAX here to avoid putting it in the group by + MAX(value) +FROM + ( + SELECT f.f_dkey, f.timestamp, d.env, f.value + FROM + (SELECT DISTINCT d_dkey, env + FROM dimension_table -- not sorted + WHERE service = 'log' + ) AS d, + fact_table_ordered AS f + WHERE d.d_dkey = f.f_dkey + ) AS j +GROUP BY f_dkey, timestamp +ORDER BY f_dkey, timestamp; +---- +logical_plan +01)Sort: j.f_dkey ASC NULLS LAST, j.timestamp ASC NULLS LAST +02)--Aggregate: groupBy=[[j.f_dkey, j.timestamp]], aggr=[[max(j.env), max(j.value)]] +03)----SubqueryAlias: j +04)------Projection: f.f_dkey, f.timestamp, d.env, f.value +05)--------Inner Join: d.d_dkey = f.f_dkey +06)----------SubqueryAlias: d +07)------------Aggregate: groupBy=[[dimension_table.d_dkey, dimension_table.env]], aggr=[[]] +08)--------------Projection: dimension_table.d_dkey, dimension_table.env +09)----------------Filter: dimension_table.service = Utf8View("log") +10)------------------TableScan: dimension_table projection=[env, service, d_dkey], partial_filters=[dimension_table.service = Utf8View("log")] +11)----------SubqueryAlias: f +12)------------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey] +physical_plan +01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST] +02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as f_dkey, timestamp@1 as timestamp], aggr=[max(j.env), max(j.value)], ordering_mode=Sorted +03)----ProjectionExec: expr=[f_dkey@3 as f_dkey, timestamp@1 as timestamp, env@0 as env, value@2 as value] +04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@2)], projection=[env@1, timestamp@2, value@3, f_dkey@4] +05)--------CoalescePartitionsExec +06)----------AggregateExec: mode=SinglePartitioned, gby=[d_dkey@0 as d_dkey, env@1 as env], aggr=[] +07)------------ProjectionExec: expr=[d_dkey@1 as d_dkey, env@0 as env] +08)--------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] +09)----------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=C/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=D/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +10)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] + +query TT +EXPLAIN SELECT env, timestamp, AVG(max_bin_val) +FROM + ( + SELECT f_dkey, + timestamp, + MAX(env) AS env, -- TRICK: Add MAX here to avoid putting it in the group by. d_fkey (represent all columns in a fact file) is unique so env is also unique + MAX(value) AS max_bin_val + FROM + ( + SELECT f.f_dkey, f.timestamp, d.env, f.value + FROM + (SELECT DISTINCT d_dkey, env + FROM dimension_table + WHERE service = 'log' + ) AS d, + fact_table_ordered AS f + WHERE d.d_dkey = f.f_dkey + ) AS j + GROUP BY f_dkey, timestamp + ) AS a +GROUP BY env, timestamp +ORDER BY env, timestamp; +---- +logical_plan +01)Sort: a.env ASC NULLS LAST, a.timestamp ASC NULLS LAST +02)--Aggregate: groupBy=[[a.env, a.timestamp]], aggr=[[avg(a.max_bin_val)]] +03)----SubqueryAlias: a +04)------Projection: j.timestamp, max(j.env) AS env, max(j.value) AS max_bin_val +05)--------Aggregate: groupBy=[[j.f_dkey, j.timestamp]], aggr=[[max(j.env), max(j.value)]] +06)----------SubqueryAlias: j +07)------------Projection: f.f_dkey, f.timestamp, d.env, f.value +08)--------------Inner Join: d.d_dkey = f.f_dkey +09)----------------SubqueryAlias: d +10)------------------Aggregate: groupBy=[[dimension_table.d_dkey, dimension_table.env]], aggr=[[]] +11)--------------------Projection: dimension_table.d_dkey, dimension_table.env +12)----------------------Filter: dimension_table.service = Utf8View("log") +13)------------------------TableScan: dimension_table projection=[env, service, d_dkey], partial_filters=[dimension_table.service = Utf8View("log")] +14)----------------SubqueryAlias: f +15)------------------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey] +physical_plan +01)SortPreservingMergeExec: [env@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST] +02)--SortExec: expr=[env@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST], preserve_partitioning=[true] +03)----AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, timestamp@1 as timestamp], aggr=[avg(a.max_bin_val)] +04)------CoalesceBatchesExec: target_batch_size=8192 +05)--------RepartitionExec: partitioning=Hash([env@0, timestamp@1], 4), input_partitions=3 +06)----------AggregateExec: mode=Partial, gby=[env@1 as env, timestamp@0 as timestamp], aggr=[avg(a.max_bin_val)] +07)------------ProjectionExec: expr=[timestamp@1 as timestamp, max(j.env)@2 as env, max(j.value)@3 as max_bin_val] +08)--------------AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as f_dkey, timestamp@1 as timestamp], aggr=[max(j.env), max(j.value)], ordering_mode=Sorted +09)----------------ProjectionExec: expr=[f_dkey@3 as f_dkey, timestamp@1 as timestamp, env@0 as env, value@2 as value] +10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@2)], projection=[env@1, timestamp@2, value@3, f_dkey@4] +11)--------------------CoalescePartitionsExec +12)----------------------AggregateExec: mode=SinglePartitioned, gby=[d_dkey@0 as d_dkey, env@1 as env], aggr=[] +13)------------------------ProjectionExec: expr=[d_dkey@1 as d_dkey, env@0 as env] +14)--------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] +15)----------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=C/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=D/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +16)--------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] + +query TT +EXPLAIN SELECT env, time_bin, AVG(max_bin_val) +FROM + ( + SELECT f_dkey, + date_bin(INTERVAL '30 seconds', timestamp) AS time_bin, + MAX(env) AS env, -- TRICK: Add MAX here to avoid putting it in the group by. d_fkey (represent all columns in a fact file) is unique so env is also unique + MAX(value) AS max_bin_val + FROM + ( + SELECT f.f_dkey, f.timestamp, d.env, f.value + FROM + (SELECT DISTINCT d_dkey, env + FROM dimension_table + WHERE service = 'log' + ) AS d, + fact_table_ordered AS f + WHERE d.d_dkey = f.f_dkey + ) AS j + GROUP BY f_dkey, time_bin + ) AS a +GROUP BY env, time_bin +ORDER BY env, time_bin; +---- +logical_plan +01)Sort: a.env ASC NULLS LAST, a.time_bin ASC NULLS LAST +02)--Aggregate: groupBy=[[a.env, a.time_bin]], aggr=[[avg(a.max_bin_val)]] +03)----SubqueryAlias: a +04)------Projection: date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp) AS time_bin, max(j.env) AS env, max(j.value) AS max_bin_val +05)--------Aggregate: groupBy=[[j.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"), j.timestamp)]], aggr=[[max(j.env), max(j.value)]] +06)----------SubqueryAlias: j +07)------------Projection: f.f_dkey, f.timestamp, d.env, f.value +08)--------------Inner Join: d.d_dkey = f.f_dkey +09)----------------SubqueryAlias: d +10)------------------Aggregate: groupBy=[[dimension_table.d_dkey, dimension_table.env]], aggr=[[]] +11)--------------------Projection: dimension_table.d_dkey, dimension_table.env +12)----------------------Filter: dimension_table.service = Utf8View("log") +13)------------------------TableScan: dimension_table projection=[env, service, d_dkey], partial_filters=[dimension_table.service = Utf8View("log")] +14)----------------SubqueryAlias: f +15)------------------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey] +physical_plan +01)SortPreservingMergeExec: [env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST] +02)--SortExec: expr=[env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST], preserve_partitioning=[true] +03)----AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, time_bin@1 as time_bin], aggr=[avg(a.max_bin_val)] +04)------CoalesceBatchesExec: target_batch_size=8192 +05)--------RepartitionExec: partitioning=Hash([env@0, time_bin@1], 4), input_partitions=3 +06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as time_bin], aggr=[avg(a.max_bin_val)] +07)------------ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, max(j.env)@2 as env, max(j.value)@3 as max_bin_val] +08)--------------AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@1) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)], aggr=[max(j.env), max(j.value)], ordering_mode=Sorted +09)----------------ProjectionExec: expr=[f_dkey@3 as f_dkey, timestamp@1 as timestamp, env@0 as env, value@2 as value] +10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@2)], projection=[env@1, timestamp@2, value@3, f_dkey@4] +11)--------------------CoalescePartitionsExec +12)----------------------AggregateExec: mode=SinglePartitioned, gby=[d_dkey@0 as d_dkey, env@1 as env], aggr=[] +13)------------------------ProjectionExec: expr=[d_dkey@1 as d_dkey, env@0 as env] +14)--------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] +15)----------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=C/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/dimension/d_dkey=D/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +16)--------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimal_poc_query/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]