Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ exportMethods("%<=>%",
"floor",
"format_number",
"format_string",
"from_csv",
"from_json",
"from_unixtime",
"from_utc_timestamp",
Expand Down
35 changes: 29 additions & 6 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ NULL
#' \item \code{to_json}: it is the column containing the struct, array of the structs,
#' the map or array of maps.
#' \item \code{from_json}: it is the column containing the JSON string.
#' \item \code{from_csv}: it is the column containing the CSV string.
#' }
#' @param y Column to compute on.
#' @param value A value to compute on.
Expand All @@ -196,10 +197,17 @@ NULL
#' \item \code{array_position}: a value to locate in the given array.
#' \item \code{array_remove}: a value to remove in the given array.
#' }
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
#' additional named properties to control how it is converted, accepts the same
#' options as the JSON data source. In \code{arrays_zip}, this contains additional
#' Columns of arrays to be merged.
#' @param schema
#' \itemize{
#' \item \code{from_json}: a structType object to use as the schema to use
#' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' }
#' @param ... additional argument(s). In \code{to_json}, \code{from_json} and \code{from_csv},
#' this contains additional named properties to control how it is converted, accepts
#' the same options as the JSON and CSV data source. In \code{arrays_zip},
#' this contains additional Columns of arrays to be merged.
#' @name column_collection_functions
#' @rdname column_collection_functions
#' @family collection functions
Expand Down Expand Up @@ -2164,8 +2172,6 @@ setMethod("date_format", signature(y = "Column", x = "character"),
#' to \code{TRUE}. If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
#' Since Spark 2.3, the DDL-formatted string is also supported for the schema.
#' @param as.json.array indicating if input string is JSON array of objects or a single object.
#' @aliases from_json from_json,Column,characterOrstructType-method
#' @examples
Expand Down Expand Up @@ -2202,6 +2208,23 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
column(jc)
})

#' @details
#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType}
#' with the specified \code{schema}.
#' If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @aliases from_csv from_csv,Column,character-method
#' @note from_csv since 3.0.0
setMethod("from_csv", signature(x = "Column", schema = "character"),
function(x, schema, ...) {
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"from_csv",
x@jc, schema, options)
column(jc)
})

#' @details
#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a
#' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1'
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,10 @@ setGeneric("format_string", function(format, x, ...) { standardGeneric("format_s
#' @name NULL
setGeneric("from_json", function(x, schema, ...) { standardGeneric("from_json") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("from_csv", function(x, schema, ...) { standardGeneric("from_csv") })

#' @rdname column_datetime_functions
#' @name NULL
setGeneric("from_unixtime", function(x, ...) { standardGeneric("from_unixtime") })
Expand Down
5 changes: 5 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,11 @@ test_that("column functions", {
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)

# Test from_csv()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv")))
expect_equal(c[[1]][[1]]$a, 1)

# Test to_json(), from_json()
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
Expand Down
44 changes: 44 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2348,6 +2348,27 @@ def schema_of_json(col):
return Column(jc)


@ignore_unicode_prefix
@since(2.5)
def schema_of_csv(col, options={}):
"""
Parses a column containing a CSV string and infers its schema in DDL format.

:param col: string column in CSV format
:param options: options to control parsing. accepts the same options as the CSV datasource

>>> from pyspark.sql.types import *
>>> data = [(1, '1|a')]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(schema_of_csv(df.value, {'sep':'|'}).alias("csv")).collect()
[Row(csv=u'struct<_c0:int,_c1:string>')]
"""

sc = SparkContext._active_spark_context
jc = sc._jvm.functions.schema_of_csv(_to_java_column(col), options)
return Column(jc)


@since(1.5)
def size(col):
"""
Expand Down Expand Up @@ -2637,6 +2658,29 @@ def sequence(start, stop, step=None):
_to_java_column(start), _to_java_column(stop), _to_java_column(step)))


@ignore_unicode_prefix
@since(3.0)
def from_csv(col, schema, options={}):
"""
Parses a column containing a CSV string into a :class:`StructType`
with the specified schema. Returns `null`, in the case of an unparseable string.

:param col: string column in CSV format
:param schema: a string with schema in DDL format to use when parsing the CSV column.
:param options: options to control parsing. accepts the same options as the CSV datasource

>>> from pyspark.sql.types import *
>>> data = [(1, '1')]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_csv(df.value, "a INT").alias("csv")).collect()
[Row(csv=Row(a=1))]
"""

sc = SparkContext._active_spark_context
jc = sc._jvm.functions.from_csv(_to_java_column(col), schema, options)
return Column(jc)


# ---------------------------- User Defined Function ----------------------------------

class PandasUDFType(object):
Expand Down
6 changes: 6 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.7.3</version>
<type>jar</type>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,11 @@ object FunctionRegistry {
castAlias("date", DateType),
castAlias("timestamp", TimestampType),
castAlias("binary", BinaryType),
castAlias("string", StringType)
castAlias("string", StringType),

// csv
expression[CsvToStructs]("from_csv"),
expression[SchemaOfCsv]("schema_of_csv")
)

val builtin: SimpleFunctionRegistry = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.csv
package org.apache.spark.sql.catalyst.csv

import java.math.BigDecimal

import scala.util.control.Exception._
import scala.util.control.Exception.allCatch

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

private[csv] object CSVInferSchema {
object CSVInferSchema {

/**
* Similar to the JSON schema inference
Expand All @@ -43,13 +43,7 @@ private[csv] object CSVInferSchema {
val rootTypes: Array[DataType] =
tokenRDD.aggregate(startType)(inferRowType(options), mergeRowTypes)

header.zip(rootTypes).map { case (thisHeader, rootType) =>
val dType = rootType match {
case _: NullType => StringType
case other => other
}
StructField(thisHeader, dType, nullable = true)
}
toStructFields(rootTypes, header, options)
} else {
// By default fields are assumed to be StringType
header.map(fieldName => StructField(fieldName, StringType, nullable = true))
Expand All @@ -58,7 +52,20 @@ private[csv] object CSVInferSchema {
StructType(fields)
}

private def inferRowType(options: CSVOptions)
def toStructFields(
fieldTypes: Array[DataType],
header: Array[String],
options: CSVOptions): Array[StructField] = {
header.zip(fieldTypes).map { case (thisHeader, rootType) =>
val dType = rootType match {
case _: NullType => StringType
case other => other
}
StructField(thisHeader, dType, nullable = true)
}
}

def inferRowType(options: CSVOptions)
(rowSoFar: Array[DataType], next: Array[String]): Array[DataType] = {
var i = 0
while (i < math.min(rowSoFar.length, next.length)) { // May have columns on right missing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.csv
package org.apache.spark.sql.catalyst.csv

import java.nio.charset.StandardCharsets
import java.util.{Locale, TimeZone}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.csv

object CSVUtils {
/**
* Filter ignorable rows for CSV iterator (lines empty and starting with `comment`).
* This is currently being used in CSV reading path and CSV schema inference.
*/
def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
iter.filter { line =>
line.trim.nonEmpty && !line.startsWith(options.comment.toString)
}
}

/**
* Helper method that converts string representation of a character to actual character.
* It handles some Java escaped strings and throws exception if given string is longer than one
* character.
*/
@throws[IllegalArgumentException]
def toChar(str: String): Char = {
if (str.charAt(0) == '\\') {
str.charAt(1)
match {
case 't' => '\t'
case 'r' => '\r'
case 'b' => '\b'
case 'f' => '\f'
case '\"' => '\"' // In case user changes quote char and uses \" as delimiter in options
case '\'' => '\''
case 'u' if str == """\u0000""" => '\u0000'
case _ =>
throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str")
}
} else if (str.length == 1) {
str.charAt(0)
} else {
throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.csv
package org.apache.spark.sql.catalyst.csv

import java.io.InputStream
import java.math.BigDecimal
Expand All @@ -28,8 +28,7 @@ import com.univocity.parsers.csv.CsvParser
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.FailureSafeParser
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -264,7 +263,7 @@ class UnivocityParser(
}
}

private[csv] object UnivocityParser {
private[sql] object UnivocityParser {

/**
* Parses a stream that contains CSV strings and turns it into an iterator of tokens.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.types.{MapType, StringType, StructType}

object ExprUtils {

def evalSchemaExpr(exp: Expression): StructType = exp match {
case Literal(s, StringType) => StructType.fromDDL(s.toString)
case e => throw new AnalysisException(
s"Schema should be specified in DDL format as a string literal instead of ${e.sql}")
}

def convertToMapData(exp: Expression): Map[String, String] = exp match {
case m: CreateMap
if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) =>
val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData]
ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) =>
key.toString -> value.toString
}
case m: CreateMap =>
throw new AnalysisException(
s"A type of keys and values in map() must be string, but got ${m.dataType.catalogString}")
case _ =>
throw new AnalysisException("Must use a map() function for options")
}
}
Loading