Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ license: |

- In Spark 3.1, the Parquet, ORC, Avro and JSON datasources throw the exception `org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema` in read if they detect duplicate names in top-level columns as well in nested structures. The datasources take into account the SQL config `spark.sql.caseSensitive` while detecting column name duplicates.

- In Spark 3.1, structs and maps are wrapped by the `{}` brackets in casting them to strings. For instance, the `show()` action and the `CAST` expression use such brackets. In Spark 3.0 and earlier, the `[]` brackets are used for the same purpose. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.castComplexTypesToString.enabled` to `true`.

## Upgrading from Spark SQL 3.0 to 3.0.1

- In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference.
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/ml/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,14 @@ class Summarizer(object):
+-----------------------------------+
|aggregate_metrics(features, weight)|
+-----------------------------------+
|[[1.0,1.0,1.0], 1] |
|{[1.0,1.0,1.0], 1} |
+-----------------------------------+
<BLANKLINE>
>>> df.select(summarizer.summary(df.features)).show(truncate=False)
+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|[[1.0,1.5,2.0], 2] |
|{[1.0,1.5,2.0], 2} |
+--------------------------------+
<BLANKLINE>
>>> df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)
Expand Down
28 changes: 14 additions & 14 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1996,7 +1996,7 @@ def map_from_arrays(col1, col2):
+----------------+
| map|
+----------------+
|[2 -> a, 5 -> b]|
|{2 -> a, 5 -> b}|
+----------------+
"""
sc = SparkContext._active_spark_context
Expand Down Expand Up @@ -2316,9 +2316,9 @@ def explode_outer(col):
+---+----------+----+
| id| a_map| col|
+---+----------+----+
| 1|[x -> 1.0]| foo|
| 1|[x -> 1.0]| bar|
| 2| []|null|
| 1|{x -> 1.0}| foo|
| 1|{x -> 1.0}| bar|
| 2| {}|null|
| 3| null|null|
+---+----------+----+
"""
Expand Down Expand Up @@ -2351,9 +2351,9 @@ def posexplode_outer(col):
+---+----------+----+----+
| id| a_map| pos| col|
+---+----------+----+----+
| 1|[x -> 1.0]| 0| foo|
| 1|[x -> 1.0]| 1| bar|
| 2| []|null|null|
| 1|{x -> 1.0}| 0| foo|
| 1|{x -> 1.0}| 1| bar|
| 2| {}|null|null|
| 3| null|null|null|
+---+----------+----+----+
"""
Expand Down Expand Up @@ -2750,7 +2750,7 @@ def map_entries(col):
+----------------+
| entries|
+----------------+
|[[1, a], [2, b]]|
|[{1, a}, {2, b}]|
+----------------+
"""
sc = SparkContext._active_spark_context
Expand All @@ -2770,7 +2770,7 @@ def map_from_entries(col):
+----------------+
| map|
+----------------+
|[1 -> a, 2 -> b]|
|{1 -> a, 2 -> b}|
+----------------+
"""
sc = SparkContext._active_spark_context
Expand Down Expand Up @@ -2822,7 +2822,7 @@ def map_concat(*cols):
+------------------------+
|map3 |
+------------------------+
|[1 -> a, 2 -> b, 3 -> c]|
|{1 -> a, 2 -> b, 3 -> c}|
+------------------------+
"""
sc = SparkContext._active_spark_context
Expand Down Expand Up @@ -3241,7 +3241,7 @@ def transform_keys(col, f):
+-------------------------+
|data_upper |
+-------------------------+
|[BAR -> 2.0, FOO -> -2.0]|
|{BAR -> 2.0, FOO -> -2.0}|
+-------------------------+
"""
return _invoke_higher_order_function("TransformKeys", [col], [f])
Expand All @@ -3268,7 +3268,7 @@ def transform_values(col, f):
+---------------------------------------+
|new_data |
+---------------------------------------+
|[OPS -> 34.0, IT -> 20.0, SALES -> 2.0]|
|{OPS -> 34.0, IT -> 20.0, SALES -> 2.0}|
+---------------------------------------+
"""
return _invoke_higher_order_function("TransformValues", [col], [f])
Expand All @@ -3294,7 +3294,7 @@ def map_filter(col, f):
+--------------------------+
|data_filtered |
+--------------------------+
|[baz -> 32.0, foo -> 42.0]|
|{baz -> 32.0, foo -> 42.0}|
+--------------------------+
"""
return _invoke_higher_order_function("MapFilter", [col], [f])
Expand Down Expand Up @@ -3324,7 +3324,7 @@ def map_zip_with(col1, col2, f):
+---------------------------+
|updated_data |
+---------------------------+
|[SALES -> 16.8, IT -> 48.0]|
|{SALES -> 16.8, IT -> 48.0}|
+---------------------------+
"""
return _invoke_higher_order_function("MapZipWith", [col1, col2], [f])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
private lazy val dateFormatter = DateFormatter(zoneId)
private lazy val timestampFormatter = TimestampFormatter.getFractionFormatter(zoneId)

// The brackets that are used in casting structs and maps to strings
private val (leftBracket, rightBracket) =
if (SQLConf.get.getConf(SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING)) ("[", "]") else ("{", "}")

// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case CalendarIntervalType =>
Expand Down Expand Up @@ -330,7 +334,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
case MapType(kt, vt, _) =>
buildCast[MapData](_, map => {
val builder = new UTF8StringBuilder
builder.append("[")
builder.append(leftBracket)
if (map.numElements > 0) {
val keyArray = map.keyArray()
val valueArray = map.valueArray()
Expand All @@ -355,13 +359,13 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
i += 1
}
}
builder.append("]")
builder.append(rightBracket)
builder.build()
})
case StructType(fields) =>
buildCast[InternalRow](_, row => {
val builder = new UTF8StringBuilder
builder.append("[")
builder.append(leftBracket)
if (row.numFields > 0) {
val st = fields.map(_.dataType)
val toUTF8StringFuncs = st.map(castToString)
Expand All @@ -378,7 +382,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
i += 1
}
}
builder.append("]")
builder.append(rightBracket)
builder.build()
})
case pudt: PythonUserDefinedType => castToString(pudt.sqlType)
Expand Down Expand Up @@ -962,7 +966,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
val getMapKeyArray = CodeGenerator.getValue(mapKeyArray, kt, loopIndex)
val getMapValueArray = CodeGenerator.getValue(mapValueArray, vt, loopIndex)
code"""
|$buffer.append("[");
|$buffer.append("$leftBracket");
|if ($map.numElements() > 0) {
| $buffer.append($keyToStringFunc($getMapFirstKey));
| $buffer.append(" ->");
Expand All @@ -980,7 +984,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
| }
| }
|}
|$buffer.append("]");
|$buffer.append("$rightBracket");
""".stripMargin
}

Expand Down Expand Up @@ -1015,9 +1019,9 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
(classOf[UTF8StringBuilder].getName, buffer.code) :: Nil)

code"""
|$buffer.append("[");
|$buffer.append("$leftBracket");
|$writeStructCode
|$buffer.append("]");
|$buffer.append("$rightBracket");
""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2690,6 +2690,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val LEGACY_COMPLEX_TYPES_TO_STRING =
buildConf("spark.sql.legacy.castComplexTypesToString.enabled")
.internal()
.doc("When true, maps and structs are wrapped by [] in casting to strings. " +
"Otherwise, if this is false, which is the default, maps and structs are wrapped by {}.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,47 +712,59 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
}

test("SPARK-22973 Cast map to string") {
val ret1 = cast(Literal.create(Map(1 -> "a", 2 -> "b", 3 -> "c")), StringType)
checkEvaluation(ret1, "[1 -> a, 2 -> b, 3 -> c]")
val ret2 = cast(
Literal.create(Map("1" -> "a".getBytes, "2" -> null, "3" -> "c".getBytes)),
StringType)
checkEvaluation(ret2, "[1 -> a, 2 ->, 3 -> c]")
val ret3 = cast(
Literal.create(Map(
1 -> Date.valueOf("2014-12-03"),
2 -> Date.valueOf("2014-12-04"),
3 -> Date.valueOf("2014-12-05"))),
StringType)
checkEvaluation(ret3, "[1 -> 2014-12-03, 2 -> 2014-12-04, 3 -> 2014-12-05]")
val ret4 = cast(
Literal.create(Map(
1 -> Timestamp.valueOf("2014-12-03 13:01:00"),
2 -> Timestamp.valueOf("2014-12-04 15:05:00"))),
StringType)
checkEvaluation(ret4, "[1 -> 2014-12-03 13:01:00, 2 -> 2014-12-04 15:05:00]")
val ret5 = cast(
Literal.create(Map(
1 -> Array(1, 2, 3),
2 -> Array(4, 5, 6))),
StringType)
checkEvaluation(ret5, "[1 -> [1, 2, 3], 2 -> [4, 5, 6]]")
Seq(
"false" -> ("{", "}"),
"true" -> ("[", "]")).foreach { case (legacyBrackets, (lb, rb)) =>
withSQLConf(SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING.key -> legacyBrackets) {
val ret1 = cast(Literal.create(Map(1 -> "a", 2 -> "b", 3 -> "c")), StringType)
checkEvaluation(ret1, s"${lb}1 -> a, 2 -> b, 3 -> c$rb")
val ret2 = cast(
Literal.create(Map("1" -> "a".getBytes, "2" -> null, "3" -> "c".getBytes)),
StringType)
checkEvaluation(ret2, s"${lb}1 -> a, 2 ->, 3 -> c$rb")
val ret3 = cast(
Literal.create(Map(
1 -> Date.valueOf("2014-12-03"),
2 -> Date.valueOf("2014-12-04"),
3 -> Date.valueOf("2014-12-05"))),
StringType)
checkEvaluation(ret3, s"${lb}1 -> 2014-12-03, 2 -> 2014-12-04, 3 -> 2014-12-05$rb")
val ret4 = cast(
Literal.create(Map(
1 -> Timestamp.valueOf("2014-12-03 13:01:00"),
2 -> Timestamp.valueOf("2014-12-04 15:05:00"))),
StringType)
checkEvaluation(ret4, s"${lb}1 -> 2014-12-03 13:01:00, 2 -> 2014-12-04 15:05:00$rb")
val ret5 = cast(
Literal.create(Map(
1 -> Array(1, 2, 3),
2 -> Array(4, 5, 6))),
StringType)
checkEvaluation(ret5, s"${lb}1 -> [1, 2, 3], 2 -> [4, 5, 6]$rb")
}
}
}

test("SPARK-22981 Cast struct to string") {
val ret1 = cast(Literal.create((1, "a", 0.1)), StringType)
checkEvaluation(ret1, "[1, a, 0.1]")
val ret2 = cast(Literal.create(Tuple3[Int, String, String](1, null, "a")), StringType)
checkEvaluation(ret2, "[1,, a]")
val ret3 = cast(Literal.create(
(Date.valueOf("2014-12-03"), Timestamp.valueOf("2014-12-03 15:05:00"))), StringType)
checkEvaluation(ret3, "[2014-12-03, 2014-12-03 15:05:00]")
val ret4 = cast(Literal.create(((1, "a"), 5, 0.1)), StringType)
checkEvaluation(ret4, "[[1, a], 5, 0.1]")
val ret5 = cast(Literal.create((Seq(1, 2, 3), "a", 0.1)), StringType)
checkEvaluation(ret5, "[[1, 2, 3], a, 0.1]")
val ret6 = cast(Literal.create((1, Map(1 -> "a", 2 -> "b", 3 -> "c"))), StringType)
checkEvaluation(ret6, "[1, [1 -> a, 2 -> b, 3 -> c]]")
Seq(
"false" -> ("{", "}"),
"true" -> ("[", "]")).foreach { case (legacyBrackets, (lb, rb)) =>
withSQLConf(SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING.key -> legacyBrackets) {
val ret1 = cast(Literal.create((1, "a", 0.1)), StringType)
checkEvaluation(ret1, s"${lb}1, a, 0.1$rb")
val ret2 = cast(Literal.create(Tuple3[Int, String, String](1, null, "a")), StringType)
checkEvaluation(ret2, s"${lb}1,, a$rb")
val ret3 = cast(Literal.create(
(Date.valueOf("2014-12-03"), Timestamp.valueOf("2014-12-03 15:05:00"))), StringType)
checkEvaluation(ret3, s"${lb}2014-12-03, 2014-12-03 15:05:00$rb")
val ret4 = cast(Literal.create(((1, "a"), 5, 0.1)), StringType)
checkEvaluation(ret4, s"$lb${lb}1, a$rb, 5, 0.1$rb")
val ret5 = cast(Literal.create((Seq(1, 2, 3), "a", 0.1)), StringType)
checkEvaluation(ret5, s"$lb[1, 2, 3], a, 0.1$rb")
val ret6 = cast(Literal.create((1, Map(1 -> "a", 2 -> "b", 3 -> "c"))), StringType)
checkEvaluation(ret6, s"${lb}1, ${lb}1 -> a, 2 -> b, 3 -> c$rb$rb")
}
}
}

test("up-cast") {
Expand Down
10 changes: 5 additions & 5 deletions sql/core/src/test/resources/sql-tests/results/pivot.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ PIVOT (
FOR (course, year) IN (('dotNET', 2012), ('Java', 2013))
)
-- !query schema
struct<s:int,[dotNET, 2012]:bigint,[Java, 2013]:bigint>
struct<s:int,{dotNET, 2012}:bigint,{Java, 2013}:bigint>
-- !query output
1 15000 NULL
2 NULL 30000
Expand Down Expand Up @@ -370,7 +370,7 @@ PIVOT (
FOR (y, course) IN ((2012, 'dotNET'), (2013, 'Java'))
)
-- !query schema
struct<year:int,[2012, dotNET]:array<int>,[2013, Java]:array<int>>
struct<year:int,{2012, dotNET}:array<int>,{2013, Java}:array<int>>
-- !query output
2012 [1,1] NULL
2013 NULL [2,2]
Expand Down Expand Up @@ -404,7 +404,7 @@ PIVOT (
FOR (course, a) IN (('dotNET', array(1, 1)), ('Java', array(2, 2)))
)
-- !query schema
struct<year:int,[dotNET, [1, 1]]:bigint,[Java, [2, 2]]:bigint>
struct<year:int,{dotNET, [1, 1]}:bigint,{Java, [2, 2]}:bigint>
-- !query output
2012 15000 NULL
2013 NULL 30000
Expand All @@ -421,7 +421,7 @@ PIVOT (
FOR s IN ((1, 'a'), (2, 'b'))
)
-- !query schema
struct<year:int,[1, a]:bigint,[2, b]:bigint>
struct<year:int,{1, a}:bigint,{2, b}:bigint>
-- !query output
2012 35000 NULL
2013 NULL 78000
Expand All @@ -438,7 +438,7 @@ PIVOT (
FOR (course, s) IN (('dotNET', (1, 'a')), ('Java', (2, 'b')))
)
-- !query schema
struct<year:int,[dotNET, [1, a]]:bigint,[Java, [2, b]]:bigint>
struct<year:int,{dotNET, {1, a}}:bigint,{Java, {2, b}}:bigint>
-- !query output
2012 15000 NULL
2013 NULL 30000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ PIVOT (
FOR (course, year) IN (('dotNET', 2012), ('Java', 2013))
)
-- !query schema
struct<s:int,[dotNET, 2012]:bigint,[Java, 2013]:bigint>
struct<s:int,{dotNET, 2012}:bigint,{Java, 2013}:bigint>
-- !query output
1 15000 NULL
2 NULL 30000
Expand Down Expand Up @@ -370,7 +370,7 @@ PIVOT (
FOR (course, a) IN (('dotNET', array(1, 1)), ('Java', array(2, 2)))
)
-- !query schema
struct<year:int,[dotNET, [1, 1]]:bigint,[Java, [2, 2]]:bigint>
struct<year:int,{dotNET, [1, 1]}:bigint,{Java, [2, 2]}:bigint>
-- !query output
2012 15000 NULL
2013 NULL 30000
Expand All @@ -387,7 +387,7 @@ PIVOT (
FOR s IN ((1, 'a'), (2, 'b'))
)
-- !query schema
struct<year:int,[1, a]:bigint,[2, b]:bigint>
struct<year:int,{1, a}:bigint,{2, b}:bigint>
-- !query output
2012 35000 NULL
2013 NULL 78000
Expand All @@ -404,7 +404,7 @@ PIVOT (
FOR (course, s) IN (('dotNET', (1, 'a')), ('Java', (2, 'b')))
)
-- !query schema
struct<year:int,[dotNET, [1, a]]:bigint,[Java, [2, b]]:bigint>
struct<year:int,{dotNET, {1, a}}:bigint,{Java, {2, b}}:bigint>
-- !query output
2012 15000 NULL
2013 NULL 30000
Expand Down
Loading