Skip to content

Feature: distributed EXPLAIN ANALYZE #123

@jayshrivastava

Description

@jayshrivastava

Problem

EXPLAIN ANALYZE in vanilla DF uses a visitor pattern on the physical plan. You can implement metrics() on ExecutionPlan nodes and the visitor collects them (see DisplayableExecutionPlan::with_metrics). The issue with distributed DF is that we copy the plan when sending it across the wire, so the coordinator sees no metrics once you cross network boundaries.

Solution

Execution Stage Metrics

A solution which integrates well with vanilla DF is to treat the physical plan as if it were one instance in memory even while crossing network boundaries.

To do this, the ArrowFlightEndpoint can send a copy of the local plan's metrics (specifically, the inner plan of the ExecutionStage) when a request stream is completed (a request stream exists for each partition for each task). The ArrowFlightReader can "enrich" it's local ExecutionStage copy by merging these metrics in. We would likely save a plan per task and update the metrics for each plan copy as metrics arrive from the endpoint. Eventually, the coordinator's ExecutionStage copy gets the entire metrics set across all tasks and partitions.

This pattern should scale recursively since an ExecutionStage may contain more ExecutionStages. We just need to treat ExecutionStage nodes in a special manner.

Caveats

  • You cannot serialize metrics, so we would have to implement our own metrics proto
  • When you serialize a plan, metrics do not get serialized. Instead of the ArrowFlightEndpoint sending a copy of its local plan with metrics, it would actually send metrics only
  • The ArrowFlightReader can merge / store metrics for an ExecutionStage by task. Metrics in DF already store optional partition ids, so merging by task should preserve any per-partition information.
  • You cannot retroactively add metrics to an ExecutionPlan node, so the ArrowFlightReader would have to transform the ExecutionStage's inner plan to "wrap" nodes with metrics.

The Wire

On the endpoint side, we emit FlightData, which doesn’t have to be RecordBatch. When the batches are done, we can send a different message containing the mutated plan before closing it. There’s a few ways to do this such as implementing the Stream trait on a custom stream or I think there’s a .chain() method on streams to send some trailing data afterwards.

On the ArrowFlightReadExec side, we currently use FlightRecordBatchStream::new_from_flight_data to read the stream of FlightData as a stream of RecordBatch. We can implement a different decoder which reads both RecordBatch and our custom message.

Displaying Metrics

We already implement Displayable on ExecutionStage, so we have power over how we represent metrics and display the plan. By default, we can display metrics by task by rendering the inner plan once per task with the correct metrics set (recall from above, we store metrics for the entire plan by task inside the ExecutionStage.

Note that we need to take special care to handle these 3 ways to show metrics. It should be handled automatically because metrics in DF already store optional partition ids, but we should test and make sure this works.

enum ShowMetrics {
    None,
    Aggregated,
    /// Show full per-partition metrics
    Full,
}

Related:

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions