From b298f76e5911cc033cf4fa3be5073d92660fbed4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 6 Dec 2024 09:20:12 -0700 Subject: [PATCH 1/3] add some notes on shuffle --- .../contributor-guide/plugin_overview.md | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/docs/source/contributor-guide/plugin_overview.md b/docs/source/contributor-guide/plugin_overview.md index 3e7c24f55a..7c40de42f6 100644 --- a/docs/source/contributor-guide/plugin_overview.md +++ b/docs/source/contributor-guide/plugin_overview.md @@ -87,7 +87,7 @@ In the native code there is a `PhysicalPlanner` struct (in `planner.rs`) which c Apache DataFusion `ExecutionPlan`. In some cases, Comet provides specialized physical operators and expressions to override the DataFusion versions to ensure compatibility with Apache Spark. -The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to +The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to `CometBatchIterator.next()` to fetch the next input batch. The input could be a Comet native Parquet scan, a Spark exchange, or another native plan. @@ -95,6 +95,20 @@ a Spark exchange, or another native plan. partitions. Each call to `CometExecIterator.next()` will invoke `Native.executePlan`. Once the plan finishes executing, the resulting Arrow batches are imported into the JVM using Arrow FFI. +## Shuffle + +Spark uses a shuffle mechanism to transfer data between query stages. The shuffle mechanism is part of Spark Core +rather than Spark SQL and operates on RDDs. + +For shuffle writes, a `ShuffleMapTask` runs in the executors. This task contains a `ShuffleDependency` that is +broadcast to all of the executors. It then passes the input RDD to `ShuffleWriteProcessor.write()` which +requests a `ShuffleWriter` from the shuffle manager, and this is where it gets a Comet shuffle writer. + +`ShuffleWriteProcessor` then invokes the dependency RDD and fetches rows/batches and passes them to the shuffle writer. + +As a result, we cannot avoid having one native plan to produce the shuffle input and another native plan for +writing the batches to the shuffle file. + ## Arrow Due to the hybrid execution model, it is necessary to pass batches of data between the JVM and native code. @@ -103,7 +117,7 @@ Comet uses a combination of Arrow FFI and Arrow IPC to achieve this. ### Arrow FFI -The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a stable ABI-compatible interface for +The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a stable ABI-compatible interface for accessing Arrow data structures from multiple languages. [Arrow C Data Interface]: https://arrow.apache.org/docs/format/CDataInterface.html From e39b3b7be4ea480ac90042241375bebbec0f5ec7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 6 Dec 2024 09:58:00 -0700 Subject: [PATCH 2/3] reads --- docs/source/contributor-guide/plugin_overview.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/source/contributor-guide/plugin_overview.md b/docs/source/contributor-guide/plugin_overview.md index 7c40de42f6..0444968648 100644 --- a/docs/source/contributor-guide/plugin_overview.md +++ b/docs/source/contributor-guide/plugin_overview.md @@ -100,6 +100,8 @@ executing, the resulting Arrow batches are imported into the JVM using Arrow FFI Spark uses a shuffle mechanism to transfer data between query stages. The shuffle mechanism is part of Spark Core rather than Spark SQL and operates on RDDs. +### Shuffle Writes + For shuffle writes, a `ShuffleMapTask` runs in the executors. This task contains a `ShuffleDependency` that is broadcast to all of the executors. It then passes the input RDD to `ShuffleWriteProcessor.write()` which requests a `ShuffleWriter` from the shuffle manager, and this is where it gets a Comet shuffle writer. @@ -109,6 +111,12 @@ requests a `ShuffleWriter` from the shuffle manager, and this is where it gets a As a result, we cannot avoid having one native plan to produce the shuffle input and another native plan for writing the batches to the shuffle file. +### Shuffle Reads + +For shuffle reads a `ShuffledRDD` requests a `ShuffleReader` from the shuffle manager. Comet provides a +`CometBlockStoreShuffleReader` which is implemented in JVM and fetches blocks from Spark and then creates an +`ArrowReaderIterator` to process the blocks using Arrow's `StreamReader` for decoding IPC batches. + ## Arrow Due to the hybrid execution model, it is necessary to pass batches of data between the JVM and native code. From 27d55d4d5640e668c07fe80d81656f139a969178 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 6 Dec 2024 13:01:37 -0700 Subject: [PATCH 3/3] improve docs --- .../contributor-guide/plugin_overview.md | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/docs/source/contributor-guide/plugin_overview.md b/docs/source/contributor-guide/plugin_overview.md index 0444968648..add4172d5c 100644 --- a/docs/source/contributor-guide/plugin_overview.md +++ b/docs/source/contributor-guide/plugin_overview.md @@ -19,17 +19,26 @@ under the License. # Comet Plugin Architecture +## Overview + +The Comet plugin enhances Spark SQL by introducing optimized query execution and shuffle mechanisms leveraging +native code. It integrates with Spark's plugin framework and extension API to replace or extend Spark's +default behavior. + +--- + +# Plugin Components + ## Comet SQL Plugin -The entry point to Comet is the `org.apache.spark.CometPlugin` class, which can be registered with Spark by adding the -following setting to the Spark configuration when launching `spark-shell` or `spark-submit`: +The entry point to Comet is the org.apache.spark.CometPlugin class, which is registered in Spark using the following +configuration: ``` --conf spark.plugins=org.apache.spark.CometPlugin ``` -This class is loaded by Spark's plugin framework. It will be instantiated in the Spark driver only. Comet does not -provide any executor plugins. +The plugin is loaded on the Spark driver and does not provide executor-side plugins. The plugin will update the current `SparkConf` with the extra configuration provided by Comet, such as executor memory configuration. @@ -97,8 +106,12 @@ executing, the resulting Arrow batches are imported into the JVM using Arrow FFI ## Shuffle -Spark uses a shuffle mechanism to transfer data between query stages. The shuffle mechanism is part of Spark Core -rather than Spark SQL and operates on RDDs. +Comet integrates with Spark's shuffle mechanism, optimizing both shuffle writes and reads. Comet's shuffle manager +must be registered with Spark using the following configuration: + +``` +--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager +``` ### Shuffle Writes @@ -106,7 +119,8 @@ For shuffle writes, a `ShuffleMapTask` runs in the executors. This task contains broadcast to all of the executors. It then passes the input RDD to `ShuffleWriteProcessor.write()` which requests a `ShuffleWriter` from the shuffle manager, and this is where it gets a Comet shuffle writer. -`ShuffleWriteProcessor` then invokes the dependency RDD and fetches rows/batches and passes them to the shuffle writer. +`ShuffleWriteProcessor` then invokes the dependency RDD and fetches rows/batches and passes them to Comet's +shuffle writer, which writes batches to disk in Arrow IPC format. As a result, we cannot avoid having one native plan to produce the shuffle input and another native plan for writing the batches to the shuffle file. @@ -117,14 +131,10 @@ For shuffle reads a `ShuffledRDD` requests a `ShuffleReader` from the shuffle ma `CometBlockStoreShuffleReader` which is implemented in JVM and fetches blocks from Spark and then creates an `ArrowReaderIterator` to process the blocks using Arrow's `StreamReader` for decoding IPC batches. -## Arrow +## Arrow FFI Due to the hybrid execution model, it is necessary to pass batches of data between the JVM and native code. -Comet uses a combination of Arrow FFI and Arrow IPC to achieve this. - -### Arrow FFI - The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a stable ABI-compatible interface for accessing Arrow data structures from multiple languages. @@ -133,13 +143,6 @@ accessing Arrow data structures from multiple languages. - `CometExecIterator` invokes native plans and uses Arrow FFI to read the output batches - Native `ScanExec` operators call `CometBatchIterator` via JNI to fetch input batches from the JVM -### Arrow IPC - -Comet native shuffle uses Arrow IPC to write batches to the shuffle files. - -- `CometShuffleWriteProcessor` invokes a native plan to fetch batches and then passes them to native `ShuffleWriterExec` -- `CometBlockStoreShuffleReader` reads batches from shuffle files - ## End to End Flow The following diagram shows an example of the end-to-end flow for a query stage.