-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41108][SPARK-41005][CONNECT][FOLLOW-UP] Deduplicate ArrowConverters codes #38618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
22ecb3f to
cf2de76
Compare
|
Merged to master. |
| extends ArrowBatchIterator( | ||
| rowIter, schema, maxRecordsPerBatch, timeZoneId, context) { | ||
|
|
||
| private val arrowSchemaSize = SizeEstimator.estimate(arrowSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this supposed to achieve? This uses a lot of reflective code to figure out the size of the schema object. How is this related to the size of the batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell, this PR is virtually pure refactoring except the couple of points I mentioned in the PR description. For the question, it came from #38612 to estimate the size of the batch before creating an Arrow batch.
| rowIter, schema, maxRecordsPerBatch, timeZoneId, context) { | ||
|
|
||
| private val arrowSchemaSize = SizeEstimator.estimate(arrowSchema) | ||
| var rowCountInLastBatch: Long = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of questions. Why do we need the rowcount? It is already encoded in the batch itself. If we do need the rowcount, please make the iterator return it in the next call instead relying on a side effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is also from #38468, and this PR is a followup. The return type here is Array[Byte] that is raw binary record batch. So we cannot get the count from that unless we define other case classes to keep the row count. This class is private that is only used in the specific case.
…rters codes ### What changes were proposed in this pull request? This PR is a followup of both apache#38468 and apache#38612 that proposes to deduplicate codes in `ArrowConverters` by creating two classes `ArrowBatchIterator` and `ArrowBatchWithSchemaIterator`. In addition, we reuse `ArrowBatchWithSchemaIterator` when creating an empty Arrow batch at `createEmptyArrowBatch`. While I am here, - I addressed my own comment at apache#38612 (comment) - Kept the support of both max rows and size. Max row size check was removed in apache#38612 ### Why are the changes needed? For better readability and maintenance. ### Does this PR introduce _any_ user-facing change? No, both codes are not related. ### How was this patch tested? This is refactoring so existing CI should cover. Closes apache#38618 from HyukjinKwon/SPARK-41108-followup-1. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…tead of `ArrowConveters#toBatchIterator` ### _Why are the changes needed?_ to adapt Spark 3.4 the signature of function `ArrowConveters#toBatchIterator` is changed in apache/spark#38618 (since Spark 3.4) Before Spark 3.4: ``` private[sql] def toBatchIterator( rowIter: Iterator[InternalRow], schema: StructType, maxRecordsPerBatch: Int, timeZoneId: String, context: TaskContext): Iterator[Array[Byte]] ``` Spark 3.4 ``` private[sql] def toBatchIterator( rowIter: Iterator[InternalRow], schema: StructType, maxRecordsPerBatch: Long, timeZoneId: String, context: TaskContext): ArrowBatchIterator ``` the return type is changed from `Iterator[Array[Byte]]` to `ArrowBatchIterator` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4754 from cfmcgrady/arrow-spark34. Closes #4754 a3c58d0 [Fu Chen] fix ci 32704c5 [Fu Chen] Revert "fix ci" e32311a [Fu Chen] fix ci a76af62 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 453b6a6 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 74a9f7a [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 4ce5844 [Fu Chen] adapt Spark 3.4 Lead-authored-by: Fu Chen <[email protected]> Co-authored-by: Cheng Pan <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
…tead of `ArrowConveters#toBatchIterator` ### _Why are the changes needed?_ to adapt Spark 3.4 the signature of function `ArrowConveters#toBatchIterator` is changed in apache/spark#38618 (since Spark 3.4) Before Spark 3.4: ``` private[sql] def toBatchIterator( rowIter: Iterator[InternalRow], schema: StructType, maxRecordsPerBatch: Int, timeZoneId: String, context: TaskContext): Iterator[Array[Byte]] ``` Spark 3.4 ``` private[sql] def toBatchIterator( rowIter: Iterator[InternalRow], schema: StructType, maxRecordsPerBatch: Long, timeZoneId: String, context: TaskContext): ArrowBatchIterator ``` the return type is changed from `Iterator[Array[Byte]]` to `ArrowBatchIterator` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4754 from cfmcgrady/arrow-spark34. Closes #4754 a3c58d0 [Fu Chen] fix ci 32704c5 [Fu Chen] Revert "fix ci" e32311a [Fu Chen] fix ci a76af62 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 453b6a6 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 74a9f7a [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 4ce5844 [Fu Chen] adapt Spark 3.4 Lead-authored-by: Fu Chen <[email protected]> Co-authored-by: Cheng Pan <[email protected]> Signed-off-by: Cheng Pan <[email protected]> (cherry picked from commit d0a7ca4) Signed-off-by: Cheng Pan <[email protected]>
What changes were proposed in this pull request?
This PR is a followup of both #38468 and #38612 that proposes to deduplicate codes in
ArrowConvertersby creating two classesArrowBatchIteratorandArrowBatchWithSchemaIterator. In addition, we reuseArrowBatchWithSchemaIteratorwhen creating an empty Arrow batch atcreateEmptyArrowBatch.While I am here,
Why are the changes needed?
For better readability and maintenance.
Does this PR introduce any user-facing change?
No, both codes are not related.
How was this patch tested?
This is refactoring so existing CI should cover.