Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions explain_analyze.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Problem

`EXPLAIN ANALYZE` in vanilla DF uses a visitor pattern on the physical plan. You can implement `metrics()` on `ExecutionPlan` nodes and the visitor collects them (see `DisplayableExecutionPlan::with_metrics`). The issue with distributed DF is that we copy the plan when sending it across the wire, so the coordinator sees no metrics once you cross network boundaries.

# Solution

A solution which integrates well with vanilla DF is to treat the physical plan as if it were one instance in memory even while crossing network boundaries. Say the `ArrowFlightEndpoint` sends a copy of the local plan (specifically, the `ExecutionStage`) when a request stream is completed (a request stream is once per partition per task I believe). The `ArrowFlightReader` can copy those changes into its local plan copy. Eventually, the coordinator gets the entire physical plan with metrics. (I'll admit that there may be too much copying in this solution).

On the endpoint side, we emit `FlightData`, which doesn’t have to be `RecordBatch`. When the batches are done, we can send a different message containing the mutated plan before closing it. There’s a few ways to do this such as implementing the `Stream` trait on a custom stream or I think there’s a `.chain()` method on streams to send some trailing data afterwards.

On the `ArrowFlightReadExec` side, we currently use `FlightRecordBatchStream::new_from_flight_data` to read the stream of `FlightData` as a stream of `RecordBatch`. We can implement a different decoder which reads both `RecordBatch` and our custom message.

The end to end flow looks like this:

1. A `ArrowFlightReadExec` calls `ArrowFlightEndpoint` (any worker node) T * P times (say T is number of tasks and P is number of partitions per task). Each individual stream returns a copy of the `ExecutionStage` except it only has metrics for a given partition in a task.

2. Upon receiving the plan from a stream, the `ArrowFlightReadExec` can reconcile the incoming `ExecutionStage` with its local copy of the `ExecutionStage`. I need to flesh this part out.

I think we'd have to traverse the `ExecutionStage` plan and somehow store the metrics by task and by node (so we can ultimately display metrics by task for each plan node). I'm not sure how well this plays with the visitor pattern since regular plan nodes inside an `ExecutionStage` are not aware of tasks. We may have to implement our own visitor or something of the sort.

3. Implement the `metrics()` method on `ExecutionStage` and play around with displaying the output. I need to look into this part more as it depends on step 2 above.

Also, there’s different modes on `DisplayableExecutionPlan::with_metrics`, so we may actually need to store metrics by partition and by task.
```
enum ShowMetrics {
None,
Aggregated,
/// Show full per-partition metrics
Full,
}
```

# Next Steps

At this point, it probably makes sense to try implementing something (starting with the `ExecutionStage` visitor since that's the least fleshed out). Please lmk if you have any feedback!
Loading