From 68a1c7074e959d3bec595747194606a7c478c5d1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 May 2015 01:56:47 +0800 Subject: [PATCH 1/3] Show warning message if mapred.reduce.tasks is set to -1. --- .../org/apache/spark/sql/execution/commands.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 99f24910fd61..200397cb5c0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -84,8 +84,15 @@ case class SetCommand( logWarning( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") - sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value) - Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value")) + if (value == "-1") { + logWarning( + s"Set this property to -1 for automatically determining the number of reducers " + + s"is not supported, showing current ${SQLConf.SHUFFLE_PARTITIONS} instead.") + Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}")) + } else { + sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value) + Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value")) + } // Configures a single property. case Some((key, Some(value))) => From 4ede705512d0d84682a742e1fb35b79ffc7a972d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 May 2015 10:59:06 +0800 Subject: [PATCH 2/3] Throw exception instead of warning message. --- .../scala/org/apache/spark/sql/execution/commands.scala | 7 +++---- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 +++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 200397cb5c0f..33cca8a5516d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -85,10 +85,9 @@ case class SetCommand( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") if (value == "-1") { - logWarning( - s"Set this property to -1 for automatically determining the number of reducers " + - s"is not supported, showing current ${SQLConf.SHUFFLE_PARTITIONS} instead.") - Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}")) + val msg = s"Setting ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} to -1 for automatically " + + "determining the number of reducers is not supported." + throw new IllegalArgumentException(msg) } else { sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value) Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9e02e69fda3f..e26cf274e682 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -830,6 +830,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { conf.clear() } + test("SET commands with illegal or inappropriate argument") { + conf.clear() + // mapred.reduce.tasks=-1 for automatically determing the number of reducers is not supported + intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-1")) + conf.clear() + } + test("apply schema") { val schema1 = StructType( StructField("f1", IntegerType, false) :: From e518f965ba3b593114a74cac52fe229252ab3d26 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 4 May 2015 18:02:26 +0800 Subject: [PATCH 3/3] Consider other wrong setting values. --- .../main/scala/org/apache/spark/sql/execution/commands.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 4a65e7539791..65687db4e623 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -84,8 +84,8 @@ case class SetCommand( logWarning( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") - if (value == "-1") { - val msg = s"Setting ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} to -1 for automatically " + + if (value.toInt < 1) { + val msg = s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " + "determining the number of reducers is not supported." throw new IllegalArgumentException(msg) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index fbc2d4ce093d..e374bd87e97d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -864,8 +864,11 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { test("SET commands with illegal or inappropriate argument") { conf.clear() - // mapred.reduce.tasks=-1 for automatically determing the number of reducers is not supported + // Set negative mapred.reduce.tasks for automatically determing + // the number of reducers is not supported intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-1")) + intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-01")) + intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-2")) conf.clear() }