From a643ef2f60a45c202d71e39d0cdaa32fb1350890 Mon Sep 17 00:00:00 2001 From: Corey Woodfield Date: Wed, 28 Jun 2017 18:11:26 -0600 Subject: [PATCH 1/4] Removed invalid joinTypes from javadoc of Dataset#joinWith --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 7be4aa1ca9562..c183132ec4b3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -990,7 +990,7 @@ class Dataset[T] private[sql]( * @param condition Join expression. * @param joinType Type of join to perform. Default `inner`. Must be one of: * `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, - * `right`, `right_outer`, `left_semi`, `left_anti`. + * `right`, `right_outer`. * * @group typedrel * @since 1.6.0 From 00af86152349bdd9f812fef2bd19530d70a96a6f Mon Sep 17 00:00:00 2001 From: Corey Woodfield Date: Tue, 4 Jul 2017 10:51:04 -0600 Subject: [PATCH 2/4] Added check for invalid join types in joinWith and tests --- .../main/scala/org/apache/spark/sql/Dataset.scala | 4 ++++ .../scala/org/apache/spark/sql/DatasetSuite.scala | 13 +++++++++++++ 2 files changed, 17 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c183132ec4b3c..aeef879ca0502 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1007,6 +1007,10 @@ class Dataset[T] private[sql]( JoinType(joinType), Some(condition.expr))).analyzed.asInstanceOf[Join] + if (joined.joinType == LeftSemi || joined.joinType == LeftAnti) { + throw new AnalysisException("Invalid join type in joinWith: " + joined.joinType) + } + // For both join side, combine all outputs into a single column and alias it with "_1" or "_2", // to match the schema for the encoder of the join result. // Note that we do this before joining them, to enable the join operator to return null for one diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 87b7b090de3bf..748b524a2395d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -321,6 +321,19 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ((("b", 2), ("b", 2)), ("b", 2))) } + test("joinWith join types") { + val ds1 = Seq(1, 2, 3).toDS().as("a") + val ds2 = Seq(1, 2).toDS().as("b") + + intercept[AnalysisException] { + ds1.joinWith(ds2, $"a.value" === $"b.value", "left_semi") + } + + intercept[AnalysisException] { + ds1.joinWith(ds2, $"a.value" === $"b.value", "left_anti") + } + } + test("groupBy function, keys") { val ds = Seq(("a", 1), ("b", 1)).toDS() val grouped = ds.groupByKey(v => (1, v._2)) From c00daf8b3f2f9c8f42411138c0dcc56a5e97d5e2 Mon Sep 17 00:00:00 2001 From: Corey Woodfield Date: Thu, 6 Jul 2017 18:07:15 -0600 Subject: [PATCH 3/4] Added check to make sure correct exception was being caught in test --- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 748b524a2395d..ba03199d660b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -325,13 +325,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val ds1 = Seq(1, 2, 3).toDS().as("a") val ds2 = Seq(1, 2).toDS().as("b") - intercept[AnalysisException] { + val e1 = intercept[AnalysisException] { ds1.joinWith(ds2, $"a.value" === $"b.value", "left_semi") - } + }.getMessage + assert(e1.contains("Invalid join type in joinWith: LeftSemi")) - intercept[AnalysisException] { + val e2 = intercept[AnalysisException] { ds1.joinWith(ds2, $"a.value" === $"b.value", "left_anti") - } + }.getMessage + assert(e2.contains("Invalid join type in joinWith: LeftAnti")) } test("groupBy function, keys") { From 9aee54a0750bb3275aa2c48a50c12760cab15281 Mon Sep 17 00:00:00 2001 From: Corey Woodfield Date: Fri, 7 Jul 2017 07:26:25 -0600 Subject: [PATCH 4/4] Analysis exception now contains joinType.sql instead of joinType Also made tests more robust and less likely to break if changes are made to joinTypes --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index aeef879ca0502..06ff601d67a1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1008,7 +1008,7 @@ class Dataset[T] private[sql]( Some(condition.expr))).analyzed.asInstanceOf[Join] if (joined.joinType == LeftSemi || joined.joinType == LeftAnti) { - throw new AnalysisException("Invalid join type in joinWith: " + joined.joinType) + throw new AnalysisException("Invalid join type in joinWith: " + joined.joinType.sql) } // For both join side, combine all outputs into a single column and alias it with "_1" or "_2", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index ba03199d660b8..4a1e3e1599599 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -21,6 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} +import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SortExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange} @@ -328,12 +329,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val e1 = intercept[AnalysisException] { ds1.joinWith(ds2, $"a.value" === $"b.value", "left_semi") }.getMessage - assert(e1.contains("Invalid join type in joinWith: LeftSemi")) + assert(e1.contains("Invalid join type in joinWith: " + LeftSemi.sql)) val e2 = intercept[AnalysisException] { ds1.joinWith(ds2, $"a.value" === $"b.value", "left_anti") }.getMessage - assert(e2.contains("Invalid join type in joinWith: LeftAnti")) + assert(e2.contains("Invalid join type in joinWith: " + LeftAnti.sql)) } test("groupBy function, keys") {