diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 2deb8fde2da6f..aa69b0de697f5 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -44,11 +44,13 @@ use datafusion_common::{assert_contains, Result}; use datafusion_execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; -use datafusion_execution::TaskContext; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_execution::{DiskManager, TaskContext}; use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::join_selection::JoinSelection; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::collect; use datafusion_physical_plan::spill::get_record_batch_memory_size; use rand::Rng; use test_utils::AccessLogGenerator; @@ -468,6 +470,83 @@ async fn test_stringview_external_sort() { let _ = df.collect().await.expect("Query execution failed"); } +// Tests for disk limit (`max_temp_directory_size` in `DiskManager`) +// ------------------------------------------------------------------ + +// Create a new `SessionContext` with speicified disk limit and memory pool limit +async fn setup_context( + disk_limit: u64, + memory_pool_limit: usize, +) -> Result { + let disk_manager = DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)? + .with_max_temp_directory_size(disk_limit)?; + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit))) + .build_arc() + .unwrap(); + + let runtime = Arc::new(RuntimeEnv { + memory_pool: runtime.memory_pool.clone(), + disk_manager: Arc::new(disk_manager), + cache_manager: runtime.cache_manager.clone(), + object_store_registry: runtime.object_store_registry.clone(), + }); + + let config = SessionConfig::new() + .with_sort_spill_reservation_bytes(10 * 1024 * 1024) // 10MB + .with_target_partitions(1); + + Ok(SessionContext::new_with_config_rt(config, runtime)) +} + +/// If the spilled bytes exceed the disk limit, the query should fail +/// (specified by `max_temp_directory_size` in `DiskManager`) +#[tokio::test] +async fn test_disk_spill_limit_reached() -> Result<()> { + let ctx = setup_context(100 * 1024 * 1024, 60 * 1024 * 1024).await?; + + let df = ctx + .sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1") + .await + .unwrap(); + + let err = df.collect().await.unwrap_err(); + assert_contains!( + err.to_string(), + "The used disk space during the spilling process has exceeded the allowable limit" + ); + + Ok(()) +} + +/// External query should succeed, if the spilled bytes is less than the disk limit +#[tokio::test] +async fn test_disk_spill_limit_not_reached() -> Result<()> { + let disk_spill_limit = 100 * 1024 * 1024; // 100MB + let ctx = setup_context(disk_spill_limit, 60 * 1024 * 1024).await?; + + let df = ctx + .sql("select * from generate_series(1, 10000000) as t1(v1) order by v1") + .await + .unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + + let task_ctx = ctx.task_ctx(); + let _ = collect(Arc::clone(&plan), task_ctx) + .await + .expect("Query execution failed"); + + let spill_count = plan.metrics().unwrap().spill_count().unwrap(); + let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap(); + + println!("spill count {}, spill bytes {}", spill_count, spilled_bytes); + assert!(spill_count > 0); + assert!((spilled_bytes as u64) < disk_spill_limit); + + Ok(()) +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 8f642f3384d2e..95df6fc7a7c8e 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -39,6 +39,7 @@ name = "datafusion_execution" [dependencies] arrow = { workspace = true } +chrono = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } @@ -49,6 +50,3 @@ parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } url = { workspace = true } - -[dev-dependencies] -chrono = { workspace = true } diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index caa62eefe14c7..590a254a37513 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -17,14 +17,26 @@ //! [`DiskManager`]: Manages files generated during query execution -use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; +use arrow::array::RecordBatch; +use arrow::datatypes::Schema; +use arrow::ipc::writer::StreamWriter; +use datafusion_common::{ + config_err, exec_datafusion_err, internal_err, resources_datafusion_err, + resources_err, DataFusionError, Result, +}; use log::debug; use parking_lot::Mutex; use rand::{thread_rng, Rng}; +use std::fs::File; use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::{Builder, NamedTempFile, TempDir}; +use crate::memory_pool::human_readable_size; +use crate::metrics::{Count, SpillMetrics}; + +const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB + /// Configuration for temporary disk access #[derive(Debug, Clone)] pub enum DiskManagerConfig { @@ -75,6 +87,14 @@ pub struct DiskManager { /// If `Some(vec![])` a new OS specified temporary directory will be created /// If `None` an error will be returned (configured not to spill) local_dirs: Mutex>>>, + + /// The maximum amount of data (in bytes) stored inside the temporary directories. + /// Default to 100GB + max_temp_directory_size: u64, + + /// Used disk space in the temporary directories. Now only spilled data for + /// external executors are counted. + used_disk_space: Count, } impl DiskManager { @@ -84,6 +104,8 @@ impl DiskManager { DiskManagerConfig::Existing(manager) => Ok(manager), DiskManagerConfig::NewOs => Ok(Arc::new(Self { local_dirs: Mutex::new(Some(vec![])), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), })), DiskManagerConfig::NewSpecified(conf_dirs) => { let local_dirs = create_local_dirs(conf_dirs)?; @@ -93,14 +115,67 @@ impl DiskManager { ); Ok(Arc::new(Self { local_dirs: Mutex::new(Some(local_dirs)), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), })) } DiskManagerConfig::Disabled => Ok(Arc::new(Self { local_dirs: Mutex::new(None), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), })), } } + pub fn try_new_without_arc(config: DiskManagerConfig) -> Result { + match config { + DiskManagerConfig::Existing(manager) => { + Arc::try_unwrap(manager).map_err(|_| { + DataFusionError::Internal("Failed to unwrap Arc".to_string()) + }) + } + DiskManagerConfig::NewOs => Ok(Self { + local_dirs: Mutex::new(Some(vec![])), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), + }), + DiskManagerConfig::NewSpecified(conf_dirs) => { + let local_dirs = create_local_dirs(conf_dirs)?; + debug!( + "Created local dirs {:?} as DataFusion working directory", + local_dirs + ); + Ok(Self { + local_dirs: Mutex::new(Some(local_dirs)), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), + }) + } + DiskManagerConfig::Disabled => Ok(Self { + local_dirs: Mutex::new(None), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), + }), + } + } + + /// Set the maximum amount of data (in bytes) stored inside the temporary directories. + pub fn with_max_temp_directory_size( + mut self, + max_temp_directory_size: u64, + ) -> Result { + // If the disk manager is disabled and `max_temp_directory_size` is not 0, + // this operation is not meaningful, fail early. + if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 { + return config_err!( + "Cannot set max temp directory size for disabled disk manager" + ); + } + + self.max_temp_directory_size = max_temp_directory_size; + Ok(self) + } + /// Return true if this disk manager supports creating temporary /// files. If this returns false, any call to `create_tmp_file` /// will error. @@ -144,6 +219,94 @@ impl DiskManager { .map_err(DataFusionError::IoError)?, }) } + + /// Write record batches to a temporary file, and return the spill file handle. + /// + /// This method is used within executors with spilling capabilities to write + /// temporary `RecordBatch`es to disk. Resource errors are returned if the written + /// file size exceeds the disk limit specified in `max_temp_directory_size` from + /// `DiskManager`. + /// + /// # Arguments + /// + /// * `batches` - A slice of `RecordBatch` to be written to disk. Note that this + /// slice can't be empty. + /// * `request_description` - A description of the request for logging and error messages. + /// * `caller_spill_metrics` - Metrics to be updated with the spill operation details, from the calling exeuctor. + pub fn try_spill_record_batches( + &self, + batches: &[RecordBatch], + request_description: &str, + caller_spill_metrics: &mut SpillMetrics, + ) -> Result { + if batches.is_empty() { + return internal_err!( + "`try_spill_record_batches` requires at least one batch" + ); + } + + let spill_file = self.create_tmp_file(request_description)?; + let schema = batches[0].schema(); + + let mut stream_writer = IPCStreamWriter::new(spill_file.path(), schema.as_ref())?; + + for batch in batches { + // The IPC Stream writer does not have a mechanism to avoid writing duplicate + // `Buffer`s repeatedly, so we do not use `get_record_batch_memory_size()` + // to estimate the memory size with duplicated `Buffer`s. + let estimate_extra_size = batch.get_array_memory_size(); + + if (self.used_disk_space.value() + estimate_extra_size) as u64 + > self.max_temp_directory_size + { + return resources_err!( + "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.", + human_readable_size(self.max_temp_directory_size as usize) + ); + } + + self.used_disk_space.add(estimate_extra_size); + stream_writer.write(batch)?; + } + + stream_writer.finish()?; + + // Update calling executor's spill metrics + caller_spill_metrics + .spilled_bytes + .add(stream_writer.num_bytes); + caller_spill_metrics + .spilled_rows + .add(stream_writer.num_rows); + caller_spill_metrics.spill_file_count.add(1); + + Ok(spill_file) + } + + /// Refer to the documentation for [`Self::try_spill_record_batches`]. This method + /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`. + pub fn try_spill_record_batch_by_size( + &self, + batch: &RecordBatch, + request_description: &str, + spill_metrics: &mut SpillMetrics, + row_limit: usize, + ) -> Result { + let total_rows = batch.num_rows(); + let mut batches = Vec::new(); + let mut offset = 0; + + // It's ok to calculate all slices first, because slicing is zero-copy. + while offset < total_rows { + let length = std::cmp::min(total_rows - offset, row_limit); + let sliced_batch = batch.slice(offset, length); + batches.push(sliced_batch); + offset += length; + } + + // Spill the sliced batches to disk + self.try_spill_record_batches(&batches, request_description, spill_metrics) + } } /// A wrapper around a [`NamedTempFile`] that also contains @@ -183,6 +346,51 @@ fn create_local_dirs(local_dirs: Vec) -> Result>> { .collect() } +/// Write in Arrow IPC Stream format to a file. +/// +/// Stream format is used for spill because it supports dictionary replacement, and the random +/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement). +pub struct IPCStreamWriter { + /// Inner writer + pub writer: StreamWriter, + /// Batches written + pub num_batches: usize, + /// Rows written + pub num_rows: usize, + /// Bytes written + pub num_bytes: usize, +} + +impl IPCStreamWriter { + /// Create new writer + pub fn new(path: &Path, schema: &Schema) -> Result { + let file = File::create(path).map_err(|e| { + exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}") + })?; + Ok(Self { + num_batches: 0, + num_rows: 0, + num_bytes: 0, + writer: StreamWriter::try_new(file, schema)?, + }) + } + + /// Write one single batch + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + self.writer.write(batch)?; + self.num_batches += 1; + self.num_rows += batch.num_rows(); + let num_bytes: usize = batch.get_array_memory_size(); + self.num_bytes += num_bytes; + Ok(()) + } + + /// Finish the writer + pub fn finish(&mut self) -> Result<()> { + self.writer.finish().map_err(Into::into) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index a9e3a27f80356..9e8952ae4ca86 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -40,7 +40,8 @@ pub mod registry { }; } -pub use disk_manager::DiskManager; +pub use disk_manager::{DiskManager, IPCStreamWriter}; +pub mod metrics; pub use registry::FunctionRegistry; pub use stream::{RecordBatchStream, SendableRecordBatchStream}; pub use task::TaskContext; diff --git a/datafusion/physical-plan/src/metrics/builder.rs b/datafusion/execution/src/metrics/builder.rs similarity index 99% rename from datafusion/physical-plan/src/metrics/builder.rs rename to datafusion/execution/src/metrics/builder.rs index dbda0a310ce52..994b56d4ae8da 100644 --- a/datafusion/physical-plan/src/metrics/builder.rs +++ b/datafusion/execution/src/metrics/builder.rs @@ -29,7 +29,7 @@ use super::{ /// case of constant strings /// /// ```rust -/// use datafusion_physical_plan::metrics::*; +/// use datafusion_execution::metrics::*; /// /// let metrics = ExecutionPlanMetricsSet::new(); /// let partition = 1; diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/execution/src/metrics/grouped_metrics.rs similarity index 88% rename from datafusion/physical-plan/src/metrics/baseline.rs rename to datafusion/execution/src/metrics/grouped_metrics.rs index b26a08dd0fada..a59cc36f1ab96 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/execution/src/metrics/grouped_metrics.rs @@ -29,7 +29,7 @@ use datafusion_common::Result; /// /// Example: /// ``` -/// use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; +/// use datafusion_execution::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; /// let metrics = ExecutionPlanMetricsSet::new(); /// /// let partition = 2; @@ -143,6 +143,32 @@ impl Drop for BaselineMetrics { } } +/// A set of metrics related to spilling during the execution of an operator +#[derive(Debug, Clone, Default)] +pub struct SpillMetrics { + /// count of spill files during the execution of the operator + pub spill_file_count: Count, + /// total spilled bytes during the execution of the operator + pub spilled_bytes: Count, + /// total spilled rows during the execution of the operator + pub spilled_rows: Count, +} + +impl SpillMetrics { + /// Create a new set of spill metrics + pub fn new( + spill_file_count: Count, + spilled_bytes: Count, + spilled_rows: Count, + ) -> Self { + Self { + spill_file_count, + spilled_bytes, + spilled_rows, + } + } +} + /// Trait for things that produce output rows as a result of execution. pub trait RecordOutput { /// Record that some number of output rows have been produced diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/execution/src/metrics/mod.rs similarity index 98% rename from datafusion/physical-plan/src/metrics/mod.rs rename to datafusion/execution/src/metrics/mod.rs index 50252e8d973ac..a2ad8a04d0e7e 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/execution/src/metrics/mod.rs @@ -17,8 +17,8 @@ //! Metrics for recording information about execution -mod baseline; mod builder; +mod grouped_metrics; mod value; use parking_lot::Mutex; @@ -31,19 +31,19 @@ use std::{ use datafusion_common::HashMap; // public exports -pub use baseline::{BaselineMetrics, RecordOutput}; pub use builder::MetricBuilder; +pub use grouped_metrics::{BaselineMetrics, RecordOutput, SpillMetrics}; pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; /// Something that tracks a value of interest (metric) of a DataFusion -/// [`ExecutionPlan`] execution. +/// ExecutionPlan execution. /// /// Typically [`Metric`]s are not created directly, but instead /// are created using [`MetricBuilder`] or methods on /// [`ExecutionPlanMetricsSet`]. /// /// ``` -/// use datafusion_physical_plan::metrics::*; +/// use datafusion_execution::metrics::*; /// /// let metrics = ExecutionPlanMetricsSet::new(); /// assert!(metrics.clone_inner().output_rows().is_none()); @@ -62,8 +62,6 @@ pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; /// // As well as from the metrics set /// assert_eq!(metrics.clone_inner().output_rows(), Some(13)); /// ``` -/// -/// [`ExecutionPlan`]: super::ExecutionPlan #[derive(Debug)] pub struct Metric { @@ -164,9 +162,7 @@ impl Metric { } } -/// A snapshot of the metrics for a particular ([`ExecutionPlan`]). -/// -/// [`ExecutionPlan`]: super::ExecutionPlan +/// A snapshot of the metrics for a particular `ExecutionPlan`. #[derive(Default, Debug, Clone)] pub struct MetricsSet { metrics: Vec>, @@ -343,14 +339,12 @@ impl Display for MetricsSet { /// A set of [`Metric`]s for an individual "operator" (e.g. `&dyn /// ExecutionPlan`). /// -/// This structure is intended as a convenience for [`ExecutionPlan`] +/// This structure is intended as a convenience for `ExecutionPlan` /// implementations so they can generate different streams for multiple /// partitions but easily report them together. /// /// Each `clone()` of this structure will add metrics to the same /// underlying metrics set -/// -/// [`ExecutionPlan`]: super::ExecutionPlan #[derive(Default, Debug, Clone)] pub struct ExecutionPlanMetricsSet { inner: Arc>, diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/execution/src/metrics/value.rs similarity index 100% rename from datafusion/physical-plan/src/metrics/value.rs rename to datafusion/execution/src/metrics/value.rs diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 0947a2ff55391..097d1414585bd 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -26,7 +26,6 @@ use crate::aggregates::{ topk_stream::GroupedTopKAggregateStream, }; use crate::execution_plan::{CardinalityEffect, EmissionType}; -use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::get_field_metadata; use crate::windows::get_ordered_partition_by_indices; use crate::{ @@ -39,6 +38,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; use datafusion_common::{internal_err, not_impl_err, Constraint, Constraints, Result}; +use datafusion_execution::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; @@ -1347,7 +1347,6 @@ mod tests { use crate::common::collect; use crate::execution_plan::Boundedness; use crate::expressions::col; - use crate::metrics::MetricValue; use crate::test::assert_is_pending; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::TestMemoryExec; @@ -1365,6 +1364,7 @@ mod tests { }; use datafusion_execution::config::SessionConfig; use datafusion_execution::memory_pool::FairSpillPool; + use datafusion_execution::metrics::MetricValue; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_functions_aggregate::array_agg::array_agg_udaf; use datafusion_functions_aggregate::average::avg_udaf; diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 9474a5f88c92a..c25b738972a59 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -21,11 +21,11 @@ use crate::aggregates::{ aggregate_expressions, create_accumulators, finalize_aggregation, AccumulatorItem, AggregateMode, }; -use crate::metrics::{BaselineMetrics, RecordOutput}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; +use datafusion_execution::metrics::{BaselineMetrics, RecordOutput}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::stream::BoxStream; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 05122d5a5403d..57e5644ed2302 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -27,12 +27,11 @@ use crate::aggregates::{ create_schema, evaluate_group_by, evaluate_many, evaluate_optional, AggregateMode, PhysicalGroupBy, }; -use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; use crate::sorts::sort::sort_batch; use crate::sorts::streaming_merge::StreamingMergeBuilder; -use crate::spill::{read_spill_as_stream, spill_record_batch_by_size}; +use crate::spill::read_spill_as_stream; use crate::stream::RecordBatchStreamAdapter; -use crate::{aggregates, metrics, ExecutionPlan, PhysicalExpr}; +use crate::{aggregates, ExecutionPlan, PhysicalExpr}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; @@ -42,6 +41,8 @@ use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::metrics::{self, SpillMetrics}; +use datafusion_execution::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_expr::{EmitTo, GroupsAccumulator}; @@ -109,12 +110,8 @@ struct SpillState { /// Peak memory used for buffered data. /// Calculated as sum of peak memory values across partitions peak_mem_used: metrics::Gauge, - /// count of spill files during the execution of the operator - spill_count: metrics::Count, - /// total spilled bytes during the execution of the operator - spilled_bytes: metrics::Count, - /// total spilled rows during the execution of the operator - spilled_rows: metrics::Count, + /// Spilling-related metrics + spill_metrics: SpillMetrics, } /// Tracks if the aggregate should skip partial aggregations @@ -553,9 +550,11 @@ impl GroupedHashAggregateStream { merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), peak_mem_used: MetricBuilder::new(&agg.metrics) .gauge("peak_mem_used", partition), - spill_count: MetricBuilder::new(&agg.metrics).spill_count(partition), - spilled_bytes: MetricBuilder::new(&agg.metrics).spilled_bytes(partition), - spilled_rows: MetricBuilder::new(&agg.metrics).spilled_rows(partition), + spill_metrics: SpillMetrics::new( + MetricBuilder::new(&agg.metrics).spill_count(partition), + MetricBuilder::new(&agg.metrics).spilled_bytes(partition), + MetricBuilder::new(&agg.metrics).spilled_rows(partition), + ), }; // Skip aggregation is supported if: @@ -987,23 +986,15 @@ impl GroupedHashAggregateStream { return Ok(()); }; let sorted = sort_batch(&emit, self.spill_state.spill_expr.as_ref(), None)?; - let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?; // TODO: slice large `sorted` and write to multiple files in parallel - spill_record_batch_by_size( + let spillfile = self.runtime.disk_manager.try_spill_record_batch_by_size( &sorted, - spillfile.path().into(), - sorted.schema(), + "HashAggSpill", + &mut self.spill_state.spill_metrics, self.batch_size, )?; self.spill_state.spills.push(spillfile); - // Update metrics - self.spill_state.spill_count.add(1); - self.spill_state - .spilled_bytes - .add(sorted.get_array_memory_size()); - self.spill_state.spilled_rows.add(sorted.num_rows()); - Ok(()) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index fa8d125d62d1f..bc0764a50d7c8 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -22,7 +22,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, @@ -31,6 +30,9 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use datafusion_execution::TaskContext; use crate::coalesce::{BatchCoalescer, CoalescerState}; diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 9a955155c01e8..4fb2a083a749b 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -21,7 +21,6 @@ use std::any::Any; use std::sync::Arc; -use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::stream::{ObservedStream, RecordBatchReceiverStream}; use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, @@ -32,6 +31,9 @@ use crate::projection::{make_with_child, ProjectionExec}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use datafusion_common::{internal_err, Result}; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use datafusion_execution::TaskContext; /// Merge execution plan executes partitions in parallel and combines them into a single diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 851e504b69afc..27c3930adad3e 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -16,13 +16,13 @@ // under the License. pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; -pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; pub use crate::stream::EmptyRecordBatchStream; pub use datafusion_common::hash_utils; pub use datafusion_common::utils::project_schema; pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; +pub use datafusion_execution::metrics::{Metric, MetricsSet}; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; @@ -36,7 +36,6 @@ use std::sync::Arc; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::display::DisplayableExecutionPlan; -use crate::metrics::MetricsSet; use crate::projection::ProjectionExec; use crate::repartition::RepartitionExec; use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index a66873bc6576e..2dcd13a0fcfc7 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -30,10 +30,7 @@ use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, ProjectionExec, }; -use crate::{ - metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - DisplayFormatType, ExecutionPlan, -}; +use crate::{DisplayFormatType, ExecutionPlan}; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; @@ -43,6 +40,9 @@ use datafusion_common::stats::Precision; use datafusion_common::{ internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, }; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 63c9c99212483..b8735aa9e6daa 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -26,13 +26,13 @@ use super::{ execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, }; -use crate::metrics::MetricsSet; use crate::stream::RecordBatchStreamAdapter; use crate::ExecutionPlanProperties; use arrow::array::{ArrayRef, RecordBatch, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{internal_err, Result}; +use datafusion_execution::metrics::MetricsSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::{Distribution, EquivalenceProperties}; diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index ca4c26251de02..343428005da9e 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -27,7 +27,6 @@ use super::utils::{ }; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::execution_plan::{boundedness_from_children, EmissionType}; -use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ join_allows_pushdown, join_table_borders, new_join_children, physical_to_column_exprs, ProjectionExec, @@ -44,6 +43,7 @@ use arrow::datatypes::{Fields, Schema, SchemaRef}; use datafusion_common::stats::Precision; use datafusion_common::{internal_err, JoinType, Result, ScalarValue}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index b2e9b37655f16..135e8911f730b 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -52,7 +52,6 @@ use crate::{ JoinFilter, JoinHashMap, JoinHashMapOffset, JoinHashMapType, JoinOn, JoinOnRef, StatefulStreamResult, }, - metrics::{ExecutionPlanMetricsSet, MetricsSet}, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; @@ -73,6 +72,7 @@ use datafusion_common::{ JoinSide, JoinType, Result, }; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::{ diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 64dfc8219b644..e010708e3fa53 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -37,7 +37,6 @@ use crate::joins::utils::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut, }; use crate::joins::SharedBitmapBuilder; -use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, ProjectionExec, @@ -56,6 +55,7 @@ use datafusion_common::{ exec_datafusion_err, internal_err, project_schema, JoinSide, Result, Statistics, }; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_execution::TaskContext; use datafusion_expr::JoinType; use datafusion_physical_expr::equivalence::{ diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 9b008f5242c47..ec75eefa7c13a 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -41,16 +41,14 @@ use crate::joins::utils::{ reorder_output_after_swap, symmetric_join_output_partitioning, JoinFilter, JoinOn, JoinOnRef, }; -use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use crate::projection::{ join_allows_pushdown, join_table_borders, new_join_children, physical_to_column_exprs, update_join_on, ProjectionExec, }; -use crate::spill::spill_record_batches; use crate::{ - metrics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, + PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, + Statistics, }; use arrow::array::{types::UInt64Type, *}; @@ -66,6 +64,10 @@ use datafusion_common::{ }; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::metrics::{self, SpillMetrics}; +use datafusion_execution::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, +}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; @@ -580,12 +582,8 @@ struct SortMergeJoinMetrics { /// Peak memory used for buffered data. /// Calculated as sum of peak memory values across partitions peak_mem_used: metrics::Gauge, - /// count of spills during the execution of the operator - spill_count: Count, - /// total spilled bytes during the execution of the operator - spilled_bytes: Count, - /// total spilled rows during the execution of the operator - spilled_rows: Count, + /// Spilling-related metrics + spill_metrics: SpillMetrics, } impl SortMergeJoinMetrics { @@ -599,9 +597,11 @@ impl SortMergeJoinMetrics { MetricBuilder::new(metrics).counter("output_batches", partition); let output_rows = MetricBuilder::new(metrics).output_rows(partition); let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition); - let spill_count = MetricBuilder::new(metrics).spill_count(partition); - let spilled_bytes = MetricBuilder::new(metrics).spilled_bytes(partition); - let spilled_rows = MetricBuilder::new(metrics).spilled_rows(partition); + let spill_metrics = SpillMetrics::new( + MetricBuilder::new(metrics).spill_count(partition), + MetricBuilder::new(metrics).spilled_bytes(partition), + MetricBuilder::new(metrics).spilled_rows(partition), + ); Self { join_time, @@ -610,9 +610,7 @@ impl SortMergeJoinMetrics { output_batches, output_rows, peak_mem_used, - spill_count, - spilled_bytes, - spilled_rows, + spill_metrics, } } } @@ -1387,26 +1385,17 @@ impl SortMergeJoinStream { } Err(_) if self.runtime_env.disk_manager.tmp_files_enabled() => { // spill buffered batch to disk - let spill_file = self - .runtime_env - .disk_manager - .create_tmp_file("sort_merge_join_buffered_spill")?; - if let Some(batch) = buffered_batch.batch { - spill_record_batches( - &[batch], - spill_file.path().into(), - Arc::clone(&self.buffered_schema), - )?; + let spill_file = + self.runtime_env.disk_manager.try_spill_record_batches( + &[batch], + "sort_merge_join_buffered_spill", + &mut self.join_metrics.spill_metrics, + )?; + buffered_batch.spill_file = Some(spill_file); buffered_batch.batch = None; - // update metrics to register spill - self.join_metrics.spill_count.add(1); - self.join_metrics - .spilled_bytes - .add(buffered_batch.size_estimation); - self.join_metrics.spilled_rows.add(buffered_batch.num_rows); Ok(()) } else { internal_err!("Buffered batch has empty body") diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 677601a12845f..65f05b69b2838 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -23,8 +23,7 @@ use std::mem::size_of; use std::sync::Arc; use crate::joins::utils::{JoinFilter, JoinHashMapType}; -use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; -use crate::{metrics, ExecutionPlan}; +use crate::ExecutionPlan; use arrow::array::{ ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, PrimitiveArray, RecordBatch, @@ -36,6 +35,8 @@ use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ arrow_datafusion_err, DataFusionError, HashSet, JoinSide, Result, ScalarValue, }; +use datafusion_execution::metrics; +use datafusion_execution::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 47af4ab9a7652..062adbfcd3083 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -52,10 +52,9 @@ use crate::projection::{ physical_to_column_exprs, update_join_filter, update_join_on, ProjectionExec, }; use crate::{ - joins::StreamJoinPartitionMode, - metrics::{ExecutionPlanMetricsSet, MetricsSet}, - DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, + joins::StreamJoinPartitionMode, DisplayAs, DisplayFormatType, Distribution, + ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, Statistics, }; use arrow::array::{ @@ -69,6 +68,7 @@ use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::bisect; use datafusion_common::{internal_err, plan_err, HashSet, JoinSide, JoinType, Result}; use datafusion_execution::memory_pool::MemoryConsumer; +use datafusion_execution::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_execution::TaskContext; use datafusion_expr::interval_arithmetic::Interval; use datafusion_physical_expr::equivalence::join_equivalence_properties; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index cffc4b4bff8ee..e185b01cacff7 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -25,7 +25,6 @@ use std::ops::{IndexMut, Range}; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; use crate::{ ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, }; @@ -47,6 +46,7 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult, }; +use datafusion_execution::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_physical_expr::equivalence::add_offset_to_expr; use datafusion_physical_expr::expressions::Column; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6ddaef1a2d280..fe288bf3319ca 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -44,7 +44,6 @@ pub use crate::execution_plan::{ execute_stream_partitioned, get_plan_string, with_new_children_if_necessary, ExecutionPlan, ExecutionPlanProperties, PlanProperties, }; -pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; @@ -68,7 +67,6 @@ pub mod insert; pub mod joins; pub mod limit; pub mod memory; -pub mod metrics; pub mod placeholder_row; pub mod projection; pub mod recursive_query; @@ -87,6 +85,9 @@ pub mod udaf { pub use datafusion_expr::StatisticsArgs; pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr; } +pub mod metrics { + pub use datafusion_execution::metrics::*; +} pub mod coalesce; #[cfg(test)] diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index f720294c7ad90..994dc91015166 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -22,9 +22,9 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use super::DisplayAs; use super::{ - DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, + ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::execution_plan::{Boundedness, CardinalityEffect}; @@ -33,6 +33,9 @@ use crate::{DisplayFormatType, Distribution, ExecutionPlan, Partitioning}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, Result}; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use datafusion_execution::TaskContext; use futures::stream::{Stream, StreamExt}; diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 08c4d24f4c7fe..8274f590852aa 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -27,7 +27,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::expressions::{CastExpr, Column, Literal}; -use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -35,6 +34,9 @@ use super::{ use crate::execution_plan::CardinalityEffect; use crate::joins::utils::{ColumnIndex, JoinFilter}; use crate::{ColumnStatistics, DisplayFormatType, ExecutionPlan, PhysicalExpr}; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 05b78e4e1da42..d0088a99451cc 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -24,10 +24,12 @@ use std::task::{Context, Poll}; use super::work_table::{ReservedBatches, WorkTable, WorkTableExec}; use crate::execution_plan::{Boundedness, EmissionType}; use crate::{ - metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, Statistics, +}; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; -use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 40e68cfcae837..e372c23e7b191 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -25,13 +25,11 @@ use std::task::{Context, Poll}; use std::{any::Any, vec}; use super::common::SharedMemoryReservation; -use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use crate::execution_plan::CardinalityEffect; use crate::hash_utils::create_hashes; -use crate::metrics::BaselineMetrics; use crate::projection::{all_columns, make_with_child, update_expr, ProjectionExec}; use crate::repartition::distributor_channels::{ channels, partition_aware_channels, DistributionReceiver, DistributionSender, @@ -39,6 +37,9 @@ use crate::repartition::distributor_channels::{ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; +use datafusion_execution::metrics::{ + self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, +}; use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; use arrow::compute::take_arrays; diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 1c2b8cd0c91b7..20c19a74d50a4 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -23,11 +23,11 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; -use crate::metrics::BaselineMetrics; use crate::sorts::builder::BatchBuilder; use crate::sorts::cursor::{Cursor, CursorValues}; use crate::sorts::stream::PartitionedStream; use crate::RecordBatchStream; +use datafusion_execution::metrics::BaselineMetrics; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -451,9 +451,9 @@ impl SortPreservingMergeStream { self.update_winner(cmp_node, winner, challenger); } } else if challenger < *winner { - // If the winner doesn’t survive in the final match, it indicates that the original winner + // If the winner doesn't survive in the final match, it indicates that the original winner // has moved up in value, so the challenger now becomes the new winner. - // This also means that we’re in a new round of the tie breaker, + // This also means that we're in a new round of the tie breaker, // and the polls count is outdated (though not yet cleaned up). // // By the time we reach this code, both the new winner and the current challenger diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index dc03c012d9be4..8482dc359b1c7 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -57,12 +57,14 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::sorts::sort::sort_batch; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, }; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 751496c70808e..bcb4a8cba1ec2 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -28,14 +28,9 @@ use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; -use crate::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, -}; use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; -use crate::spill::{ - get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, -}; +use crate::spill::{get_record_batch_memory_size, read_spill_as_stream}; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ @@ -43,6 +38,9 @@ use crate::{ ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, }; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics, +}; use arrow::array::{ Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array, @@ -65,23 +63,19 @@ struct ExternalSorterMetrics { /// metrics baseline: BaselineMetrics, - /// count of spills during the execution of the operator - spill_count: Count, - - /// total spilled bytes during the execution of the operator - spilled_bytes: Count, - - /// total spilled rows during the execution of the operator - spilled_rows: Count, + /// Spilling-related metrics + spill_metrics: SpillMetrics, } impl ExternalSorterMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { baseline: BaselineMetrics::new(metrics, partition), - spill_count: MetricBuilder::new(metrics).spill_count(partition), - spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), - spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition), + spill_metrics: SpillMetrics::new( + MetricBuilder::new(metrics).spill_count(partition), + MetricBuilder::new(metrics).spilled_bytes(partition), + MetricBuilder::new(metrics).spilled_rows(partition), + ), } } } @@ -377,17 +371,17 @@ impl ExternalSorter { /// How many bytes have been spilled to disk? fn spilled_bytes(&self) -> usize { - self.metrics.spilled_bytes.value() + self.metrics.spill_metrics.spilled_bytes.value() } /// How many rows have been spilled to disk? fn spilled_rows(&self) -> usize { - self.metrics.spilled_rows.value() + self.metrics.spill_metrics.spilled_rows.value() } /// How many spill files have been created? fn spill_count(&self) -> usize { - self.metrics.spill_count.value() + self.metrics.spill_metrics.spill_file_count.value() } /// Writes any `in_memory_batches` to a spill file and clears @@ -404,17 +398,13 @@ impl ExternalSorter { debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); - let (spilled_rows, spilled_bytes) = spill_record_batches( + let spill_file = self.runtime.disk_manager.try_spill_record_batches( &batches, - spill_file.path().into(), - Arc::clone(&self.schema), + "Sorting", + &mut self.metrics.spill_metrics, )?; let used = self.reservation.free(); - self.metrics.spill_count.add(1); - self.metrics.spilled_bytes.add(spilled_bytes); - self.metrics.spilled_rows.add(spilled_rows); self.spills.push(spill_file); Ok(used) } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 454a068551754..912e7ef98c704 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -22,13 +22,15 @@ use std::sync::Arc; use crate::common::spawn_buffered; use crate::limit::LimitStream; -use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, }; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use datafusion_common::{internal_err, Result}; use datafusion_execution::memory_pool::MemoryConsumer; @@ -384,7 +386,6 @@ mod tests { use crate::coalesce_partitions::CoalescePartitionsExec; use crate::execution_plan::{Boundedness, EmissionType}; use crate::expressions::col; - use crate::metrics::{MetricValue, Timestamp}; use crate::repartition::RepartitionExec; use crate::sorts::sort::SortExec; use crate::stream::RecordBatchReceiverStream; @@ -402,6 +403,7 @@ mod tests { use datafusion_common::{assert_batches_eq, assert_contains, DataFusionError}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::config::SessionConfig; + use datafusion_execution::metrics::{MetricValue, Timestamp}; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_execution::RecordBatchStream; use datafusion_physical_expr::expressions::Column; diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index a541f79dc7174..b691e6cbb7338 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -18,7 +18,6 @@ //! Merge that deals with an arbitrary size of streaming inputs. //! This is an order-preserving merge. -use crate::metrics::BaselineMetrics; use crate::sorts::{ merge::SortPreservingMergeStream, stream::{FieldCursorStream, RowCursorStream}, @@ -28,6 +27,7 @@ use arrow::array::*; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::{internal_err, Result}; use datafusion_execution::memory_pool::MemoryReservation; +use datafusion_execution::metrics::BaselineMetrics; use datafusion_physical_expr_common::sort_expr::LexOrdering; macro_rules! primitive_merge_helper { diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index fa1b8a91cec7c..ace850f940bfe 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -23,16 +23,14 @@ use std::path::{Path, PathBuf}; use std::ptr::NonNull; use arrow::array::ArrayData; -use arrow::datatypes::{Schema, SchemaRef}; -use arrow::ipc::{reader::StreamReader, writer::StreamWriter}; +use arrow::datatypes::SchemaRef; +use arrow::ipc::reader::StreamReader; use arrow::record_batch::RecordBatch; -use log::debug; use tokio::sync::mpsc::Sender; use datafusion_common::{exec_datafusion_err, HashSet, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; -use datafusion_execution::memory_pool::human_readable_size; -use datafusion_execution::SendableRecordBatchStream; +use datafusion_execution::{IPCStreamWriter, SendableRecordBatchStream}; use crate::stream::RecordBatchReceiverStream; @@ -54,41 +52,13 @@ pub(crate) fn read_spill_as_stream( Ok(builder.build()) } -/// Spills in-memory `batches` to disk. -/// -/// Returns total number of the rows spilled to disk. -pub(crate) fn spill_record_batches( - batches: &[RecordBatch], - path: PathBuf, - schema: SchemaRef, -) -> Result<(usize, usize)> { - let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?; - for batch in batches { - writer.write(batch)?; - } - writer.finish()?; - debug!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, - writer.num_rows, - human_readable_size(writer.num_bytes), - ); - Ok((writer.num_rows, writer.num_bytes)) -} - -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(path)?); - let reader = StreamReader::try_new(file, None)?; - for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; - } - Ok(()) -} - /// Spill the `RecordBatch` to disk as smaller batches -/// split by `batch_size_rows` +/// split by `batch_size_rows`. +#[deprecated( + since = "46.0.0", + note = "This function is deprecated. Use `datafusion_execution::DiskManager::try_spill_record_batch_by_size` instead. Note this method is mainly used within DataFusion for spilling operators. If you only + want the functionality of writing `RecordBatch`es to disk, consider using `arrow::ipc::writer::StreamWriter` instead." +)] pub fn spill_record_batch_by_size( batch: &RecordBatch, path: PathBuf, @@ -110,6 +80,17 @@ pub fn spill_record_batch_by_size( Ok(()) } +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { + let file = BufReader::new(File::open(path)?); + let reader = StreamReader::try_new(file, None)?; + for batch in reader { + sender + .blocking_send(batch.map_err(Into::into)) + .map_err(|e| exec_datafusion_err!("{e}"))?; + } + Ok(()) +} + /// Calculate total used memory of this batch. /// /// This function is used to estimate the physical memory usage of the `RecordBatch`. @@ -178,55 +159,10 @@ fn count_array_data_memory_size( } } -/// Write in Arrow IPC Stream format to a file. -/// -/// Stream format is used for spill because it supports dictionary replacement, and the random -/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement). -struct IPCStreamWriter { - /// Inner writer - pub writer: StreamWriter, - /// Batches written - pub num_batches: usize, - /// Rows written - pub num_rows: usize, - /// Bytes written - pub num_bytes: usize, -} - -impl IPCStreamWriter { - /// Create new writer - pub fn new(path: &Path, schema: &Schema) -> Result { - let file = File::create(path).map_err(|e| { - exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}") - })?; - Ok(Self { - num_batches: 0, - num_rows: 0, - num_bytes: 0, - writer: StreamWriter::try_new(file, schema)?, - }) - } - - /// Write one single batch - pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { - self.writer.write(batch)?; - self.num_batches += 1; - self.num_rows += batch.num_rows(); - let num_bytes: usize = batch.get_array_memory_size(); - self.num_bytes += num_bytes; - Ok(()) - } - - /// Finish the writer - pub fn finish(&mut self) -> Result<()> { - self.writer.finish().map_err(Into::into) - } -} - #[cfg(test)] mod tests { use super::*; - use crate::spill::{spill_record_batch_by_size, spill_record_batches}; + use crate::test::build_table_i32; use arrow::array::{Float64Array, Int32Array, ListArray}; use arrow::compute::cast; @@ -234,6 +170,7 @@ mod tests { use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_execution::disk_manager::DiskManagerConfig; + use datafusion_execution::metrics::SpillMetrics; use datafusion_execution::DiskManager; use itertools::Itertools; use std::fs::File; @@ -256,15 +193,16 @@ mod tests { let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - let spill_file = disk_manager.create_tmp_file("Test Spill")?; let schema = batch1.schema(); let num_rows = batch1.num_rows() + batch2.num_rows(); - let (spilled_rows, _) = spill_record_batches( + + let mut tmp_metrics = SpillMetrics::default(); + let spill_file = disk_manager.try_spill_record_batches( &[batch1, batch2], - spill_file.path().into(), - Arc::clone(&schema), + "Test Spill", + &mut tmp_metrics, )?; - assert_eq!(spilled_rows, num_rows); + assert_eq!(tmp_metrics.spilled_rows.value(), num_rows); let file = BufReader::new(File::open(spill_file.path())?); let reader = StreamReader::try_new(file, None)?; @@ -322,14 +260,14 @@ mod tests { let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - let spill_file = disk_manager.create_tmp_file("Test Spill")?; - let num_rows = batch1.num_rows() + batch2.num_rows(); - let (spilled_rows, _) = spill_record_batches( - &[batch1, batch2], - spill_file.path().into(), - Arc::clone(&dict_schema), + let mut tmp_metrics = SpillMetrics::default(); + let spill_file = disk_manager.try_spill_record_batches( + &[batch1.clone(), batch2.clone()], + "Test Spill", + &mut tmp_metrics, )?; - assert_eq!(spilled_rows, num_rows); + let num_rows = batch1.num_rows() + batch2.num_rows(); + assert_eq!(tmp_metrics.spilled_rows.value(), num_rows); let file = BufReader::new(File::open(spill_file.path())?); let reader = StreamReader::try_new(file, None)?; @@ -352,20 +290,17 @@ mod tests { let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - let spill_file = disk_manager.create_tmp_file("Test Spill")?; - let schema = batch1.schema(); - spill_record_batch_by_size( + let mut tmp_metrics = SpillMetrics::default(); + let spill_file = disk_manager.try_spill_record_batch_by_size( &batch1, - spill_file.path().into(), - Arc::clone(&schema), + "Test Spill", + &mut tmp_metrics, 1, )?; let file = BufReader::new(File::open(spill_file.path())?); let reader = StreamReader::try_new(file, None)?; - assert_eq!(reader.schema(), schema); - let batches = reader.collect_vec(); assert!(batches.len() == 4); diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 23cbb1ce49c18..fc522c02557fe 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -22,12 +22,12 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -use super::metrics::BaselineMetrics; use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream}; use crate::displayable; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::{internal_err, Result}; +use datafusion_execution::metrics::BaselineMetrics; use datafusion_execution::TaskContext; use futures::stream::BoxStream; diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 8bdfca2a89076..2e5d0d17e14d5 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -25,12 +25,14 @@ use super::{DisplayAs, DisplayFormatType, PlanProperties}; use crate::display::{display_orderings, ProjectSchemaDisplay}; use crate::execution_plan::{Boundedness, EmissionType}; use crate::limit::LimitStream; -use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ all_alias_free_columns, new_projections_for_columns, update_expr, ProjectionExec, }; use crate::stream::RecordBatchStreamAdapter; use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use arrow::datatypes::{Schema, SchemaRef}; use datafusion_common::{internal_err, plan_err, Result}; diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 85de1eefce2e4..b209aa8ffed49 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -24,13 +24,15 @@ use arrow::{ use std::mem::size_of; use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; -use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; use crate::spill::get_record_batch_memory_size; use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; use datafusion_common::HashMap; use datafusion_common::Result; +use datafusion_execution::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, +}; use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, runtime_env::RuntimeEnv, diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 68d1803b7133e..b5fae137b7461 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -27,7 +27,6 @@ use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; use super::{ - metrics::{ExecutionPlanMetricsSet, MetricsSet}, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -35,9 +34,11 @@ use super::{ use crate::execution_plan::{ boundedness_from_children, emission_type_from_children, InvariantLevel, }; -use crate::metrics::BaselineMetrics; use crate::projection::{make_with_child, ProjectionExec}; use crate::stream::ObservedStream; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 430391de5922d..5aaeb0cdc0d5d 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -21,11 +21,10 @@ use std::cmp::{self, Ordering}; use std::task::{ready, Poll}; use std::{any::Any, sync::Arc}; -use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; -use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; +use super::DisplayAs; use crate::{ - DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream, - SendableRecordBatchStream, + DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use arrow::array::{ @@ -41,6 +40,9 @@ use arrow_ord::cmp::lt; use datafusion_common::{ exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions, }; +use datafusion_execution::metrics::{ + self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, +}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 0d9c58b3bf496..1b4498798219b 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -28,7 +28,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::utils::create_schema; -use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, window_equivalence_properties, @@ -54,6 +53,9 @@ use datafusion_common::utils::{ use datafusion_common::{ arrow_datafusion_err, exec_err, DataFusionError, HashMap, Result, }; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use datafusion_execution::TaskContext; use datafusion_expr::window_state::{PartitionBatchState, WindowAggState}; use datafusion_expr::ColumnarValue; diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index d31fd66ca1f14..abd96087bb158 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -24,7 +24,6 @@ use std::task::{Context, Poll}; use super::utils::create_schema; use crate::execution_plan::EmissionType; -use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, window_equivalence_properties, @@ -34,6 +33,9 @@ use crate::{ ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, }; +use datafusion_execution::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use arrow::array::ArrayRef; use arrow::compute::{concat, concat_batches}; diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index d3d29bfad7cec..832b136a0aead 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -23,10 +23,10 @@ use std::sync::{Arc, Mutex}; use crate::execution_plan::{Boundedness, EmissionType}; use crate::memory::MemoryStream; use crate::{ - metrics::{ExecutionPlanMetricsSet, MetricsSet}, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, Statistics, }; -use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use datafusion_execution::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch;