Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions src/execution_plans/partition_isolator.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use crate::DistributedTaskContext;
use crate::common::require_one_child;
use crate::DistributedTaskContext;
use datafusion::common::config::ConfigOptions;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_expr_common::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::physical_plan::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation,
};
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::{
error::Result,
execution::SendableRecordBatchStream,
Expand Down Expand Up @@ -192,11 +197,74 @@ impl ExecutionPlan for PartitionIsolatorExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

/// PartitionIsolatorExec is a transparent passthrough node that only remaps
/// partition indices — it does not filter, project, or transform data.
/// Therefore it forwards all parent filters to its child unchanged,
/// matching the behavior of other transparent nodes like CoalesceBatchesExec
/// and RepartitionExec.
fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}

fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::physical_plan::empty::EmptyExec;

fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}

/// Verifies that PartitionIsolatorExec forwards parent filters to its
/// child unchanged. This is critical for dynamic filter pushdown (e.g.
/// TopK or hash join filters) to reach the DataSourceExec through
/// PartitionIsolatorExec.
#[test]
fn test_forwards_filters_to_child() {
use datafusion::common::ScalarValue;
use datafusion::physical_expr::expressions::lit;

let schema = test_schema();
let input = Arc::new(EmptyExec::new(schema)) as Arc<dyn ExecutionPlan>;
let isolator = PartitionIsolatorExec::new(input, 2);

// Create a filter expression to push down.
let filter: Arc<dyn PhysicalExpr> = lit(ScalarValue::Boolean(Some(true)));
let parent_filters = vec![filter];

let config = ConfigOptions::default();
let result = isolator
.gather_filters_for_pushdown(FilterPushdownPhase::Post, parent_filters.clone(), &config)
.expect("gather_filters_for_pushdown should succeed");

// The filter description should forward the parent filter to the
// single child. parent_filters() returns a Vec<Vec<PushedDownPredicate>>
// with one entry per child.
let child_filters = result.parent_filters();
assert_eq!(child_filters.len(), 1, "should have exactly one child");
assert_eq!(
child_filters[0].len(),
1,
"child should receive exactly one filter"
);
}

#[test]
fn test_partition_groups() {
Expand Down
107 changes: 107 additions & 0 deletions src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::common::exec_datafusion_err;
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::PhysicalPlanNode;
Expand Down Expand Up @@ -111,6 +113,19 @@ impl Worker {
plan = hook(plan)
}

// Re-run the post-optimization filter pushdown pass to re-establish
// dynamic filter connections (e.g. TopK, HashJoin) that were broken
// during plan serialization. When the coordinator serializes the plan,
// DynamicFilterPhysicalExpr instances are snapshotted into static
// expressions. After deserialization, operators like SortExec(TopK)
// create fresh DynamicFilterPhysicalExpr instances with new Arcs, but
// the downstream DataSourceExec still holds the static snapshot. This
// pass pushes the fresh dynamic filters down to DataSourceExec, restoring
// the shared Arc that enables runtime file pruning.
let config = task_ctx.session_config().options();
let optimizer = FilterPushdown::new_post_optimization();
plan = optimizer.optimize(plan, config)?;

// Initialize partition count to the number of partitions in the stage
let total_partitions = plan.properties().partitioning.partition_count();
Ok::<_, DataFusionError>(TaskData {
Expand Down Expand Up @@ -479,4 +494,96 @@ mod tests {
let node = Arc::new(EmptyExec::new(SchemaRef::new(Schema::empty())));
Arc::new(RepartitionExec::try_new(node, Partitioning::RoundRobinBatch(partitions)).unwrap())
}

/// Verifies that re-running FilterPushdown after deserialization restores
/// the TopK dynamic filter connections that are broken during plan
/// serialization. This is the scenario addressed by the
/// `FilterPushdown::new_post_optimization()` call in the `do_get` handler.
///
/// The lifecycle:
/// 1. Build a plan with SortExec(TopK) on a parquet DataSourceExec.
/// 2. Run FilterPushdown so the TopK dynamic filter is pushed into
/// DataSourceExec's predicate.
/// 3. Serialize the plan (simulating coordinator -> worker transfer).
/// 4. Deserialize it (the dynamic filter Arc sharing is now broken).
/// 5. Re-run FilterPushdown (what `do_get` does) and assert the
/// DynamicFilter predicate is restored on DataSourceExec.
#[tokio::test]
async fn test_topk_dynamic_filter_survives_serde_roundtrip() {
use crate::test_utils::session_context::register_temp_parquet_table;
use arrow::array::Int64Array;
use arrow::datatypes::{DataType, Field};
use datafusion::common::config::ConfigOptions;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::displayable;
use datafusion::prelude::SessionContext;
use datafusion_proto::physical_plan::AsExecutionPlan;

// 1. Create a session and register a parquet table so the plan gets a
// real DataSourceExec (MemorySourceConfig rejects filter pushdown).
let ctx = SessionContext::new();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
let batch = arrow::record_batch::RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![3, 1, 4, 1, 5, 9, 2, 6]))],
)
.unwrap();
let _path = register_temp_parquet_table("topk_test", schema, vec![batch], &ctx)
.await
.unwrap();

// 2. Build a physical plan with TopK via ORDER BY ... LIMIT.
let df = ctx
.sql("SELECT a FROM topk_test ORDER BY a LIMIT 3")
.await
.unwrap();
let plan = df.create_physical_plan().await.unwrap();

// 3. Run the post-optimization FilterPushdown pass so the TopK dynamic
// filter is pushed into DataSourceExec.
let config = ConfigOptions::default();
let optimizer = FilterPushdown::new_post_optimization();
let plan = optimizer.optimize(plan, &config).unwrap();

let display_before = displayable(plan.as_ref()).indent(true).to_string();
assert!(
display_before.contains("DynamicFilter"),
"Expected DynamicFilter in plan before serialization:\n{display_before}"
);

// 4. Serialize then deserialize (simulates coordinator -> worker).
let codec = DefaultPhysicalExtensionCodec {};
let proto =
PhysicalPlanNode::try_from_physical_plan(plan, &codec).unwrap();
let bytes = proto.encode_to_vec();

let proto_node = PhysicalPlanNode::try_decode(bytes.as_ref()).unwrap();
let task_ctx = ctx.task_ctx();
let deserialized = proto_node
.try_into_physical_plan(&task_ctx, &codec)
.unwrap();

// After deserialization, the DynamicFilter Arc sharing is broken. The
// SortExec(TopK) creates a fresh DynamicFilterPhysicalExpr on
// deserialization but the DataSourceExec still holds a stale snapshot.
// Verify the stale state: no DynamicFilter in DataSourceExec's predicate.
let display_stale = displayable(deserialized.as_ref()).indent(true).to_string();
assert!(
!display_stale.contains("DynamicFilter"),
"Expected no DynamicFilter before re-running FilterPushdown \
(the dynamic filter should have been snapshotted to a static \
expression during serialization):\n{display_stale}"
);

// 5. Re-run FilterPushdown (this is what the do_get handler does).
let restored = optimizer.optimize(deserialized, &config).unwrap();

let display_after = displayable(restored.as_ref()).indent(true).to_string();
assert!(
display_after.contains("DynamicFilter"),
"Expected DynamicFilter to be restored after re-running \
FilterPushdown on the deserialized plan:\n{display_after}"
);
}
}