From ff12c69900fe70bdfecc6593a526c954884e4056 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 12:18:05 -0700 Subject: [PATCH 01/12] show shuffle wall time --- .../core/src/execution/shuffle/shuffle_writer.rs | 15 ++++++++++----- .../spark/sql/comet/CometCollectLimitExec.scala | 3 ++- .../apache/spark/sql/comet/CometMetricNode.scala | 8 ++++++++ .../comet/CometTakeOrderedAndProjectExec.scala | 3 ++- .../shuffle/CometShuffleExchangeExec.scala | 11 +++++++++-- 5 files changed, 31 insertions(+), 9 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 7587ff06dc..8aa10ae707 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -628,6 +628,9 @@ struct ShuffleRepartitionerMetrics { /// metrics baseline: BaselineMetrics, + /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. + write_time: Time, + /// count of spills during the execution of the operator spill_count: Count, @@ -640,8 +643,10 @@ struct ShuffleRepartitionerMetrics { impl ShuffleRepartitionerMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition); Self { baseline: BaselineMetrics::new(metrics, partition), + write_time, spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), data_size: MetricBuilder::new(metrics).counter("data_size", partition), @@ -872,7 +877,7 @@ impl ShuffleRepartitioner { .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; for i in 0..num_output_partitions { - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); offsets[i] = output_data.stream_position()?; output_data.write_all(&output_batches[i])?; @@ -885,7 +890,7 @@ impl ShuffleRepartitioner { for spill in &output_spills { let length = spill.offsets[i + 1] - spill.offsets[i]; if length > 0 { - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); let mut spill_file = BufReader::new(File::open(spill.file.path()).map_err(|e| { @@ -900,7 +905,7 @@ impl ShuffleRepartitioner { } } } - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); output_data.flush()?; timer.stop(); @@ -909,7 +914,7 @@ impl ShuffleRepartitioner { .stream_position() .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); let mut output_index = BufWriter::new(File::create(index_file).map_err(|e| { @@ -959,7 +964,7 @@ impl ShuffleRepartitioner { return Ok(0); } - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); let spillfile = self .runtime diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 8ea0b17654..f75af5076b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -57,7 +57,8 @@ case class CometCollectLimitExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, - "number of partitions")) ++ readMetrics ++ writeMetrics + "number of partitions")) ++ readMetrics ++ writeMetrics ++ CometMetricNode.shuffleMetrics( + sparkContext) private lazy val serializer: Serializer = new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 47c89d9433..6642cc0c6c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -130,6 +130,14 @@ object CometMetricNode { "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows")) } + def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = { + Map( + "elapsed_compute" -> SQLMetrics.createNanoTimingMetric( + sc, + "total native shuffle write time"), + "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sc, "shuffle wall time")) + } + /** * Creates a [[CometMetricNode]] from a [[CometPlan]]. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 5582f4d687..19586628a7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -57,7 +57,8 @@ case class CometTakeOrderedAndProjectExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, - "number of partitions")) ++ readMetrics ++ writeMetrics + "number of partitions")) ++ readMetrics ++ writeMetrics ++ CometMetricNode.shuffleMetrics( + sparkContext) private lazy val serializer: Serializer = new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index b1dd9ac836..a1b8d5504d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -79,7 +79,8 @@ case class CometShuffleExchangeExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, - "number of partitions")) ++ readMetrics ++ writeMetrics + "number of partitions")) ++ readMetrics ++ writeMetrics ++ CometMetricNode.shuffleMetrics( + sparkContext) override def nodeName: String = if (shuffleType == CometNativeShuffle) { "CometExchange" @@ -464,6 +465,9 @@ class CometShuffleWriteProcessor( mapId: Long, mapIndex: Int, context: TaskContext): MapStatus = { + + val startTime = System.nanoTime() + val metricsReporter = createMetricsReporter(context) val shuffleBlockResolver = SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver] @@ -481,7 +485,7 @@ class CometShuffleWriteProcessor( val nativeSQLMetrics = Map( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), - "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) + "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId @@ -524,6 +528,9 @@ class CometShuffleWriteProcessor( partitionLengths, Array.empty, // TODO: add checksums tempDataFilePath.toFile) + + metrics("shuffleWallTime").add(System.nanoTime() - startTime) + MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId) } From 0034bc0c70a74bef4fb12f26ee2354c84f228419 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 12:35:37 -0700 Subject: [PATCH 02/12] use write_time consistently --- .../core/src/execution/shuffle/shuffle_writer.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 8aa10ae707..4ec2a4830d 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -271,7 +271,7 @@ impl PartitionBuffer { columns: &[ArrayRef], indices: &[usize], start_index: usize, - time_metric: &Time, + write_time: &Time, ) -> AppendRowStatus { let mut mem_diff = 0; let mut start = start_index; @@ -293,7 +293,7 @@ impl PartitionBuffer { }); self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { - let mut timer = time_metric.timer(); + let mut timer = write_time.timer(); let flush = self.flush(); if let Err(e) = flush { return AppendRowStatus::MemDiff(Err(e)); @@ -1000,12 +1000,11 @@ impl ShuffleRepartitioner { let output = &mut self.buffered_partitions[partition_id]; - let time_metric = self.metrics.baseline.elapsed_compute(); - // If the range of indices is not big enough, just appending the rows into // active array builders instead of directly adding them as a record batch. let mut start_index: usize = 0; - let mut output_ret = output.append_rows(columns, indices, start_index, time_metric); + let mut output_ret = + output.append_rows(columns, indices, start_index, &self.metrics.write_time); loop { match output_ret { @@ -1022,10 +1021,9 @@ impl ShuffleRepartitioner { let output = &mut self.buffered_partitions[partition_id]; output.reservation.free(); - let time_metric = self.metrics.baseline.elapsed_compute(); - start_index = new_start; - output_ret = output.append_rows(columns, indices, start_index, time_metric); + output_ret = + output.append_rows(columns, indices, start_index, &self.metrics.write_time); if let AppendRowStatus::StartIndex(new_start) = output_ret { if new_start == start_index { From 4ec6ed6f0473699acb93aa7e8676230cab8354bd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 13:18:05 -0700 Subject: [PATCH 03/12] record native shuffle elapsed compute --- .../src/execution/shuffle/shuffle_writer.rs | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 4ec2a4830d..e01e0a09de 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -17,17 +17,6 @@ //! Defines the External shuffle repartition plan. -use std::{ - any::Any, - fmt, - fmt::{Debug, Formatter}, - fs::{File, OpenOptions}, - io::{BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write}, - path::Path, - sync::Arc, - task::{Context, Poll}, -}; - use arrow::{datatypes::*, ipc::writer::StreamWriter}; use async_trait::async_trait; use bytes::Buf; @@ -59,6 +48,17 @@ use futures::executor::block_on; use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use simd_adler32::Adler32; +use std::time::Instant; +use std::{ + any::Any, + fmt, + fmt::{Debug, Formatter}, + fs::{File, OpenOptions}, + io::{BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write}, + path::Path, + sync::Arc, + task::{Context, Poll}, +}; use crate::{ common::bit::ceil, @@ -630,7 +630,7 @@ struct ShuffleRepartitionerMetrics { /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. write_time: Time, - + //other_time: Time, /// count of spills during the execution of the operator spill_count: Count, @@ -706,6 +706,7 @@ impl ShuffleRepartitioner { /// This function will slice input batch according to configured batch size and then /// shuffle rows into corresponding partition buffer. async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + let start_time = Instant::now(); let mut start = 0; while start < batch.num_rows() { let end = (start + self.batch_size).min(batch.num_rows()); @@ -713,6 +714,10 @@ impl ShuffleRepartitioner { self.partitioning_batch(batch).await?; start = end; } + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); Ok(()) } @@ -853,6 +858,7 @@ impl ShuffleRepartitioner { /// Writes buffered shuffled record batches into Arrow IPC bytes. async fn shuffle_write(&mut self) -> Result { + let mut elapsed_compute = self.metrics.baseline.elapsed_compute().timer(); let num_output_partitions = self.num_output_partitions; let buffered_partitions = &mut self.buffered_partitions; let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; @@ -932,6 +938,8 @@ impl ShuffleRepartitioner { let used = self.reservation.size(); self.reservation.shrink(used); + elapsed_compute.stop(); + // shuffle writer always has empty output Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?)) } From 565c27aa5265bb2634c86b1251b4ecbc002ba1f0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 14:16:39 -0700 Subject: [PATCH 04/12] save progress --- docs/source/user-guide/metrics.md | 61 +++++++++++++++++++ docs/source/user-guide/tuning.md | 25 -------- native/core/src/execution/metrics/utils.rs | 1 + .../shuffle/CometShuffleExchangeExec.scala | 2 + 4 files changed, 64 insertions(+), 25 deletions(-) create mode 100644 docs/source/user-guide/metrics.md diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md new file mode 100644 index 0000000000..be883b7a56 --- /dev/null +++ b/docs/source/user-guide/metrics.md @@ -0,0 +1,61 @@ + + +# Comet Metrics + +## Spark SQL Metrics + +Some Comet metrics are not directly comparable to Spark metrics in some cases: + +### CometScanExec + +`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to +milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times +between Spark and Comet. + +### Exchange + +Comet adds some additional metrics: + +| Metric | Description | +|-------------------|---------------------------------------------------------------------------------------------| +| `nativeWallTime` | Total time spent in native shuffle writer, exluding the execution time of the input plan | + + +## Native Metrics + +Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are +logged for each native plan (and there is one plan per task, so this is very verbose). + +Here is a guide to some of the native metrics. + +### ScanExec + +| Metric | Description | +| ----------------- | --------------------------------------------------------------------------------------------------- | +| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. | +| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. | +| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. | + +### ShuffleWriterExec + +| Metric | Description | +|-------------------|-------------------------------------------| +| `elapsed_compute` | Total time excluding any child operators. | +| `srite_time` | Time spent writing bytes to disk. | diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index af722494f9..56303909bd 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -102,28 +102,3 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set, then any shuffle operations that cannot be supported in this mode will fall back to Spark. - -## Metrics - -### Spark SQL Metrics - -Some Comet metrics are not directly comparable to Spark metrics in some cases: - -- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to - milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times - between Spark and Comet. - -### Native Metrics - -Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are -logged for each native plan (and there is one plan per task, so this is very verbose). - -Here is a guide to some of the native metrics. - -### ScanExec - -| Metric | Description | -| ----------------- | --------------------------------------------------------------------------------------------------- | -| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. | -| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. | -| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. | diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 0eb4b631dd..5d11ccb6cf 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -73,6 +73,7 @@ pub fn update_comet_metric( comet_metric_node(metric_node).get_child_node(i as i32) -> JObject )?; if child_metric_node.is_null() { + println!("Missing JVM metric node for {}", child_plan.plan_id); continue; } update_comet_metric(env, &child_metric_node, child_plan, metrics_jstrings)?; diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index a1b8d5504d..f80c6ea88c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -485,12 +485,14 @@ class CometShuffleWriteProcessor( val nativeSQLMetrics = Map( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), + "elapsed_compute" -> metrics("elapsed_compute"), "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2) + // create the native plan for the ShuffleWriterExec val cometIter = CometExec.getCometIterator( Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]), outputAttributes.length, From d3f00bf1298130809a63dd7897bbc86a530399ea Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 14:29:40 -0700 Subject: [PATCH 05/12] input_time --- docs/source/index.rst | 1 + docs/source/user-guide/metrics.md | 9 +++---- .../src/execution/shuffle/shuffle_writer.rs | 24 +++++++++++++++++-- .../spark/sql/comet/CometMetricNode.scala | 3 ++- .../shuffle/CometShuffleExchangeExec.scala | 1 + 5 files changed, 31 insertions(+), 7 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index 39ad27a57c..21ec36ca95 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -51,6 +51,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Configuration Settings Compatibility Guide Tuning Guide + Metrics Guide .. _toc.contributor-guide-links: .. toctree:: diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index be883b7a56..e8e5fdda9b 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -55,7 +55,8 @@ Here is a guide to some of the native metrics. ### ShuffleWriterExec -| Metric | Description | -|-------------------|-------------------------------------------| -| `elapsed_compute` | Total time excluding any child operators. | -| `srite_time` | Time spent writing bytes to disk. | +| Metric | Description | +|-------------------|-------------------------------------------------------| +| `elapsed_compute` | Total time excluding any child operators. | +| `input_time` | Time spent executing input plan and fetching batches. | +| `write_time` | Time spent writing bytes to disk. | diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index e01e0a09de..6fb1396bd6 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -137,9 +137,13 @@ impl ExecutionPlan for ShuffleWriterExec { partition: usize, context: Arc, ) -> Result { - let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); + // execute the child plan + let start_time = Instant::now(); + let input = self.input.execute(partition, Arc::clone(&context))?; + metrics.input_time.add_duration(start_time.elapsed()); + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once( @@ -628,8 +632,12 @@ struct ShuffleRepartitionerMetrics { /// metrics baseline: BaselineMetrics, + /// Time executing child plan and fetching batches + input_time: Time, + /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. write_time: Time, + //other_time: Time, /// count of spills during the execution of the operator spill_count: Count, @@ -643,9 +651,11 @@ struct ShuffleRepartitionerMetrics { impl ShuffleRepartitionerMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + let input_time = MetricBuilder::new(metrics).subset_time("input_time", partition); let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition); Self { baseline: BaselineMetrics::new(metrics, partition), + input_time, write_time, spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), @@ -1115,7 +1125,7 @@ async fn external_shuffle( context.session_config().batch_size(), ); - while let Some(batch) = input.next().await { + while let Some(batch) = fetch_next_batch(&mut input, &repartitioner.metrics.input_time).await { // Block on the repartitioner to insert the batch and shuffle the rows // into the corresponding partition buffer. // Otherwise, pull the next batch from the input stream might overwrite the @@ -1125,6 +1135,16 @@ async fn external_shuffle( repartitioner.shuffle_write().await } +async fn fetch_next_batch( + input: &mut SendableRecordBatchStream, + input_time: &Time, +) -> Option> { + let mut input_time = input_time.timer(); + let next_batch = input.next().await; + input_time.stop(); + next_batch +} + fn new_array_builders(schema: &SchemaRef, batch_size: usize) -> Vec> { schema .fields() diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 6642cc0c6c..480f466fd8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -134,7 +134,8 @@ object CometMetricNode { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric( sc, - "total native shuffle write time"), + "native shuffle writer wall time"), + "input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer input time"), "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sc, "shuffle wall time")) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index f80c6ea88c..3ddcbb193c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -486,6 +486,7 @@ class CometShuffleWriteProcessor( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), "elapsed_compute" -> metrics("elapsed_compute"), + "input_time" -> metrics("input_time"), "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) val nativeMetrics = CometMetricNode(nativeSQLMetrics) From cfc72f5e4b27d32c2c3ddd5214629e2f8e1cc9e6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 14:34:15 -0700 Subject: [PATCH 06/12] save --- docs/source/user-guide/metrics.md | 11 ++++++----- .../org/apache/spark/sql/comet/CometMetricNode.scala | 6 +++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index e8e5fdda9b..691e679cba 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -33,10 +33,11 @@ between Spark and Comet. Comet adds some additional metrics: -| Metric | Description | -|-------------------|---------------------------------------------------------------------------------------------| -| `nativeWallTime` | Total time spent in native shuffle writer, exluding the execution time of the input plan | - +| Metric | Description | +| ------------------------------- | ----------------------------------------------------------------------------------------- | +| `native shuffle time` | Total time spent in native shuffle writer, excluding the execution time of the input plan | +| `native shuffle input time` | Time spend executing the shuffle input plan and fetching batches. | +| `shuffle wall time (inclusive)` | Total time executing the shuffle write, inclusive of executing the input plan. | ## Native Metrics @@ -56,7 +57,7 @@ Here is a guide to some of the native metrics. ### ShuffleWriterExec | Metric | Description | -|-------------------|-------------------------------------------------------| +| ----------------- | ----------------------------------------------------- | | `elapsed_compute` | Total time excluding any child operators. | | `input_time` | Time spent executing input plan and fetching batches. | | `write_time` | Time spent writing bytes to disk. | diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 480f466fd8..94ca6c1deb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -134,9 +134,9 @@ object CometMetricNode { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric( sc, - "native shuffle writer wall time"), - "input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer input time"), - "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sc, "shuffle wall time")) + "native shuffle time"), + "input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"), + "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sc, "shuffle wall time (inclusive)")) } /** From b26322e1d15214a66dc95efc4b4a335a2879bde4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 15:16:37 -0700 Subject: [PATCH 07/12] Ready for review --- .../scala/org/apache/comet/CometConf.scala | 9 +++++++++ docs/source/user-guide/configs.md | 1 + docs/source/user-guide/metrics.md | 2 +- native/core/src/execution/metrics/utils.rs | 1 - native/core/src/execution/operators/scan.rs | 2 +- .../src/execution/shuffle/shuffle_writer.rs | 9 +++++++++ .../spark/sql/comet/CometMetricNode.scala | 18 ++++++++++++------ .../shuffle/CometShuffleExchangeExec.scala | 10 +++++----- 8 files changed, 38 insertions(+), 14 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index b602d7cf1c..76551d05f9 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -45,6 +45,9 @@ import org.apache.comet.shims.ShimCometConf */ object CometConf extends ShimCometConf { + private val METRICS_GUIDE = "For more information, refer to the Comet Metrics " + + "Guide (https://datafusion.apache.org/comet/user-guide/metrics.html)" + private val TUNING_GUIDE = "For more information, refer to the Comet Tuning " + "Guide (https://datafusion.apache.org/comet/user-guide/tuning.html)" @@ -414,6 +417,12 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_ENABLE_DETAILED_METRICS: ConfigEntry[Boolean] = + conf("spark.comet.metrics.detailed") + .doc(s"Enable this option to see additional SQL metrics. $METRICS_GUIDE.") + .booleanConf + .createWithDefault(false) + val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.explainFallback.enabled") .doc( diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 69da792223..790c900195 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -64,6 +64,7 @@ Comet provides the following configuration settings. | spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | +| spark.comet.metrics.detailed | Enable this option to see additional SQL metrics. For more information, refer to the Comet Metrics Guide (https://datafusion.apache.org/comet/user-guide/metrics.html). | false | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false | | spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false | diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index 691e679cba..f64eb13b2b 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -21,7 +21,7 @@ under the License. ## Spark SQL Metrics -Some Comet metrics are not directly comparable to Spark metrics in some cases: +Set `spark.comet.metrics.detailed=true` to see all available Comet metrics. ### CometScanExec diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 5d11ccb6cf..0eb4b631dd 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -73,7 +73,6 @@ pub fn update_comet_metric( comet_metric_node(metric_node).get_child_node(i as i32) -> JObject )?; if child_metric_node.is_null() { - println!("Missing JVM metric node for {}", child_plan.plan_id); continue; } update_comet_metric(env, &child_metric_node, child_plan, metrics_jstrings)?; diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index a297f87c1f..007cdc1d26 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -75,7 +75,7 @@ pub struct ScanExec { /// Metrics collector metrics: ExecutionPlanMetricsSet, /// Baseline metrics - baseline_metrics: BaselineMetrics, + pub(crate) baseline_metrics: BaselineMetrics, /// Time waiting for JVM input plan to execute and return batches jvm_fetch_time: Time, /// Time spent in FFI diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 6fb1396bd6..299f740f46 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -60,6 +60,7 @@ use std::{ task::{Context, Poll}, }; +use crate::execution::operators::ScanExec; use crate::{ common::bit::ceil, errors::{CometError, CometResult}, @@ -139,6 +140,14 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result { let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); + if let Some(scan) = self.input.as_any().downcast_ref::() { + // ScanExec starts executing and fetching data during query planning time so we + // need to capture that time here to ensure that we have accurate metrics + metrics + .input_time + .add(scan.baseline_metrics.elapsed_compute()); + } + // execute the child plan let start_time = Instant::now(); let input = self.input.execute(partition, Arc::clone(&context))?; diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 94ca6c1deb..e333bce929 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -24,6 +24,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.comet.CometConf + /** * A node carrying SQL metrics from SparkPlan, and metrics of its children. Native code will call * [[getChildNode]] and [[set]] to update the metrics. @@ -131,12 +133,16 @@ object CometMetricNode { } def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = { - Map( - "elapsed_compute" -> SQLMetrics.createNanoTimingMetric( - sc, - "native shuffle time"), - "input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"), - "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sc, "shuffle wall time (inclusive)")) + if (CometConf.COMET_ENABLE_DETAILED_METRICS.get()) { + Map( + "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"), + "input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"), + "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric( + sc, + "shuffle wall time (inclusive)")) + } else { + Map("elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time")) + } } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 3ddcbb193c..d03f2ef979 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -483,11 +483,10 @@ class CometShuffleWriteProcessor( // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( - "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), - "elapsed_compute" -> metrics("elapsed_compute"), - "input_time" -> metrics("input_time"), - "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) + "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), + "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++ + metrics.filterKeys(Seq("elapsed_compute", "input_time").contains) val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId @@ -532,7 +531,8 @@ class CometShuffleWriteProcessor( Array.empty, // TODO: add checksums tempDataFilePath.toFile) - metrics("shuffleWallTime").add(System.nanoTime() - startTime) + // update wall time metric if available + metrics.get("shuffleWallTime").foreach(_.add(System.nanoTime() - startTime)) MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId) } From 3c8f3cfa11254fd0a98ef6042997d901abd5212b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 16:59:14 -0700 Subject: [PATCH 08/12] add IPC time --- docs/source/user-guide/metrics.md | 3 +- native/core/src/execution/jni_api.rs | 6 ++- native/core/src/execution/shuffle/row.rs | 4 +- .../src/execution/shuffle/shuffle_writer.rs | 40 ++++++++++++++----- .../spark/sql/comet/CometMetricNode.scala | 1 + .../shuffle/CometShuffleExchangeExec.scala | 5 ++- 6 files changed, 45 insertions(+), 14 deletions(-) diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index f64eb13b2b..b1e3a67442 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -34,7 +34,8 @@ between Spark and Comet. Comet adds some additional metrics: | Metric | Description | -| ------------------------------- | ----------------------------------------------------------------------------------------- | +|---------------------------------|-------------------------------------------------------------------------------------------| +| `ipc time` | Time to encode batches in IPC format | | `native shuffle time` | Total time spent in native shuffle writer, excluding the execution time of the input plan | | `native shuffle input time` | Time spend executing the shuffle input plan and fetching batches. | | `shuffle wall time (inclusive)` | Total time executing the shuffle write, inclusive of executing the input plan. | diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 491b389c99..9f5adbeb94 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -17,8 +17,10 @@ //! Define JNI APIs which can be called from Java/Scala. +use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; use arrow::datatypes::DataType as ArrowDataType; use arrow_array::RecordBatch; +use datafusion::physical_plan::metrics::Time; use datafusion::{ execution::{ disk_manager::DiskManagerConfig, @@ -40,8 +42,6 @@ use jni::{ use std::time::{Duration, Instant}; use std::{collections::HashMap, sync::Arc, task::Poll}; -use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; - use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, execution::{ @@ -455,6 +455,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative checksum_enabled: jboolean, checksum_algo: jint, current_checksum: jlong, + ipc_time: &Time, ) -> jlongArray { try_unwrap_or_throw(&e, |mut env| unsafe { let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?; @@ -493,6 +494,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative checksum_enabled, checksum_algo, current_checksum, + ipc_time, )?; let checksum = if let Some(checksum) = checksum { diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index ce752e68af..cfaca8ce4c 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -40,6 +40,7 @@ use arrow_array::{ Array, ArrayRef, RecordBatch, RecordBatchOptions, }; use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; +use datafusion::physical_plan::metrics::Time; use jni::sys::{jint, jlong}; use std::{ fs::OpenOptions, @@ -3295,6 +3296,7 @@ pub fn process_sorted_row_partition( // this is the initial checksum for this method, as it also gets updated iteratively // inside the loop within the method across batches. initial_checksum: Option, + ipc_time: &Time, ) -> Result<(i64, Option), CometError> { // TODO: We can tune this parameter automatically based on row size and cache size. let row_step = 10; @@ -3354,7 +3356,7 @@ pub fn process_sorted_row_partition( let mut frozen: Vec = vec![]; let mut cursor = Cursor::new(&mut frozen); cursor.seek(SeekFrom::End(0))?; - written += write_ipc_compressed(&batch, &mut cursor)?; + written += write_ipc_compressed(&batch, &mut cursor, ipc_time)?; if let Some(checksum) = &mut current_checksum { checksum.update(&mut cursor)?; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 299f740f46..ef56dd4735 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -285,6 +285,7 @@ impl PartitionBuffer { indices: &[usize], start_index: usize, write_time: &Time, + ipc_time: &Time, ) -> AppendRowStatus { let mut mem_diff = 0; let mut start = start_index; @@ -307,7 +308,7 @@ impl PartitionBuffer { self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { let mut timer = write_time.timer(); - let flush = self.flush(); + let flush = self.flush(ipc_time); if let Err(e) = flush { return AppendRowStatus::MemDiff(Err(e)); } @@ -326,7 +327,7 @@ impl PartitionBuffer { } /// flush active data into frozen bytes - fn flush(&mut self) -> Result { + fn flush(&mut self, ipc_time: &Time) -> Result { if self.num_active_rows == 0 { return Ok(0); } @@ -343,7 +344,7 @@ impl PartitionBuffer { let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new(&mut self.frozen); cursor.seek(SeekFrom::End(0))?; - write_ipc_compressed(&frozen_batch, &mut cursor)?; + write_ipc_compressed(&frozen_batch, &mut cursor, ipc_time)?; mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; Ok(mem_diff) @@ -647,6 +648,9 @@ struct ShuffleRepartitionerMetrics { /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. write_time: Time, + /// Time encoding batches to IPC format + ipc_time: Time, + //other_time: Time, /// count of spills during the execution of the operator spill_count: Count, @@ -662,10 +666,12 @@ impl ShuffleRepartitionerMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { let input_time = MetricBuilder::new(metrics).subset_time("input_time", partition); let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition); + let ipc_time = MetricBuilder::new(metrics).subset_time("ipc_time", partition); Self { baseline: BaselineMetrics::new(metrics, partition), input_time, write_time, + ipc_time, spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), data_size: MetricBuilder::new(metrics).counter("data_size", partition), @@ -883,7 +889,7 @@ impl ShuffleRepartitioner { let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { - buffered_partitions[i].flush()?; + buffered_partitions[i].flush(&self.metrics.ipc_time)?; output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); } @@ -1001,6 +1007,7 @@ impl ShuffleRepartitioner { &mut self.buffered_partitions, spillfile.path(), self.num_output_partitions, + &self.metrics.ipc_time, )?; timer.stop(); @@ -1030,8 +1037,13 @@ impl ShuffleRepartitioner { // If the range of indices is not big enough, just appending the rows into // active array builders instead of directly adding them as a record batch. let mut start_index: usize = 0; - let mut output_ret = - output.append_rows(columns, indices, start_index, &self.metrics.write_time); + let mut output_ret = output.append_rows( + columns, + indices, + start_index, + &self.metrics.write_time, + &self.metrics.ipc_time, + ); loop { match output_ret { @@ -1049,8 +1061,13 @@ impl ShuffleRepartitioner { output.reservation.free(); start_index = new_start; - output_ret = - output.append_rows(columns, indices, start_index, &self.metrics.write_time); + output_ret = output.append_rows( + columns, + indices, + start_index, + &self.metrics.write_time, + &self.metrics.ipc_time, + ); if let AppendRowStatus::StartIndex(new_start) = output_ret { if new_start == start_index { @@ -1075,11 +1092,12 @@ fn spill_into( buffered_partitions: &mut [PartitionBuffer], path: &Path, num_output_partitions: usize, + ipc_time: &Time, ) -> Result> { let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { - buffered_partitions[i].flush()?; + buffered_partitions[i].flush(ipc_time)?; output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); } let path = path.to_owned(); @@ -1525,10 +1543,12 @@ impl Checksum { pub(crate) fn write_ipc_compressed( batch: &RecordBatch, output: &mut W, + time: &Time, ) -> Result { if batch.num_rows() == 0 { return Ok(0); } + let mut timer = time.timer(); let start_pos = output.stream_position()?; // write ipc_length placeholder @@ -1549,6 +1569,8 @@ pub(crate) fn write_ipc_compressed( output.seek(SeekFrom::Start(start_pos))?; output.write_all(&ipc_length.to_le_bytes()[..])?; + timer.stop(); + output.seek(SeekFrom::Start(end_pos))?; Ok((end_pos - start_pos) as usize) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index e333bce929..fcccfa2272 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -136,6 +136,7 @@ object CometMetricNode { if (CometConf.COMET_ENABLE_DETAILED_METRICS.get()) { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"), + "ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "IPC encoding time"), "input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"), "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric( sc, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index d03f2ef979..1aaafd00e4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -481,12 +481,15 @@ class CometShuffleWriteProcessor( // Call native shuffle write val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename) + // these metrics are only reported when detailed metrics are enabled via config + val detailedMetrics = Seq("elapsed_compute", "input_time", "ipc_time") + // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( "data_size" -> metrics("dataSize"), "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++ - metrics.filterKeys(Seq("elapsed_compute", "input_time").contains) + metrics.filterKeys(detailedMetrics.contains) val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId From 0c854600d706aa628e9aa8cffb9bdc8b292fb9f1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 17:03:02 -0700 Subject: [PATCH 09/12] address feedback --- docs/source/user-guide/metrics.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index b1e3a67442..d4973c0664 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -25,16 +25,16 @@ Set `spark.comet.metrics.detailed=true` to see all available Comet metrics. ### CometScanExec -`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to -milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times -between Spark and Comet. +| Metric | Description | +| ----------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `scan time` | Total time to scan a Parquet file. This is not comparable to the same metric in Spark because Comet's scan metric is more accurate (records time in nanoseconds rather than milliseconds) | ### Exchange Comet adds some additional metrics: | Metric | Description | -|---------------------------------|-------------------------------------------------------------------------------------------| +| ------------------------------- | ----------------------------------------------------------------------------------------- | | `ipc time` | Time to encode batches in IPC format | | `native shuffle time` | Total time spent in native shuffle writer, excluding the execution time of the input plan | | `native shuffle input time` | Time spend executing the shuffle input plan and fetching batches. | From a317fc58509fdf6cd97f3e20c395ff1c07a9f890 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 17:21:56 -0700 Subject: [PATCH 10/12] update benchmark code --- docs/source/user-guide/metrics.md | 4 ++-- native/core/benches/row_columnar.rs | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index d4973c0664..516f1f3eb4 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -34,8 +34,8 @@ Set `spark.comet.metrics.detailed=true` to see all available Comet metrics. Comet adds some additional metrics: | Metric | Description | -| ------------------------------- | ----------------------------------------------------------------------------------------- | -| `ipc time` | Time to encode batches in IPC format | +| ------------------------------- |-------------------------------------------------------------------------------------------| +| `ipc time` | Time to encode batches in IPC format. Includes compression time. | | `native shuffle time` | Total time spent in native shuffle writer, excluding the execution time of the input plan | | `native shuffle input time` | Time spend executing the shuffle input plan and fetching batches. | | `shuffle wall time (inclusive)` | Total time executing the shuffle write, inclusive of executing the input plan. | diff --git a/native/core/benches/row_columnar.rs b/native/core/benches/row_columnar.rs index 60b41330e3..28920440c9 100644 --- a/native/core/benches/row_columnar.rs +++ b/native/core/benches/row_columnar.rs @@ -20,6 +20,7 @@ use comet::execution::shuffle::row::{ process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow, }; use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::physical_plan::metrics::Time; use tempfile::Builder; const NUM_ROWS: usize = 10000; @@ -63,6 +64,8 @@ fn benchmark(c: &mut Criterion) { let row_size_ptr = row_sizes.as_mut_ptr(); let schema = vec![ArrowDataType::Int64; NUM_COLS]; + let ipc_time = Time::default(); + b.iter(|| { let tempfile = Builder::new().tempfile().unwrap(); @@ -77,6 +80,7 @@ fn benchmark(c: &mut Criterion) { false, 0, None, + &ipc_time, ) .unwrap(); }); From 73b4f9becad8921a1c65fc35333c308a8f42a49d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 21:21:43 -0700 Subject: [PATCH 11/12] add input_batches metric --- .../core/src/execution/shuffle/shuffle_writer.rs | 15 ++++++++------- .../apache/spark/sql/comet/CometMetricNode.scala | 1 + .../shuffle/CometShuffleExchangeExec.scala | 4 ++-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index ef56dd4735..152e45a56b 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -651,7 +651,9 @@ struct ShuffleRepartitionerMetrics { /// Time encoding batches to IPC format ipc_time: Time, - //other_time: Time, + /// Number of input batches + input_batches: Count, + /// count of spills during the execution of the operator spill_count: Count, @@ -664,14 +666,12 @@ struct ShuffleRepartitionerMetrics { impl ShuffleRepartitionerMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { - let input_time = MetricBuilder::new(metrics).subset_time("input_time", partition); - let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition); - let ipc_time = MetricBuilder::new(metrics).subset_time("ipc_time", partition); Self { baseline: BaselineMetrics::new(metrics, partition), - input_time, - write_time, - ipc_time, + input_time: MetricBuilder::new(metrics).subset_time("input_time", partition), + write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), + ipc_time: MetricBuilder::new(metrics).subset_time("ipc_time", partition), + input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), data_size: MetricBuilder::new(metrics).counter("data_size", partition), @@ -739,6 +739,7 @@ impl ShuffleRepartitioner { self.partitioning_batch(batch).await?; start = end; } + self.metrics.input_batches.add(1); self.metrics .baseline .elapsed_compute() diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index fcccfa2272..4ac04993e8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -138,6 +138,7 @@ object CometMetricNode { "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"), "ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "IPC encoding time"), "input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"), + "input_batches" -> SQLMetrics.createMetric(sc, "number of input batches"), "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric( sc, "shuffle wall time (inclusive)")) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 1aaafd00e4..1b9dbeaed1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -482,12 +482,12 @@ class CometShuffleWriteProcessor( val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename) // these metrics are only reported when detailed metrics are enabled via config - val detailedMetrics = Seq("elapsed_compute", "input_time", "ipc_time") + val detailedMetrics = Seq("elapsed_compute", "input_time", "ipc_time", "input_batches") // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( - "data_size" -> metrics("dataSize"), "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), + "data_size" -> metrics("dataSize"), "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++ metrics.filterKeys(detailedMetrics.contains) val nativeMetrics = CometMetricNode(nativeSQLMetrics) From bf039f70a30bd256075d67b6d6d577fd7f7ec558 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Dec 2024 07:28:31 -0700 Subject: [PATCH 12/12] address feedback --- docs/source/user-guide/metrics.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index 516f1f3eb4..dbe74c66cf 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -25,16 +25,16 @@ Set `spark.comet.metrics.detailed=true` to see all available Comet metrics. ### CometScanExec -| Metric | Description | -| ----------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `scan time` | Total time to scan a Parquet file. This is not comparable to the same metric in Spark because Comet's scan metric is more accurate (records time in nanoseconds rather than milliseconds) | +| Metric | Description | +| ----------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `scan time` | Total time to scan a Parquet file. This is not comparable to the same metric in Spark because Comet's scan metric is more accurate. Although both Comet and Spark measure the time in nanoseconds, Spark rounds this time to the nearest millisecond per batch and Comet does not. | ### Exchange Comet adds some additional metrics: | Metric | Description | -| ------------------------------- |-------------------------------------------------------------------------------------------| +| ------------------------------- | ----------------------------------------------------------------------------------------- | | `ipc time` | Time to encode batches in IPC format. Includes compression time. | | `native shuffle time` | Total time spent in native shuffle writer, excluding the execution time of the input plan | | `native shuffle input time` | Time spend executing the shuffle input plan and fetching batches. |