From ffa4ed7ccc4ecf591c1fc03bfd3dd6f32d5268b0 Mon Sep 17 00:00:00 2001 From: zero323 Date: Fri, 30 Oct 2020 14:07:39 +0100 Subject: [PATCH 1/7] Add from_avro and to_avro to R API --- R/pkg/DESCRIPTION | 1 + R/pkg/NAMESPACE | 2 + R/pkg/R/functions_avro.R | 117 +++++++++++++++++++++ R/pkg/R/generics.R | 9 +- R/pkg/tests/fulltests/test_sparkSQL_avro.R | 50 +++++++++ R/run-tests.sh | 13 ++- 6 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 R/pkg/R/functions_avro.R create mode 100644 R/pkg/tests/fulltests/test_sparkSQL_avro.R diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 2047f0d75ca18..d6846044dafce 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -42,6 +42,7 @@ Collate: 'context.R' 'deserialize.R' 'functions.R' + 'functions_avro.R' 'install.R' 'jvm.R' 'mllib_classification.R' diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 404a6968ea429..b927a6b96b810 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -292,6 +292,7 @@ exportMethods("%<=>%", "floor", "format_number", "format_string", + "from_avro", "from_csv", "from_json", "from_unixtime", @@ -416,6 +417,7 @@ exportMethods("%<=>%", "timestamp_seconds", "toDegrees", "toRadians", + "to_avro", "to_csv", "to_date", "to_json", diff --git a/R/pkg/R/functions_avro.R b/R/pkg/R/functions_avro.R new file mode 100644 index 0000000000000..cf542cd070ad3 --- /dev/null +++ b/R/pkg/R/functions_avro.R @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +#' Avro processing functions for Column operations +#' +#' Avro processing functions defined for \code{Column}. +#' +#' @param x Column to compute on. +#' @param jsonFormatSchema character Avro schema in JSON string format +#' @param ... additional argument(s) passed as parser options. +#' @name column_avro_functions +#' @rdname column_avro_functions +#' @family avro functions +#' @note Avro is built-in but external data source module since Spark 2.4. +#' Please deploy the application as per the deployment section of "Apache Avro Data Source Guide". +#' @examples +#' \dontrun{ +#' df <- createDataFrame(iris) +#' schema <- paste( +#' c( +#' '{"type": "record", "namespace": "example.avro", "name": "Iris", "fields": [', +#' '{"type": ["double", "null"], "name": "Sepal_Length"},', +#' '{"type": ["double", "null"], "name": "Sepal_Width"},', +#' '{"type": ["double", "null"], "name": "Petal_Length"},', +#' '{"type": ["double", "null"], "name": "Petal_Width"},', +#' '{"type": ["string", "null"], "name": "Species"}]}' +#' ), +#' collapse="\\n" +#' ) +#' +#' df_serialized <- select( +#' df, +#' alias(to_avro(alias(struct(column("*")), "fields")), "payload") +#' ) +#' +#' df_deserialized <- select( +#' df_serialized, +#' from_avro(df_serialized$payload, schema) +#' ) +#' +#' head(df_deserialized) +#' } +NULL + +#' @include generics.R column.R +NULL + +#' @details +#' \code{from_avro} Converts a binary column of Avro format into its corresponding catalyst value. +#' The specified schema must match the read data, otherwise the behavior is undefined: +#' it may fail or return arbitrary result. +#' To deserialize the data with a compatible and evolved schema, the expected Avro schema can be +#' set via the option avroSchema. +#' +#' @rdname column_avro_functions +#' @aliases from_avro from_avro,Column-method +#' @note from_avro since 3.1.0 +setMethod("from_avro", + signature(x = "characterOrColumn"), + function(x, jsonFormatSchema, ...) { + x <- if (is.character(x)) { + column(x) + } else { + x + } + + options <- varargsToStrEnv(...) + jc <- callJStatic( + "org.apache.spark.sql.avro.functions", "from_avro", + x@jc, + jsonFormatSchema, + options + ) + column(jc) + }) + +#' @details +#' \code{to_avro} Converts a column into binary of Avro format. +#' +#' @rdname column_avro_functions +#' @aliases to_avro to_avro,Column-method +#' @note to_avro since 3.1.0 +setMethod("to_avro", + signature(x = "characterOrColumn"), + function(x, jsonFormatSchema = NULL) { + x <- if (is.character(x)) { + column(x) + } else { + x + } + + jc <- if (is.null(jsonFormatSchema)) { + callJStatic("org.apache.spark.sql.avro.functions", "to_avro", x@jc) + } else { + callJStatic( + "org.apache.spark.sql.avro.functions", + "to_avro", + x@jc, + jsonFormatSchema + ) + } + column(jc) + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index e372ae27e315a..1fe6599bf1b97 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -950,7 +950,6 @@ setGeneric("current_date", function(x = "missing") { standardGeneric("current_da #' @name NULL setGeneric("current_timestamp", function(x = "missing") { standardGeneric("current_timestamp") }) - #' @rdname column_datetime_diff_functions #' @name NULL setGeneric("datediff", function(y, x) { standardGeneric("datediff") }) @@ -1015,6 +1014,10 @@ setGeneric("expr", function(x) { standardGeneric("expr") }) #' @name NULL setGeneric("flatten", function(x) { standardGeneric("flatten") }) +#' @rdname column_avro_functions +#' @name NULL +setGeneric("from_avro", function(x, ...) { standardGeneric("from_avro") }) + #' @rdname column_datetime_diff_functions #' @name NULL setGeneric("from_utc_timestamp", function(y, x) { standardGeneric("from_utc_timestamp") }) @@ -1388,6 +1391,10 @@ setGeneric("sumDistinct", function(x) { standardGeneric("sumDistinct") }) #' @name timestamp_seconds setGeneric("timestamp_seconds", function(x) { standardGeneric("timestamp_seconds") }) +#' @rdname column_avro_functions +#' @name NULL +setGeneric("to_avro", function(x, ...) { standardGeneric("to_avro") }) + #' @rdname column_collection_functions #' @name NULL setGeneric("transform_keys", function(x, f) { standardGeneric("transform_keys") }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL_avro.R b/R/pkg/tests/fulltests/test_sparkSQL_avro.R new file mode 100644 index 0000000000000..dfc1053de1422 --- /dev/null +++ b/R/pkg/tests/fulltests/test_sparkSQL_avro.R @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("SparkSQL Avro functions") + +sparkR.session( + master = sparkRTestMaster +) + +test_that("avro column functions", { + skip_if_not( + grepl("spark-avro", sparkR.conf("spark.jars", "")), + "spark-avro jar not present" + ) + + schema <- '{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_color", "type": ["string", "null"]} + ] + }' + + c0 <- column("foo") + c1 <- from_avro(c0, schema) + expect_s4_class(c1, "Column") + c2 <- from_avro("foo", schema) + expect_s4_class(c2, "Column") + c3 <- to_avro(c1) + expect_s4_class(c3, "Column") + c4 <- to_avro(c1, schema) + expect_s4_class(c4, "Column") +}) diff --git a/R/run-tests.sh b/R/run-tests.sh index 51ca7d600caf0..2405ba0e4c948 100755 --- a/R/run-tests.sh +++ b/R/run-tests.sh @@ -23,7 +23,18 @@ FAILED=0 LOGFILE=$FWDIR/unit-tests.out rm -f $LOGFILE -SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +SPARK_AVRO_JAR_PATH=$(find $FWDIR/../external/avro/ -name "spark-avro*jar" -print | egrep -v "tests.jar|test-sources.jar|sources.jar|javadoc.jar") + +if [[ $(echo $SPARK_AVRO_JAR_PATH | wc -l) -eq 1 ]]; then + SPARK_JARS=$SPARK_AVRO_JAR_PATH +fi + +if [ -z "$SPARK_JARS" ]; then + SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +else + SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +fi + FAILED=$((PIPESTATUS[0]||$FAILED)) NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)" From 55c3780ab9bf46d78aa05a2c03a9fbf6282ac566 Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 2 Nov 2020 09:10:16 +0100 Subject: [PATCH 2/7] Add link to docs --- R/pkg/R/functions_avro.R | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/R/pkg/R/functions_avro.R b/R/pkg/R/functions_avro.R index cf542cd070ad3..5896ece791357 100644 --- a/R/pkg/R/functions_avro.R +++ b/R/pkg/R/functions_avro.R @@ -26,7 +26,10 @@ #' @rdname column_avro_functions #' @family avro functions #' @note Avro is built-in but external data source module since Spark 2.4. -#' Please deploy the application as per the deployment section of "Apache Avro Data Source Guide". +#' Please deploy the application as per +#' \href{https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying}{ +#' the deployment section +#' } of "Apache Avro Data Source Guide". #' @examples #' \dontrun{ #' df <- createDataFrame(iris) From 8bdf665981495be02fdd7a6a3d1cef4f8d432bd9 Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 2 Nov 2020 15:27:06 +0100 Subject: [PATCH 3/7] Add R example to avro docs --- docs/sql-data-sources-avro.md | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index 69b165ed28bae..ac4710aab843d 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -183,6 +183,38 @@ query = output\ .option("topic", "topic2")\ .start() +{% endhighlight %} + +
+{% highlight r %} + +# `from_avro` requires Avro schema in JSON string format. +jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ") + +df <- read.stream( + "kafka", + kafka.bootstrap.servers = "host1:port1,host2:port2", + subscribe = "topic1" +) + +# 1. Decode the Avro data into a struct; +# 2. Filter by column `favorite_color`; +# 3. Encode the column `name` in Avro format. + +output <- select( + filter( + select(df, alias(from_avro("value", jsonFormatSchema), "user")), + column("user.favorite_color") == "red" + ), + alias(to_avro("user.name"), "value") +) + +write.stream( + output, + "kafka", + kafka.bootstrap.servers = "host1:port1,host2:port2", + topic = "topic2" +) {% endhighlight %}
From 27e6236bfda84ec7c4641f28ca93616a2ad88a2c Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 2 Nov 2020 16:14:04 +0100 Subject: [PATCH 4/7] Fix condition --- R/run-tests.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/run-tests.sh b/R/run-tests.sh index 2405ba0e4c948..edc2b2b60b60e 100755 --- a/R/run-tests.sh +++ b/R/run-tests.sh @@ -30,9 +30,9 @@ if [[ $(echo $SPARK_AVRO_JAR_PATH | wc -l) -eq 1 ]]; then fi if [ -z "$SPARK_JARS" ]; then - SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE -else SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +else + SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE fi FAILED=$((PIPESTATUS[0]||$FAILED)) From a45b1b09d409b8140fc313157f4a60628267a42f Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 2 Nov 2020 15:30:09 +0100 Subject: [PATCH 5/7] Move functions to functions.R --- R/pkg/DESCRIPTION | 1 - R/pkg/R/functions.R | 101 ++++++++++++++++++++++++++++++++ R/pkg/R/functions_avro.R | 120 --------------------------------------- 3 files changed, 101 insertions(+), 121 deletions(-) delete mode 100644 R/pkg/R/functions_avro.R diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index d6846044dafce..2047f0d75ca18 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -42,7 +42,6 @@ Collate: 'context.R' 'deserialize.R' 'functions.R' - 'functions_avro.R' 'install.R' 'jvm.R' 'mllib_classification.R' diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index bcd798a8c31e2..039d28a3a37b6 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -361,6 +361,50 @@ NULL #' } NULL +#' Avro processing functions for Column operations +#' +#' Avro processing functions defined for \code{Column}. +#' +#' @param x Column to compute on. +#' @param jsonFormatSchema character Avro schema in JSON string format +#' @param ... additional argument(s) passed as parser options. +#' @name column_avro_functions +#' @rdname column_avro_functions +#' @family avro functions +#' @note Avro is built-in but external data source module since Spark 2.4. +#' Please deploy the application as per +#' \href{https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying}{ +#' the deployment section +#' } of "Apache Avro Data Source Guide". +#' @examples +#' \dontrun{ +#' df <- createDataFrame(iris) +#' schema <- paste( +#' c( +#' '{"type": "record", "namespace": "example.avro", "name": "Iris", "fields": [', +#' '{"type": ["double", "null"], "name": "Sepal_Length"},', +#' '{"type": ["double", "null"], "name": "Sepal_Width"},', +#' '{"type": ["double", "null"], "name": "Petal_Length"},', +#' '{"type": ["double", "null"], "name": "Petal_Width"},', +#' '{"type": ["string", "null"], "name": "Species"}]}' +#' ), +#' collapse="\\n" +#' ) +#' +#' df_serialized <- select( +#' df, +#' alias(to_avro(alias(struct(column("*")), "fields")), "payload") +#' ) +#' +#' df_deserialized <- select( +#' df_serialized, +#' from_avro(df_serialized$payload, schema) +#' ) +#' +#' head(df_deserialized) +#' } +NULL + #' @details #' \code{lit}: A new Column is created to represent the literal value. #' If the parameter is a Column, it is returned unchanged. @@ -4547,3 +4591,60 @@ setMethod("vector_to_array", ) column(jc) }) + +#' @details +#' \code{from_avro} Converts a binary column of Avro format into its corresponding catalyst value. +#' The specified schema must match the read data, otherwise the behavior is undefined: +#' it may fail or return arbitrary result. +#' To deserialize the data with a compatible and evolved schema, the expected Avro schema can be +#' set via the option avroSchema. +#' +#' @rdname column_avro_functions +#' @aliases from_avro from_avro,Column-method +#' @note from_avro since 3.1.0 +setMethod("from_avro", + signature(x = "characterOrColumn"), + function(x, jsonFormatSchema, ...) { + x <- if (is.character(x)) { + column(x) + } else { + x + } + + options <- varargsToStrEnv(...) + jc <- callJStatic( + "org.apache.spark.sql.avro.functions", "from_avro", + x@jc, + jsonFormatSchema, + options + ) + column(jc) + }) + +#' @details +#' \code{to_avro} Converts a column into binary of Avro format. +#' +#' @rdname column_avro_functions +#' @aliases to_avro to_avro,Column-method +#' @note to_avro since 3.1.0 +setMethod("to_avro", + signature(x = "characterOrColumn"), + function(x, jsonFormatSchema = NULL) { + x <- if (is.character(x)) { + column(x) + } else { + x + } + + jc <- if (is.null(jsonFormatSchema)) { + callJStatic("org.apache.spark.sql.avro.functions", "to_avro", x@jc) + } else { + callJStatic( + "org.apache.spark.sql.avro.functions", + "to_avro", + x@jc, + jsonFormatSchema + ) + } + column(jc) + }) diff --git a/R/pkg/R/functions_avro.R b/R/pkg/R/functions_avro.R deleted file mode 100644 index 5896ece791357..0000000000000 --- a/R/pkg/R/functions_avro.R +++ /dev/null @@ -1,120 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#' Avro processing functions for Column operations -#' -#' Avro processing functions defined for \code{Column}. -#' -#' @param x Column to compute on. -#' @param jsonFormatSchema character Avro schema in JSON string format -#' @param ... additional argument(s) passed as parser options. -#' @name column_avro_functions -#' @rdname column_avro_functions -#' @family avro functions -#' @note Avro is built-in but external data source module since Spark 2.4. -#' Please deploy the application as per -#' \href{https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying}{ -#' the deployment section -#' } of "Apache Avro Data Source Guide". -#' @examples -#' \dontrun{ -#' df <- createDataFrame(iris) -#' schema <- paste( -#' c( -#' '{"type": "record", "namespace": "example.avro", "name": "Iris", "fields": [', -#' '{"type": ["double", "null"], "name": "Sepal_Length"},', -#' '{"type": ["double", "null"], "name": "Sepal_Width"},', -#' '{"type": ["double", "null"], "name": "Petal_Length"},', -#' '{"type": ["double", "null"], "name": "Petal_Width"},', -#' '{"type": ["string", "null"], "name": "Species"}]}' -#' ), -#' collapse="\\n" -#' ) -#' -#' df_serialized <- select( -#' df, -#' alias(to_avro(alias(struct(column("*")), "fields")), "payload") -#' ) -#' -#' df_deserialized <- select( -#' df_serialized, -#' from_avro(df_serialized$payload, schema) -#' ) -#' -#' head(df_deserialized) -#' } -NULL - -#' @include generics.R column.R -NULL - -#' @details -#' \code{from_avro} Converts a binary column of Avro format into its corresponding catalyst value. -#' The specified schema must match the read data, otherwise the behavior is undefined: -#' it may fail or return arbitrary result. -#' To deserialize the data with a compatible and evolved schema, the expected Avro schema can be -#' set via the option avroSchema. -#' -#' @rdname column_avro_functions -#' @aliases from_avro from_avro,Column-method -#' @note from_avro since 3.1.0 -setMethod("from_avro", - signature(x = "characterOrColumn"), - function(x, jsonFormatSchema, ...) { - x <- if (is.character(x)) { - column(x) - } else { - x - } - - options <- varargsToStrEnv(...) - jc <- callJStatic( - "org.apache.spark.sql.avro.functions", "from_avro", - x@jc, - jsonFormatSchema, - options - ) - column(jc) - }) - -#' @details -#' \code{to_avro} Converts a column into binary of Avro format. -#' -#' @rdname column_avro_functions -#' @aliases to_avro to_avro,Column-method -#' @note to_avro since 3.1.0 -setMethod("to_avro", - signature(x = "characterOrColumn"), - function(x, jsonFormatSchema = NULL) { - x <- if (is.character(x)) { - column(x) - } else { - x - } - - jc <- if (is.null(jsonFormatSchema)) { - callJStatic("org.apache.spark.sql.avro.functions", "to_avro", x@jc) - } else { - callJStatic( - "org.apache.spark.sql.avro.functions", - "to_avro", - x@jc, - jsonFormatSchema - ) - } - column(jc) - }) From 1457133e1a216ead0c49934876a09f95b978d799 Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 2 Nov 2020 16:34:14 +0100 Subject: [PATCH 6/7] Move tests to test_sparkSQL.R --- R/pkg/tests/fulltests/test_sparkSQL.R | 26 +++++++++++ R/pkg/tests/fulltests/test_sparkSQL_avro.R | 50 ---------------------- 2 files changed, 26 insertions(+), 50 deletions(-) delete mode 100644 R/pkg/tests/fulltests/test_sparkSQL_avro.R diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 3a0d359e2ae79..45de1ef1bd3d1 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1841,6 +1841,32 @@ test_that("column functions", { ) }) +test_that("avro column functions", { + skip_if_not( + grepl("spark-avro", sparkR.conf("spark.jars", "")), + "spark-avro jar not present" + ) + + schema <- '{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_color", "type": ["string", "null"]} + ] + }' + + c0 <- column("foo") + c1 <- from_avro(c0, schema) + expect_s4_class(c1, "Column") + c2 <- from_avro("foo", schema) + expect_s4_class(c2, "Column") + c3 <- to_avro(c1) + expect_s4_class(c3, "Column") + c4 <- to_avro(c1, schema) + expect_s4_class(c4, "Column") +}) + test_that("column binary mathfunctions", { lines <- c("{\"a\":1, \"b\":5}", "{\"a\":2, \"b\":6}", diff --git a/R/pkg/tests/fulltests/test_sparkSQL_avro.R b/R/pkg/tests/fulltests/test_sparkSQL_avro.R deleted file mode 100644 index dfc1053de1422..0000000000000 --- a/R/pkg/tests/fulltests/test_sparkSQL_avro.R +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("SparkSQL Avro functions") - -sparkR.session( - master = sparkRTestMaster -) - -test_that("avro column functions", { - skip_if_not( - grepl("spark-avro", sparkR.conf("spark.jars", "")), - "spark-avro jar not present" - ) - - schema <- '{"namespace": "example.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_color", "type": ["string", "null"]} - ] - }' - - c0 <- column("foo") - c1 <- from_avro(c0, schema) - expect_s4_class(c1, "Column") - c2 <- from_avro("foo", schema) - expect_s4_class(c2, "Column") - c3 <- to_avro(c1) - expect_s4_class(c3, "Column") - c4 <- to_avro(c1, schema) - expect_s4_class(c4, "Column") -}) From 63f77929119031c8385d0b5fc256f9be87210bfc Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 2 Nov 2020 23:32:46 +0100 Subject: [PATCH 7/7] Remove availability note --- docs/sql-data-sources-avro.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index ac4710aab843d..9ecc6eb91da5a 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -88,8 +88,6 @@ Kafka key-value record will be augmented with some metadata, such as the ingesti * If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. * `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. -Both functions are currently only available in Scala, Java, and Python. -
{% highlight scala %}