From e9220b4e68178a6cba4e02bf34f0623a80dbc31f Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 9 Aug 2018 00:59:25 +0900 Subject: [PATCH 1/2] Add default case to match. --- .../scala/org/apache/spark/sql/DataFrameNaFunctions.scala | 2 ++ .../org/apache/spark/sql/execution/aggregate/AggUtils.scala | 4 ++++ .../scala/org/apache/spark/sql/execution/command/tables.scala | 3 +++ .../spark/sql/execution/streaming/MicroBatchExecution.scala | 3 +++ .../streaming/state/SymmetricHashJoinStateManager.scala | 3 +++ .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 3 +++ 6 files changed, 18 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index f3a2b70657c48..5288907b7d7ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -494,6 +494,8 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { case (NumericType, dt) => dt.isInstanceOf[NumericType] case (StringType, dt) => dt == StringType case (BooleanType, dt) => dt == BooleanType + case _ => + throw new IllegalArgumentException(s"$targetType is not matched at fillValue") } // Only fill if the column is part of the cols list. if (typeMatches && cols.exists(col => columnEquals(f.name, col))) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index ebbdf1aaa024d..c8ef2b3f6998d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -177,6 +177,10 @@ object AggUtils { case agg @ AggregateExpression(aggregateFunction, mode, true, _) => aggregateFunction.transformDown(distinctColumnAttributeLookup) .asInstanceOf[AggregateFunction] + case agg => + throw new IllegalArgumentException( + "Non-distinct aggregate is found in functionsWithDistinct " + + s"at planAggregateWithOneDistinct: $agg") } val partialDistinctAggregate: SparkPlan = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 56f48b7dc00ee..f4dede9fcc899 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -960,6 +960,9 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman case EXTERNAL => " EXTERNAL TABLE" case VIEW => " VIEW" case MANAGED => " TABLE" + case t => + throw new IllegalArgumentException( + s"Unknown table type is found at showCreateHiveTable: $t") } builder ++= s"CREATE$tableTypeString ${table.quotedString}" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index c759f5be8ba35..d6cdc6d5443f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -394,6 +394,9 @@ class MicroBatchExecution( case (src: Source, off) => src.commit(off) case (reader: MicroBatchReader, off) => reader.commit(reader.deserializeOffset(off.json)) + case (src, _) => + throw new IllegalArgumentException( + s"Unknows source is found at constructNextBatch: $src") } } else { throw new IllegalStateException(s"batch ${currentBatchId - 1} doesn't exist") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index 55d783e023246..4676db29b20b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -273,6 +273,9 @@ class SymmetricHashJoinStateManager( s.copy(desc = newDesc(desc)) -> value case (s @ StateStoreCustomTimingMetric(_, desc), value) => s.copy(desc = newDesc(desc)) -> value + case (s, _) => + throw new IllegalArgumentException( + s"Unknown state store custom metric is found at metrics: $s") } ) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index db8fd5a43d842..02c1ed93eb2f8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -927,6 +927,9 @@ private[hive] object HiveClientImpl { case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW + case t => + throw new IllegalArgumentException( + s"Unknown table type is found at toHiveTable: $t") }) // Note: In Hive the schema and partition columns must be disjoint sets val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => From fc5d7a1c59f8769dffac145a7c7e8dc533a7fc65 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 9 Aug 2018 02:27:23 +0900 Subject: [PATCH 2/2] Fix. --- .../spark/sql/execution/streaming/MicroBatchExecution.scala | 2 +- .../streaming/state/SymmetricHashJoinStateManager.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index d6cdc6d5443f7..b1c91ac94b268 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -396,7 +396,7 @@ class MicroBatchExecution( reader.commit(reader.deserializeOffset(off.json)) case (src, _) => throw new IllegalArgumentException( - s"Unknows source is found at constructNextBatch: $src") + s"Unknown source is found at constructNextBatch: $src") } } else { throw new IllegalStateException(s"batch ${currentBatchId - 1} doesn't exist") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index 4676db29b20b3..6e7cd2db213d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -275,7 +275,7 @@ class SymmetricHashJoinStateManager( s.copy(desc = newDesc(desc)) -> value case (s, _) => throw new IllegalArgumentException( - s"Unknown state store custom metric is found at metrics: $s") + s"Unknown state store custom metric is found at metrics: $s") } ) }