Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.executor.memory=32G \
--conf spark.executor.cores=8 \
--conf spark.cores.max=8 \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=10g \
Comment on lines +60 to +61
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enable unified memory management

--jars $COMET_JAR \
--conf spark.driver.extraClassPath=$COMET_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.comet.cast.allowIncompatible=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
tpcbench.py \
--benchmark tpch \
Expand Down
85 changes: 57 additions & 28 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,64 @@ under the License.

Comet provides some tuning options to help you get the best performance from your queries.

## Metrics
## Memory Tuning

Comet metrics are not directly comparable to Spark metrics in some cases.
Comet provides two options for memory management:

`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds *per batch* which can result in a large loss of precision. In one case we saw total scan time
of 41 seconds reported as 23 seconds for example.
- **Unified Memory Management** shares an off-heap memory pool between Spark and Comet. This is the recommended option.
- **Native Memory Management** leverages DataFusion's memory management for the native plans and allocates memory independently of Spark.

## Memory Tuning
### Unified Memory Management

Comet currently doesn't share the memory allocation from Spark but owns its own memory allocation.
That's said, Comet requires additional memory to be allocated. Comet provides some memory related configurations to help you tune the memory usage.
This option is automatically enabled when `spark.memory.offHeap.enabled=true`.

By default, the amount of memory is `spark.comet.memory.overhead.factor` * `spark.executor.memory`.
The default value for `spark.comet.memory.overhead.factor` is 0.2. You can increase the factor to require more
memory for Comet to use, if you see OOM error. Generally, increasing memory overhead will improve the performance of your queries.
For example, some operators like `SortMergeJoin` and `HashAggregate` may require more memory to run.
Once the memory is not enough, the operator will spill to disk, which will slow down the query.
Each executor will have a single memory pool which will be shared by all native plans being executed within that
process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`.

Besides, you can also set the memory explicitly by setting `spark.comet.memoryOverhead` to the desired value.
Comet will allocate at least `spark.comet.memory.overhead.min` memory.
### Native Memory Management

This option is automatically enabled when `spark.memory.offHeap.enabled=false`.

Each native plan has a dedicated memory pool.

By default, the size of each pool is `spark.comet.memory.overhead.factor` \* `spark.executor.memory`. The default value
for `spark.comet.memory.overhead.factor` is 0.2.

It is important to take executor concurrency into account. The maximum number of concurrent plans in an executor can
be calculated with `spark.executor.cores` / `spark.task.cpus`.

For example, if the executor can execute 4 plans concurrently, then the total amount of memory allocated will be
4 _ `spark.comet.memory.overhead.factor` _ `spark.executor.memory`.

It is also possible to set `spark.comet.memoryOverhead` to the desired size for each pool, rather than calculating
it based on `spark.comet.memory.overhead.factor`.

If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` are set, the former will be used.

Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool.

### Determining How Much Memory to Allocate

Generally, increasing memory overhead will improve the performance of your queries.
For example, some operators like `SortMergeJoin` and `HashAggregate` may require more memory to run.
Once the memory is not enough, the operator will spill to disk, which will slow down the query.

You can increase the amount of memory for Comet to use, if you see OOM error.

## Memory Tuning using CometPlugin
Configuring memory for Spark and Comet might be a tedious task as it requires to tune Spark executor overhead memory and Comet memory overhead configs. Comet provides a Spark plugin `CometPlugin` which can be set up to your Spark application to help memory settings.

For users running the Comet in clusters like Kubernetes or YARN, `CometPlugin` can also make the resource manager respect correctly Comet memory parameters `spark.comet.memory*`.
it is needed to pass to the starting command line additional Spark configuration parameter `--conf spark.plugins=org.apache.spark.CometPlugin`
Configuring memory for Spark and Comet might be a tedious task as it requires to tune Spark executor overhead memory
and Comet memory overhead configs. Comet provides a Spark plugin `CometPlugin` which can be set up to your Spark
application to help memory settings.

For users running the Comet in clusters like Kubernetes or YARN, `CometPlugin` can also make the resource manager
respect correctly Comet memory parameters `spark.comet.memory*`.
It is needed to pass to the starting command line additional Spark configuration parameter `--conf spark.plugins=org.apache.spark.CometPlugin`

The resource managers respects Apache Spark memory configuration before starting the containers.

The `CometPlugin` plugin overrides `spark.executor.memoryOverhead` adding up the Comet memory configuration.


## Shuffle

Comet provides Comet shuffle features that can be used to improve the performance of your queries.
Expand All @@ -71,19 +94,21 @@ spark.comet.exec.shuffle.enabled=true
`spark.shuffle.manager` is a Spark static configuration which cannot be changed at runtime.
It must be set before the Spark context is created. You can enable or disable Comet shuffle
at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`.
Once it is disabled, Comet will fallback to the default Spark shuffle manager.

> **_NOTE:_** At the moment Comet Shuffle is not compatible with Spark AQE partition coalesce. To disable set `spark.sql.adaptive.coalescePartitions.enabled` to `false`.
Once it is disabled, Comet will fall back to the default Spark shuffle manager.

### Shuffle Mode

Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto Mode.

#### Auto Mode

`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan. This
is the default.

#### Columnar Shuffle

By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses JVM-based columnar shuffle
to improve the performance of shuffle operations. Columnar shuffle supports HashPartitioning,
RoundRobinPartitioning, RangePartitioning and SinglePartitioning. This mode has the highest
Comet Columnar shuffle is JVM-based and supports `HashPartitioning`,
`RoundRobinPartitioning`, `RangePartitioning` and `SinglePartitioning`. This mode has the highest
query coverage.

Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to `jvm`.
Expand All @@ -93,8 +118,12 @@ Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to `j
Comet also provides a fully native shuffle implementation that can be used to improve the performance.
To enable native shuffle, just set `spark.comet.exec.shuffle.mode` to `native`

Native shuffle only supports HashPartitioning and SinglePartitioning.
Native shuffle only supports `HashPartitioning` and `SinglePartitioning`.

### Auto Mode
## Metrics

`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan.
Comet metrics are not directly comparable to Spark metrics in some cases.

`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision. In one case we saw total scan time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably get away from exact numbers, just highlight the loss of precision can be twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change but looks like I failed to push this before merging the PR. I will address in my next PR.

of 41 seconds reported as 23 seconds for example.