Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ public StreamManager getStreamManager() {
callback0.latch.await(60, TimeUnit.SECONDS);
assertTrue(callback0.failure instanceof IOException);

// make sure callback1 is called.
callback1.latch.await(60, TimeUnit.SECONDS);
// failed at same time as previous
assertTrue(callback1.failure instanceof IOException);
}
Expand Down
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def __hash__(self):
"pyspark.profiler",
"pyspark.shuffle",
"pyspark.tests",
"pyspark.util",
]
)

Expand Down
2 changes: 1 addition & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@ referencing a singleton.
Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs.
Currently Hive SerDes and UDFs are based on Hive 1.2.1,
and Spark SQL can be connected to different versions of Hive Metastore
(from 0.12.0 to 1.2.1. Also see [Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore)).
(from 0.12.0 to 2.1.1. Also see [Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore)).

#### Deploying in Existing Hive Warehouses

Expand Down
3 changes: 3 additions & 0 deletions docs/submitting-applications.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ The master URL passed to Spark can be in one of the following formats:
<tr><td> <code>spark://HOST:PORT</code> </td><td> Connect to the given <a href="spark-standalone.html">Spark standalone
cluster</a> master. The port must be whichever one your master is configured to use, which is 7077 by default.
</td></tr>
<tr><td> <code>spark://HOST1:PORT1,HOST2:PORT2</code> </td><td> Connect to the given <a href="spark-standalone.html#standby-masters-with-zookeeper">Spark standalone
cluster with standby masters with Zookeeper</a>. The list must have all the master hosts in the high availability cluster set up with Zookeeper. The port must be whichever each master is configured to use, which is 7077 by default.
</td></tr>
<tr><td> <code>mesos://HOST:PORT</code> </td><td> Connect to the given <a href="running-on-mesos.html">Mesos</a> cluster.
The port must be whichever one your is configured to use, which is 5050 by default.
Or, for a Mesos cluster using ZooKeeper, use <code>mesos://zk://...</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,8 @@ class GeneralizedLinearRegressionSummary private[regression] (
private[regression] lazy val link: Link = familyLink.link

/** Number of instances in DataFrame predictions. */
private[regression] lazy val numInstances: Long = predictions.count()
@Since("2.2.0")
lazy val numInstances: Long = predictions.count()

/** The numeric rank of the fitted linear model. */
@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,8 @@ class LinearRegressionSummary private[regression] (
lazy val numInstances: Long = predictions.count()

/** Degrees of freedom */
private val degreesOfFreedom: Long = if (privateModel.getFitIntercept) {
@Since("2.2.0")
val degreesOfFreedom: Long = if (privateModel.getFitIntercept) {
numInstances - privateModel.coefficients.size - 1
} else {
numInstances - privateModel.coefficients.size
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from tempfile import NamedTemporaryFile

from pyspark.cloudpickle import print_exec
from pyspark.util import _exception_message

if sys.version < '3':
import cPickle as pickle
Expand Down Expand Up @@ -82,7 +83,8 @@ def dump(self, value, f):
except pickle.PickleError:
raise
except Exception as e:
msg = "Could not serialize broadcast: " + e.__class__.__name__ + ": " + e.message
msg = "Could not serialize broadcast: %s: %s" \
% (e.__class__.__name__, _exception_message(e))
print_exec(sys.stderr)
raise pickle.PicklingError(msg)
f.close()
Expand Down
9 changes: 5 additions & 4 deletions python/pyspark/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import traceback
import weakref

from pyspark.util import _exception_message

if sys.version < '3':
from pickle import Pickler
Expand Down Expand Up @@ -152,13 +153,13 @@ def dump(self, obj):
except pickle.PickleError:
raise
except Exception as e:
if "'i' format requires" in e.message:
msg = "Object too large to serialize: " + e.message
emsg = _exception_message(e)
if "'i' format requires" in emsg:
msg = "Object too large to serialize: %s" % emsg
else:
msg = "Could not serialize object: " + e.__class__.__name__ + ": " + e.message
msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
print_exec(sys.stderr)
raise pickle.PicklingError(msg)


def save_memoryview(self, obj):
"""Fallback to save_string"""
Expand Down
45 changes: 45 additions & 0 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

__all__ = []


def _exception_message(excp):
"""Return the message from an exception as either a str or unicode object. Supports both
Python 2 and Python 3.

>>> msg = "Exception message"
>>> excp = Exception(msg)
>>> msg == _exception_message(excp)
True

>>> msg = u"unicöde"
>>> excp = Exception(msg)
>>> msg == _exception_message(excp)
True
"""
if hasattr(excp, "message"):
return excp.message
return str(excp)


if __name__ == "__main__":
import doctest
(failure_count, test_count) = doctest.testmod()
if failure_count:
exit(-1)
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,19 @@ object JavaTypeInference {
typeToken.getRawType match {
case c if !inferExternalType(c).isInstanceOf[ObjectType] => getPath

case c if c == classOf[java.lang.Short] =>
NewInstance(c, getPath :: Nil, ObjectType(c))
case c if c == classOf[java.lang.Integer] =>
NewInstance(c, getPath :: Nil, ObjectType(c))
case c if c == classOf[java.lang.Long] =>
NewInstance(c, getPath :: Nil, ObjectType(c))
case c if c == classOf[java.lang.Double] =>
NewInstance(c, getPath :: Nil, ObjectType(c))
case c if c == classOf[java.lang.Byte] =>
NewInstance(c, getPath :: Nil, ObjectType(c))
case c if c == classOf[java.lang.Float] =>
NewInstance(c, getPath :: Nil, ObjectType(c))
case c if c == classOf[java.lang.Boolean] =>
NewInstance(c, getPath :: Nil, ObjectType(c))
case c if c == classOf[java.lang.Short] ||
c == classOf[java.lang.Integer] ||
c == classOf[java.lang.Long] ||
c == classOf[java.lang.Double] ||
c == classOf[java.lang.Float] ||
c == classOf[java.lang.Byte] ||
c == classOf[java.lang.Boolean] =>
StaticInvoke(
c,
ObjectType(c),
"valueOf",
getPath :: Nil,
propagateNull = true)

case c if c == classOf[java.sql.Date] =>
StaticInvoke(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ object ScalaReflection extends ScalaReflection {
def deserializerFor[T : TypeTag]: Expression = {
val tpe = localTypeOf[T]
val clsName = getClassNameFromType(tpe)
val walkedTypePath = s"""- root class: "${clsName}"""" :: Nil
val walkedTypePath = s"""- root class: "$clsName"""" :: Nil
deserializerFor(tpe, None, walkedTypePath)
}

Expand Down Expand Up @@ -204,37 +204,37 @@ object ScalaReflection extends ScalaReflection {
case t if t <:< localTypeOf[java.lang.Integer] =>
val boxedType = classOf[java.lang.Integer]
val objectType = ObjectType(boxedType)
NewInstance(boxedType, getPath :: Nil, objectType)
StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true)

case t if t <:< localTypeOf[java.lang.Long] =>
val boxedType = classOf[java.lang.Long]
val objectType = ObjectType(boxedType)
NewInstance(boxedType, getPath :: Nil, objectType)
StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true)

case t if t <:< localTypeOf[java.lang.Double] =>
val boxedType = classOf[java.lang.Double]
val objectType = ObjectType(boxedType)
NewInstance(boxedType, getPath :: Nil, objectType)
StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true)

case t if t <:< localTypeOf[java.lang.Float] =>
val boxedType = classOf[java.lang.Float]
val objectType = ObjectType(boxedType)
NewInstance(boxedType, getPath :: Nil, objectType)
StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true)

case t if t <:< localTypeOf[java.lang.Short] =>
val boxedType = classOf[java.lang.Short]
val objectType = ObjectType(boxedType)
NewInstance(boxedType, getPath :: Nil, objectType)
StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true)

case t if t <:< localTypeOf[java.lang.Byte] =>
val boxedType = classOf[java.lang.Byte]
val objectType = ObjectType(boxedType)
NewInstance(boxedType, getPath :: Nil, objectType)
StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true)

case t if t <:< localTypeOf[java.lang.Boolean] =>
val boxedType = classOf[java.lang.Boolean]
val objectType = ObjectType(boxedType)
NewInstance(boxedType, getPath :: Nil, objectType)
StaticInvoke(boxedType, objectType, "valueOf", getPath :: Nil, propagateNull = true)

case t if t <:< localTypeOf[java.sql.Date] =>
StaticInvoke(
Expand Down Expand Up @@ -270,12 +270,14 @@ object ScalaReflection extends ScalaReflection {

case t if t <:< localTypeOf[Array[_]] =>
val TypeRef(_, _, Seq(elementType)) = t
val Schema(_, elementNullable) = schemaFor(elementType)
val Schema(dataType, elementNullable) = schemaFor(elementType)
val className = getClassNameFromType(elementType)
val newTypePath = s"""- array element class: "$className"""" +: walkedTypePath

val mapFunction: Expression => Expression = p => {
val converter = deserializerFor(elementType, Some(p), newTypePath)
val mapFunction: Expression => Expression = element => {
// upcast the array element to the data type the encoder expected.
val casted = upCastToExpectedType(element, dataType, newTypePath)
val converter = deserializerFor(elementType, Some(casted), newTypePath)
if (elementNullable) {
converter
} else {
Expand Down Expand Up @@ -305,12 +307,14 @@ object ScalaReflection extends ScalaReflection {

case t if t <:< localTypeOf[Seq[_]] =>
val TypeRef(_, _, Seq(elementType)) = t
val Schema(_, elementNullable) = schemaFor(elementType)
val Schema(dataType, elementNullable) = schemaFor(elementType)
val className = getClassNameFromType(elementType)
val newTypePath = s"""- array element class: "$className"""" +: walkedTypePath

val mapFunction: Expression => Expression = p => {
val converter = deserializerFor(elementType, Some(p), newTypePath)
val mapFunction: Expression => Expression = element => {
// upcast the array element to the data type the encoder expected.
val casted = upCastToExpectedType(element, dataType, newTypePath)
val converter = deserializerFor(elementType, Some(casted), newTypePath)
if (elementNullable) {
converter
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects.{MapObjects, NewInstance, UnresolvedMapObjects}
import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects, NewInstance, UnresolvedMapObjects}
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -2321,7 +2321,11 @@ class Analyzer(
*/
object ResolveUpCast extends Rule[LogicalPlan] {
private def fail(from: Expression, to: DataType, walkedTypePath: Seq[String]) = {
throw new AnalysisException(s"Cannot up cast ${from.sql} from " +
val fromStr = from match {
case l: LambdaVariable => "array element"
case e => e.sql
}
throw new AnalysisException(s"Cannot up cast $fromStr from " +
s"${from.dataType.simpleString} to ${to.simpleString} as it may truncate\n" +
"The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") +
"You can either add an explicit cast to the input data or choose a higher precision " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ object TypeCoercion {
NaNvl(l, Cast(r, DoubleType))
case NaNvl(l, r) if l.dataType == FloatType && r.dataType == DoubleType =>
NaNvl(Cast(l, DoubleType), r)
case NaNvl(l, r) if r.dataType == NullType => NaNvl(l, Cast(r, l.dataType))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ trait PredicateHelper {
protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match {
// Non-deterministic expressions are not allowed as join conditions.
case e if !e.deterministic => false
case l: ListQuery =>
case _: ListQuery | _: Exists =>
// A ListQuery defines the query which we want to search in an IN subquery expression.
// Currently the only way to evaluate an IN subquery is to convert it to a
// LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule.
// It cannot be evaluated as part of a Join operator.
// An Exists shouldn't be push into a Join operator too.
false
case e: SubqueryExpression =>
// non-correlated subquery will be replaced as literal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,14 +656,20 @@ class TypeCoercionSuite extends PlanTest {

test("nanvl casts") {
ruleTest(TypeCoercion.FunctionArgumentConversion,
NaNvl(Literal.create(1.0, FloatType), Literal.create(1.0, DoubleType)),
NaNvl(Cast(Literal.create(1.0, FloatType), DoubleType), Literal.create(1.0, DoubleType)))
NaNvl(Literal.create(1.0f, FloatType), Literal.create(1.0, DoubleType)),
NaNvl(Cast(Literal.create(1.0f, FloatType), DoubleType), Literal.create(1.0, DoubleType)))
ruleTest(TypeCoercion.FunctionArgumentConversion,
NaNvl(Literal.create(1.0, DoubleType), Literal.create(1.0, FloatType)),
NaNvl(Literal.create(1.0, DoubleType), Cast(Literal.create(1.0, FloatType), DoubleType)))
NaNvl(Literal.create(1.0, DoubleType), Literal.create(1.0f, FloatType)),
NaNvl(Literal.create(1.0, DoubleType), Cast(Literal.create(1.0f, FloatType), DoubleType)))
ruleTest(TypeCoercion.FunctionArgumentConversion,
NaNvl(Literal.create(1.0, DoubleType), Literal.create(1.0, DoubleType)),
NaNvl(Literal.create(1.0, DoubleType), Literal.create(1.0, DoubleType)))
ruleTest(TypeCoercion.FunctionArgumentConversion,
NaNvl(Literal.create(1.0f, FloatType), Literal.create(null, NullType)),
NaNvl(Literal.create(1.0f, FloatType), Cast(Literal.create(null, NullType), FloatType)))
ruleTest(TypeCoercion.FunctionArgumentConversion,
NaNvl(Literal.create(1.0, DoubleType), Literal.create(null, NullType)),
NaNvl(Literal.create(1.0, DoubleType), Cast(Literal.create(null, NullType), DoubleType)))
}

test("type coercion for If") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ case class StringIntClass(a: String, b: Int)

case class ComplexClass(a: Long, b: StringLongClass)

case class PrimitiveArrayClass(arr: Array[Long])

case class ArrayClass(arr: Seq[StringIntClass])

case class NestedArrayClass(nestedArr: Array[ArrayClass])
Expand Down Expand Up @@ -66,6 +68,27 @@ class EncoderResolutionSuite extends PlanTest {
encoder.resolveAndBind(attrs).fromRow(InternalRow(InternalRow(str, 1.toByte), 2))
}

test("real type doesn't match encoder schema but they are compatible: primitive array") {
val encoder = ExpressionEncoder[PrimitiveArrayClass]
val attrs = Seq('arr.array(IntegerType))
val array = new GenericArrayData(Array(1, 2, 3))
encoder.resolveAndBind(attrs).fromRow(InternalRow(array))
}

test("the real type is not compatible with encoder schema: primitive array") {
val encoder = ExpressionEncoder[PrimitiveArrayClass]
val attrs = Seq('arr.array(StringType))
assert(intercept[AnalysisException](encoder.resolveAndBind(attrs)).message ==
s"""
|Cannot up cast array element from string to bigint as it may truncate
|The type path of the target object is:
|- array element class: "scala.Long"
|- field (class: "scala.Array", name: "arr")
|- root class: "org.apache.spark.sql.catalyst.encoders.PrimitiveArrayClass"
|You can either add an explicit cast to the input data or choose a higher precision type
""".stripMargin.trim + " of the field in the target object")
}

test("real type doesn't match encoder schema but they are compatible: array") {
val encoder = ExpressionEncoder[ArrayClass]
val attrs = Seq('arr.array(new StructType().add("a", "int").add("b", "int").add("c", "int")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
val quotedColName = "`" + col.name + "`"
val colValue = col.dataType match {
case DoubleType | FloatType =>
// nanvl only supports these types
nanvl(df.col(quotedColName), lit(null).cast(col.dataType))
nanvl(df.col(quotedColName), lit(null)) // nanvl only supports these types
case _ => df.col(quotedColName)
}
coalesce(colValue, lit(replacement).cast(col.dataType)).as(col.name)
Expand Down
Loading