Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL_arrow.R
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,22 @@ test_that("Arrow optimization - unsupported types", {
})
})

test_that("SPARK-32478: gapply() Arrow optimization - error message for schema mismatch", {
skip_if_not_installed("arrow")
df <- createDataFrame(list(list(a = 1L, b = "a")))

conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
tryCatch({
expect_error(
count(gapply(df, "a", function(key, group) { group }, structType("a int, b int"))),
"expected IntegerType, IntegerType, got IntegerType, StringType")
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
})

sparkR.session.stop()
16 changes: 8 additions & 8 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -681,12 +681,12 @@ The current supported minimum version is 1.0.0; however, this might change betwe
Arrow optimization is available when converting a Spark DataFrame to an R DataFrame using the call `collect(spark_df)`,
when creating a Spark DataFrame from an R DataFrame with `createDataFrame(r_df)`, when applying an R native function to each partition
via `dapply(...)` and when applying an R native function to grouped data via `gapply(...)`.
To use Arrow when executing these calls, users need to first set the Spark configuration ‘spark.sql.execution.arrow.sparkr.enabled’
to ‘true’. This is disabled by default.
To use Arrow when executing these, users need to set the Spark configuration ‘spark.sql.execution.arrow.sparkr.enabled’
to ‘true’ first. This is disabled by default.

In addition, optimizations enabled by ‘spark.sql.execution.arrow.sparkr.enabled’ could fallback automatically to non-Arrow optimization
implementation if an error occurs before the actual computation within Spark during converting a Spark DataFrame to/from an R
DataFrame.
Whether the optimization is enabled or not, SparkR produces the same results. In addition, the conversion
between Spark DataFrame and R DataFrame falls back automatically to non-Arrow optimization implementation
when the optimization fails for any reasons before the actual computation.

<div data-lang="r" markdown="1">
{% highlight r %}
Expand All @@ -713,9 +713,9 @@ collect(gapply(spark_df,
{% endhighlight %}
</div>

Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow,
`collect(spark_df)` results in the collection of all records in the DataFrame to the driver program and should be done on a
small subset of the data.
Note that even with Arrow, `collect(spark_df)` results in the collection of all records in the DataFrame to
the driver program and should be done on a small subset of the data. In addition, the specified output schema
in `gapply(...)` and `dapply(...)` should be matched to the R DataFrame's returned by the given function.

## Supported SQL Types

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,14 @@ case class FlatMapGroupsInRWithArrowExec(
// binary in a batch due to the limitation of R API. See also ARROW-4512.
val columnarBatchIter = runner.compute(groupedByRKey, -1)
val outputProject = UnsafeProjection.create(output, output)
columnarBatchIter.flatMap(_.rowIterator().asScala).map(outputProject)
val outputTypes = StructType.fromAttributes(output).map(_.dataType)

columnarBatchIter.flatMap { batch =>
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
assert(outputTypes == actualDataTypes, "Invalid schema from gapply(): " +
s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}")
batch.rowIterator().asScala
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is same as dapply:

columnarBatchIter.flatMap { batch =>
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): " +
s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}")
batch.rowIterator.asScala

}.map(outputProject)
}
}
}
Expand Down