Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ private[kafka010] abstract class KafkaRowWriter(
case t =>
throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
"must be a StringType")
s"must be a ${StringType.simpleString}")
}
val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME)
.getOrElse(Literal(null, BinaryType))
keyExpression.dataType match {
case StringType | BinaryType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t")
s"attribute unsupported type ${t.simpleString}")
}
val valueExpression = inputSchema
.find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse(
Expand All @@ -129,7 +129,7 @@ private[kafka010] abstract class KafkaRowWriter(
case StringType | BinaryType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t")
s"attribute unsupported type ${t.simpleString}")
}
UnsafeProjection.create(
Seq(topicExpression, Cast(keyExpression, BinaryType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,23 @@ private[kafka010] object KafkaWriter extends Logging {
).dataType match {
case StringType => // good
case _ =>
throw new AnalysisException(s"Topic type must be a String")
throw new AnalysisException(s"Topic type must be a ${StringType.simpleString}")
}
schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse(
Literal(null, StringType)
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " +
s"must be a String or BinaryType")
s"must be a ${StringType.simpleString} or ${BinaryType.simpleString}")
}
schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse(
throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
s"must be a String or BinaryType")
s"must be a ${StringType.simpleString} or ${BinaryType.simpleString}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binarytype"))
"value attribute type must be a string or binary"))

try {
/* key field wrong type */
Expand All @@ -330,7 +330,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binarytype"))
"key attribute type must be a string or binary"))
}

test("streaming - write to non-existing topic") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binarytype"))
"value attribute type must be a string or binary"))

try {
ex = intercept[StreamingQueryException] {
Expand All @@ -318,7 +318,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binarytype"))
"key attribute type must be a string or binary"))
}

test("streaming - write to non-existing topic") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme
require(dataType.isInstanceOf[NumericType] ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to use SchemaUtils checkColumnTypes here -- what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't do that because NumericType is an AbstractDataType and not a DataType so I couldn't use that method.

dataType.isInstanceOf[StringType] ||
dataType.isInstanceOf[BooleanType],
s"FeatureHasher requires columns to be of NumericType, BooleanType or StringType. " +
s"FeatureHasher requires columns to be of ${NumericType.simpleString}, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original PR it doesn't seem like we rewrote the constant types only the dynamic ones (and this PR also doesn't seem to consistently rewrite the constant types referenced). What's the reason/how do you decide which ones you want to rewrite to ref simpleString?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR rewrites always constant type referenced. I am not sure why you are saying it is not. If I missed some places, then it was just because I haven't seen them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's my bad, looking through it I saw some raw type names but those are all in the suites which makes sense.

s"${BooleanType.simpleString} or ${StringType.simpleString}. " +
s"Column $fieldName was $dataType")
}
val attrGroup = new AttributeGroup($(outputCol), $(numFeatures))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String)
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
require(inputType.isInstanceOf[ArrayType],
s"The input column must be ArrayType, but got $inputType.")
s"The input column must be ${ArrayType.simpleString}, but got ${inputType.simpleString}.")
val attrGroup = new AttributeGroup($(outputCol), $(numFeatures))
SchemaUtils.appendColumn(schema, attrGroup.toStructField())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ private[ml] class FeatureEncoder(numFeatures: Array[Int]) extends Serializable {
*/
def foreachNonzeroOutput(value: Any, f: (Int, Double) => Unit): Unit = value match {
case d: Double =>
assert(numFeatures.length == 1, "DoubleType columns should only contain one feature.")
assert(numFeatures.length == 1,
s"${DoubleType.simpleString} columns should only contain one feature.")
val numOutputCols = numFeatures.head
if (numOutputCols > 1) {
assert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class NGram @Since("1.5.0") (@Since("1.5.0") override val uid: String)

override protected def validateInputType(inputType: DataType): Unit = {
require(inputType.sameType(ArrayType(StringType)),
s"Input type must be ArrayType(StringType) but got $inputType.")
s"Input type must be ${ArrayType(StringType).simpleString} but got $inputType.")
}

override protected def outputDataType: DataType = new ArrayType(StringType, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class OneHotEncoder @Since("1.4.0") (@Since("1.4.0") override val uid: String) e
val inputFields = schema.fields

require(schema(inputColName).dataType.isInstanceOf[NumericType],
s"Input column must be of type NumericType but got ${schema(inputColName).dataType}")
s"Input column must be of type ${NumericType.simpleString} but got " +
schema(inputColName).dataType.simpleString)
require(!inputFields.exists(_.name == outputColName),
s"Output column $outputColName already exists.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ class RFormulaModel private[feature](
require(!columnNames.contains($(featuresCol)), "Features column already exists.")
require(
!columnNames.contains($(labelCol)) || schema($(labelCol)).dataType.isInstanceOf[NumericType],
"Label column already exists and is not of type NumericType.")
s"Label column already exists and is not of type ${NumericType.simpleString}.")
}

@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") override val uid: String
@Since("1.5.0")
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
require(inputType.sameType(ArrayType(StringType)),
s"Input type must be ArrayType(StringType) but got $inputType.")
require(inputType.sameType(ArrayType(StringType)), "Input type must be " +
s"${ArrayType(StringType).simpleString} but got ${inputType.simpleString}.")
SchemaUtils.appendColumn(schema, $(outputCol), inputType, schema($(inputCol)).nullable)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String)
schema(name).dataType match {
case _: NumericType | BooleanType => None
case t if t.isInstanceOf[VectorUDT] => None
case other => Some(s"Data type $other of column $name is not supported.")
case other => Some(s"Data type ${other.simpleString} of column $name is not supported.")
}
}
if (incorrectColumns.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[fpm] trait FPGrowthParams extends Params with HasPredictionCol {
protected def validateAndTransformSchema(schema: StructType): StructType = {
val inputType = schema($(itemsCol)).dataType
require(inputType.isInstanceOf[ArrayType],
s"The input column must be ArrayType, but got $inputType.")
s"The input column must be ${ArrayType.simpleString}, but got ${inputType.simpleString}.")
SchemaUtils.appendColumn(schema, $(predictionCol), schema($(itemsCol)).dataType)
}
}
Expand Down
11 changes: 7 additions & 4 deletions mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ private[spark] object SchemaUtils {
val actualDataType = schema(colName).dataType
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(actualDataType.equals(dataType),
s"Column $colName must be of type $dataType but was actually $actualDataType.$message")
s"Column $colName must be of type ${dataType.simpleString} but was actually " +
s"${actualDataType.simpleString}.$message")
}

/**
Expand All @@ -57,7 +58,8 @@ private[spark] object SchemaUtils {
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(dataTypes.exists(actualDataType.equals),
s"Column $colName must be of type equal to one of the following types: " +
s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.$message")
s"${dataTypes.map(_.simpleString).mkString("[", ", ", "]")} but was actually of type " +
s"${actualDataType.simpleString}.$message")
}

/**
Expand All @@ -70,8 +72,9 @@ private[spark] object SchemaUtils {
msg: String = ""): Unit = {
val actualDataType = schema(colName).dataType
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(actualDataType.isInstanceOf[NumericType], s"Column $colName must be of type " +
s"NumericType but was actually of type $actualDataType.$message")
require(actualDataType.isInstanceOf[NumericType],
s"Column $colName must be of type ${NumericType.simpleString} but was actually of type " +
s"${actualDataType.simpleString}.$message")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class BinaryClassificationEvaluatorSuite
evaluator.evaluate(stringDF)
}
assert(thrown.getMessage.replace("\n", "") contains "Column rawPrediction must be of type " +
"equal to one of the following types: [DoubleType, ")
assert(thrown.getMessage.replace("\n", "") contains "but was actually of type StringType.")
"equal to one of the following types: [double, ")
assert(thrown.getMessage.replace("\n", "") contains "but was actually of type string.")
}

test("should support all NumericType labels and not support other types") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class RFormulaSuite extends MLTest with DefaultReadWriteTest {
testTransformerByInterceptingException[(Int, Boolean)](
original,
model,
"Label column already exists and is not of type NumericType.",
"Label column already exists and is not of type numeric.",
"x")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ class VectorAssemblerSuite
assembler.transform(df)
}
assert(thrown.getMessage contains
"Data type StringType of column a is not supported.\n" +
"Data type StringType of column b is not supported.\n" +
"Data type StringType of column c is not supported.")
"Data type string of column a is not supported.\n" +
"Data type string of column b is not supported.\n" +
"Data type string of column c is not supported.")
}

test("ML attributes") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging {
estimator.fit(strDF)
}
assert(thrown.getMessage.contains(
s"$column must be of type NumericType but was actually of type StringType"))
s"$column must be of type numeric but was actually of type string"))
}

private class NumericTypeWithEncoder[A](val numericType: NumericType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class AFTSurvivalRegressionSuite extends MLTest with DefaultReadWriteTest {
aft.fit(dfWithStringCensors)
}
assert(thrown.getMessage.contains(
"Column censor must be of type NumericType but was actually of type StringType"))
"Column censor must be of type numeric but was actually of type string"))
}

test("numerical stability of standardization") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object MLTestingUtils extends SparkFunSuite {
estimator.fit(dfWithStringLabels)
}
assert(thrown.getMessage.contains(
"Column label must be of type NumericType but was actually of type StringType"))
"Column label must be of type numeric but was actually of type string"))

estimator match {
case weighted: Estimator[M] with HasWeightCol =>
Expand All @@ -86,7 +86,7 @@ object MLTestingUtils extends SparkFunSuite {
weighted.fit(dfWithStringWeights)
}
assert(thrown.getMessage.contains(
"Column weight must be of type NumericType but was actually of type StringType"))
"Column weight must be of type numeric but was actually of type string"))
case _ =>
}
}
Expand All @@ -104,7 +104,7 @@ object MLTestingUtils extends SparkFunSuite {
evaluator.evaluate(dfWithStringLabels)
}
assert(thrown.getMessage.contains(
"Column label must be of type NumericType but was actually of type StringType"))
"Column label must be of type numeric but was actually of type string"))
}

def genClassifDFWithNumericLabelCol(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ trait CreateNamedStructLike extends Expression {
val invalidNames = nameExprs.filterNot(e => e.foldable && e.dataType == StringType)
if (invalidNames.nonEmpty) {
TypeCheckResult.TypeCheckFailure(
"Only foldable StringType expressions are allowed to appear at odd position, got:" +
s" ${invalidNames.mkString(",")}")
s"Only foldable ${StringType.simpleString} expressions are allowed to appear at odd" +
s" position, got: ${invalidNames.mkString(",")}")
} else if (!names.contains(null)) {
TypeCheckResult.TypeCheckSuccess
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ object JsonExprUtils {
}
case m: CreateMap =>
throw new AnalysisException(
s"A type of keys and values in map() must be string, but got ${m.dataType}")
s"A type of keys and values in map() must be string, but got ${m.dataType.simpleString}")
case _ =>
throw new AnalysisException("Must use a map() function for options")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,12 @@ case class Elt(children: Seq[Expression]) extends Expression {
val (indexType, inputTypes) = (indexExpr.dataType, inputExprs.map(_.dataType))
if (indexType != IntegerType) {
return TypeCheckResult.TypeCheckFailure(s"first input to function $prettyName should " +
s"have IntegerType, but it's $indexType")
s"have ${IntegerType.simpleString}, but it's ${indexType.simpleString}")
}
if (inputTypes.exists(tpe => !Seq(StringType, BinaryType).contains(tpe))) {
return TypeCheckResult.TypeCheckFailure(
s"input to function $prettyName should have StringType or BinaryType, but it's " +
s"input to function $prettyName should have ${StringType.simpleString} or " +
s"${BinaryType.simpleString}, but it's " +
inputTypes.map(_.simpleString).mkString("[", ", ", "]"))
}
TypeUtils.checkForSameTypeInputExpr(inputTypes, s"function $prettyName")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ private[sql] class JacksonGenerator(

// `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType],
"JacksonGenerator only supports to be initialized with a StructType " +
s"or MapType but got ${dataType.simpleString}")
s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString} " +
s"or ${MapType.simpleString} but got ${dataType.simpleString}")

// `ValueWriter`s for all fields of the schema
private lazy val rootFieldWriters: Array[ValueWriter] = dataType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ class JacksonParser(
case "NaN" => Float.NaN
case "Infinity" => Float.PositiveInfinity
case "-Infinity" => Float.NegativeInfinity
case other => throw new RuntimeException(s"Cannot parse $other as FloatType.")
case other => throw new RuntimeException(
s"Cannot parse $other as ${FloatType.simpleString}.")
}
}

Expand All @@ -144,7 +145,8 @@ class JacksonParser(
case "NaN" => Double.NaN
case "Infinity" => Double.PositiveInfinity
case "-Infinity" => Double.NegativeInfinity
case other => throw new RuntimeException(s"Cannot parse $other as DoubleType.")
case other =>
throw new RuntimeException(s"Cannot parse $other as ${DoubleType.simpleString}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ object TypeUtils {
if (dt.isInstanceOf[NumericType] || dt == NullType) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(s"$caller requires numeric types, not $dt")
TypeCheckResult.TypeCheckFailure(s"$caller requires numeric types, not ${dt.simpleString}")
}
}

def checkForOrderingExpr(dt: DataType, caller: String): TypeCheckResult = {
if (RowOrdering.isOrderable(dt)) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(s"$caller does not support ordering on type $dt")
TypeCheckResult.TypeCheckFailure(
s"$caller does not support ordering on type ${dt.simpleString}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ abstract class NumericType extends AtomicType {
}


private[sql] object NumericType extends AbstractDataType {
private[spark] object NumericType extends AbstractDataType {
/**
* Enables matching against NumericType for expressions:
* {{{
Expand All @@ -155,11 +155,12 @@ private[sql] object NumericType extends AbstractDataType {
*/
def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[NumericType]

override private[sql] def defaultConcreteType: DataType = DoubleType
override private[spark] def defaultConcreteType: DataType = DoubleType

override private[sql] def simpleString: String = "numeric"
override private[spark] def simpleString: String = "numeric"

override private[sql] def acceptsType(other: DataType): Boolean = other.isInstanceOf[NumericType]
override private[spark] def acceptsType(other: DataType): Boolean =
other.isInstanceOf[NumericType]
}


Expand Down
Loading