Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ license: |

- In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but -0.0 and 0.0 are considered as different values when used in aggregate grouping keys, window partition keys and join keys. Since Spark 3.0, this bug is fixed. For example, `Seq(-0.0, 0.0).toDF("d").groupBy("d").count()` returns `[(0.0, 2)]` in Spark 3.0, and `[(0.0, 1), (-0.0, 1)]` in Spark 2.4 and earlier.

- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, new config `spark.sql.legacy.allowDuplicatedMapKeys` was added, with the default value `false`, Spark will throw RuntimeException while duplicated keys are found. If set to `true`, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be undefined.
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, Spark will throw RuntimeException while duplicated keys are found. Users can set `spark.sql.mapKeyDedupPolicy` to LAST_WIN to deduplicate map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be undefined.

- In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.sources.validatePartitionColumns` to `false`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.collection.mutable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.array.ByteArrayMethods

Expand Down Expand Up @@ -49,8 +48,7 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
private lazy val keyGetter = InternalRow.getAccessor(keyType)
private lazy val valueGetter = InternalRow.getAccessor(valueType)

private val allowDuplicatedMapKey =
SQLConf.get.getConf(LEGACY_ALLOW_DUPLICATED_MAP_KEY)
private val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY)

def put(key: Any, value: Any): Unit = {
if (key == null) {
Expand All @@ -67,13 +65,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
keys.append(key)
values.append(value)
} else {
if (!allowDuplicatedMapKey) {
throw new RuntimeException(s"Duplicate map key $key was founded, please check the input " +
"data. If you want to remove the duplicated keys with last-win policy, you can set " +
s"${LEGACY_ALLOW_DUPLICATED_MAP_KEY.key} to true.")
if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.EXCEPTION.toString) {
throw new RuntimeException(s"Duplicate map key $key was found, please check the input " +
"data. If you want to remove the duplicated keys, you can set " +
s"${SQLConf.MAP_KEY_DEDUP_POLICY.key} to ${SQLConf.MapKeyDedupPolicy.LAST_WIN} so that " +
"the key inserted at last takes precedence.")
} else if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// Overwrite the previous value, as the policy is last wins.
values(index) = value
} else {
throw new IllegalStateException("Unknown map key dedup policy: " + mapKeyDedupPolicy)
}
// Overwrite the previous value, as the policy is last wins.
values(index) = value
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,21 @@ object SQLConf {
.stringConf
.createOptional

object MapKeyDedupPolicy extends Enumeration {
val EXCEPTION, LAST_WIN = Value
}

val MAP_KEY_DEDUP_POLICY = buildConf("spark.sql.mapKeyDedupPolicy")
.doc("The policy to deduplicate map keys in builtin function: CreateMap, MapFromArrays, " +
"MapFromEntries, StringToMap, MapConcat and TransformKeys. When EXCEPTION, the query " +
"fails if duplicated map keys are detected. When LAST_WIN, the map key that is inserted " +
"at last takes precedence.")
.version("3.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(MapKeyDedupPolicy.values.map(_.toString))
.createWithDefault(MapKeyDedupPolicy.EXCEPTION.toString)

val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.doLooseUpcast")
.internal()
.doc("When true, the upcast will be loose and allows string to atomic types.")
Expand Down Expand Up @@ -2435,17 +2450,6 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_ALLOW_DUPLICATED_MAP_KEY =
Copy link
Member

@viirya viirya Mar 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update sql-migration-guide doc too. We already documented spark.sql.legacy.allowDuplicatedMapKeys there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1,
Also find another mistake I made in the migration guide, fixed in #27782, please take a look.

buildConf("spark.sql.legacy.allowDuplicatedMapKeys")
.doc("When true, use last wins policy to remove duplicated map keys in built-in functions, " +
"this config takes effect in below build-in functions: CreateMap, MapFromArrays, " +
"MapFromEntries, StringToMap, MapConcat and TransformKeys. Otherwise, if this is false, " +
"which is the default, Spark will throw an exception when duplicated map keys are " +
"detected.")
.version("")
.booleanConf
.createWithDefault(false)

val LEGACY_ALLOW_HASH_ON_MAPTYPE = buildConf("spark.sql.legacy.allowHashOnMapType")
.doc("When set to true, hash expressions can be applied on elements of MapType. Otherwise, " +
"an analysis exception will be thrown.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
MapType(IntegerType, IntegerType, valueContainsNull = true))
val mNull = Literal.create(null, MapType(StringType, StringType))

withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
checkExceptionInExpression[RuntimeException](
MapConcat(Seq(m0, m1)), "Duplicate map key")
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// overlapping maps should remove duplicated map keys w.r.t. last win policy.
checkEvaluation(MapConcat(Seq(m0, m1)), create_map("a" -> "4", "b" -> "2", "c" -> "3"))
}
Expand Down Expand Up @@ -274,7 +276,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null))
checkEvaluation(MapFromEntries(ai2), Map.empty)
checkEvaluation(MapFromEntries(ai3), null)
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {

checkExceptionInExpression[RuntimeException](
MapFromEntries(ai4), "Duplicate map key")
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// Duplicated map keys will be removed w.r.t. the last wins policy.
checkEvaluation(MapFromEntries(ai4), create_map(1 -> 20))
}
Expand All @@ -298,7 +303,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null))
checkEvaluation(MapFromEntries(as2), Map.empty)
checkEvaluation(MapFromEntries(as3), null)
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {

checkExceptionInExpression[RuntimeException](
MapFromEntries(as4), "Duplicate map key")
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// Duplicated map keys will be removed w.r.t. the last wins policy.
checkEvaluation(MapFromEntries(as4), create_map("a" -> "bb"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
CreateMap(interlace(strWithNull, intSeq.map(Literal(_)))),
"Cannot use null as map key")

withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
checkExceptionInExpression[RuntimeException](
CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))), "Duplicate map key")
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// Duplicated map keys will be removed w.r.t. the last wins policy.
checkEvaluation(
CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))),
Expand Down Expand Up @@ -284,7 +286,12 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
MapFromArrays(intWithNullArray, strArray),
"Cannot use null as map key")

withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
checkExceptionInExpression[RuntimeException](
MapFromArrays(
Literal.create(Seq(1, 1), ArrayType(IntegerType)),
Literal.create(Seq(2, 3), ArrayType(IntegerType))),
"Duplicate map key")
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// Duplicated map keys will be removed w.r.t. the last wins policy.
checkEvaluation(
MapFromArrays(
Expand Down Expand Up @@ -404,7 +411,9 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
val m5 = Map("a" -> null)
checkEvaluation(new StringToMap(s5), m5)

withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
checkExceptionInExpression[RuntimeException](
new StringToMap(Literal("a:1,b:2,a:3")), "Duplicate map key")
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// Duplicated map keys will be removed w.r.t. the last wins policy.
checkEvaluation(
new StringToMap(Literal("a:1,b:2,a:3")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,10 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper
checkEvaluation(
transformKeys(transformKeys(ai0, plusOne), plusValue),
create_map(3 -> 1, 5 -> 2, 7 -> 3, 9 -> 4))
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {

checkExceptionInExpression[RuntimeException](
transformKeys(ai0, modKey), "Duplicate map key")
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// Duplicated map keys will be removed w.r.t. the last wins policy.
checkEvaluation(transformKeys(ai0, modKey), create_map(1 -> 4, 2 -> 2, 0 -> 3))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper {
val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
builder.put(1, 1)
val e = intercept[RuntimeException](builder.put(1, 2))
assert(e.getMessage.contains("Duplicate map key 1 was founded"))
assert(e.getMessage.contains("Duplicate map key 1 was found"))
}

test("remove duplicated keys with last wins policy") {
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
builder.put(1, 1)
builder.put(2, 2)
Expand All @@ -63,8 +63,15 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper {
}
}

test("binary type key") {
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
test("binary type key with duplication") {
val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType)
builder.put(Array(1.toByte), 1)
builder.put(Array(2.toByte), 2)
val e = intercept[RuntimeException](builder.put(Array(1.toByte), 3))
// By default duplicated map key fails the query.
assert(e.getMessage.contains("Duplicate map key"))

withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType)
builder.put(Array(1.toByte), 1)
builder.put(Array(2.toByte), 2)
Expand All @@ -79,39 +86,55 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper {
}
}

test("struct type key") {
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
test("struct type key with duplication") {
val unsafeRow = {
val row = new UnsafeRow(1)
val bytes = new Array[Byte](16)
row.pointTo(bytes, 16)
row.setInt(0, 1)
row
}

val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType)
builder.put(InternalRow(1), 1)
builder.put(InternalRow(2), 2)
val e = intercept[RuntimeException](builder.put(unsafeRow, 3))
// By default duplicated map key fails the query.
assert(e.getMessage.contains("Duplicate map key"))

withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType)
builder.put(InternalRow(1), 1)
builder.put(InternalRow(2), 2)
val unsafeRow = {
val row = new UnsafeRow(1)
val bytes = new Array[Byte](16)
row.pointTo(bytes, 16)
row.setInt(0, 1)
row
}
builder.put(unsafeRow, 3)
val map = builder.build()
assert(map.numElements() == 2)
assert(ArrayBasedMapData.toScalaMap(map) == Map(InternalRow(1) -> 3, InternalRow(2) -> 2))
}
}

test("array type key") {
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
test("array type key with duplication") {
val unsafeArray = {
val array = new UnsafeArrayData()
val bytes = new Array[Byte](24)
Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
array.setInt(0, 1)
array.setInt(1, 1)
array
}

val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType)
builder.put(new GenericArrayData(Seq(1, 1)), 1)
builder.put(new GenericArrayData(Seq(2, 2)), 2)
val e = intercept[RuntimeException](builder.put(unsafeArray, 3))
// By default duplicated map key fails the query.
assert(e.getMessage.contains("Duplicate map key"))

withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType)
builder.put(new GenericArrayData(Seq(1, 1)), 1)
builder.put(new GenericArrayData(Seq(2, 2)), 2)
val unsafeArray = {
val array = new UnsafeArrayData()
val bytes = new Array[Byte](24)
Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
array.setInt(0, 1)
array.setInt(1, 1)
array
}
builder.put(unsafeArray, 3)
val map = builder.build()
assert(map.numElements() == 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.TimeZone

import scala.util.Random

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
Expand Down Expand Up @@ -651,7 +652,9 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
Row(null)
)

withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
intercept[SparkException](df1.selectExpr("map_concat(map1, map2)").collect())
intercept[SparkException](df1.select(map_concat($"map1", $"map2")).collect())
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
checkAnswer(df1.selectExpr("map_concat(map1, map2)"), expected1a)
checkAnswer(df1.select(map_concat($"map1", $"map2")), expected1a)
}
Expand Down Expand Up @@ -3070,7 +3073,13 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(dfExample2.select(transform_keys(col("j"), (k, v) => k + v)),
Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7))))

withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
intercept[SparkException] {
dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)").collect()
}
intercept[SparkException] {
dfExample3.select(transform_keys(col("x"), (k, v) => k % 2 === 0 || v)).collect()
}
withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)"),
Seq(Row(Map(true -> true, true -> false))))

Expand Down