Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ exportMethods("%<=>%",
"floor",
"format_number",
"format_string",
"from_avro",
"from_csv",
"from_json",
"from_unixtime",
Expand Down Expand Up @@ -416,6 +417,7 @@ exportMethods("%<=>%",
"timestamp_seconds",
"toDegrees",
"toRadians",
"to_avro",
"to_csv",
"to_date",
"to_json",
Expand Down
101 changes: 101 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,50 @@ NULL
#' }
NULL

#' Avro processing functions for Column operations
#'
#' Avro processing functions defined for \code{Column}.
#'
#' @param x Column to compute on.
#' @param jsonFormatSchema character Avro schema in JSON string format
#' @param ... additional argument(s) passed as parser options.
#' @name column_avro_functions
#' @rdname column_avro_functions
#' @family avro functions
#' @note Avro is built-in but external data source module since Spark 2.4.
#' Please deploy the application as per
#' \href{https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying}{
#' the deployment section
#' } of "Apache Avro Data Source Guide".
#' @examples
#' \dontrun{
#' df <- createDataFrame(iris)
#' schema <- paste(
#' c(
#' '{"type": "record", "namespace": "example.avro", "name": "Iris", "fields": [',
#' '{"type": ["double", "null"], "name": "Sepal_Length"},',
#' '{"type": ["double", "null"], "name": "Sepal_Width"},',
#' '{"type": ["double", "null"], "name": "Petal_Length"},',
#' '{"type": ["double", "null"], "name": "Petal_Width"},',
#' '{"type": ["string", "null"], "name": "Species"}]}'
#' ),
#' collapse="\\n"
#' )
#'
#' df_serialized <- select(
#' df,
#' alias(to_avro(alias(struct(column("*")), "fields")), "payload")
#' )
#'
#' df_deserialized <- select(
#' df_serialized,
#' from_avro(df_serialized$payload, schema)
#' )
#'
#' head(df_deserialized)
#' }
NULL

#' @details
#' \code{lit}: A new Column is created to represent the literal value.
#' If the parameter is a Column, it is returned unchanged.
Expand Down Expand Up @@ -4547,3 +4591,60 @@ setMethod("vector_to_array",
)
column(jc)
})

#' @details
#' \code{from_avro} Converts a binary column of Avro format into its corresponding catalyst value.
#' The specified schema must match the read data, otherwise the behavior is undefined:
#' it may fail or return arbitrary result.
#' To deserialize the data with a compatible and evolved schema, the expected Avro schema can be
#' set via the option avroSchema.
#'
#' @rdname column_avro_functions
#' @aliases from_avro from_avro,Column-method
#' @note from_avro since 3.1.0
setMethod("from_avro",
signature(x = "characterOrColumn"),
function(x, jsonFormatSchema, ...) {
x <- if (is.character(x)) {
column(x)
} else {
x
}

options <- varargsToStrEnv(...)
jc <- callJStatic(
"org.apache.spark.sql.avro.functions", "from_avro",
x@jc,
jsonFormatSchema,
options
)
column(jc)
})

#' @details
#' \code{to_avro} Converts a column into binary of Avro format.
#'
#' @rdname column_avro_functions
#' @aliases to_avro to_avro,Column-method
#' @note to_avro since 3.1.0
setMethod("to_avro",
signature(x = "characterOrColumn"),
function(x, jsonFormatSchema = NULL) {
x <- if (is.character(x)) {
column(x)
} else {
x
}

jc <- if (is.null(jsonFormatSchema)) {
callJStatic("org.apache.spark.sql.avro.functions", "to_avro", x@jc)
} else {
callJStatic(
"org.apache.spark.sql.avro.functions",
"to_avro",
x@jc,
jsonFormatSchema
)
}
column(jc)
})
9 changes: 8 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,6 @@ setGeneric("current_date", function(x = "missing") { standardGeneric("current_da
#' @name NULL
setGeneric("current_timestamp", function(x = "missing") { standardGeneric("current_timestamp") })


#' @rdname column_datetime_diff_functions
#' @name NULL
setGeneric("datediff", function(y, x) { standardGeneric("datediff") })
Expand Down Expand Up @@ -1015,6 +1014,10 @@ setGeneric("expr", function(x) { standardGeneric("expr") })
#' @name NULL
setGeneric("flatten", function(x) { standardGeneric("flatten") })

#' @rdname column_avro_functions
#' @name NULL
setGeneric("from_avro", function(x, ...) { standardGeneric("from_avro") })

#' @rdname column_datetime_diff_functions
#' @name NULL
setGeneric("from_utc_timestamp", function(y, x) { standardGeneric("from_utc_timestamp") })
Expand Down Expand Up @@ -1388,6 +1391,10 @@ setGeneric("sumDistinct", function(x) { standardGeneric("sumDistinct") })
#' @name timestamp_seconds
setGeneric("timestamp_seconds", function(x) { standardGeneric("timestamp_seconds") })

#' @rdname column_avro_functions
#' @name NULL
setGeneric("to_avro", function(x, ...) { standardGeneric("to_avro") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("transform_keys", function(x, f) { standardGeneric("transform_keys") })
Expand Down
26 changes: 26 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,32 @@ test_that("column functions", {
)
})

test_that("avro column functions", {
skip_if_not(
grepl("spark-avro", sparkR.conf("spark.jars", "")),
"spark-avro jar not present"
)

schema <- '{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}'

c0 <- column("foo")
c1 <- from_avro(c0, schema)
expect_s4_class(c1, "Column")
c2 <- from_avro("foo", schema)
expect_s4_class(c2, "Column")
c3 <- to_avro(c1)
expect_s4_class(c3, "Column")
c4 <- to_avro(c1, schema)
expect_s4_class(c4, "Column")
})

test_that("column binary mathfunctions", {
lines <- c("{\"a\":1, \"b\":5}",
"{\"a\":2, \"b\":6}",
Expand Down
13 changes: 12 additions & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,18 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_AVRO_JAR_PATH=$(find $FWDIR/../external/avro/ -name "spark-avro*jar" -print | egrep -v "tests.jar|test-sources.jar|sources.jar|javadoc.jar")

if [[ $(echo $SPARK_AVRO_JAR_PATH | wc -l) -eq 1 ]]; then
SPARK_JARS=$SPARK_AVRO_JAR_PATH
fi

if [ -z "$SPARK_JARS" ]; then
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
else
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
fi

FAILED=$((PIPESTATUS[0]||$FAILED))

NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"
Expand Down
34 changes: 32 additions & 2 deletions docs/sql-data-sources-avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ Kafka key-value record will be augmented with some metadata, such as the ingesti
* If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file.
* `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.

Both functions are currently only available in Scala, Java, and Python.

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
Expand Down Expand Up @@ -183,6 +181,38 @@ query = output\
.option("topic", "topic2")\
.start()

{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")

df <- read.stream(
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1"
)

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.

output <- select(
filter(
select(df, alias(from_avro("value", jsonFormatSchema), "user")),
column("user.favorite_color") == "red"
),
alias(to_avro("user.name"), "value")
)

write.stream(
output,
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
topic = "topic2"
)
{% endhighlight %}
</div>
</div>
Expand Down