Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ private[kafka010] abstract class KafkaRowWriter(
case t =>
throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
"must be a StringType")
s"must be a ${StringType.catalogString}")
}
val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME)
.getOrElse(Literal(null, BinaryType))
keyExpression.dataType match {
case StringType | BinaryType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t")
s"attribute unsupported type ${t.catalogString}")
}
val valueExpression = inputSchema
.find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse(
Expand All @@ -129,7 +129,7 @@ private[kafka010] abstract class KafkaRowWriter(
case StringType | BinaryType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t")
s"attribute unsupported type ${t.catalogString}")
}
UnsafeProjection.create(
Seq(topicExpression, Cast(keyExpression, BinaryType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,23 @@ private[kafka010] object KafkaWriter extends Logging {
).dataType match {
case StringType => // good
case _ =>
throw new AnalysisException(s"Topic type must be a String")
throw new AnalysisException(s"Topic type must be a ${StringType.catalogString}")
}
schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse(
Literal(null, StringType)
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " +
s"must be a String or BinaryType")
s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}")
}
schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse(
throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
s"must be a String or BinaryType")
s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binarytype"))
"value attribute type must be a string or binary"))

try {
/* key field wrong type */
Expand All @@ -330,7 +330,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binarytype"))
"key attribute type must be a string or binary"))
}

test("streaming - write to non-existing topic") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binarytype"))
"value attribute type must be a string or binary"))

try {
ex = intercept[StreamingQueryException] {
Expand All @@ -318,7 +318,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binarytype"))
"key attribute type must be a string or binary"))
}

test("streaming - write to non-existing topic") {
Expand Down
3 changes: 2 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/feature/DCT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class DCT @Since("1.5.0") (@Since("1.5.0") override val uid: String)
}

override protected def validateInputType(inputType: DataType): Unit = {
require(inputType.isInstanceOf[VectorUDT], s"Input type must be VectorUDT but got $inputType.")
require(inputType.isInstanceOf[VectorUDT],
s"Input type must be ${(new VectorUDT).catalogString} but got ${inputType.catalogString}.")
}

override protected def outputDataType: DataType = new VectorUDT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme
require(dataType.isInstanceOf[NumericType] ||
dataType.isInstanceOf[StringType] ||
dataType.isInstanceOf[BooleanType],
s"FeatureHasher requires columns to be of NumericType, BooleanType or StringType. " +
s"Column $fieldName was $dataType")
s"FeatureHasher requires columns to be of ${NumericType.simpleString}, " +
s"${BooleanType.catalogString} or ${StringType.catalogString}. " +
s"Column $fieldName was ${dataType.catalogString}")
}
val attrGroup = new AttributeGroup($(outputCol), $(numFeatures))
SchemaUtils.appendColumn(schema, attrGroup.toStructField())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String)
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
require(inputType.isInstanceOf[ArrayType],
s"The input column must be ArrayType, but got $inputType.")
s"The input column must be ${ArrayType.simpleString}, but got ${inputType.catalogString}.")
val attrGroup = new AttributeGroup($(outputCol), $(numFeatures))
SchemaUtils.appendColumn(schema, attrGroup.toStructField())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ private[ml] class FeatureEncoder(numFeatures: Array[Int]) extends Serializable {
*/
def foreachNonzeroOutput(value: Any, f: (Int, Double) => Unit): Unit = value match {
case d: Double =>
assert(numFeatures.length == 1, "DoubleType columns should only contain one feature.")
assert(numFeatures.length == 1,
s"${DoubleType.catalogString} columns should only contain one feature.")
val numOutputCols = numFeatures.head
if (numOutputCols > 1) {
assert(
Expand Down
3 changes: 2 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/feature/NGram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class NGram @Since("1.5.0") (@Since("1.5.0") override val uid: String)

override protected def validateInputType(inputType: DataType): Unit = {
require(inputType.sameType(ArrayType(StringType)),
s"Input type must be ArrayType(StringType) but got $inputType.")
s"Input type must be ${ArrayType(StringType).catalogString} but got " +
inputType.catalogString)
}

override protected def outputDataType: DataType = new ArrayType(StringType, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class OneHotEncoder @Since("1.4.0") (@Since("1.4.0") override val uid: String) e
val inputFields = schema.fields

require(schema(inputColName).dataType.isInstanceOf[NumericType],
s"Input column must be of type NumericType but got ${schema(inputColName).dataType}")
s"Input column must be of type ${NumericType.simpleString} but got " +
schema(inputColName).dataType.catalogString)
require(!inputFields.exists(_.name == outputColName),
s"Output column $outputColName already exists.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ class RFormulaModel private[feature](
require(!columnNames.contains($(featuresCol)), "Features column already exists.")
require(
!columnNames.contains($(labelCol)) || schema($(labelCol)).dataType.isInstanceOf[NumericType],
"Label column already exists and is not of type NumericType.")
s"Label column already exists and is not of type ${NumericType.simpleString}.")
}

@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") override val uid: String
@Since("1.5.0")
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
require(inputType.sameType(ArrayType(StringType)),
s"Input type must be ArrayType(StringType) but got $inputType.")
require(inputType.sameType(ArrayType(StringType)), "Input type must be " +
s"${ArrayType(StringType).catalogString} but got ${inputType.catalogString}.")
SchemaUtils.appendColumn(schema, $(outputCol), inputType, schema($(inputCol)).nullable)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class Tokenizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
}

override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType, s"Input type must be string type but got $inputType.")
require(inputType == StringType,
s"Input type must be ${StringType.catalogString} type but got ${inputType.catalogString}.")
}

override protected def outputDataType: DataType = new ArrayType(StringType, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String)
schema(name).dataType match {
case _: NumericType | BooleanType => None
case t if t.isInstanceOf[VectorUDT] => None
case other => Some(s"Data type $other of column $name is not supported.")
case other => Some(s"Data type ${other.catalogString} of column $name is not supported.")
}
}
if (incorrectColumns.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[fpm] trait FPGrowthParams extends Params with HasPredictionCol {
protected def validateAndTransformSchema(schema: StructType): StructType = {
val inputType = schema($(itemsCol)).dataType
require(inputType.isInstanceOf[ArrayType],
s"The input column must be ArrayType, but got $inputType.")
s"The input column must be ${ArrayType.simpleString}, but got ${inputType.catalogString}.")
SchemaUtils.appendColumn(schema, $(predictionCol), schema($(itemsCol)).dataType)
}
}
Expand Down
11 changes: 7 additions & 4 deletions mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ private[spark] object SchemaUtils {
val actualDataType = schema(colName).dataType
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(actualDataType.equals(dataType),
s"Column $colName must be of type $dataType but was actually $actualDataType.$message")
s"Column $colName must be of type ${dataType.catalogString} but was actually " +
s"${actualDataType.catalogString}.$message")
}

/**
Expand All @@ -58,7 +59,8 @@ private[spark] object SchemaUtils {
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(dataTypes.exists(actualDataType.equals),
s"Column $colName must be of type equal to one of the following types: " +
s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.$message")
s"${dataTypes.map(_.catalogString).mkString("[", ", ", "]")} but was actually of type " +
s"${actualDataType.catalogString}.$message")
}

/**
Expand All @@ -71,8 +73,9 @@ private[spark] object SchemaUtils {
msg: String = ""): Unit = {
val actualDataType = schema(colName).dataType
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(actualDataType.isInstanceOf[NumericType], s"Column $colName must be of type " +
s"NumericType but was actually of type $actualDataType.$message")
require(actualDataType.isInstanceOf[NumericType],
s"Column $colName must be of type ${NumericType.simpleString} but was actually of type " +
s"${actualDataType.catalogString}.$message")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class BinaryClassificationEvaluatorSuite
evaluator.evaluate(stringDF)
}
assert(thrown.getMessage.replace("\n", "") contains "Column rawPrediction must be of type " +
"equal to one of the following types: [DoubleType, ")
assert(thrown.getMessage.replace("\n", "") contains "but was actually of type StringType.")
"equal to one of the following types: [double, ")
assert(thrown.getMessage.replace("\n", "") contains "but was actually of type string.")
}

test("should support all NumericType labels and not support other types") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class RFormulaSuite extends MLTest with DefaultReadWriteTest {
testTransformerByInterceptingException[(Int, Boolean)](
original,
model,
"Label column already exists and is not of type NumericType.",
"Label column already exists and is not of type numeric.",
"x")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ class VectorAssemblerSuite
assembler.transform(df)
}
assert(thrown.getMessage contains
"Data type StringType of column a is not supported.\n" +
"Data type StringType of column b is not supported.\n" +
"Data type StringType of column c is not supported.")
"Data type string of column a is not supported.\n" +
"Data type string of column b is not supported.\n" +
"Data type string of column c is not supported.")
}

test("ML attributes") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging {
estimator.fit(strDF)
}
assert(thrown.getMessage.contains(
s"$column must be of type NumericType but was actually of type StringType"))
s"$column must be of type numeric but was actually of type string"))
}

private class NumericTypeWithEncoder[A](val numericType: NumericType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class AFTSurvivalRegressionSuite extends MLTest with DefaultReadWriteTest {
aft.fit(dfWithStringCensors)
}
assert(thrown.getMessage.contains(
"Column censor must be of type NumericType but was actually of type StringType"))
"Column censor must be of type numeric but was actually of type string"))
}

test("numerical stability of standardization") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object MLTestingUtils extends SparkFunSuite {
estimator.fit(dfWithStringLabels)
}
assert(thrown.getMessage.contains(
"Column label must be of type NumericType but was actually of type StringType"))
"Column label must be of type numeric but was actually of type string"))

estimator match {
case weighted: Estimator[M] with HasWeightCol =>
Expand All @@ -86,7 +86,7 @@ object MLTestingUtils extends SparkFunSuite {
weighted.fit(dfWithStringWeights)
}
assert(thrown.getMessage.contains(
"Column weight must be of type NumericType but was actually of type StringType"))
"Column weight must be of type numeric but was actually of type string"))
case _ =>
}
}
Expand All @@ -104,7 +104,7 @@ object MLTestingUtils extends SparkFunSuite {
evaluator.evaluate(dfWithStringLabels)
}
assert(thrown.getMessage.contains(
"Column label must be of type NumericType but was actually of type StringType"))
"Column label must be of type numeric but was actually of type string"))
}

def genClassifDFWithNumericLabelCol(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2252,7 +2252,7 @@ class Analyzer(
}
expr
case other =>
throw new AnalysisException("need an array field but got " + other.simpleString)
throw new AnalysisException("need an array field but got " + other.catalogString)
}
}
validateNestedTupleFields(result)
Expand All @@ -2261,8 +2261,8 @@ class Analyzer(
}

private def fail(schema: StructType, maxOrdinal: Int): Unit = {
throw new AnalysisException(s"Try to map ${schema.simpleString} to Tuple${maxOrdinal + 1}, " +
"but failed as the number of fields does not line up.")
throw new AnalysisException(s"Try to map ${schema.catalogString} to Tuple${maxOrdinal + 1}" +
", but failed as the number of fields does not line up.")
}

/**
Expand Down Expand Up @@ -2341,7 +2341,7 @@ class Analyzer(
case e => e.sql
}
throw new AnalysisException(s"Cannot up cast $fromStr from " +
s"${from.dataType.simpleString} to ${to.simpleString} as it may truncate\n" +
s"${from.dataType.catalogString} to ${to.catalogString} as it may truncate\n" +
"The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") +
"You can either add an explicit cast to the input data or choose a higher precision " +
"type of the field in the target object")
Expand Down
Loading