diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index a9a39ceca8777..71dc7f1da5f52 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -49,7 +49,7 @@ license: | - In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but -0.0 and 0.0 are considered as different values when used in aggregate grouping keys, window partition keys and join keys. Since Spark 3.0, this bug is fixed. For example, `Seq(-0.0, 0.0).toDF("d").groupBy("d").count()` returns `[(0.0, 2)]` in Spark 3.0, and `[(0.0, 1), (-0.0, 1)]` in Spark 2.4 and earlier. - - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, new config `spark.sql.legacy.allowDuplicatedMapKeys` was added, with the default value `false`, Spark will throw RuntimeException while duplicated keys are found. If set to `true`, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be undefined. + - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, Spark will throw RuntimeException while duplicated keys are found. Users can set `spark.sql.mapKeyDedupPolicy` to LAST_WIN to deduplicate map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be undefined. - In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.sources.validatePartitionColumns` to `false`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala index 40e75b5b6cd25..0185b5743e446 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala @@ -21,7 +21,6 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY import org.apache.spark.sql.types._ import org.apache.spark.unsafe.array.ByteArrayMethods @@ -49,8 +48,7 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria private lazy val keyGetter = InternalRow.getAccessor(keyType) private lazy val valueGetter = InternalRow.getAccessor(valueType) - private val allowDuplicatedMapKey = - SQLConf.get.getConf(LEGACY_ALLOW_DUPLICATED_MAP_KEY) + private val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY) def put(key: Any, value: Any): Unit = { if (key == null) { @@ -67,13 +65,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria keys.append(key) values.append(value) } else { - if (!allowDuplicatedMapKey) { - throw new RuntimeException(s"Duplicate map key $key was founded, please check the input " + - "data. If you want to remove the duplicated keys with last-win policy, you can set " + - s"${LEGACY_ALLOW_DUPLICATED_MAP_KEY.key} to true.") + if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.EXCEPTION.toString) { + throw new RuntimeException(s"Duplicate map key $key was found, please check the input " + + "data. If you want to remove the duplicated keys, you can set " + + s"${SQLConf.MAP_KEY_DEDUP_POLICY.key} to ${SQLConf.MapKeyDedupPolicy.LAST_WIN} so that " + + "the key inserted at last takes precedence.") + } else if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { + // Overwrite the previous value, as the policy is last wins. + values(index) = value + } else { + throw new IllegalStateException("Unknown map key dedup policy: " + mapKeyDedupPolicy) } - // Overwrite the previous value, as the policy is last wins. - values(index) = value } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7f55f2272bfbf..c9bb8338aead7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2329,6 +2329,21 @@ object SQLConf { .stringConf .createOptional + object MapKeyDedupPolicy extends Enumeration { + val EXCEPTION, LAST_WIN = Value + } + + val MAP_KEY_DEDUP_POLICY = buildConf("spark.sql.mapKeyDedupPolicy") + .doc("The policy to deduplicate map keys in builtin function: CreateMap, MapFromArrays, " + + "MapFromEntries, StringToMap, MapConcat and TransformKeys. When EXCEPTION, the query " + + "fails if duplicated map keys are detected. When LAST_WIN, the map key that is inserted " + + "at last takes precedence.") + .version("3.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(MapKeyDedupPolicy.values.map(_.toString)) + .createWithDefault(MapKeyDedupPolicy.EXCEPTION.toString) + val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.doLooseUpcast") .internal() .doc("When true, the upcast will be loose and allows string to atomic types.") @@ -2435,17 +2450,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_ALLOW_DUPLICATED_MAP_KEY = - buildConf("spark.sql.legacy.allowDuplicatedMapKeys") - .doc("When true, use last wins policy to remove duplicated map keys in built-in functions, " + - "this config takes effect in below build-in functions: CreateMap, MapFromArrays, " + - "MapFromEntries, StringToMap, MapConcat and TransformKeys. Otherwise, if this is false, " + - "which is the default, Spark will throw an exception when duplicated map keys are " + - "detected.") - .version("") - .booleanConf - .createWithDefault(false) - val LEGACY_ALLOW_HASH_ON_MAPTYPE = buildConf("spark.sql.legacy.allowHashOnMapType") .doc("When set to true, hash expressions can be applied on elements of MapType. Otherwise, " + "an analysis exception will be thrown.") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala index 01df6675016d2..3cfc66f5cdb03 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala @@ -139,7 +139,9 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper MapType(IntegerType, IntegerType, valueContainsNull = true)) val mNull = Literal.create(null, MapType(StringType, StringType)) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + checkExceptionInExpression[RuntimeException]( + MapConcat(Seq(m0, m1)), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // overlapping maps should remove duplicated map keys w.r.t. last win policy. checkEvaluation(MapConcat(Seq(m0, m1)), create_map("a" -> "4", "b" -> "2", "c" -> "3")) } @@ -274,7 +276,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null)) checkEvaluation(MapFromEntries(ai2), Map.empty) checkEvaluation(MapFromEntries(ai3), null) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + + checkExceptionInExpression[RuntimeException]( + MapFromEntries(ai4), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation(MapFromEntries(ai4), create_map(1 -> 20)) } @@ -298,7 +303,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null)) checkEvaluation(MapFromEntries(as2), Map.empty) checkEvaluation(MapFromEntries(as3), null) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + + checkExceptionInExpression[RuntimeException]( + MapFromEntries(as4), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation(MapFromEntries(as4), create_map("a" -> "bb")) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala index 2c1e0c8460468..3df7d02fb6604 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala @@ -217,7 +217,9 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { CreateMap(interlace(strWithNull, intSeq.map(Literal(_)))), "Cannot use null as map key") - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + checkExceptionInExpression[RuntimeException]( + CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation( CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))), @@ -284,7 +286,12 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { MapFromArrays(intWithNullArray, strArray), "Cannot use null as map key") - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + checkExceptionInExpression[RuntimeException]( + MapFromArrays( + Literal.create(Seq(1, 1), ArrayType(IntegerType)), + Literal.create(Seq(2, 3), ArrayType(IntegerType))), + "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation( MapFromArrays( @@ -404,7 +411,9 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { val m5 = Map("a" -> null) checkEvaluation(new StringToMap(s5), m5) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + checkExceptionInExpression[RuntimeException]( + new StringToMap(Literal("a:1,b:2,a:3")), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation( new StringToMap(Literal("a:1,b:2,a:3")), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala index b3438538a3367..c07f06eac08ea 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala @@ -465,7 +465,10 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation( transformKeys(transformKeys(ai0, plusOne), plusValue), create_map(3 -> 1, 5 -> 2, 7 -> 3, 9 -> 4)) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + + checkExceptionInExpression[RuntimeException]( + transformKeys(ai0, modKey), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation(transformKeys(ai0, modKey), create_map(1 -> 4, 2 -> 2, 0 -> 3)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala index 87bbdb7300e4c..6e07cd5d6415d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala @@ -48,11 +48,11 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper { val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType) builder.put(1, 1) val e = intercept[RuntimeException](builder.put(1, 2)) - assert(e.getMessage.contains("Duplicate map key 1 was founded")) + assert(e.getMessage.contains("Duplicate map key 1 was found")) } test("remove duplicated keys with last wins policy") { - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType) builder.put(1, 1) builder.put(2, 2) @@ -63,8 +63,15 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper { } } - test("binary type key") { - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + test("binary type key with duplication") { + val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType) + builder.put(Array(1.toByte), 1) + builder.put(Array(2.toByte), 2) + val e = intercept[RuntimeException](builder.put(Array(1.toByte), 3)) + // By default duplicated map key fails the query. + assert(e.getMessage.contains("Duplicate map key")) + + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType) builder.put(Array(1.toByte), 1) builder.put(Array(2.toByte), 2) @@ -79,18 +86,26 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper { } } - test("struct type key") { - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + test("struct type key with duplication") { + val unsafeRow = { + val row = new UnsafeRow(1) + val bytes = new Array[Byte](16) + row.pointTo(bytes, 16) + row.setInt(0, 1) + row + } + + val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType) + builder.put(InternalRow(1), 1) + builder.put(InternalRow(2), 2) + val e = intercept[RuntimeException](builder.put(unsafeRow, 3)) + // By default duplicated map key fails the query. + assert(e.getMessage.contains("Duplicate map key")) + + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType) builder.put(InternalRow(1), 1) builder.put(InternalRow(2), 2) - val unsafeRow = { - val row = new UnsafeRow(1) - val bytes = new Array[Byte](16) - row.pointTo(bytes, 16) - row.setInt(0, 1) - row - } builder.put(unsafeRow, 3) val map = builder.build() assert(map.numElements() == 2) @@ -98,20 +113,28 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper { } } - test("array type key") { - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + test("array type key with duplication") { + val unsafeArray = { + val array = new UnsafeArrayData() + val bytes = new Array[Byte](24) + Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2) + array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24) + array.setInt(0, 1) + array.setInt(1, 1) + array + } + + val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType) + builder.put(new GenericArrayData(Seq(1, 1)), 1) + builder.put(new GenericArrayData(Seq(2, 2)), 2) + val e = intercept[RuntimeException](builder.put(unsafeArray, 3)) + // By default duplicated map key fails the query. + assert(e.getMessage.contains("Duplicate map key")) + + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType) builder.put(new GenericArrayData(Seq(1, 1)), 1) builder.put(new GenericArrayData(Seq(2, 2)), 2) - val unsafeArray = { - val array = new UnsafeArrayData() - val bytes = new Array[Byte](24) - Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2) - array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24) - array.setInt(0, 1) - array.setInt(1, 1) - array - } builder.put(unsafeArray, 3) val map = builder.build() assert(map.numElements() == 2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index b4b9a488b11c4..a613c33b6c876 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -23,6 +23,7 @@ import java.util.TimeZone import scala.util.Random +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback @@ -651,7 +652,9 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { Row(null) ) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + intercept[SparkException](df1.selectExpr("map_concat(map1, map2)").collect()) + intercept[SparkException](df1.select(map_concat($"map1", $"map2")).collect()) + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { checkAnswer(df1.selectExpr("map_concat(map1, map2)"), expected1a) checkAnswer(df1.select(map_concat($"map1", $"map2")), expected1a) } @@ -3070,7 +3073,13 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(dfExample2.select(transform_keys(col("j"), (k, v) => k + v)), Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7)))) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + intercept[SparkException] { + dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)").collect() + } + intercept[SparkException] { + dfExample3.select(transform_keys(col("x"), (k, v) => k % 2 === 0 || v)).collect() + } + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)"), Seq(Row(Map(true -> true, true -> false))))