Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ hyper-util = "0.1.16"
pretty_assertions = "1.4"
reqwest = "0.12"
zip = "6.0"
test-case = "3.3.1"
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub use flight_service::{
DefaultSessionBuilder, MappedWorkerSessionBuilder, MappedWorkerSessionBuilderExt, Worker,
WorkerQueryContext, WorkerSessionBuilder,
};
pub use metrics::rewrite_distributed_plan_with_metrics;
pub use metrics::{DistributedMetricsFormat, rewrite_distributed_plan_with_metrics};
pub use networking::{
BoxCloneSyncChannel, ChannelResolver, DefaultChannelResolver, WorkerResolver,
create_flight_client,
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ pub(crate) mod proto;
mod task_metrics_collector;
mod task_metrics_rewriter;
pub(crate) use task_metrics_collector::{MetricsCollectorResult, TaskMetricsCollector};
pub use task_metrics_rewriter::rewrite_distributed_plan_with_metrics;
pub use task_metrics_rewriter::{DistributedMetricsFormat, rewrite_distributed_plan_with_metrics};
206 changes: 206 additions & 0 deletions src/metrics/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,174 @@ pub fn df_metrics_set_to_proto(
Ok(MetricsSetProto { metrics })
}

/// Converts a [MetricsSet] to a [MetricsSet], but renames all metrics to have a "_{task_id}" suffix.
/// ***Custom metrics are not supported - they will NOT be renamed***
///
/// Specific metrics like [OutputRows] will be transformed to general metrics like [Count].
///
/// We do it this way because, by default, [DisplayableExecutionPlan::with_metrics] aggregates
/// metrics by name (see https://github.com/apache/datafusion/blob/f0e38df39e13921ae19e79d26760c6554466955c/datafusion/physical-plan/src/display.rs#L425).
/// In some cases, we want to show metrics for each task seprately, so we rename metrics to have
/// the task id in them so they do not get aggregated together.
///
/// Notably, [DisplayableExecutionPlan::with_full_metrics] exists, but this is too verbose, as it
/// will show metrics for each partition and include arbitrary labels. Renaming allows us to
/// achieve some medium-level of verbosity.
pub fn annotate_metrics_set_with_task_id(metrics_set: &MetricsSet, task_id: u64) -> MetricsSet {
let mut result = MetricsSet::new();

for metric in metrics_set.iter() {
let partition = metric.partition();
let labels = metric.labels().to_vec();

let base_name = metric.value().name();
let new_name = Cow::Owned(format!("{base_name}_{task_id}"));

let new_metric_value = match metric.value() {
MetricValue::OutputRows(count) => {
Copy link
Collaborator

@gabotechs gabotechs Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit annoying that we need to do this manual clone of the metric values here. Another drawback is that we do not need atomic variable allocation and modifications, which means that we are paying an unnecessary price for thread safety when no operation will happen across threads.

I gave it a try, and I don't think we can avoid this with the current API. IMO it would be nice to make a contribution upstream doing whatever is necessary so that at some point in the future we can replace this with something like:

metric.value().clone().with_name("my new name")

Or something similar.

Also MetricValue, Label and pretty much everything that composes a Metric implements Clone, but for some reason it does not look like Metric implements clone. That's weird.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing we can do in this PR is probably open a contribution upstream and link the contributed PR here. If you are up for it that would be amazing, otherwise we can just put a comment here stating what would need to change in upstream's API so that we can do this more ergonomically.

let new_count = Count::new();
new_count.add(count.value());
MetricValue::Count {
name: new_name,
count: new_count,
}
}
MetricValue::ElapsedCompute(time) => {
let new_time = Time::new();
new_time.add_duration(std::time::Duration::from_nanos(time.value() as u64));
MetricValue::Time {
name: new_name,
time: new_time,
}
}
MetricValue::SpillCount(count) => {
let new_count = Count::new();
new_count.add(count.value());
MetricValue::Count {
name: new_name,
count: new_count,
}
}
MetricValue::SpilledBytes(count) => {
let new_count = Count::new();
new_count.add(count.value());
MetricValue::Count {
name: new_name,
count: new_count,
}
}
MetricValue::SpilledRows(count) => {
let new_count = Count::new();
new_count.add(count.value());
MetricValue::Count {
name: new_name,
count: new_count,
}
}
MetricValue::CurrentMemoryUsage(gauge) => {
let new_gauge = Gauge::new();
new_gauge.set(gauge.value());
MetricValue::Gauge {
name: new_name,
gauge: new_gauge,
}
}
MetricValue::Count { count, .. } => {
let new_count = Count::new();
new_count.add(count.value());
MetricValue::Count {
name: new_name,
count: new_count,
}
}
MetricValue::Gauge { gauge, .. } => {
let new_gauge = Gauge::new();
new_gauge.set(gauge.value());
MetricValue::Gauge {
name: new_name,
gauge: new_gauge,
}
}
MetricValue::Time { time, .. } => {
let new_time = Time::new();
new_time.add_duration(std::time::Duration::from_nanos(time.value() as u64));
MetricValue::Time {
name: new_name,
time: new_time,
}
}
MetricValue::StartTimestamp(timestamp) => {
let new_gauge = Gauge::new();
if let Some(dt) = timestamp.value() {
new_gauge.set(dt.timestamp_nanos_opt().unwrap_or(0) as usize);
}
MetricValue::Gauge {
name: new_name,
gauge: new_gauge,
}
}
MetricValue::EndTimestamp(timestamp) => {
let new_gauge = Gauge::new();
if let Some(dt) = timestamp.value() {
new_gauge.set(dt.timestamp_nanos_opt().unwrap_or(0) as usize);
}
MetricValue::Gauge {
name: new_name,
gauge: new_gauge,
}
}
MetricValue::OutputBytes(count) => {
let new_count = Count::new();
new_count.add(count.value());
MetricValue::Count {
name: new_name,
count: new_count,
}
}
MetricValue::OutputBatches(count) => {
let new_count = Count::new();
new_count.add(count.value());
MetricValue::Count {
name: new_name,
count: new_count,
}
}
MetricValue::PruningMetrics {
pruning_metrics, ..
} => {
// Convert to a count representing the matched value
let new_count = Count::new();
new_count.add(pruning_metrics.matched());
MetricValue::Count {
name: Cow::Owned(format!("{base_name}_matched_{task_id}")),
count: new_count,
}
}
MetricValue::Ratio { ratio_metrics, .. } => {
// Convert ratio to a gauge representing the percentage
let new_gauge = Gauge::new();
if ratio_metrics.total() > 0 {
new_gauge.set((ratio_metrics.part() * 100) / ratio_metrics.total());
}
MetricValue::Gauge {
name: new_name,
gauge: new_gauge,
}
}
// Skip custom metrics as they cannot be generically converted
MetricValue::Custom { .. } => continue,
};

result.push(Arc::new(Metric::new_with_labels(
new_metric_value,
partition,
labels,
)));
}

result
}

/// metrics_set_proto_to_df converts a [MetricsSetProto] to a [datafusion::physical_plan::metrics::MetricsSet].
pub fn metrics_set_proto_to_df(
metrics_set_proto: &MetricsSetProto,
Expand Down Expand Up @@ -1090,4 +1258,42 @@ mod tests {
)));
test_roundtrip_helper(metrics_set, "ratio_metrics");
}

#[test]
fn test_annotate_metrics_set_with_task_id_output_rows() {
// Create a MetricsSet with an OutputRows metric
let mut metrics_set = MetricsSet::new();
let count = Count::new();
count.add(1234);
let labels = vec![Label::new("operator", "scan")];
metrics_set.push(Arc::new(Metric::new_with_labels(
MetricValue::OutputRows(count),
Some(0),
labels,
)));

let task_id = 42;
let annotated = annotate_metrics_set_with_task_id(&metrics_set, task_id);

// Verify we have one metric
assert_eq!(annotated.iter().count(), 1);

let metric = annotated.iter().next().unwrap();

// Verify OutputRows was converted to Count with task_id suffix
match metric.value() {
MetricValue::Count { name, count } => {
assert_eq!(name.as_ref(), "output_rows_42");
assert_eq!(count.value(), 1234);
}
other => panic!("Expected Count, got {:?}", other.name()),
}

// Verify labels and partition are preserved
assert_eq!(metric.partition(), Some(0));
let labels: Vec<_> = metric.labels().iter().collect();
assert_eq!(labels.len(), 1);
assert_eq!(labels[0].name(), "operator");
assert_eq!(labels[0].value(), "scan");
}
}
Loading