diff --git a/docs/source/contributor-guide/plugin_overview.md b/docs/source/contributor-guide/plugin_overview.md index 3e7c24f55a..add4172d5c 100644 --- a/docs/source/contributor-guide/plugin_overview.md +++ b/docs/source/contributor-guide/plugin_overview.md @@ -19,17 +19,26 @@ under the License. # Comet Plugin Architecture +## Overview + +The Comet plugin enhances Spark SQL by introducing optimized query execution and shuffle mechanisms leveraging +native code. It integrates with Spark's plugin framework and extension API to replace or extend Spark's +default behavior. + +--- + +# Plugin Components + ## Comet SQL Plugin -The entry point to Comet is the `org.apache.spark.CometPlugin` class, which can be registered with Spark by adding the -following setting to the Spark configuration when launching `spark-shell` or `spark-submit`: +The entry point to Comet is the org.apache.spark.CometPlugin class, which is registered in Spark using the following +configuration: ``` --conf spark.plugins=org.apache.spark.CometPlugin ``` -This class is loaded by Spark's plugin framework. It will be instantiated in the Spark driver only. Comet does not -provide any executor plugins. +The plugin is loaded on the Spark driver and does not provide executor-side plugins. The plugin will update the current `SparkConf` with the extra configuration provided by Comet, such as executor memory configuration. @@ -87,7 +96,7 @@ In the native code there is a `PhysicalPlanner` struct (in `planner.rs`) which c Apache DataFusion `ExecutionPlan`. In some cases, Comet provides specialized physical operators and expressions to override the DataFusion versions to ensure compatibility with Apache Spark. -The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to +The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to `CometBatchIterator.next()` to fetch the next input batch. The input could be a Comet native Parquet scan, a Spark exchange, or another native plan. @@ -95,15 +104,38 @@ a Spark exchange, or another native plan. partitions. Each call to `CometExecIterator.next()` will invoke `Native.executePlan`. Once the plan finishes executing, the resulting Arrow batches are imported into the JVM using Arrow FFI. -## Arrow +## Shuffle -Due to the hybrid execution model, it is necessary to pass batches of data between the JVM and native code. +Comet integrates with Spark's shuffle mechanism, optimizing both shuffle writes and reads. Comet's shuffle manager +must be registered with Spark using the following configuration: + +``` +--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager +``` + +### Shuffle Writes -Comet uses a combination of Arrow FFI and Arrow IPC to achieve this. +For shuffle writes, a `ShuffleMapTask` runs in the executors. This task contains a `ShuffleDependency` that is +broadcast to all of the executors. It then passes the input RDD to `ShuffleWriteProcessor.write()` which +requests a `ShuffleWriter` from the shuffle manager, and this is where it gets a Comet shuffle writer. -### Arrow FFI +`ShuffleWriteProcessor` then invokes the dependency RDD and fetches rows/batches and passes them to Comet's +shuffle writer, which writes batches to disk in Arrow IPC format. -The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a stable ABI-compatible interface for +As a result, we cannot avoid having one native plan to produce the shuffle input and another native plan for +writing the batches to the shuffle file. + +### Shuffle Reads + +For shuffle reads a `ShuffledRDD` requests a `ShuffleReader` from the shuffle manager. Comet provides a +`CometBlockStoreShuffleReader` which is implemented in JVM and fetches blocks from Spark and then creates an +`ArrowReaderIterator` to process the blocks using Arrow's `StreamReader` for decoding IPC batches. + +## Arrow FFI + +Due to the hybrid execution model, it is necessary to pass batches of data between the JVM and native code. + +The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a stable ABI-compatible interface for accessing Arrow data structures from multiple languages. [Arrow C Data Interface]: https://arrow.apache.org/docs/format/CDataInterface.html @@ -111,13 +143,6 @@ accessing Arrow data structures from multiple languages. - `CometExecIterator` invokes native plans and uses Arrow FFI to read the output batches - Native `ScanExec` operators call `CometBatchIterator` via JNI to fetch input batches from the JVM -### Arrow IPC - -Comet native shuffle uses Arrow IPC to write batches to the shuffle files. - -- `CometShuffleWriteProcessor` invokes a native plan to fetch batches and then passes them to native `ShuffleWriterExec` -- `CometBlockStoreShuffleReader` reads batches from shuffle files - ## End to End Flow The following diagram shows an example of the end-to-end flow for a query stage.