diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index b1838ca801..e0d7c1bc93 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -23,7 +23,7 @@ Comet provides some tuning options to help you get the best performance from you ## Memory Tuning -Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`. +Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`. If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark. Each executor will have a single memory pool which will be shared by all native plans being executed within that @@ -105,8 +105,16 @@ then any shuffle operations that cannot be supported in this mode will fall back ## Metrics -Comet metrics are not directly comparable to Spark metrics in some cases. +Some Comet metrics are not directly comparable to Spark metrics in some cases: -`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to -milliseconds _per batch_ which can result in a large loss of precision. In one case we saw total scan time -of 41 seconds reported as 23 seconds for example. +- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to + milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times + between Spark and Comet. + +Comet also adds some custom metrics: + +### ShuffleWriterExec + +| Metric | Description | +| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 7587ff06dc..c79eeeb4a0 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -139,6 +139,7 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result { let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); + let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -151,6 +152,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), metrics, context, + jvm_fetch_time, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -1083,6 +1085,7 @@ impl Debug for ShuffleRepartitioner { } } +#[allow(clippy::too_many_arguments)] async fn external_shuffle( mut input: SendableRecordBatchStream, partition_id: usize, @@ -1091,6 +1094,7 @@ async fn external_shuffle( partitioning: Partitioning, metrics: ShuffleRepartitionerMetrics, context: Arc, + jvm_fetch_time: Time, ) -> Result { let schema = input.schema(); let mut repartitioner = ShuffleRepartitioner::new( @@ -1104,13 +1108,23 @@ async fn external_shuffle( context.session_config().batch_size(), ); - while let Some(batch) = input.next().await { - // Block on the repartitioner to insert the batch and shuffle the rows - // into the corresponding partition buffer. - // Otherwise, pull the next batch from the input stream might overwrite the - // current batch in the repartitioner. - block_on(repartitioner.insert_batch(batch?))?; + loop { + let mut timer = jvm_fetch_time.timer(); + let b = input.next().await; + timer.stop(); + + match b { + Some(batch_result) => { + // Block on the repartitioner to insert the batch and shuffle the rows + // into the corresponding partition buffer. + // Otherwise, pull the next batch from the input stream might overwrite the + // current batch in the repartitioner. + block_on(repartitioner.insert_batch(batch_result?))?; + } + _ => break, + } } + repartitioner.shuffle_write().await } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index a9bd954e99..2cb8a84d94 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -52,7 +52,9 @@ use std::{ }; /// ScanExec reads batches of data from Spark via JNI. The source of the scan could be a file -/// scan or the result of reading a broadcast or shuffle exchange. +/// scan or the result of reading a broadcast or shuffle exchange. ScanExec isn't invoked +/// until the data is already available in the JVM. When CometExecIterator invokes +/// Native.executePlan, it passes in the memory addresses of the input batches. #[derive(Debug, Clone)] pub struct ScanExec { /// The ID of the execution context that owns this subquery. We use this ID to retrieve the JVM @@ -73,6 +75,7 @@ pub struct ScanExec { cache: PlanProperties, /// Metrics collector metrics: ExecutionPlanMetricsSet, + /// Baseline metrics baseline_metrics: BaselineMetrics, } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 6430a7899f..4c3f994f9e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -77,6 +77,9 @@ case class CometShuffleExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time fetching batches from JVM"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics @@ -480,7 +483,14 @@ class CometShuffleWriteProcessor( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) - val nativeMetrics = CometMetricNode(nativeSQLMetrics) + + val nativeMetrics = if (metrics.contains("jvm_fetch_time")) { + CometMetricNode( + nativeSQLMetrics ++ Map("jvm_fetch_time" -> + metrics("jvm_fetch_time"))) + } else { + CometMetricNode(nativeSQLMetrics) + } // Getting rid of the fake partitionId val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2)