From 6ec509b990c985ad8519aa563cfe08c24e6847ae Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 30 Jun 2017 17:11:01 +0900 Subject: [PATCH 1/2] Call cross join path in PySpark join rather than throwing NPE --- python/pyspark/sql/dataframe.py | 2 ++ python/pyspark/sql/tests.py | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0649271ed224..27a6dad8917d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -833,6 +833,8 @@ def join(self, other, on=None, how=None): else: if how is None: how = "inner" + if on is None: + on = self._jseq([]) assert isinstance(how, basestring), "how should be basestring" jdf = self._jdf.join(other._jdf, on, how) return DataFrame(jdf, self.sql_ctx) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 0a1cd6856b8e..ab256a9aa9d2 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2021,6 +2021,11 @@ def test_toDF_with_schema_string(self): self.assertEqual(df.schema.simpleString(), "struct") self.assertEqual(df.collect(), [Row(key=i) for i in range(100)]) + def test_join_without_on(self): + self.assertRaises( + AnalysisException, + lambda: self.spark.range(1).join(self.spark.range(1), how="inner").collect()) + # Regression test for invalid join methods when on is None, Spark-14761 def test_invalid_join_method(self): df1 = self.spark.createDataFrame([("Alice", 5), ("Bob", 8)], ["name", "age"]) From 48a92d1166e9ab13ef8926885b12c5e513148115 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 3 Jul 2017 17:59:29 +0900 Subject: [PATCH 2/2] Add both tests before/after "spark.sql.crossJoin.enabled" --- python/pyspark/sql/tests.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index ab256a9aa9d2..c105969b26b9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2022,9 +2022,20 @@ def test_toDF_with_schema_string(self): self.assertEqual(df.collect(), [Row(key=i) for i in range(100)]) def test_join_without_on(self): - self.assertRaises( - AnalysisException, - lambda: self.spark.range(1).join(self.spark.range(1), how="inner").collect()) + df1 = self.spark.range(1).toDF("a") + df2 = self.spark.range(1).toDF("b") + + try: + self.spark.conf.set("spark.sql.crossJoin.enabled", "false") + self.assertRaises(AnalysisException, lambda: df1.join(df2, how="inner").collect()) + + self.spark.conf.set("spark.sql.crossJoin.enabled", "true") + actual = df1.join(df2, how="inner").collect() + expected = [Row(a=0, b=0)] + self.assertEqual(actual, expected) + finally: + # We should unset this. Otherwise, other tests are affected. + self.spark.conf.unset("spark.sql.crossJoin.enabled") # Regression test for invalid join methods when on is None, Spark-14761 def test_invalid_join_method(self):