Skip to content

Commit cc12cf6

Browse files
committed
[SPARK-29378][R] Upgrade SparkR to use Arrow 0.15 API
### What changes were proposed in this pull request? [[SPARK-29376] Upgrade Apache Arrow to version 0.15.1](#26133) upgrades to Arrow 0.15 at Scala/Java/Python. This PR aims to upgrade `SparkR` to use Arrow 0.15 API. Currently, it's broken. ### Why are the changes needed? First of all, it turns out that our Jenkins jobs (including PR builder) ignores Arrow test. Arrow 0.15 has a breaking R API changes at [ARROW-5505](https://issues.apache.org/jira/browse/ARROW-5505) and we missed that. AppVeyor was the only one having SparkR Arrow tests but it's broken now. **Jenkins** ``` Skipped ------------------------------------------------------------------------ 1. createDataFrame/collect Arrow optimization (test_sparkSQL_arrow.R#25) - arrow not installed ``` Second, Arrow throws OOM on AppVeyor environment (Windows JDK8) like the following because it still has Arrow 0.14. ``` Warnings ----------------------------------------------------------------------- 1. createDataFrame/collect Arrow optimization (test_sparkSQL_arrow.R#39) - createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.sparkr.enabled' is set to true; however, failed, attempting non-optimization. Reason: Error in handleErrors(returnStatus, conn): java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243) ``` It is due to the version mismatch. ```java int messageLength = MessageSerializer.bytesToInt(buffer.array()); if (messageLength == IPC_CONTINUATION_TOKEN) { buffer.clear(); // ARROW-6313, if the first 4 bytes are continuation message, read the next 4 for the length if (in.readFully(buffer) == 4) { messageLength = MessageSerializer.bytesToInt(buffer.array()); } } // Length of 0 indicates end of stream if (messageLength != 0) { // Read the message into the buffer. ByteBuffer messageBuffer = ByteBuffer.allocate(messageLength); ``` After upgrading this to 0.15, we are hitting ARROW-5505. This PR upgrades Arrow version in AppVeyor and fix the issue. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass the AppVeyor. This PR passed here. - https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark/builds/28909044 ``` SparkSQL Arrow optimization: Spark package found in SPARK_HOME: C:\projects\spark\bin\.. ................ ``` Closes #26555 from dongjoon-hyun/SPARK-R-TEST. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent e88267c commit cc12cf6

File tree

3 files changed

+4
-6
lines changed

3 files changed

+4
-6
lines changed

R/pkg/R/SQLContext.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
166166
for (rdf_slice in rdf_slices) {
167167
batch <- arrow::record_batch(rdf_slice)
168168
if (is.null(stream_writer)) {
169-
stream <- arrow::FileOutputStream(fileName)
169+
stream <- arrow::FileOutputStream$create(fileName)
170170
schema <- batch$schema
171-
stream_writer <- arrow::RecordBatchStreamWriter(stream, schema)
171+
stream_writer <- arrow::RecordBatchStreamWriter$create(stream, schema)
172172
}
173173

174174
stream_writer$write_batch(batch)

R/pkg/R/deserialize.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ readDeserializeInArrow <- function(inputCon) {
242242
# for now.
243243
dataLen <- readInt(inputCon)
244244
arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
245-
batches <- arrow::RecordBatchStreamReader(arrowData)$batches()
245+
batches <- arrow::RecordBatchStreamReader$create(arrowData)$batches()
246246

247247
if (useAsTibble) {
248248
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))

appveyor.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,8 @@ install:
4242
# Install maven and dependencies
4343
- ps: .\dev\appveyor-install-dependencies.ps1
4444
# Required package for R unit tests
45-
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'e1071', 'survival'), repos='https://cloud.r-project.org/')"
46-
# Use Arrow R 0.14.1 for now. 0.15.0 seems not working for now. See SPARK-29378.
45+
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'e1071', 'survival', 'arrow'), repos='https://cloud.r-project.org/')"
4746
- cmd: R -e "install.packages(c('assertthat', 'bit64', 'fs', 'purrr', 'R6', 'tidyselect'), repos='https://cloud.r-project.org/')"
48-
- cmd: R -e "install.packages('https://cran.r-project.org/src/contrib/Archive/arrow/arrow_0.14.1.tar.gz', repos=NULL, type='source')"
4947
# Here, we use the fixed version of testthat. For more details, please see SPARK-22817.
5048
# As of devtools 2.1.0, it requires testthat higher then 2.1.1 as a dependency. SparkR test requires testthat 1.0.2.
5149
# Therefore, we don't use devtools but installs it directly from the archive including its dependencies.

0 commit comments

Comments
 (0)