Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -466,19 +466,25 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer {
cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[ColumnarBatch] = {
// optimize
val newSelectedAttributes = if (selectedAttributes.isEmpty) {
cacheAttributes
} else {
selectedAttributes
// When no columns are selected (e.g., count-only scan or
// cross-join side that needs only row count), return
// row-only batches without decoding parquet data.
if (selectedAttributes.isEmpty) {
return input.map {
case parquetCB: ParquetCachedBatch =>
new ColumnarBatch(Array.empty, parquetCB.numRows)
case other =>
throw new IllegalStateException(
s"Expected ParquetCachedBatch but got ${other.getClass}")
}
Comment on lines +469 to +479
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zero-column fast-path mapping (selectedAttributes.isEmpty -> map ParquetCachedBatch to new ColumnarBatch(Array.empty, numRows)) is duplicated here and again in convertCachedBatchToColumnarBatch. Consider extracting a small private helper to keep behavior/exception text consistent and reduce the chance of one path diverging in future edits.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two call sites differ in return type semantics (gpuConvertCachedBatchToColumnarBatch returns GPU-resident batches, convertCachedBatchToColumnarBatch returns host batches) so a shared helper would need to paper over that distinction. Given the logic is just new ColumnarBatch(Array.empty, numRows), the duplication is minimal and a helper would add more abstraction than it saves.

}
val (cachedSchemaWithNames, selectedSchemaWithNames) =
getSupportedSchemaFromUnsupported(cacheAttributes, newSelectedAttributes)
getSupportedSchemaFromUnsupported(cacheAttributes, selectedAttributes)
convertCachedBatchToColumnarInternal(
input,
cachedSchemaWithNames,
selectedSchemaWithNames,
newSelectedAttributes)
selectedAttributes)
}

private def convertCachedBatchToColumnarInternal(
Expand Down Expand Up @@ -563,19 +569,23 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer {
cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[ColumnarBatch] = {
// optimize
val newSelectedAttributes = if (selectedAttributes.isEmpty) {
cacheAttributes
} else {
selectedAttributes
// When no columns are selected, return row-only batches
if (selectedAttributes.isEmpty) {
return input.map {
case parquetCB: ParquetCachedBatch =>
new ColumnarBatch(Array.empty, parquetCB.numRows)
case other =>
throw new IllegalStateException(
s"Expected ParquetCachedBatch but got ${other.getClass}")
}
}
Comment on lines +573 to 581
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Inconsistency with CloseableColumnBatchIterator wrapping

The non-empty paths in convertCachedBatchToColumnarBatch both wrap their results in CloseableColumnBatchIterator (GPU path at line 592, CPU path via CachedBatchIteratorConsumer), but the new zero-column early-return does not. While this is functionally safe — ColumnarBatch(Array.empty, n) holds no closeable column vector resources — it is a structural inconsistency. Consider wrapping for uniformity:

Suggested change
if (selectedAttributes.isEmpty) {
return input.map {
case parquetCB: ParquetCachedBatch =>
new ColumnarBatch(Array.empty, parquetCB.numRows)
case other =>
throw new IllegalStateException(
s"Expected ParquetCachedBatch but got ${other.getClass}")
}
}
// When no columns are selected, return row-only batches
if (selectedAttributes.isEmpty) {
return input.mapPartitions { cbIter =>
CloseableColumnBatchIterator(cbIter.map {
case parquetCB: ParquetCachedBatch =>
new ColumnarBatch(Array.empty, parquetCB.numRows)
case other =>
throw new IllegalStateException(
s"Expected ParquetCachedBatch but got ${other.getClass}")
})
}
}

The same note applies to the analogous block in gpuConvertCachedBatchToColumnarBatch (lines 472–479).

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you noted, this is functionally safe — the empty ColumnarBatch holds no closeable resources, so wrapping it in CloseableColumnBatchIterator would be a no-op. Keeping the early return simple makes the intent clearer: no columns → no decoding, just row count.

val rapidsConf = new RapidsConf(conf)
val (cachedSchemaWithNames, selectedSchemaWithNames) =
getSupportedSchemaFromUnsupported(cacheAttributes, newSelectedAttributes)
getSupportedSchemaFromUnsupported(cacheAttributes, selectedAttributes)
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU &&
isSchemaSupportedByCudf(cachedSchemaWithNames)) {
val batches = convertCachedBatchToColumnarInternal(input, cachedSchemaWithNames,
selectedSchemaWithNames, newSelectedAttributes)
selectedSchemaWithNames, selectedAttributes)
val cbRdd = batches.map(batch => {
withResource(batch) { gpuBatch =>
val cols = GpuColumnVector.extractColumns(gpuBatch)
Expand All @@ -585,7 +595,7 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer {
cbRdd.mapPartitions(iter => CloseableColumnBatchIterator(iter))
} else {
val origSelectedAttributesWithUnambiguousNames =
sanitizeColumnNames(newSelectedAttributes, selectedSchemaWithNames)
sanitizeColumnNames(selectedAttributes, selectedSchemaWithNames)
val broadcastedConf = SparkSession.active.sparkContext.broadcast(conf.getAllConfs)
input.mapPartitions {
cbIter => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ class RapidsTestSettings extends BackendTestSettings {
enableSuite[RapidsMergedParquetReadSchemaSuite]
enableSuite[RapidsGeneratorFunctionSuite]
enableSuite[RapidsSQLQuerySuite]
.exclude("SPARK-6743: no columns from cache", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/14098"))
.exclude("aggregation with codegen updates peak execution memory", WONT_FIX_ISSUE("Codegen and memory metrics not applicable for GPU"))
.exclude("external sorting updates peak execution memory", WONT_FIX_ISSUE("Memory metrics implementation differs on GPU"))
.exclude("run sql directly on files", ADJUST_UT("Replaced by testRapids version that expects \"Path does not exist\" instead of \"Hive built-in ORC data source must be used with Hive support\" because there's a spark-hive jar in the CLASSPATH in our UT running"))
Expand Down
Loading