From 8ef6b219ba876da151013a3db1d18a73e3d30d17 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Tue, 10 May 2016 17:41:43 -0400 Subject: [PATCH 1/5] clarify jdbc() docstring --- python/pyspark/sql/readwriter.py | 39 +++++++++++++++++--------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bd728c97c82a..c5ff36a26215 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -402,7 +402,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non def orc(self, path): """Loads an ORC file, returning the result as a :class:`DataFrame`. - ::Note: Currently ORC support is only available together with + .. note:: Currently ORC support is only available together with :class:`HiveContext`. >>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') @@ -415,28 +415,31 @@ def orc(self, path): def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None): """ - Construct a :class:`DataFrame` representing the database table accessible - via JDBC URL `url` named `table` and connection `properties`. + Construct a :class:`DataFrame` representing the database table named ``table`` + accessible via JDBC URL ``url`` and connection ``properties``. - The `column` parameter could be used to partition the table, then it will - be retrieved in parallel based on the parameters passed to this function. + Partitions of the table will be retrieved in parallel if either ``column`` or + ``predicates`` is specified. - The `predicates` parameter gives a list expressions suitable for inclusion - in WHERE clauses; each one defines one partition of the :class:`DataFrame`. + If both ``column`` and ``predicates`` are specified, ``column`` will be used. - ::Note: Don't create too many partitions in parallel on a large cluster; + .. note:: Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems. - :param url: a JDBC URL - :param table: name of table - :param column: the column used to partition - :param lowerBound: the lower bound of partition column - :param upperBound: the upper bound of the partition column + :param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` + :param table: the name of the table + :param column: the name of an integer column that will be used for partitioning; + if this parameter is specified, then ``numPartitions``, ``lowerBound`` + (inclusive), and ``upperBound`` (exclusive) will form partition strides + for generated WHERE clause expressions used to split the column + ``column`` evenly + :param lowerBound: the minimum value of ``column`` used to decide partition stride + :param upperBound: the maximum value of ``column`` used to decide partition stride :param numPartitions: the number of partitions - :param predicates: a list of expressions - :param properties: JDBC database connection arguments, a list of arbitrary string - tag/value. Normally at least a "user" and "password" property - should be included. + :param predicates: a list of expressions suitable for inclusion in WHERE clauses; + each one defines one partition of the :class:`DataFrame` + :param properties: a dictionary of JDBC database connection arguments; normally, + at least a "user" and "password" property should be included :return: a DataFrame """ if properties is None: @@ -808,7 +811,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No def orc(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in ORC format at the specified path. - ::Note: Currently ORC support is only available together with + .. note:: Currently ORC support is only available together with :class:`HiveContext`. :param path: the path in any Hadoop supported file system From 237ebe3c02a2ec4c8863b980ccd1a017fe33679a Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Tue, 10 May 2016 17:53:30 -0400 Subject: [PATCH 2/5] fix Sphinx notes --- python/pyspark/sql/readwriter.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index c5ff36a26215..a122cded21bf 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -402,8 +402,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non def orc(self, path): """Loads an ORC file, returning the result as a :class:`DataFrame`. - .. note:: Currently ORC support is only available together with - :class:`HiveContext`. + .. note:: Currently ORC support is only available together with :class:`HiveContext`. >>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') >>> df.dtypes @@ -423,7 +422,7 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar If both ``column`` and ``predicates`` are specified, ``column`` will be used. - .. note:: Don't create too many partitions in parallel on a large cluster; + .. note:: Don't create too many partitions in parallel on a large cluster; \ otherwise Spark might crash your external database systems. :param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` @@ -811,8 +810,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No def orc(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in ORC format at the specified path. - .. note:: Currently ORC support is only available together with - :class:`HiveContext`. + .. note:: Currently ORC support is only available together with :class:`HiveContext`. :param path: the path in any Hadoop supported file system :param mode: specifies the behavior of the save operation when data already exists. @@ -840,7 +838,7 @@ def orc(self, path, mode=None, partitionBy=None, compression=None): def jdbc(self, url, table, mode=None, properties=None): """Saves the content of the :class:`DataFrame` to a external database table via JDBC. - .. note:: Don't create too many partitions in parallel on a large cluster;\ + .. note:: Don't create too many partitions in parallel on a large cluster; \ otherwise Spark might crash your external database systems. :param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` From 183b6e7685b6b30005aac1f995b38e3e74ac02d9 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Tue, 10 May 2016 18:36:13 -0400 Subject: [PATCH 3/5] update references to HiveContext, since that's being deprecated --- python/pyspark/sql/readwriter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index a122cded21bf..6158e1783d3e 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -402,7 +402,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non def orc(self, path): """Loads an ORC file, returning the result as a :class:`DataFrame`. - .. note:: Currently ORC support is only available together with :class:`HiveContext`. + .. note:: Currently ORC support is only available together with Hive support. >>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') >>> df.dtypes @@ -810,7 +810,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No def orc(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in ORC format at the specified path. - .. note:: Currently ORC support is only available together with :class:`HiveContext`. + .. note:: Currently ORC support is only available together with Hive support. :param path: the path in any Hadoop supported file system :param mode: specifies the behavior of the save operation when data already exists. From 6a73fe9930bf0041c8ad38a130be6ec86d11a173 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 11 May 2016 17:05:01 -0400 Subject: [PATCH 4/5] remove remaining mentions of HiveContext --- python/pyspark/sql/readwriter.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 6158e1783d3e..1dc9069d3893 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -404,7 +404,7 @@ def orc(self, path): .. note:: Currently ORC support is only available together with Hive support. - >>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') + >>> df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> df.dtypes [('a', 'bigint'), ('b', 'int'), ('c', 'int')] """ @@ -540,7 +540,7 @@ def partitionBy(self, *cols): def queryName(self, queryName): """Specifies the name of the :class:`ContinuousQuery` that can be started with :func:`startStream`. This name must be unique among all the currently active queries - in the associated spark + in the associated SparkSession. .. note:: Experimental. @@ -824,7 +824,7 @@ def orc(self, path, mode=None, partitionBy=None, compression=None): known case-insensitive shorten names (none, snappy, zlib, and lzo). This will overwrite ``orc.compress``. - >>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') + >>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) @@ -866,29 +866,27 @@ def _test(): import os import tempfile from pyspark.context import SparkContext - from pyspark.sql import SparkSession, Row, HiveContext + from pyspark.sql import SparkSession, Row import pyspark.sql.readwriter os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.readwriter.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') + spark = SparkSession.withHiveSupport(sc) globs['tempfile'] = tempfile globs['os'] = os globs['sc'] = sc - globs['spark'] = SparkSession.builder\ - .enableHiveSupport()\ - .getOrCreate() - globs['hiveContext'] = HiveContext._createForTesting(sc) - globs['df'] = globs['spark'].read.parquet('python/test_support/sql/parquet_partitioned') + globs['spark'] = spark + globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned') globs['sdf'] = \ - globs['spark'].read.format('text').stream('python/test_support/sql/streaming') + spark.read.format('text').stream('python/test_support/sql/streaming') (failure_count, test_count) = doctest.testmod( pyspark.sql.readwriter, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) - globs['sc'].stop() + sc.stop() if failure_count: exit(-1) From e7d97e494332c335a4f3593658cc71f0fe4b2eff Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 11 May 2016 18:03:56 -0400 Subject: [PATCH 5/5] don't use Hive support if it's not available --- python/pyspark/sql/readwriter.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1dc9069d3893..7fd7583972c7 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -865,6 +865,7 @@ def _test(): import doctest import os import tempfile + import py4j from pyspark.context import SparkContext from pyspark.sql import SparkSession, Row import pyspark.sql.readwriter @@ -873,7 +874,10 @@ def _test(): globs = pyspark.sql.readwriter.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') - spark = SparkSession.withHiveSupport(sc) + try: + spark = SparkSession.withHiveSupport(sc) + except py4j.protocol.Py4JError: + spark = SparkSession(sc) globs['tempfile'] = tempfile globs['os'] = os