[AutoSparkUT] Fix CollectLimitExec GPU replacement for Command results (SPARK-19650)#14398
Conversation
NVIDIA#14110) Skip GPU replacement of CollectLimitExec when the child plan already provides pre-computed results (CommandResultExec, ExecutedCommandExec, LocalTableScanExec). These plans override executeCollect/executeTake to return rows without triggering a Spark job. Replacing their parent CollectLimitExec with GPU equivalent forces a job submission through GpuBringBackToHost's default executeCollect path. This preserves GPU-CPU parity for SPARK-19650: the original Spark test now passes directly on GPU without any test adaptation. Issue: NVIDIA#14110 Maven validation: mvn package -pl tests -am -Dbuildver=330 \ -Dmaven.repo.local=./.mvn-repo \ -DwildcardSuites=...RapidsSQLQuerySuite Tests: succeeded 215, failed 0, ignored 18 Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor
Greptile SummaryThis PR fixes a correctness issue where the GPU plugin was replacing Key changes:
Confidence Score: 5/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant CollectLimitExec
participant GpuCollectLimitMeta
participant tagPlanForGpu
User->>CollectLimitExec: sql("show databases").head()
Note over GpuCollectLimitMeta: Plan tagging phase
GpuCollectLimitMeta->>tagPlanForGpu: tagPlanForGpu()
tagPlanForGpu->>tagPlanForGpu: check child class name
alt child is CommandResultExec or ExecutedCommandExec
tagPlanForGpu->>GpuCollectLimitMeta: willNotWorkOnGpu(...)
Note over CollectLimitExec: Stays on CPU
CollectLimitExec->>CollectLimitExec: executeCollect() → child.executeTake(limit)
Note over CollectLimitExec: Returns pre-computed rows (no Spark job)
else normal child (e.g. data query)
tagPlanForGpu->>GpuCollectLimitMeta: (no-op, GPU replacement proceeds)
Note over GpuCollectLimitMeta: convertToGpu()
GpuCollectLimitMeta-->>User: GpuGlobalLimitExec → GpuShuffleExchangeExec → GpuLocalLimitExec
end
Reviews (3): Last reviewed commit: "Use full class name for command exec typ..." | Re-trigger Greptile |
There was a problem hiding this comment.
Pull request overview
Fixes a Spark RAPIDS GPU plugin behavior regression where CollectLimitExec wrapping command-style results (e.g., sql("show databases").head()) would be replaced with a GPU plan that triggers a Spark job, instead of preserving Spark’s job-free executeCollect/executeTake behavior for pre-computed command results.
Changes:
- Add a
GpuCollectLimitMeta.tagPlanForGpu()guard to keepCollectLimitExecon CPU when its child is a pre-computed result exec (command/local table scan), avoiding an unnecessary Spark job. - Remove the SPARK-19650 exclusion from the Spark 3.3.0 RAPIDS test settings since the original Spark test now passes.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala |
Prevents GPU replacement of CollectLimitExec for command/local pre-computed children to preserve job-free collection semantics. |
tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsTestSettings.scala |
Re-enables the SPARK-19650 test by removing the known-issue exclusion. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| override def tagPlanForGpu(): Unit = { | ||
| val childName = collectLimit.child.getClass.getSimpleName | ||
| if (childName == "CommandResultExec" || | ||
| childName == "ExecutedCommandExec" || | ||
| childName == "LocalTableScanExec") { | ||
| willNotWorkOnGpu( | ||
| s"child $childName already provides pre-computed " + | ||
| "results; replacing CollectLimit would trigger " + | ||
| "an unnecessary Spark job") | ||
| } |
There was a problem hiding this comment.
Using getClass.getSimpleName string comparisons to detect child plan types is brittle (renames/relocation across Spark versions, subclasses/proxies) and makes the logic harder to maintain. Prefer type-based matching (e.g., collectLimit.child match { case _: CommandResultExec | _: ExecutedCommandExec | _: LocalTableScanExec => ... }) or a shim/helper that checks classes directly, and import the relevant Spark exec classes instead of comparing names as strings.
There was a problem hiding this comment.
This is the same concern addressed in 590dd86. String matching is intentional: CommandResultExec was introduced in Spark 3.2, and limit.scala is cross-version common code — importing it directly would cause compile failures on older Spark shims. The comment added in 590dd86 explains this design choice.
There was a problem hiding this comment.
Maybe adding a list of supported commands in the shim layer would be better practice. I'm fine with the current code but would like to see other reviewers' opinions.
There was a problem hiding this comment.
And the line limit issue seems to still be there, as it is in other recent PRs. (It's a nit for this pr, too.)
There was a problem hiding this comment.
Agree that a shim-layer approach would be cleaner if the list grows. Right now it's only 2 types and both are stable Spark internals (CommandResultExec since 3.2, ExecutedCommandExec since early Spark), so the maintenance cost of string matching is low. Happy to refactor into a shim helper if other reviewers also prefer that — would make sense to do it once we need to add more types.
There was a problem hiding this comment.
no problem, but this has already passed IT, the lines are not too many, I will do a length change if there're new comments in, I'll do it along with that.
- Remove LocalTableScanExec from the guard — it is used for general small datasets where GPU acceleration is still beneficial. - Add comment explaining why string matching is used instead of isInstanceOf (cross-version compatibility). Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor
|
build |
Performance AnalysisThis PR modifies Affected code path: cold path (query planning only) The new No hot-path changes:
Trigger condition is narrow: The guard only fires when Net effect for the triggered case is a performance improvement: Before this fix, Conclusion: No benchmark needed. The change is on a cold path (planning-time only), does not touch the hot execution path, and the behavioral change for the narrow trigger case is strictly a performance improvement (fewer Spark jobs). |
|
@sperlingxx @firestarman can you help take a look at this PR? |
| // String matching avoids compile errors on Spark versions | ||
| // where CommandResultExec (added in 3.2) does not exist. | ||
| val childName = collectLimit.child.getClass.getSimpleName | ||
| if (childName == "CommandResultExec" || |
There was a problem hiding this comment.
NIT: Can we use the full package name instead of the shorter one ?
There was a problem hiding this comment.
Fixed in 2fc8235. Switched from getClass.getSimpleName to getClass.getName with endsWith for full package name matching (.execution.CommandResultExec and .execution.command.ExecutedCommandExec). This is also more robust since getSimpleName can throw InternalError for some inner/anonymous classes.
Switch from getSimpleName to getName with endsWith for CommandResultExec/ExecutedCommandExec detection in tagPlanForGpu(). Full package name matching is more robust than simple name — getSimpleName can throw InternalError for some inner/anonymous classes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Allen Xu <allxu@nvidia.com>
|
build |
The stash pop three-way merge re-introduced exclusions for NVIDIA#14098, Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor NVIDIA#14110, and NVIDIA#14116 that were already removed by merged PRs NVIDIA#14446, NVIDIA#14398, and NVIDIA#14400. Remove them to match origin/main.
Summary
close #14110
executeCollectbehavior for Command results (issue [AutoSparkUT]"SPARK-19650: An action on a Command should not trigger a Spark job" in SQLQuerySuite failed #14110).CollectLimitExecwraps aCommandResultExec(e.g.,sql("show databases").head()), the GPU plugin was replacing it withGpuGlobalLimitExec → GpuShuffleExchangeExec → GpuLocalLimitExec, forcing result materialization throughGpuBringBackToHost's defaultexecuteCollect()which triggers a Spark job. On CPU,CollectLimitExec.executeCollect()delegates toCommandResultExec.executeTake()which returns pre-computed rows without a job.tagPlanForGpu()check inGpuCollectLimitMeta: when the child isCommandResultExecorExecutedCommandExec,CollectLimitExecstays on CPU to preserve the job-free execution path.Traceability
RapidsSQLQuerySuite(inherits original test, notestRapidsoverride)test("SPARK-19650: An action on a Command should not trigger a Spark job")inSQLQuerySuitesql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scalaGpuCollectLimitMeta.tagPlanForGpu()inlimit.scalaTest plan
mvn package -pl tests -am -Dbuildver=330 -Dmaven.repo.local=./.mvn-repo -DwildcardSuites=...RapidsSQLQuerySuite— Tests: succeeded 215, failed 0, ignored 18 (was 19 ignored before)testRapidsoverride)Performance
This PR modifies
sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala(GPU execution path). Impact analysis:Affected code path: cold path (query planning only). The new
tagPlanForGpu()override inGpuCollectLimitMetaruns once per query plan during the plan-tagging phase — not per-row, not per-batch, not per-partition. The cost is onegetClass.getSimpleNamecall + two string comparisons perCollectLimitExecnode encountered during planning. This is negligible relative to any query's total planning time.No hot-path changes:
convertToGpu()is unchanged — the GPU replacement logic for normalCollectLimitExec(e.g.,SELECT * FROM table LIMIT N) is identical before and after this PR.Trigger condition is narrow: The guard only fires when
CollectLimitExec.childisCommandResultExecorExecutedCommandExec— DDL/administrative command results (e.g.,SHOW DATABASES,DESCRIBE TABLE), not data-intensive queries. For all other child types, the existing GPU replacement path is taken with zero behavioral difference.Net effect for the triggered case is a performance improvement: Before this fix,
CollectLimitExec(CommandResultExec)was replaced with GPU operators that forced a Spark job submission throughGpuBringBackToHost.executeCollect(). After this fix,CollectLimitExecstays on CPU and delegates toCommandResultExec.executeTake(), returning pre-computed rows without any Spark job — eliminating unnecessary job scheduling overhead.Checklists
(The existing Spark test
SPARK-19650inRapidsSQLQuerySuitenow covers the fixed code path — the KNOWN_ISSUE exclusion was removed so the test runs directly.)Made with Cursor