diff --git a/src/execution_plans/partition_isolator.rs b/src/execution_plans/partition_isolator.rs index e543c6b9..ef3c43f8 100644 --- a/src/execution_plans/partition_isolator.rs +++ b/src/execution_plans/partition_isolator.rs @@ -1,10 +1,15 @@ -use crate::DistributedTaskContext; use crate::common::require_one_child; +use crate::DistributedTaskContext; +use datafusion::common::config::ConfigOptions; use datafusion::execution::TaskContext; +use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_expr_common::metrics::MetricsSet; -use datafusion::physical_plan::ExecutionPlanProperties; +use datafusion::physical_plan::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, +}; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::ExecutionPlanProperties; use datafusion::{ error::Result, execution::SendableRecordBatchStream, @@ -192,11 +197,74 @@ impl ExecutionPlan for PartitionIsolatorExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + /// PartitionIsolatorExec is a transparent passthrough node that only remaps + /// partition indices — it does not filter, project, or transform data. + /// Therefore it forwards all parent filters to its child unchanged, + /// matching the behavior of other transparent nodes like CoalesceBatchesExec + /// and RepartitionExec. + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + FilterDescription::from_children(parent_filters, &self.children()) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) + } } #[cfg(test)] mod tests { use super::*; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion::physical_plan::empty::EmptyExec; + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) + } + + /// Verifies that PartitionIsolatorExec forwards parent filters to its + /// child unchanged. This is critical for dynamic filter pushdown (e.g. + /// TopK or hash join filters) to reach the DataSourceExec through + /// PartitionIsolatorExec. + #[test] + fn test_forwards_filters_to_child() { + use datafusion::common::ScalarValue; + use datafusion::physical_expr::expressions::lit; + + let schema = test_schema(); + let input = Arc::new(EmptyExec::new(schema)) as Arc; + let isolator = PartitionIsolatorExec::new(input, 2); + + // Create a filter expression to push down. + let filter: Arc = lit(ScalarValue::Boolean(Some(true))); + let parent_filters = vec![filter]; + + let config = ConfigOptions::default(); + let result = isolator + .gather_filters_for_pushdown(FilterPushdownPhase::Post, parent_filters.clone(), &config) + .expect("gather_filters_for_pushdown should succeed"); + + // The filter description should forward the parent filter to the + // single child. parent_filters() returns a Vec> + // with one entry per child. + let child_filters = result.parent_filters(); + assert_eq!(child_filters.len(), 1, "should have exactly one child"); + assert_eq!( + child_filters[0].len(), + 1, + "child should receive exactly one filter" + ); + } #[test] fn test_partition_groups() { diff --git a/src/flight_service/do_get.rs b/src/flight_service/do_get.rs index 55e83120..3091b32e 100644 --- a/src/flight_service/do_get.rs +++ b/src/flight_service/do_get.rs @@ -23,6 +23,8 @@ use datafusion::arrow::ipc::writer::IpcWriteOptions; use datafusion::common::exec_datafusion_err; use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder}; +use datafusion::physical_optimizer::filter_pushdown::FilterPushdown; +use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf::PhysicalPlanNode; @@ -111,6 +113,19 @@ impl Worker { plan = hook(plan) } + // Re-run the post-optimization filter pushdown pass to re-establish + // dynamic filter connections (e.g. TopK, HashJoin) that were broken + // during plan serialization. When the coordinator serializes the plan, + // DynamicFilterPhysicalExpr instances are snapshotted into static + // expressions. After deserialization, operators like SortExec(TopK) + // create fresh DynamicFilterPhysicalExpr instances with new Arcs, but + // the downstream DataSourceExec still holds the static snapshot. This + // pass pushes the fresh dynamic filters down to DataSourceExec, restoring + // the shared Arc that enables runtime file pruning. + let config = task_ctx.session_config().options(); + let optimizer = FilterPushdown::new_post_optimization(); + plan = optimizer.optimize(plan, config)?; + // Initialize partition count to the number of partitions in the stage let total_partitions = plan.properties().partitioning.partition_count(); Ok::<_, DataFusionError>(TaskData { @@ -479,4 +494,96 @@ mod tests { let node = Arc::new(EmptyExec::new(SchemaRef::new(Schema::empty()))); Arc::new(RepartitionExec::try_new(node, Partitioning::RoundRobinBatch(partitions)).unwrap()) } + + /// Verifies that re-running FilterPushdown after deserialization restores + /// the TopK dynamic filter connections that are broken during plan + /// serialization. This is the scenario addressed by the + /// `FilterPushdown::new_post_optimization()` call in the `do_get` handler. + /// + /// The lifecycle: + /// 1. Build a plan with SortExec(TopK) on a parquet DataSourceExec. + /// 2. Run FilterPushdown so the TopK dynamic filter is pushed into + /// DataSourceExec's predicate. + /// 3. Serialize the plan (simulating coordinator -> worker transfer). + /// 4. Deserialize it (the dynamic filter Arc sharing is now broken). + /// 5. Re-run FilterPushdown (what `do_get` does) and assert the + /// DynamicFilter predicate is restored on DataSourceExec. + #[tokio::test] + async fn test_topk_dynamic_filter_survives_serde_roundtrip() { + use crate::test_utils::session_context::register_temp_parquet_table; + use arrow::array::Int64Array; + use arrow::datatypes::{DataType, Field}; + use datafusion::common::config::ConfigOptions; + use datafusion::physical_optimizer::filter_pushdown::FilterPushdown; + use datafusion::physical_optimizer::PhysicalOptimizerRule; + use datafusion::physical_plan::displayable; + use datafusion::prelude::SessionContext; + use datafusion_proto::physical_plan::AsExecutionPlan; + + // 1. Create a session and register a parquet table so the plan gets a + // real DataSourceExec (MemorySourceConfig rejects filter pushdown). + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch = arrow::record_batch::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![3, 1, 4, 1, 5, 9, 2, 6]))], + ) + .unwrap(); + let _path = register_temp_parquet_table("topk_test", schema, vec![batch], &ctx) + .await + .unwrap(); + + // 2. Build a physical plan with TopK via ORDER BY ... LIMIT. + let df = ctx + .sql("SELECT a FROM topk_test ORDER BY a LIMIT 3") + .await + .unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + + // 3. Run the post-optimization FilterPushdown pass so the TopK dynamic + // filter is pushed into DataSourceExec. + let config = ConfigOptions::default(); + let optimizer = FilterPushdown::new_post_optimization(); + let plan = optimizer.optimize(plan, &config).unwrap(); + + let display_before = displayable(plan.as_ref()).indent(true).to_string(); + assert!( + display_before.contains("DynamicFilter"), + "Expected DynamicFilter in plan before serialization:\n{display_before}" + ); + + // 4. Serialize then deserialize (simulates coordinator -> worker). + let codec = DefaultPhysicalExtensionCodec {}; + let proto = + PhysicalPlanNode::try_from_physical_plan(plan, &codec).unwrap(); + let bytes = proto.encode_to_vec(); + + let proto_node = PhysicalPlanNode::try_decode(bytes.as_ref()).unwrap(); + let task_ctx = ctx.task_ctx(); + let deserialized = proto_node + .try_into_physical_plan(&task_ctx, &codec) + .unwrap(); + + // After deserialization, the DynamicFilter Arc sharing is broken. The + // SortExec(TopK) creates a fresh DynamicFilterPhysicalExpr on + // deserialization but the DataSourceExec still holds a stale snapshot. + // Verify the stale state: no DynamicFilter in DataSourceExec's predicate. + let display_stale = displayable(deserialized.as_ref()).indent(true).to_string(); + assert!( + !display_stale.contains("DynamicFilter"), + "Expected no DynamicFilter before re-running FilterPushdown \ + (the dynamic filter should have been snapshotted to a static \ + expression during serialization):\n{display_stale}" + ); + + // 5. Re-run FilterPushdown (this is what the do_get handler does). + let restored = optimizer.optimize(deserialized, &config).unwrap(); + + let display_after = displayable(restored.as_ref()).indent(true).to_string(); + assert!( + display_after.contains("DynamicFilter"), + "Expected DynamicFilter to be restored after re-running \ + FilterPushdown on the deserialized plan:\n{display_after}" + ); + } }