Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
725ccb4
Refactor native planner to build tree of SparkPlan that maps to origi…
andygrove Nov 22, 2024
581984e
add SparkPlan children
andygrove Nov 22, 2024
d25f2d3
clippy
andygrove Nov 22, 2024
2027dc9
add some more documentation
andygrove Nov 22, 2024
8c8c9a5
aggregate metrics
andygrove Nov 22, 2024
baae1ba
simplify approach
andygrove Nov 22, 2024
68d0fdf
add a unit test
andygrove Nov 22, 2024
2347203
save progress
andygrove Nov 22, 2024
d0aeda1
remove debug, add specific jvm timer
andygrove Nov 22, 2024
3359738
fix
andygrove Nov 22, 2024
0a8a06b
clippy
andygrove Nov 22, 2024
40e90d5
clippy
andygrove Nov 22, 2024
5bfe334
format
andygrove Nov 22, 2024
0a3044e
Revert some changes, update documentation
andygrove Nov 24, 2024
d430175
fix typo
andygrove Nov 24, 2024
ff5076d
fix typo
andygrove Nov 24, 2024
ca66095
measure more FFI time
andygrove Nov 24, 2024
6fd5723
record FFI time for CollectLimitExec and TakeOrderedAndProject
andygrove Nov 26, 2024
c512187
upmerge:
andygrove Nov 26, 2024
56e3ead
save
andygrove Nov 26, 2024
36e6233
Revert "save"
andygrove Nov 26, 2024
1715b5f
Record Arrow FFI metrics
andygrove Nov 29, 2024
45a617b
Revert custom metric in shuffle writer
andygrove Nov 30, 2024
ff26736
revert FFI timing changes, which are now in a separate PR
andygrove Nov 30, 2024
82c9c7c
remove println
andygrove Dec 1, 2024
c77b144
address feedback
andygrove Dec 1, 2024
e4aeb68
Merge branch 'arrow-ffi-metric' into latest
andygrove Dec 1, 2024
db29f02
finer-grained FFI metrics
andygrove Dec 2, 2024
1b13aec
refactor
andygrove Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator
Expand Down Expand Up @@ -148,20 +149,28 @@ class NativeUtil {
*/
def getNextBatch(
numOutputCols: Int,
arrowFfiMetric: Option[SQLMetric],
func: (Array[Long], Array[Long]) => Long): Option[ColumnarBatch] = {

val start = System.nanoTime()
val (arrays, schemas) = allocateArrowStructs(numOutputCols)

val arrayAddrs = arrays.map(_.memoryAddress())
val schemaAddrs = schemas.map(_.memoryAddress())

var arrowFfiTime = System.nanoTime() - start

val result = func(arrayAddrs, schemaAddrs)

result match {
case -1 =>
// EOF
None
case numRows =>
val start = System.nanoTime()
val cometVectors = importVector(arrays, schemas)
arrowFfiTime += System.nanoTime() - start
arrowFfiMetric.foreach(_.add(arrowFfiTime))
Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt))
case flag =>
throw new IllegalStateException(s"Invalid native flag: $flag")
Expand Down
14 changes: 10 additions & 4 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,14 @@ Some Comet metrics are not directly comparable to Spark metrics in some cases:

Comet also adds some custom metrics:

### ShuffleWriterExec
### CometScanExec

| Metric | Description |
| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. |
| Metric | Description |
| ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------- |
| `scanTime` | Total time to read Parquet batches into JVM. This does not include the Arrow FFI cost of exporting these batches to native code for processing. |

### Common to all Comet Operators

| Metric | Description |
| ---------------- | ------------------------------------------------------------------------------------------------ |
| `arrow_ffi_time` | Measure the time it takes to transfer Arrow batches between JVM and native code using Arrow FFI. |
1 change: 1 addition & 0 deletions native/core/src/execution/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ pub mod expressions;
mod operators;
pub mod planner;
pub mod shuffle_writer;
pub(crate) mod spark_plan;
mod util;
Loading
Loading