Skip to content

Commit 2c8241a

Browse files
authored
feat(small): Add BaselineMetrics to generate_series() table function (#16255)
* Add BaselineMetrics to LazyMemoryStream * UT
1 parent 5d3ed9c commit 2c8241a

File tree

2 files changed

+63
-5
lines changed

2 files changed

+63
-5
lines changed

datafusion/physical-plan/src/memory.rs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::sync::Arc;
2323
use std::task::{Context, Poll};
2424

2525
use crate::execution_plan::{Boundedness, EmissionType};
26+
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2627
use crate::{
2728
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
2829
RecordBatchStream, SendableRecordBatchStream, Statistics,
@@ -146,6 +147,8 @@ pub struct LazyMemoryExec {
146147
batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
147148
/// Plan properties cache storing equivalence properties, partitioning, and execution mode
148149
cache: PlanProperties,
150+
/// Execution metrics
151+
metrics: ExecutionPlanMetricsSet,
149152
}
150153

151154
impl LazyMemoryExec {
@@ -164,6 +167,7 @@ impl LazyMemoryExec {
164167
schema,
165168
batch_generators: generators,
166169
cache,
170+
metrics: ExecutionPlanMetricsSet::new(),
167171
})
168172
}
169173
}
@@ -254,12 +258,18 @@ impl ExecutionPlan for LazyMemoryExec {
254258
);
255259
}
256260

261+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
257262
Ok(Box::pin(LazyMemoryStream {
258263
schema: Arc::clone(&self.schema),
259264
generator: Arc::clone(&self.batch_generators[partition]),
265+
baseline_metrics,
260266
}))
261267
}
262268

269+
fn metrics(&self) -> Option<MetricsSet> {
270+
Some(self.metrics.clone_inner())
271+
}
272+
263273
fn statistics(&self) -> Result<Statistics> {
264274
Ok(Statistics::new_unknown(&self.schema))
265275
}
@@ -276,6 +286,8 @@ pub struct LazyMemoryStream {
276286
/// parallel execution.
277287
/// Sharing generators between streams should be used with caution.
278288
generator: Arc<RwLock<dyn LazyBatchGenerator>>,
289+
/// Execution metrics
290+
baseline_metrics: BaselineMetrics,
279291
}
280292

281293
impl Stream for LazyMemoryStream {
@@ -285,13 +297,16 @@ impl Stream for LazyMemoryStream {
285297
self: std::pin::Pin<&mut Self>,
286298
_: &mut Context<'_>,
287299
) -> Poll<Option<Self::Item>> {
300+
let _timer_guard = self.baseline_metrics.elapsed_compute().timer();
288301
let batch = self.generator.write().generate_next_batch();
289302

290-
match batch {
303+
let poll = match batch {
291304
Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
292305
Ok(None) => Poll::Ready(None),
293306
Err(e) => Poll::Ready(Some(Err(e))),
294-
}
307+
};
308+
309+
self.baseline_metrics.record_poll(poll)
295310
}
296311
}
297312

@@ -304,6 +319,7 @@ impl RecordBatchStream for LazyMemoryStream {
304319
#[cfg(test)]
305320
mod lazy_memory_tests {
306321
use super::*;
322+
use crate::common::collect;
307323
use arrow::array::Int64Array;
308324
use arrow::datatypes::{DataType, Field, Schema};
309325
use futures::StreamExt;
@@ -419,4 +435,45 @@ mod lazy_memory_tests {
419435

420436
Ok(())
421437
}
438+
439+
#[tokio::test]
440+
async fn test_generate_series_metrics_integration() -> Result<()> {
441+
// Test LazyMemoryExec metrics with different configurations
442+
let test_cases = vec![
443+
(10, 2, 10), // 10 rows, batch size 2, expected 10 rows
444+
(100, 10, 100), // 100 rows, batch size 10, expected 100 rows
445+
(5, 1, 5), // 5 rows, batch size 1, expected 5 rows
446+
];
447+
448+
for (total_rows, batch_size, expected_rows) in test_cases {
449+
let schema =
450+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
451+
let generator = TestGenerator {
452+
counter: 0,
453+
max_batches: (total_rows + batch_size - 1) / batch_size, // ceiling division
454+
batch_size: batch_size as usize,
455+
schema: Arc::clone(&schema),
456+
};
457+
458+
let exec =
459+
LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
460+
let task_ctx = Arc::new(TaskContext::default());
461+
462+
let stream = exec.execute(0, task_ctx)?;
463+
let batches = collect(stream).await?;
464+
465+
// Verify metrics exist with actual expected numbers
466+
let metrics = exec.metrics().unwrap();
467+
468+
// Count actual rows returned
469+
let actual_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
470+
assert_eq!(actual_rows, expected_rows);
471+
472+
// Verify metrics match actual output
473+
assert_eq!(metrics.output_rows().unwrap(), expected_rows);
474+
assert!(metrics.elapsed_compute().unwrap() > 0);
475+
}
476+
477+
Ok(())
478+
}
422479
}

datafusion/physical-plan/src/metrics/baseline.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,10 @@ impl BaselineMetrics {
117117
}
118118
}
119119

120-
/// Process a poll result of a stream producing output for an
121-
/// operator, recording the output rows and stream done time and
122-
/// returning the same poll result
120+
/// Process a poll result of a stream producing output for an operator.
121+
///
122+
/// Note: this method only updates `output_rows` and `end_time` metrics.
123+
/// Remember to update `elapsed_compute` and other metrics manually.
123124
pub fn record_poll(
124125
&self,
125126
poll: Poll<Option<Result<RecordBatch>>>,

0 commit comments

Comments
 (0)