From 734dfbfcfea1ed1ab3a5f18f84c412a569dd87e7 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 10 Apr 2017 20:41:08 -0700 Subject: [PATCH 1/9] [SPARK-17564][TESTS] Fix flaky RequestTimeoutIntegrationSuite.furtherRequestsDelay ## What changes were proposed in this pull request? This PR fixs the following failure: ``` sbt.ForkMain$ForkError: java.lang.AssertionError: null at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertTrue(Assert.java:52) at org.apache.spark.network.RequestTimeoutIntegrationSuite.furtherRequestsDelay(RequestTimeoutIntegrationSuite.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at com.novocode.junit.JUnitRunner$1.execute(JUnitRunner.java:132) at sbt.ForkMain$Run$2.call(ForkMain.java:296) at sbt.ForkMain$Run$2.call(ForkMain.java:286) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` It happens several times per month on [Jenkins](http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.network.RequestTimeoutIntegrationSuite&test_name=furtherRequestsDelay). The failure is because `callback1` may not be called before `assertTrue(callback1.failure instanceof IOException);`. It's pretty easy to reproduce this error by adding a sleep before this line: https://github.com/apache/spark/blob/379b0b0bbdbba2278ce3bcf471bd75f6ffd9cf0d/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java#L267 The fix is straightforward: just use the latch to wait until `callback1` is called. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #17599 from zsxwing/SPARK-17564. --- .../apache/spark/network/RequestTimeoutIntegrationSuite.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java index 9aa17e24b6246..c0724e018263f 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java @@ -225,6 +225,8 @@ public StreamManager getStreamManager() { callback0.latch.await(60, TimeUnit.SECONDS); assertTrue(callback0.failure instanceof IOException); + // make sure callback1 is called. + callback1.latch.await(60, TimeUnit.SECONDS); // failed at same time as previous assertTrue(callback1.failure instanceof IOException); } From 0d2b796427a59d3e9967b62618be301307f29162 Mon Sep 17 00:00:00 2001 From: Benjamin Fradet Date: Tue, 11 Apr 2017 09:12:49 +0200 Subject: [PATCH 2/9] [SPARK-20097][ML] Fix visibility discrepancy with numInstances and degreesOfFreedom in LR and GLR ## What changes were proposed in this pull request? - made `numInstances` public in GLR - made `degreesOfFreedom` public in LR ## How was this patch tested? reran the concerned test suites Author: Benjamin Fradet Closes #17431 from BenFradet/SPARK-20097. --- .../spark/ml/regression/GeneralizedLinearRegression.scala | 3 ++- .../org/apache/spark/ml/regression/LinearRegression.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 33137b0c0fdec..d6093a01c671c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -1133,7 +1133,8 @@ class GeneralizedLinearRegressionSummary private[regression] ( private[regression] lazy val link: Link = familyLink.link /** Number of instances in DataFrame predictions. */ - private[regression] lazy val numInstances: Long = predictions.count() + @Since("2.2.0") + lazy val numInstances: Long = predictions.count() /** The numeric rank of the fitted linear model. */ @Since("2.0.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 45df1d9be647d..f7e3c8fa5b6e6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -696,7 +696,8 @@ class LinearRegressionSummary private[regression] ( lazy val numInstances: Long = predictions.count() /** Degrees of freedom */ - private val degreesOfFreedom: Long = if (privateModel.getFitIntercept) { + @Since("2.2.0") + val degreesOfFreedom: Long = if (privateModel.getFitIntercept) { numInstances - privateModel.coefficients.size - 1 } else { numInstances - privateModel.coefficients.size From d11ef3d77ec2136d6b28bd69f5dd2cc0a22e4717 Mon Sep 17 00:00:00 2001 From: MirrorZ Date: Tue, 11 Apr 2017 10:34:39 +0100 Subject: [PATCH 3/9] Document Master URL format in high availability set up ## What changes were proposed in this pull request? Add documentation for adding master url in multi host, port format for standalone cluster with high availability with zookeeper. Referring documentation [Standby Masters with ZooKeeper](http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper) ## How was this patch tested? Documenting the functionality already present. Author: MirrorZ Closes #17584 from MirrorZ/master. --- docs/submitting-applications.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index d23dbcf10d952..866d6e527549c 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -143,6 +143,9 @@ The master URL passed to Spark can be in one of the following formats: spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. + spark://HOST1:PORT1,HOST2:PORT2 Connect to the given Spark standalone + cluster with standby masters with Zookeeper. The list must have all the master hosts in the high availability cluster set up with Zookeeper. The port must be whichever each master is configured to use, which is 7077 by default. + mesos://HOST:PORT Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://.... From c8706980ae07362ae5963829e9ada5007eada46b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 11 Apr 2017 20:21:04 +0800 Subject: [PATCH 4/9] [SPARK-20274][SQL] support compatible array element type in encoder ## What changes were proposed in this pull request? This is a regression caused by SPARK-19716. Before SPARK-19716, we will cast an array field to the expected array type. However, after SPARK-19716, the cast is removed, but we forgot to push the cast to the element level. ## How was this patch tested? new regression tests Author: Wenchen Fan Closes #17587 from cloud-fan/array. --- .../spark/sql/catalyst/ScalaReflection.scala | 18 +++++++++------ .../sql/catalyst/analysis/Analyzer.scala | 8 +++++-- .../encoders/EncoderResolutionSuite.scala | 23 +++++++++++++++++++ 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 198122759e4ad..0c5a818f54f5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -132,7 +132,7 @@ object ScalaReflection extends ScalaReflection { def deserializerFor[T : TypeTag]: Expression = { val tpe = localTypeOf[T] val clsName = getClassNameFromType(tpe) - val walkedTypePath = s"""- root class: "${clsName}"""" :: Nil + val walkedTypePath = s"""- root class: "$clsName"""" :: Nil deserializerFor(tpe, None, walkedTypePath) } @@ -270,12 +270,14 @@ object ScalaReflection extends ScalaReflection { case t if t <:< localTypeOf[Array[_]] => val TypeRef(_, _, Seq(elementType)) = t - val Schema(_, elementNullable) = schemaFor(elementType) + val Schema(dataType, elementNullable) = schemaFor(elementType) val className = getClassNameFromType(elementType) val newTypePath = s"""- array element class: "$className"""" +: walkedTypePath - val mapFunction: Expression => Expression = p => { - val converter = deserializerFor(elementType, Some(p), newTypePath) + val mapFunction: Expression => Expression = element => { + // upcast the array element to the data type the encoder expected. + val casted = upCastToExpectedType(element, dataType, newTypePath) + val converter = deserializerFor(elementType, Some(casted), newTypePath) if (elementNullable) { converter } else { @@ -305,12 +307,14 @@ object ScalaReflection extends ScalaReflection { case t if t <:< localTypeOf[Seq[_]] => val TypeRef(_, _, Seq(elementType)) = t - val Schema(_, elementNullable) = schemaFor(elementType) + val Schema(dataType, elementNullable) = schemaFor(elementType) val className = getClassNameFromType(elementType) val newTypePath = s"""- array element class: "$className"""" +: walkedTypePath - val mapFunction: Expression => Expression = p => { - val converter = deserializerFor(elementType, Some(p), newTypePath) + val mapFunction: Expression => Expression = element => { + // upcast the array element to the data type the encoder expected. + val casted = upCastToExpectedType(element, dataType, newTypePath) + val converter = deserializerFor(elementType, Some(casted), newTypePath) if (elementNullable) { converter } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b0cdef70297cf..9816b33ae8dff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.expressions.objects.{MapObjects, NewInstance, UnresolvedMapObjects} +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects, NewInstance, UnresolvedMapObjects} import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.plans._ @@ -2321,7 +2321,11 @@ class Analyzer( */ object ResolveUpCast extends Rule[LogicalPlan] { private def fail(from: Expression, to: DataType, walkedTypePath: Seq[String]) = { - throw new AnalysisException(s"Cannot up cast ${from.sql} from " + + val fromStr = from match { + case l: LambdaVariable => "array element" + case e => e.sql + } + throw new AnalysisException(s"Cannot up cast $fromStr from " + s"${from.dataType.simpleString} to ${to.simpleString} as it may truncate\n" + "The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") + "You can either add an explicit cast to the input data or choose a higher precision " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala index e5a3e1fd374dc..630e8a7990e7b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala @@ -33,6 +33,8 @@ case class StringIntClass(a: String, b: Int) case class ComplexClass(a: Long, b: StringLongClass) +case class PrimitiveArrayClass(arr: Array[Long]) + case class ArrayClass(arr: Seq[StringIntClass]) case class NestedArrayClass(nestedArr: Array[ArrayClass]) @@ -66,6 +68,27 @@ class EncoderResolutionSuite extends PlanTest { encoder.resolveAndBind(attrs).fromRow(InternalRow(InternalRow(str, 1.toByte), 2)) } + test("real type doesn't match encoder schema but they are compatible: primitive array") { + val encoder = ExpressionEncoder[PrimitiveArrayClass] + val attrs = Seq('arr.array(IntegerType)) + val array = new GenericArrayData(Array(1, 2, 3)) + encoder.resolveAndBind(attrs).fromRow(InternalRow(array)) + } + + test("the real type is not compatible with encoder schema: primitive array") { + val encoder = ExpressionEncoder[PrimitiveArrayClass] + val attrs = Seq('arr.array(StringType)) + assert(intercept[AnalysisException](encoder.resolveAndBind(attrs)).message == + s""" + |Cannot up cast array element from string to bigint as it may truncate + |The type path of the target object is: + |- array element class: "scala.Long" + |- field (class: "scala.Array", name: "arr") + |- root class: "org.apache.spark.sql.catalyst.encoders.PrimitiveArrayClass" + |You can either add an explicit cast to the input data or choose a higher precision type + """.stripMargin.trim + " of the field in the target object") + } + test("real type doesn't match encoder schema but they are compatible: array") { val encoder = ExpressionEncoder[ArrayClass] val attrs = Seq('arr.array(new StructType().add("a", "int").add("b", "int").add("c", "int"))) From cd91f967145909852d9af09b10b80f86ed05edb5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 11 Apr 2017 20:33:10 +0800 Subject: [PATCH 5/9] [SPARK-20175][SQL] Exists should not be evaluated in Join operator ## What changes were proposed in this pull request? Similar to `ListQuery`, `Exists` should not be evaluated in `Join` operator too. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh Closes #17491 from viirya/dont-push-exists-to-join. --- .../spark/sql/catalyst/expressions/predicates.scala | 3 ++- .../scala/org/apache/spark/sql/SubquerySuite.scala | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 8acb740f8db8c..5034566132f7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -92,11 +92,12 @@ trait PredicateHelper { protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match { // Non-deterministic expressions are not allowed as join conditions. case e if !e.deterministic => false - case l: ListQuery => + case _: ListQuery | _: Exists => // A ListQuery defines the query which we want to search in an IN subquery expression. // Currently the only way to evaluate an IN subquery is to convert it to a // LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule. // It cannot be evaluated as part of a Join operator. + // An Exists shouldn't be push into a Join operator too. false case e: SubqueryExpression => // non-correlated subquery will be replaced as literal diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 5fe6667ceca18..0f0199cbe2777 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -844,4 +844,14 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(0) :: Row(1) :: Nil) } } + + test("ListQuery and Exists should work even no correlated references") { + checkAnswer( + sql("select * from l, r where l.a = r.c AND (r.d in (select d from r) OR l.a >= 1)"), + Row(2, 1.0, 2, 3.0) :: Row(2, 1.0, 2, 3.0) :: Row(2, 1.0, 2, 3.0) :: + Row(2, 1.0, 2, 3.0) :: Row(3.0, 3.0, 3, 2.0) :: Row(6, null, 6, null) :: Nil) + checkAnswer( + sql("select * from l, r where l.a = r.c + 1 AND (exists (select * from r) OR l.a = r.c)"), + Row(3, 3.0, 2, 3.0) :: Row(3, 3.0, 2, 3.0) :: Nil) + } } From 123b4fbbc331f116b45f11b9f7ecbe0b0575323d Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 11 Apr 2017 11:12:31 -0700 Subject: [PATCH 6/9] [SPARK-20289][SQL] Use StaticInvoke to box primitive types ## What changes were proposed in this pull request? Dataset typed API currently uses NewInstance to box primitive types (i.e. calling the constructor). Instead, it'd be slightly more idiomatic in Java to use PrimitiveType.valueOf, which can be invoked using StaticInvoke expression. ## How was this patch tested? The change should be covered by existing tests for Dataset encoders. Author: Reynold Xin Closes #17604 from rxin/SPARK-20289. --- .../sql/catalyst/JavaTypeInference.scala | 27 +++++++++---------- .../spark/sql/catalyst/ScalaReflection.scala | 14 +++++----- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 9d4617dda555f..86a73a319ec3f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -204,20 +204,19 @@ object JavaTypeInference { typeToken.getRawType match { case c if !inferExternalType(c).isInstanceOf[ObjectType] => getPath - case c if c == classOf[java.lang.Short] => - NewInstance(c, getPath :: Nil, ObjectType(c)) - case c if c == classOf[java.lang.Integer] => - NewInstance(c, getPath :: Nil, ObjectType(c)) - case c if c == classOf[java.lang.Long] => - NewInstance(c, getPath :: Nil, ObjectType(c)) - case c if c == classOf[java.lang.Double] => - NewInstance(c, getPath :: Nil, ObjectType(c)) - case c if c == classOf[java.lang.Byte] => - NewInstance(c, getPath :: Nil, ObjectType(c)) - case c if c == classOf[java.lang.Float] => - NewInstance(c, getPath :: Nil, ObjectType(c)) - case c if c == classOf[java.lang.Boolean] => - NewInstance(c, getPath :: Nil, ObjectType(c)) + case c if c == classOf[java.lang.Short] || + c == classOf[java.lang.Integer] || + c == classOf[java.lang.Long] || + c == classOf[java.lang.Double] || + c == classOf[java.lang.Float] || + c == classOf[java.lang.Byte] || + c == classOf[java.lang.Boolean] => + StaticInvoke( + c, + ObjectType(c), + "valueOf", + getPath :: Nil, + propagateNull = true) case c if c == classOf[java.sql.Date] => StaticInvoke( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 0c5a818f54f5c..82710a2a183ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -204,37 +204,37 @@ object ScalaReflection extends ScalaReflection { case t if t <:< localTypeOf[java.lang.Integer] => val boxedType = classOf[java.lang.Integer] val objectType = ObjectType(boxedType) - NewInstance(boxedType, getPath :: Nil, objectType) + StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true) case t if t <:< localTypeOf[java.lang.Long] => val boxedType = classOf[java.lang.Long] val objectType = ObjectType(boxedType) - NewInstance(boxedType, getPath :: Nil, objectType) + StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true) case t if t <:< localTypeOf[java.lang.Double] => val boxedType = classOf[java.lang.Double] val objectType = ObjectType(boxedType) - NewInstance(boxedType, getPath :: Nil, objectType) + StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true) case t if t <:< localTypeOf[java.lang.Float] => val boxedType = classOf[java.lang.Float] val objectType = ObjectType(boxedType) - NewInstance(boxedType, getPath :: Nil, objectType) + StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true) case t if t <:< localTypeOf[java.lang.Short] => val boxedType = classOf[java.lang.Short] val objectType = ObjectType(boxedType) - NewInstance(boxedType, getPath :: Nil, objectType) + StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true) case t if t <:< localTypeOf[java.lang.Byte] => val boxedType = classOf[java.lang.Byte] val objectType = ObjectType(boxedType) - NewInstance(boxedType, getPath :: Nil, objectType) + StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true) case t if t <:< localTypeOf[java.lang.Boolean] => val boxedType = classOf[java.lang.Boolean] val objectType = ObjectType(boxedType) - NewInstance(boxedType, getPath :: Nil, objectType) + StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true) case t if t <:< localTypeOf[java.sql.Date] => StaticInvoke( From 6297697f975960a3006c4e58b4964d9ac40eeaf5 Mon Sep 17 00:00:00 2001 From: David Gingrich Date: Tue, 11 Apr 2017 12:18:31 -0700 Subject: [PATCH 7/9] [SPARK-19505][PYTHON] AttributeError on Exception.message in Python3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Added `util._message_exception` helper to use `str(e)` when `e.message` is unavailable (Python3). Grepped for all occurrences of `.message` in `pyspark/` and these were the only occurrences. ## How was this patch tested? - Doctests for helper function ## Legal This is my original work and I license the work to the project under the project’s open source license. Author: David Gingrich Closes #16845 from dgingrich/topic-spark-19505-py3-exceptions. --- dev/sparktestsupport/modules.py | 1 + python/pyspark/broadcast.py | 4 ++- python/pyspark/cloudpickle.py | 9 ++++--- python/pyspark/util.py | 45 +++++++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 python/pyspark/util.py diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 246f5188a518d..78b5b8b0f4b59 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -340,6 +340,7 @@ def __hash__(self): "pyspark.profiler", "pyspark.shuffle", "pyspark.tests", + "pyspark.util", ] ) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 74dee1420754a..b1b59f73d6718 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -21,6 +21,7 @@ from tempfile import NamedTemporaryFile from pyspark.cloudpickle import print_exec +from pyspark.util import _exception_message if sys.version < '3': import cPickle as pickle @@ -82,7 +83,8 @@ def dump(self, value, f): except pickle.PickleError: raise except Exception as e: - msg = "Could not serialize broadcast: " + e.__class__.__name__ + ": " + e.message + msg = "Could not serialize broadcast: %s: %s" \ + % (e.__class__.__name__, _exception_message(e)) print_exec(sys.stderr) raise pickle.PicklingError(msg) f.close() diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 959fb8b357f99..389bee7eee6e9 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -56,6 +56,7 @@ import traceback import weakref +from pyspark.util import _exception_message if sys.version < '3': from pickle import Pickler @@ -152,13 +153,13 @@ def dump(self, obj): except pickle.PickleError: raise except Exception as e: - if "'i' format requires" in e.message: - msg = "Object too large to serialize: " + e.message + emsg = _exception_message(e) + if "'i' format requires" in emsg: + msg = "Object too large to serialize: %s" % emsg else: - msg = "Could not serialize object: " + e.__class__.__name__ + ": " + e.message + msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg) print_exec(sys.stderr) raise pickle.PicklingError(msg) - def save_memoryview(self, obj): """Fallback to save_string""" diff --git a/python/pyspark/util.py b/python/pyspark/util.py new file mode 100644 index 0000000000000..e5d332ce54429 --- /dev/null +++ b/python/pyspark/util.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = [] + + +def _exception_message(excp): + """Return the message from an exception as either a str or unicode object. Supports both + Python 2 and Python 3. + + >>> msg = "Exception message" + >>> excp = Exception(msg) + >>> msg == _exception_message(excp) + True + + >>> msg = u"unicöde" + >>> excp = Exception(msg) + >>> msg == _exception_message(excp) + True + """ + if hasattr(excp, "message"): + return excp.message + return str(excp) + + +if __name__ == "__main__": + import doctest + (failure_count, test_count) = doctest.testmod() + if failure_count: + exit(-1) From cde9e328484e4007aa6b505312d7cea5461a6eaf Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 11 Apr 2017 19:30:34 -0700 Subject: [PATCH 8/9] [MINOR][DOCS] Update supported versions for Hive Metastore ## What changes were proposed in this pull request? Since SPARK-18112 and SPARK-13446, Apache Spark starts to support reading Hive metastore 2.0 ~ 2.1.1. This updates the docs. ## How was this patch tested? N/A Author: Dongjoon Hyun Closes #17612 from dongjoon-hyun/metastore. --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 7ae9847983d4d..c425faca4c273 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1700,7 +1700,7 @@ referencing a singleton. Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Hive SerDes and UDFs are based on Hive 1.2.1, and Spark SQL can be connected to different versions of Hive Metastore -(from 0.12.0 to 1.2.1. Also see [Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore)). +(from 0.12.0 to 2.1.1. Also see [Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore)). #### Deploying in Existing Hive Warehouses From 8ad63ee158815de5ffff7bf03cdf25aef312095f Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 12 Apr 2017 11:19:20 +0800 Subject: [PATCH 9/9] [SPARK-20291][SQL] NaNvl(FloatType, NullType) should not be cast to NaNvl(DoubleType, DoubleType) ## What changes were proposed in this pull request? `NaNvl(float value, null)` will be converted into `NaNvl(float value, Cast(null, DoubleType))` and finally `NaNvl(Cast(float value, DoubleType), Cast(null, DoubleType))`. This will cause mismatching in the output type when the input type is float. By adding extra rule in TypeCoercion can resolve this issue. ## How was this patch tested? unite tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: DB Tsai Closes #17606 from dbtsai/fixNaNvl. --- .../spark/sql/catalyst/analysis/TypeCoercion.scala | 1 + .../sql/catalyst/analysis/TypeCoercionSuite.scala | 14 ++++++++++---- .../apache/spark/sql/DataFrameNaFunctions.scala | 3 +-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 768897dc0713c..e1dd010d37a95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -571,6 +571,7 @@ object TypeCoercion { NaNvl(l, Cast(r, DoubleType)) case NaNvl(l, r) if l.dataType == FloatType && r.dataType == DoubleType => NaNvl(Cast(l, DoubleType), r) + case NaNvl(l, r) if r.dataType == NullType => NaNvl(l, Cast(r, l.dataType)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala index 3e0c357b6de42..011d09ff60641 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala @@ -656,14 +656,20 @@ class TypeCoercionSuite extends PlanTest { test("nanvl casts") { ruleTest(TypeCoercion.FunctionArgumentConversion, - NaNvl(Literal.create(1.0, FloatType), Literal.create(1.0, DoubleType)), - NaNvl(Cast(Literal.create(1.0, FloatType), DoubleType), Literal.create(1.0, DoubleType))) + NaNvl(Literal.create(1.0f, FloatType), Literal.create(1.0, DoubleType)), + NaNvl(Cast(Literal.create(1.0f, FloatType), DoubleType), Literal.create(1.0, DoubleType))) ruleTest(TypeCoercion.FunctionArgumentConversion, - NaNvl(Literal.create(1.0, DoubleType), Literal.create(1.0, FloatType)), - NaNvl(Literal.create(1.0, DoubleType), Cast(Literal.create(1.0, FloatType), DoubleType))) + NaNvl(Literal.create(1.0, DoubleType), Literal.create(1.0f, FloatType)), + NaNvl(Literal.create(1.0, DoubleType), Cast(Literal.create(1.0f, FloatType), DoubleType))) ruleTest(TypeCoercion.FunctionArgumentConversion, NaNvl(Literal.create(1.0, DoubleType), Literal.create(1.0, DoubleType)), NaNvl(Literal.create(1.0, DoubleType), Literal.create(1.0, DoubleType))) + ruleTest(TypeCoercion.FunctionArgumentConversion, + NaNvl(Literal.create(1.0f, FloatType), Literal.create(null, NullType)), + NaNvl(Literal.create(1.0f, FloatType), Cast(Literal.create(null, NullType), FloatType))) + ruleTest(TypeCoercion.FunctionArgumentConversion, + NaNvl(Literal.create(1.0, DoubleType), Literal.create(null, NullType)), + NaNvl(Literal.create(1.0, DoubleType), Cast(Literal.create(null, NullType), DoubleType))) } test("type coercion for If") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index 93d565d9fe904..052d85ad33bd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -408,8 +408,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { val quotedColName = "`" + col.name + "`" val colValue = col.dataType match { case DoubleType | FloatType => - // nanvl only supports these types - nanvl(df.col(quotedColName), lit(null).cast(col.dataType)) + nanvl(df.col(quotedColName), lit(null)) // nanvl only supports these types case _ => df.col(quotedColName) } coalesce(colValue, lit(replacement).cast(col.dataType)).as(col.name)