From 3e645a79e21ee712fa8a17b6abcdad4a42ae7535 Mon Sep 17 00:00:00 2001 From: cclauss Date: Fri, 16 Mar 2018 00:08:04 +0100 Subject: [PATCH 1/9] [SPARK-23698] Reduce undefined names in Python 3 --- dev/create-release/releaseutils.py | 8 ++++++-- dev/merge_spark_pr.py | 2 +- python/pyspark/sql/conf.py | 5 ++++- python/pyspark/sql/streaming.py | 5 +---- python/pyspark/streaming/dstream.py | 2 ++ .../src/test/resources/data/scripts/dumpdata_script.py | 3 +++ 6 files changed, 17 insertions(+), 8 deletions(-) diff --git a/dev/create-release/releaseutils.py b/dev/create-release/releaseutils.py index ab812e1bb7c0..8cc990d87184 100755 --- a/dev/create-release/releaseutils.py +++ b/dev/create-release/releaseutils.py @@ -50,7 +50,7 @@ sys.exit(-1) if sys.version < '3': - input = raw_input + input = raw_input # noqa # Contributors list file name contributors_file_name = "contributors.txt" @@ -152,7 +152,11 @@ def get_commits(tag): if not is_valid_author(author): author = github_username # Guard against special characters - author = unidecode.unidecode(unicode(author, "UTF-8")).strip() + try: # Python 2 + author = unicode(author, "UTF-8") + except NameError: # Python 3 + author = str(author) + author = unidecode.unidecode(author).strip() commit = Commit(_hash, author, title, pr_number) commits.append(commit) return commits diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index fe05282efdd4..28a6714856c1 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -40,7 +40,7 @@ JIRA_IMPORTED = False if sys.version < '3': - input = raw_input + input = raw_input # noqa # Location of your Spark git development area SPARK_HOME = os.environ.get("SPARK_HOME", os.getcwd()) diff --git a/python/pyspark/sql/conf.py b/python/pyspark/sql/conf.py index f80bf598c221..71ea1631718f 100644 --- a/python/pyspark/sql/conf.py +++ b/python/pyspark/sql/conf.py @@ -20,6 +20,9 @@ from pyspark import since, _NoValue from pyspark.rdd import ignore_unicode_prefix +if sys.version_info[0] >= 3: + basestring = str + class RuntimeConfig(object): """User-facing configuration API, accessible through `SparkSession.conf`. @@ -59,7 +62,7 @@ def unset(self, key): def _checkType(self, obj, identifier): """Assert that an object is of type str.""" - if not isinstance(obj, str) and not isinstance(obj, unicode): + if not isinstance(obj, basestring): raise TypeError("expected %s '%s' to be a string (was '%s')" % (identifier, obj, type(obj).__name__)) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 8c1fd4af674d..ee13778a7dcd 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -19,10 +19,7 @@ import json if sys.version >= '3': - intlike = int - basestring = unicode = str -else: - intlike = (int, long) + basestring = str from py4j.java_gateway import java_import diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 59977dcb435a..ce42a857d0c0 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -23,6 +23,8 @@ if sys.version < "3": from itertools import imap as map, ifilter as filter +else: + long = int from py4j.protocol import Py4JJavaError diff --git a/sql/hive/src/test/resources/data/scripts/dumpdata_script.py b/sql/hive/src/test/resources/data/scripts/dumpdata_script.py index 341a1b40e07a..5b360208d36f 100644 --- a/sql/hive/src/test/resources/data/scripts/dumpdata_script.py +++ b/sql/hive/src/test/resources/data/scripts/dumpdata_script.py @@ -18,6 +18,9 @@ # import sys +if sys.version_info[0] >= 3: + xrange = range + for i in xrange(50): for j in xrange(5): for k in xrange(20022): From a74bb53a26c7d91ad652c40d0c91af32bdf99d2b Mon Sep 17 00:00:00 2001 From: cclauss Date: Fri, 27 Jul 2018 21:43:37 +0200 Subject: [PATCH 2/9] add test_slice() to python/pyspark/streaming/tests.py --- python/pyspark/streaming/tests.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 09af47a597be..aa948e3dac1b 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime as dt import glob import os import sys @@ -206,6 +207,22 @@ def func(dstream): expected = [[len(x)] for x in input] self._test_func(input, func, expected) + def test_slice(self): + """Basic operation test for DStream.slice.""" + eol_python2 = dt.datetime(2020, 1, 1) + five_secs = dt.timedelta(seconds=30) + input = [eol_python2 - five_secs, eol_python2] + + def func(dstream): + return dstream.slice() + expected = [dt.datetime(2019, 12, 31, 23, 55), + dt.datetime(2019, 12, 31, 23, 56), + dt.datetime(2019, 12, 31, 23, 57), + dt.datetime(2019, 12, 31, 23, 58), + dt.datetime(2019, 12, 31, 23, 59), + dt.datetime(2020, 1, 1)] # fat lady sings... + self._test_func(input, func, expected) + def test_reduce(self): """Basic operation test for DStream.reduce.""" input = [range(1, 5), range(5, 9), range(9, 13)] From 561ec8aa0f5e6fe4d29a4ed1d69537857b9e6c0d Mon Sep 17 00:00:00 2001 From: cclauss Date: Fri, 27 Jul 2018 21:48:52 +0200 Subject: [PATCH 3/9] five_secs = dt.timedelta(seconds=5) --- python/pyspark/streaming/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index aa948e3dac1b..8503dd7dc084 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -210,7 +210,7 @@ def func(dstream): def test_slice(self): """Basic operation test for DStream.slice.""" eol_python2 = dt.datetime(2020, 1, 1) - five_secs = dt.timedelta(seconds=30) + five_secs = dt.timedelta(seconds=5) input = [eol_python2 - five_secs, eol_python2] def func(dstream): From 54d65631de77c26da84f747c8a804d7b3fb25de0 Mon Sep 17 00:00:00 2001 From: cclauss Date: Sat, 4 Aug 2018 07:26:07 +0200 Subject: [PATCH 4/9] Remove comment --- python/pyspark/streaming/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 8503dd7dc084..549372af9bed 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -220,7 +220,7 @@ def func(dstream): dt.datetime(2019, 12, 31, 23, 57), dt.datetime(2019, 12, 31, 23, 58), dt.datetime(2019, 12, 31, 23, 59), - dt.datetime(2020, 1, 1)] # fat lady sings... + dt.datetime(2020, 1, 1)] self._test_func(input, func, expected) def test_reduce(self): From 4e74e687184785d270cfe45f2fff7f6e7de31fd5 Mon Sep 17 00:00:00 2001 From: cclauss Date: Fri, 10 Aug 2018 23:35:06 +0200 Subject: [PATCH 5/9] dstream.slice() returns a list of lists --- python/pyspark/streaming/tests.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 549372af9bed..56fd2d65205c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -215,12 +215,12 @@ def test_slice(self): def func(dstream): return dstream.slice() - expected = [dt.datetime(2019, 12, 31, 23, 55), - dt.datetime(2019, 12, 31, 23, 56), - dt.datetime(2019, 12, 31, 23, 57), - dt.datetime(2019, 12, 31, 23, 58), - dt.datetime(2019, 12, 31, 23, 59), - dt.datetime(2020, 1, 1)] + expected = [[dt.datetime(2019, 12, 31, 23, 55)], + [dt.datetime(2019, 12, 31, 23, 56)], + [dt.datetime(2019, 12, 31, 23, 57)], + [dt.datetime(2019, 12, 31, 23, 58)], + [dt.datetime(2019, 12, 31, 23, 59)], + [dt.datetime(2020, 1, 1)]] self._test_func(input, func, expected) def test_reduce(self): From f41bc3172cfa135bf0086c4c004775ccfe5a25fd Mon Sep 17 00:00:00 2001 From: cclauss Date: Sat, 11 Aug 2018 10:39:23 +0200 Subject: [PATCH 6/9] Remove definition of long for Python 3 --- python/pyspark/streaming/dstream.py | 2 -- python/pyspark/streaming/tests.py | 17 ----------------- 2 files changed, 19 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index ce42a857d0c0..59977dcb435a 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -23,8 +23,6 @@ if sys.version < "3": from itertools import imap as map, ifilter as filter -else: - long = int from py4j.protocol import Py4JJavaError diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 56fd2d65205c..09af47a597be 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -15,7 +15,6 @@ # limitations under the License. # -import datetime as dt import glob import os import sys @@ -207,22 +206,6 @@ def func(dstream): expected = [[len(x)] for x in input] self._test_func(input, func, expected) - def test_slice(self): - """Basic operation test for DStream.slice.""" - eol_python2 = dt.datetime(2020, 1, 1) - five_secs = dt.timedelta(seconds=5) - input = [eol_python2 - five_secs, eol_python2] - - def func(dstream): - return dstream.slice() - expected = [[dt.datetime(2019, 12, 31, 23, 55)], - [dt.datetime(2019, 12, 31, 23, 56)], - [dt.datetime(2019, 12, 31, 23, 57)], - [dt.datetime(2019, 12, 31, 23, 58)], - [dt.datetime(2019, 12, 31, 23, 59)], - [dt.datetime(2020, 1, 1)]] - self._test_func(input, func, expected) - def test_reduce(self): """Basic operation test for DStream.reduce.""" input = [range(1, 5), range(5, 9), range(9, 13)] From 73a4fd26eed19256da80d27b07b2e5f4d85eb9f6 Mon Sep 17 00:00:00 2001 From: cclauss Date: Sat, 18 Aug 2018 18:01:39 +0200 Subject: [PATCH 7/9] restore definition of long and test_slice() --- python/pyspark/streaming/dstream.py | 2 ++ python/pyspark/streaming/tests.py | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 59977dcb435a..ce42a857d0c0 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -23,6 +23,8 @@ if sys.version < "3": from itertools import imap as map, ifilter as filter +else: + long = int from py4j.protocol import Py4JJavaError diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 09af47a597be..56fd2d65205c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime as dt import glob import os import sys @@ -206,6 +207,22 @@ def func(dstream): expected = [[len(x)] for x in input] self._test_func(input, func, expected) + def test_slice(self): + """Basic operation test for DStream.slice.""" + eol_python2 = dt.datetime(2020, 1, 1) + five_secs = dt.timedelta(seconds=5) + input = [eol_python2 - five_secs, eol_python2] + + def func(dstream): + return dstream.slice() + expected = [[dt.datetime(2019, 12, 31, 23, 55)], + [dt.datetime(2019, 12, 31, 23, 56)], + [dt.datetime(2019, 12, 31, 23, 57)], + [dt.datetime(2019, 12, 31, 23, 58)], + [dt.datetime(2019, 12, 31, 23, 59)], + [dt.datetime(2020, 1, 1)]] + self._test_func(input, func, expected) + def test_reduce(self): """Basic operation test for DStream.reduce.""" input = [range(1, 5), range(5, 9), range(9, 13)] From afd508e1740d7d77fa2cc64e4650c0b840ba1bb1 Mon Sep 17 00:00:00 2001 From: cclauss Date: Tue, 21 Aug 2018 11:49:43 +0200 Subject: [PATCH 8/9] Replacing test_slice() with new implementation Thanks @BryanCutler --- python/pyspark/streaming/tests.py | 40 +++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 56fd2d65205c..f12feb26a6e6 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -209,19 +209,35 @@ def func(dstream): def test_slice(self): """Basic operation test for DStream.slice.""" - eol_python2 = dt.datetime(2020, 1, 1) - five_secs = dt.timedelta(seconds=5) - input = [eol_python2 - five_secs, eol_python2] + import datetime as dt + self.ssc = StreamingContext(self.sc, 1.0) + self.ssc.remember(4.0) + input = [[1], [2], [3], [4]] + stream = self.ssc.queueStream([self.sc.parallelize(d, 1) for d in input]) - def func(dstream): - return dstream.slice() - expected = [[dt.datetime(2019, 12, 31, 23, 55)], - [dt.datetime(2019, 12, 31, 23, 56)], - [dt.datetime(2019, 12, 31, 23, 57)], - [dt.datetime(2019, 12, 31, 23, 58)], - [dt.datetime(2019, 12, 31, 23, 59)], - [dt.datetime(2020, 1, 1)]] - self._test_func(input, func, expected) + time_vals = [] + + def get_times(t, rdd): + if rdd and len(time_vals) < len(input): + time_vals.append(t) + + stream.foreachRDD(get_times) + + self.ssc.start() + self.wait_for(time_vals, 4) + begin_time = time_vals[0] + + def get_sliced(begin_delta, end_delta): + begin = begin_time + dt.timedelta(seconds=begin_delta) + end = begin_time + dt.timedelta(seconds=end_delta) + rdds = stream.slice(begin, end) + result_list = [rdd.collect() for rdd in rdds] + return [r for result in result_list for r in result] + + self.assertEqual(set([1]), set(get_sliced(0, 0))) + self.assertEqual(set([2, 3]), set(get_sliced(1, 2))) + self.assertEqual(set([2, 3, 4]), set(get_sliced(1, 4))) + self.assertEqual(set([1, 2, 3, 4]), set(get_sliced(0, 4))) def test_reduce(self): """Basic operation test for DStream.reduce.""" From 5b3658c08de88edf73d2798c7dc41b24e6f04f6e Mon Sep 17 00:00:00 2001 From: cclauss Date: Tue, 21 Aug 2018 20:31:20 +0200 Subject: [PATCH 9/9] Fix redundant import and typo in comment --- python/pyspark/streaming/tests.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index f12feb26a6e6..5cef621a28e6 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -15,7 +15,6 @@ # limitations under the License. # -import datetime as dt import glob import os import sys @@ -180,7 +179,7 @@ def func(dstream): self._test_func(input, func, expected) def test_flatMap(self): - """Basic operation test for DStream.faltMap.""" + """Basic operation test for DStream.flatMap.""" input = [range(1, 5), range(5, 9), range(9, 13)] def func(dstream):