Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ import org.apache.comet.shims.ShimCometConf
*/
object CometConf extends ShimCometConf {

private val METRICS_GUIDE = "For more information, refer to the Comet Metrics " +
"Guide (https://datafusion.apache.org/comet/user-guide/metrics.html)"

private val TUNING_GUIDE = "For more information, refer to the Comet Tuning " +
"Guide (https://datafusion.apache.org/comet/user-guide/tuning.html)"

Expand Down Expand Up @@ -414,6 +417,12 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_ENABLE_DETAILED_METRICS: ConfigEntry[Boolean] =
conf("spark.comet.metrics.detailed")
.doc(s"Enable this option to see additional SQL metrics. $METRICS_GUIDE.")
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explainFallback.enabled")
.doc(
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer
Configuration Settings <user-guide/configs>
Compatibility Guide <user-guide/compatibility>
Tuning Guide <user-guide/tuning>
Metrics Guide <user-guide/metrics>

.. _toc.contributor-guide-links:
.. toctree::
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Comet provides the following configuration settings.
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.metrics.detailed | Enable this option to see additional SQL metrics. For more information, refer to the Comet Metrics Guide (https://datafusion.apache.org/comet/user-guide/metrics.html). | false |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false |
| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false |
Expand Down
64 changes: 64 additions & 0 deletions docs/source/user-guide/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Comet Metrics

## Spark SQL Metrics

Set `spark.comet.metrics.detailed=true` to see all available Comet metrics.

### CometScanExec

| Metric | Description |
| ----------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `scan time` | Total time to scan a Parquet file. This is not comparable to the same metric in Spark because Comet's scan metric is more accurate. Although both Comet and Spark measure the time in nanoseconds, Spark rounds this time to the nearest millisecond per batch and Comet does not. |

### Exchange

Comet adds some additional metrics:

| Metric | Description |
| ------------------------------- | ----------------------------------------------------------------------------------------- |
| `ipc time` | Time to encode batches in IPC format. Includes compression time. |
| `native shuffle time` | Total time spent in native shuffle writer, excluding the execution time of the input plan |
| `native shuffle input time` | Time spend executing the shuffle input plan and fetching batches. |
| `shuffle wall time (inclusive)` | Total time executing the shuffle write, inclusive of executing the input plan. |

## Native Metrics

Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are
logged for each native plan (and there is one plan per task, so this is very verbose).

Here is a guide to some of the native metrics.

### ScanExec

| Metric | Description |
| ----------------- | --------------------------------------------------------------------------------------------------- |
| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. |
| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. |
| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. |

### ShuffleWriterExec

| Metric | Description |
| ----------------- | ----------------------------------------------------- |
| `elapsed_compute` | Total time excluding any child operators. |
| `input_time` | Time spent executing input plan and fetching batches. |
| `write_time` | Time spent writing bytes to disk. |
25 changes: 0 additions & 25 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,28 +102,3 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin

To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set,
then any shuffle operations that cannot be supported in this mode will fall back to Spark.

## Metrics

### Spark SQL Metrics

Some Comet metrics are not directly comparable to Spark metrics in some cases:

- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times
between Spark and Comet.

### Native Metrics

Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are
logged for each native plan (and there is one plan per task, so this is very verbose).

Here is a guide to some of the native metrics.

### ScanExec

| Metric | Description |
| ----------------- | --------------------------------------------------------------------------------------------------- |
| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. |
| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. |
| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. |
4 changes: 4 additions & 0 deletions native/core/benches/row_columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use comet::execution::shuffle::row::{
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::physical_plan::metrics::Time;
use tempfile::Builder;

const NUM_ROWS: usize = 10000;
Expand Down Expand Up @@ -63,6 +64,8 @@ fn benchmark(c: &mut Criterion) {
let row_size_ptr = row_sizes.as_mut_ptr();
let schema = vec![ArrowDataType::Int64; NUM_COLS];

let ipc_time = Time::default();

b.iter(|| {
let tempfile = Builder::new().tempfile().unwrap();

Expand All @@ -77,6 +80,7 @@ fn benchmark(c: &mut Criterion) {
false,
0,
None,
&ipc_time,
)
.unwrap();
});
Expand Down
6 changes: 4 additions & 2 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

//! Define JNI APIs which can be called from Java/Scala.

use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use datafusion::physical_plan::metrics::Time;
use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
Expand All @@ -40,8 +42,6 @@ use jni::{
use std::time::{Duration, Instant};
use std::{collections::HashMap, sync::Arc, task::Poll};

use super::{serde, utils::SparkArrowConvert, CometMemoryPool};

use crate::{
errors::{try_unwrap_or_throw, CometError, CometResult},
execution::{
Expand Down Expand Up @@ -455,6 +455,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled: jboolean,
checksum_algo: jint,
current_checksum: jlong,
ipc_time: &Time,
) -> jlongArray {
try_unwrap_or_throw(&e, |mut env| unsafe {
let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;
Expand Down Expand Up @@ -493,6 +494,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled,
checksum_algo,
current_checksum,
ipc_time,
)?;

let checksum = if let Some(checksum) = checksum {
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct ScanExec {
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
pub(crate) baseline_metrics: BaselineMetrics,
/// Time waiting for JVM input plan to execute and return batches
jvm_fetch_time: Time,
/// Time spent in FFI
Expand Down
4 changes: 3 additions & 1 deletion native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use arrow_array::{
Array, ArrayRef, RecordBatch, RecordBatchOptions,
};
use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit};
use datafusion::physical_plan::metrics::Time;
use jni::sys::{jint, jlong};
use std::{
fs::OpenOptions,
Expand Down Expand Up @@ -3295,6 +3296,7 @@ pub fn process_sorted_row_partition(
// this is the initial checksum for this method, as it also gets updated iteratively
// inside the loop within the method across batches.
initial_checksum: Option<u32>,
ipc_time: &Time,
) -> Result<(i64, Option<u32>), CometError> {
// TODO: We can tune this parameter automatically based on row size and cache size.
let row_step = 10;
Expand Down Expand Up @@ -3354,7 +3356,7 @@ pub fn process_sorted_row_partition(
let mut frozen: Vec<u8> = vec![];
let mut cursor = Cursor::new(&mut frozen);
cursor.seek(SeekFrom::End(0))?;
written += write_ipc_compressed(&batch, &mut cursor)?;
written += write_ipc_compressed(&batch, &mut cursor, ipc_time)?;

if let Some(checksum) = &mut current_checksum {
checksum.update(&mut cursor)?;
Expand Down
Loading
Loading