From 417860dd61f2b7380b15dfa59f13c7e4e8f2c6db Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 7 Feb 2020 15:53:12 +0800 Subject: [PATCH 01/13] disable untyped udf --- docs/sql-migration-guide.md | 4 ++- .../org/apache/spark/ml/Transformer.scala | 5 ++-- .../org/apache/spark/ml/feature/LSH.scala | 11 ++++--- .../org/apache/spark/ml/fpm/FPGrowth.scala | 3 +- .../org/apache/spark/ml/feature/LSHTest.scala | 9 +++--- .../apache/spark/sql/internal/SQLConf.scala | 8 +++++ .../org/apache/spark/sql/functions.scala | 10 +++++++ .../scala/org/apache/spark/sql/UDFSuite.scala | 30 +++++++++++-------- 8 files changed, 51 insertions(+), 29 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index f98fab5b4c56..acb5f6ece709 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -63,8 +63,10 @@ license: | - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. - - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. + - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(AnyRef, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. + - Since Spark 3.0, using `org.apache.spark.sql.functions.udf(AnyRef, DataType)` is not allowed by default. Set `spark.sql.legacy.useUnTypedUdf.enabled` to true to keep use it. + - Since Spark 3.0, Proleptic Gregorian calendar is used in parsing, formatting, and converting dates and timestamps as well as in extracting sub-components like years, days and etc. Spark 3.0 uses Java 8 API classes from the java.time packages that based on ISO chronology (https://docs.oracle.com/javase/8/docs/api/java/time/chrono/IsoChronology.html). In Spark version 2.4 and earlier, those operations are performed by using the hybrid calendar (Julian + Gregorian, see https://docs.oracle.com/javase/7/docs/api/java/util/GregorianCalendar.html). The changes impact on the results for dates before October 15, 1582 (Gregorian) and affect on the following Spark 3.0 API: - Parsing/formatting of timestamp/date strings. This effects on CSV/JSON datasources and on the `unix_timestamp`, `date_format`, `to_unix_timestamp`, `from_unixtime`, `to_date`, `to_timestamp` functions when patterns specified by users is used for parsing and formatting. Since Spark 3.0, the conversions are based on `java.time.format.DateTimeFormatter`, see https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html. New implementation performs strict checking of its input. For example, the `2015-07-22 10:00:00` timestamp cannot be parse if pattern is `yyyy-MM-dd` because the parser does not consume whole input. Another example is the `31/01/2015 00:00` input cannot be parsed by the `dd/MM/yyyy hh:mm` pattern because `hh` supposes hours in the range `1-12`. In Spark version 2.4 and earlier, `java.text.SimpleDateFormat` is used for timestamp/date string conversions, and the supported patterns are described in https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html. The old behavior can be restored by setting `spark.sql.legacy.timeParser.enabled` to `true`. diff --git a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala index 7874fc29db6c..1652131a9003 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml import scala.annotation.varargs +import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.internal.Logging @@ -79,7 +80,7 @@ abstract class Transformer extends PipelineStage { * result as a new column. */ @DeveloperApi -abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]] +abstract class UnaryTransformer[IN: TypeTag, OUT: TypeTag, T <: UnaryTransformer[IN, OUT, T]] extends Transformer with HasInputCol with HasOutputCol with Logging { /** @group setParam */ @@ -118,7 +119,7 @@ abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]] override def transform(dataset: Dataset[_]): DataFrame = { val outputSchema = transformSchema(dataset.schema, logging = true) - val transformUDF = udf(this.createTransformFunc, outputDataType) + val transformUDF = udf(this.createTransformFunc) dataset.withColumn($(outputCol), transformUDF(dataset($(inputCol))), outputSchema($(outputCol)).metadata) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 01741019fb54..6d5c7c50dbac 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -98,7 +98,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - val transformUDF = udf(hashFunction(_: Vector), DataTypes.createArrayType(new VectorUDT)) + val transformUDF = udf(hashFunction(_: Vector)) dataset.withColumn($(outputCol), transformUDF(dataset($(inputCol)))) } @@ -128,14 +128,13 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] } // In the origin dataset, find the hash value that hash the same bucket with the key - val sameBucketWithKeyUDF = udf((x: Seq[Vector]) => - sameBucket(x, keyHash), DataTypes.BooleanType) + val sameBucketWithKeyUDF = udf((x: Seq[Vector]) => sameBucket(x, keyHash)) modelDataset.filter(sameBucketWithKeyUDF(col($(outputCol)))) } else { // In the origin dataset, find the hash value that is closest to the key // Limit the use of hashDist since it's controversial - val hashDistUDF = udf((x: Seq[Vector]) => hashDistance(x, keyHash), DataTypes.DoubleType) + val hashDistUDF = udf((x: Seq[Vector]) => hashDistance(x, keyHash)) val hashDistCol = hashDistUDF(col($(outputCol))) val modelDatasetWithDist = modelDataset.withColumn(distCol, hashDistCol) @@ -172,7 +171,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] } // Get the top k nearest neighbor by their distance to the key - val keyDistUDF = udf((x: Vector) => keyDistance(x, key), DataTypes.DoubleType) + val keyDistUDF = udf((x: Vector) => keyDistance(x, key)) val modelSubsetWithDistCol = modelSubset.withColumn(distCol, keyDistUDF(col($(inputCol)))) modelSubsetWithDistCol.sort(distCol).limit(numNearestNeighbors) } @@ -290,7 +289,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] .drop(explodeCols: _*).distinct() // Add a new column to store the distance of the two rows. - val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType) + val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y)) val joinedDatasetWithDist = joinedDataset.select(col("*"), distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol) ) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 4d001c159eda..db83ebc2c257 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -284,7 +284,6 @@ class FPGrowthModel private[ml] ( .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]] val brRules = dataset.sparkSession.sparkContext.broadcast(rules) - val dt = dataset.schema($(itemsCol)).dataType // For each rule, examine the input items and summarize the consequents val predictUDF = udf((items: Seq[Any]) => { if (items != null) { @@ -293,7 +292,7 @@ class FPGrowthModel private[ml] ( .flatMap(_._2.filter(!itemset.contains(_))).distinct } else { Seq.empty - }}, dt) + }}) dataset.withColumn($(predictionCol), predictUDF(col($(itemsCol)))) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala index 76a4acd798e3..1d052fbebd92 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala @@ -76,9 +76,8 @@ private[ml] object LSHTest { // Perform a cross join and label each pair of same_bucket and distance val pairs = transformedData.as("a").crossJoin(transformedData.as("b")) - val distUDF = udf((x: Vector, y: Vector) => model.keyDistance(x, y), DataTypes.DoubleType) - val sameBucket = udf((x: Seq[Vector], y: Seq[Vector]) => model.hashDistance(x, y) == 0.0, - DataTypes.BooleanType) + val distUDF = udf((x: Vector, y: Vector) => model.keyDistance(x, y)) + val sameBucket = udf((x: Seq[Vector], y: Seq[Vector]) => model.hashDistance(x, y) == 0.0) val result = pairs .withColumn("same_bucket", sameBucket(col(s"a.$outputCol"), col(s"b.$outputCol"))) .withColumn("distance", distUDF(col(s"a.$inputCol"), col(s"b.$inputCol"))) @@ -110,7 +109,7 @@ private[ml] object LSHTest { val model = lsh.fit(dataset) // Compute expected - val distUDF = udf((x: Vector) => model.keyDistance(x, key), DataTypes.DoubleType) + val distUDF = udf((x: Vector) => model.keyDistance(x, key)) val expected = dataset.sort(distUDF(col(model.getInputCol))).limit(k) // Compute actual @@ -148,7 +147,7 @@ private[ml] object LSHTest { val inputCol = model.getInputCol // Compute expected - val distUDF = udf((x: Vector, y: Vector) => model.keyDistance(x, y), DataTypes.DoubleType) + val distUDF = udf((x: Vector, y: Vector) => model.keyDistance(x, y)) val expected = datasetA.as("a").crossJoin(datasetB.as("b")) .filter(distUDF(col(s"a.$inputCol"), col(s"b.$inputCol")) < threshold) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d86f8693e065..42507115eec7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2016,6 +2016,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_USE_UNTYPED_UDF = + buildConf("spark.sql.legacy.useUnTypedUdf.enabled") + .internal() + .doc("When set to true, user is allowed to use org.apache.spark.sql.functions." + + "udf(f: AnyRef, dataType: DataType). Otherwise, exception will be throw.") + .booleanConf + .createWithDefault(false) + val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL = buildConf("spark.sql.truncateTable.ignorePermissionAcl.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index d125581857e0..fcb2eb56be1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -23,6 +23,7 @@ import scala.reflect.runtime.universe.{typeTag, TypeTag} import scala.util.Try import scala.util.control.NonFatal +import org.apache.spark.SparkException import org.apache.spark.annotation.Stable import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.ScalaReflection @@ -4732,6 +4733,15 @@ object functions { * @since 2.0.0 */ def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = { + if (!SQLConf.get.getConf(SQLConf.LEGACY_USE_UNTYPED_UDF)) { + val errorMsg = "You're using untyped udf, which does not have the input type information. " + + "So, Spark may blindly pass null to the Scala closure with primitive-type argument, " + + "and the closure will see the default value of the Java type for the null argument, " + + "e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. You could use " + + "other typed udf APIs to avoid this problem, or set " + + "spark.sql.legacy.useUnTypedUdf.enabled to true to insistently use this." + throw new SparkException(errorMsg) + } SparkUserDefinedFunction(f, dataType, inputSchemas = Nil) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index cc3995516dcc..5cf7dfb772ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -134,10 +134,12 @@ class UDFSuite extends QueryTest with SharedSparkSession { assert(df1.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic)) assert(df1.head().getDouble(0) >= 0.0) - val bar = udf(() => Math.random(), DataTypes.DoubleType).asNondeterministic() - val df2 = testData.select(bar()) - assert(df2.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic)) - assert(df2.head().getDouble(0) >= 0.0) + withSQLConf(SQLConf.LEGACY_USE_UNTYPED_UDF.key -> "true") { + val bar = udf(() => Math.random(), DataTypes.DoubleType).asNondeterministic() + val df2 = testData.select(bar()) + assert(df2.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic)) + assert(df2.head().getDouble(0) >= 0.0) + } val javaUdf = udf(new UDF0[Double] { override def call(): Double = Math.random() @@ -441,15 +443,17 @@ class UDFSuite extends QueryTest with SharedSparkSession { } test("SPARK-25044 Verify null input handling for primitive types - with udf(Any, DataType)") { - val f = udf((x: Int) => x, IntegerType) - checkAnswer( - Seq(Integer.valueOf(1), null).toDF("x").select(f($"x")), - Row(1) :: Row(0) :: Nil) - - val f2 = udf((x: Double) => x, DoubleType) - checkAnswer( - Seq(java.lang.Double.valueOf(1.1), null).toDF("x").select(f2($"x")), - Row(1.1) :: Row(0.0) :: Nil) + withSQLConf(SQLConf.LEGACY_USE_UNTYPED_UDF.key -> "true") { + val f = udf((x: Int) => x, IntegerType) + checkAnswer( + Seq(Integer.valueOf(1), null).toDF("x").select(f($"x")), + Row(1) :: Row(0) :: Nil) + + val f2 = udf((x: Double) => x, DoubleType) + checkAnswer( + Seq(java.lang.Double.valueOf(1.1), null).toDF("x").select(f2($"x")), + Row(1.1) :: Row(0.0) :: Nil) + } } From b50677f2a5d179d272158a55825ecc4b17d87c7d Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 7 Feb 2020 16:02:29 +0800 Subject: [PATCH 02/13] revert minor change --- docs/sql-migration-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index acb5f6ece709..27f19d69e8df 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -63,7 +63,7 @@ license: | - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. - - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(AnyRef, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. + - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. - Since Spark 3.0, using `org.apache.spark.sql.functions.udf(AnyRef, DataType)` is not allowed by default. Set `spark.sql.legacy.useUnTypedUdf.enabled` to true to keep use it. From 9b00afcad26830696ebdd0a99a5a62ffd279e618 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 7 Feb 2020 16:23:47 +0800 Subject: [PATCH 03/13] add test --- sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 5cf7dfb772ca..a47bd4bf3814 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.math.BigDecimal +import org.apache.spark.SparkException import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.plans.logical.Project @@ -457,6 +458,11 @@ class UDFSuite extends QueryTest with SharedSparkSession { } + test("use untyped UDF should fail by default") { + val e = intercept[SparkException](udf((x: Int) => x, IntegerType)) + assert(e.getMessage.contains("You're using untyped udf")) + } + test("SPARK-26308: udf with decimal") { val df1 = spark.createDataFrame( sparkContext.parallelize(Seq(Row(new BigDecimal("2011000000000002456556")))), From 1a74b4b1e398f7acb4b16fa5917311abc4c01317 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 7 Feb 2020 17:15:33 +0800 Subject: [PATCH 04/13] fix mima --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 65ffa228edde..2a72284c011f 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -74,6 +74,9 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.KMeans.getRuns"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.KMeans.setRuns"), + // [SPARK-26580][SQL][ML][FOLLOW-UP] Throw exception when use untyped UDF by default + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.UnaryTransformer.this"), + // [SPARK-27090][CORE] Removing old LEGACY_DRIVER_IDENTIFIER ("") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.LEGACY_DRIVER_IDENTIFIER"), // [SPARK-25838] Remove formatVersion from Saveable From 6eb3c4bc6c07d7b64599043077f9cfe06c45a8b1 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 10 Feb 2020 15:31:41 +0800 Subject: [PATCH 05/13] fix FPGrowthSuite --- .../scala/org/apache/spark/ml/fpm/FPGrowth.scala | 12 ++++++++---- .../spark/sql/expressions/UserDefinedFunction.scala | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index db83ebc2c257..94eb1c784766 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -29,10 +29,10 @@ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.HasPredictionCol import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented -import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, - FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, FPGrowth => MLlibFPGrowth} import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset import org.apache.spark.sql._ +import org.apache.spark.sql.expressions.SparkUserDefinedFunction import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -283,16 +283,20 @@ class FPGrowthModel private[ml] ( .rdd.map(r => (r.getSeq(0), r.getSeq(1))) .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]] val brRules = dataset.sparkSession.sparkContext.broadcast(rules) + val dt = associationRules.schema("antecedent").dataType // For each rule, examine the input items and summarize the consequents - val predictUDF = udf((items: Seq[Any]) => { + val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => { if (items != null) { val itemset = items.toSet brRules.value.filter(_._1.forall(itemset.contains)) .flatMap(_._2.filter(!itemset.contains(_))).distinct } else { Seq.empty - }}) + }}, + dt, + Nil + ) dataset.withColumn($(predictionCol), predictUDF(col($(itemsCol)))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala index 85b2cd379ba2..c50168cf7ac1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala @@ -90,7 +90,7 @@ sealed abstract class UserDefinedFunction { def asNondeterministic(): UserDefinedFunction } -private[sql] case class SparkUserDefinedFunction( +private[spark] case class SparkUserDefinedFunction( f: AnyRef, dataType: DataType, inputSchemas: Seq[Option[ScalaReflection.Schema]], From 5b957155138838e8042087a886dbdb00aec8bf7f Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 10 Feb 2020 15:38:40 +0800 Subject: [PATCH 06/13] rename config name --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 2 +- sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 42507115eec7..93352abd9d95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2016,8 +2016,8 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_USE_UNTYPED_UDF = - buildConf("spark.sql.legacy.useUnTypedUdf.enabled") + val LEGACY_ALLOW_UNTYPED_SCALA_UDF = + buildConf("spark.sql.legacy.allowUntypedScalaUDF") .internal() .doc("When set to true, user is allowed to use org.apache.spark.sql.functions." + "udf(f: AnyRef, dataType: DataType). Otherwise, exception will be throw.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index fcb2eb56be1c..94399acdc990 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -4733,7 +4733,7 @@ object functions { * @since 2.0.0 */ def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = { - if (!SQLConf.get.getConf(SQLConf.LEGACY_USE_UNTYPED_UDF)) { + if (!SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF)) { val errorMsg = "You're using untyped udf, which does not have the input type information. " + "So, Spark may blindly pass null to the Scala closure with primitive-type argument, " + "and the closure will see the default value of the Java type for the null argument, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index a47bd4bf3814..8b8c9f464eef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -135,7 +135,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { assert(df1.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic)) assert(df1.head().getDouble(0) >= 0.0) - withSQLConf(SQLConf.LEGACY_USE_UNTYPED_UDF.key -> "true") { + withSQLConf(SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key -> "true") { val bar = udf(() => Math.random(), DataTypes.DoubleType).asNondeterministic() val df2 = testData.select(bar()) assert(df2.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic)) @@ -444,7 +444,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { } test("SPARK-25044 Verify null input handling for primitive types - with udf(Any, DataType)") { - withSQLConf(SQLConf.LEGACY_USE_UNTYPED_UDF.key -> "true") { + withSQLConf(SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key -> "true") { val f = udf((x: Int) => x, IntegerType) checkAnswer( Seq(Integer.valueOf(1), null).toDF("x").select(f($"x")), From b329bb02640ceecebd6cab061e4314a32e798c42 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 10 Feb 2020 15:39:55 +0800 Subject: [PATCH 07/13] avoid hardcode --- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 94399acdc990..797a15e3d461 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -4739,7 +4739,7 @@ object functions { "and the closure will see the default value of the Java type for the null argument, " + "e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. You could use " + "other typed udf APIs to avoid this problem, or set " + - "spark.sql.legacy.useUnTypedUdf.enabled to true to insistently use this." + s"${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true to insistently use this." throw new SparkException(errorMsg) } SparkUserDefinedFunction(f, dataType, inputSchemas = Nil) From 7076293dc17a13614df94a2f4eb5b3b8187ade6f Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 10 Feb 2020 15:48:16 +0800 Subject: [PATCH 08/13] update migration guide --- docs/sql-migration-guide.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 27f19d69e8df..b18d6a7e5dc9 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -63,9 +63,7 @@ license: | - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. - - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. - - - Since Spark 3.0, using `org.apache.spark.sql.functions.udf(AnyRef, DataType)` is not allowed by default. Set `spark.sql.legacy.useUnTypedUdf.enabled` to true to keep use it. + - Since Spark 3.0, using `org.apache.spark.sql.functions.udf(AnyRef, DataType)` is not allowed by default. Set `spark.sql.legacy.allowUntypedScalaUDF` to true to keep use it. But please note that, in Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(AnyRef, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. However, since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. - Since Spark 3.0, Proleptic Gregorian calendar is used in parsing, formatting, and converting dates and timestamps as well as in extracting sub-components like years, days and etc. Spark 3.0 uses Java 8 API classes from the java.time packages that based on ISO chronology (https://docs.oracle.com/javase/8/docs/api/java/time/chrono/IsoChronology.html). In Spark version 2.4 and earlier, those operations are performed by using the hybrid calendar (Julian + Gregorian, see https://docs.oracle.com/javase/7/docs/api/java/util/GregorianCalendar.html). The changes impact on the results for dates before October 15, 1582 (Gregorian) and affect on the following Spark 3.0 API: From 00ed9c3e9e1d07b96e846373bebec95daa4b844e Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 11 Feb 2020 10:45:05 +0800 Subject: [PATCH 09/13] use -> using --- docs/sql-migration-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index b18d6a7e5dc9..3946d6c56dfc 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -63,7 +63,7 @@ license: | - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. - - Since Spark 3.0, using `org.apache.spark.sql.functions.udf(AnyRef, DataType)` is not allowed by default. Set `spark.sql.legacy.allowUntypedScalaUDF` to true to keep use it. But please note that, in Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(AnyRef, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. However, since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. + - Since Spark 3.0, using `org.apache.spark.sql.functions.udf(AnyRef, DataType)` is not allowed by default. Set `spark.sql.legacy.allowUntypedScalaUDF` to true to keep using it. But please note that, in Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(AnyRef, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. However, since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. - Since Spark 3.0, Proleptic Gregorian calendar is used in parsing, formatting, and converting dates and timestamps as well as in extracting sub-components like years, days and etc. Spark 3.0 uses Java 8 API classes from the java.time packages that based on ISO chronology (https://docs.oracle.com/javase/8/docs/api/java/time/chrono/IsoChronology.html). In Spark version 2.4 and earlier, those operations are performed by using the hybrid calendar (Julian + Gregorian, see https://docs.oracle.com/javase/7/docs/api/java/util/GregorianCalendar.html). The changes impact on the results for dates before October 15, 1582 (Gregorian) and affect on the following Spark 3.0 API: From a928232e81477dc7a6c37e29c2d92c2149678ac9 Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 11 Feb 2020 10:48:33 +0800 Subject: [PATCH 10/13] update error message --- .../scala/org/apache/spark/sql/functions.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 797a15e3d461..79f026fd88f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -23,7 +23,6 @@ import scala.reflect.runtime.universe.{typeTag, TypeTag} import scala.util.Try import scala.util.control.NonFatal -import org.apache.spark.SparkException import org.apache.spark.annotation.Stable import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.ScalaReflection @@ -4734,13 +4733,13 @@ object functions { */ def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = { if (!SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF)) { - val errorMsg = "You're using untyped udf, which does not have the input type information. " + - "So, Spark may blindly pass null to the Scala closure with primitive-type argument, " + - "and the closure will see the default value of the Java type for the null argument, " + - "e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. You could use " + - "other typed udf APIs to avoid this problem, or set " + - s"${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true to insistently use this." - throw new SparkException(errorMsg) + val errorMsg = "You're using untyped Scala UDF, which does not have the input type " + + "information. Spark may blindly pass null to the Scala closure with primitive-type " + + "argument, and the closure will see the default value of the Java type for the null " + + "argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. " + + "You could use other typed Scala UDF APIs to avoid this problem, or set " + + s"${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true and use this API with caution." + throw new AnalysisException(errorMsg) } SparkUserDefinedFunction(f, dataType, inputSchemas = Nil) } From 329797634c872247e9ae2e61d2acf6b3465ba9e7 Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 11 Feb 2020 10:50:28 +0800 Subject: [PATCH 11/13] reuse itemCols' datatype --- mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 94eb1c784766..e50d4255b1f3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -283,8 +283,8 @@ class FPGrowthModel private[ml] ( .rdd.map(r => (r.getSeq(0), r.getSeq(1))) .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]] val brRules = dataset.sparkSession.sparkContext.broadcast(rules) - val dt = associationRules.schema("antecedent").dataType + val dt = dataset.schema($(itemsCol)).dataType // For each rule, examine the input items and summarize the consequents val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => { if (items != null) { From 6969596ce7d1ce1b2962079629a1701345f349ec Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 11 Feb 2020 21:57:28 +0800 Subject: [PATCH 12/13] update test --- .../src/test/scala/org/apache/spark/sql/UDFSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 8b8c9f464eef..cbe2e91a20d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import java.math.BigDecimal -import org.apache.spark.SparkException import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.plans.logical.Project @@ -458,9 +457,9 @@ class UDFSuite extends QueryTest with SharedSparkSession { } - test("use untyped UDF should fail by default") { - val e = intercept[SparkException](udf((x: Int) => x, IntegerType)) - assert(e.getMessage.contains("You're using untyped udf")) + test("use untyped Scala UDF should fail by default") { + val e = intercept[AnalysisException](udf((x: Int) => x, IntegerType)) + assert(e.getMessage.contains("You're using untyped Scala UDF")) } test("SPARK-26308: udf with decimal") { From 0d1601bde2fc4ab39ed6b3c82e486e9c1174b915 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 20 Feb 2020 16:23:58 +0800 Subject: [PATCH 13/13] nit --- docs/sql-migration-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 3946d6c56dfc..0bcccc13aa57 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -64,7 +64,7 @@ license: | - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. - Since Spark 3.0, using `org.apache.spark.sql.functions.udf(AnyRef, DataType)` is not allowed by default. Set `spark.sql.legacy.allowUntypedScalaUDF` to true to keep using it. But please note that, in Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(AnyRef, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. However, since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default. - + - Since Spark 3.0, Proleptic Gregorian calendar is used in parsing, formatting, and converting dates and timestamps as well as in extracting sub-components like years, days and etc. Spark 3.0 uses Java 8 API classes from the java.time packages that based on ISO chronology (https://docs.oracle.com/javase/8/docs/api/java/time/chrono/IsoChronology.html). In Spark version 2.4 and earlier, those operations are performed by using the hybrid calendar (Julian + Gregorian, see https://docs.oracle.com/javase/7/docs/api/java/util/GregorianCalendar.html). The changes impact on the results for dates before October 15, 1582 (Gregorian) and affect on the following Spark 3.0 API: - Parsing/formatting of timestamp/date strings. This effects on CSV/JSON datasources and on the `unix_timestamp`, `date_format`, `to_unix_timestamp`, `from_unixtime`, `to_date`, `to_timestamp` functions when patterns specified by users is used for parsing and formatting. Since Spark 3.0, the conversions are based on `java.time.format.DateTimeFormatter`, see https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html. New implementation performs strict checking of its input. For example, the `2015-07-22 10:00:00` timestamp cannot be parse if pattern is `yyyy-MM-dd` because the parser does not consume whole input. Another example is the `31/01/2015 00:00` input cannot be parsed by the `dd/MM/yyyy hh:mm` pattern because `hh` supposes hours in the range `1-12`. In Spark version 2.4 and earlier, `java.text.SimpleDateFormat` is used for timestamp/date string conversions, and the supported patterns are described in https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html. The old behavior can be restored by setting `spark.sql.legacy.timeParser.enabled` to `true`.