From 2fab469a13db647a2538e8b4539ae0f463849b13 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 2 May 2016 20:46:00 -0700 Subject: [PATCH 1/6] [SPARK-15084][PYSPARK] Use builder pattern to create SparkSession in PySpark. This is a port of corresponding Scala builder pattern code. `sql.py` is modified as a target example case. --- examples/src/main/python/sql.py | 35 +++++++--------- python/pyspark/sql/session.py | 72 +++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 21 deletions(-) diff --git a/examples/src/main/python/sql.py b/examples/src/main/python/sql.py index 2c188759328f2..28039d01d65dc 100644 --- a/examples/src/main/python/sql.py +++ b/examples/src/main/python/sql.py @@ -20,33 +20,28 @@ import os import sys -from pyspark import SparkContext -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType if __name__ == "__main__": - sc = SparkContext(appName="PythonSQL") - sqlContext = SQLContext(sc) + spark = SparkSession.builder().appName("PythonSQL").getOrCreate() - # RDD is created from a list of rows - some_rdd = sc.parallelize([Row(name="John", age=19), - Row(name="Smith", age=23), - Row(name="Sarah", age=18)]) - # Infer schema from the first row, create a DataFrame and print the schema - some_df = sqlContext.createDataFrame(some_rdd) + # A list of Rows. Infer schema from the first row, create a DataFrame and print the schema + rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)] + some_df = spark.createDataFrame(rows) some_df.printSchema() - # Another RDD is created from a list of tuples - another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)]) + # A list of tuples + tuples = [("John", 19), ("Smith", 23), ("Sarah", 18)] # Schema with two fields - person_name and person_age schema = StructType([StructField("person_name", StringType(), False), StructField("person_age", IntegerType(), False)]) # Create a DataFrame by applying the schema to the RDD and print the schema - another_df = sqlContext.createDataFrame(another_rdd, schema) + another_df = spark.createDataFrame(tuples, schema) another_df.printSchema() # root - # |-- age: integer (nullable = true) + # |-- age: long (nullable = true) # |-- name: string (nullable = true) # A JSON dataset is pointed to by path. @@ -57,7 +52,7 @@ else: path = sys.argv[1] # Create a DataFrame from the file(s) pointed to by path - people = sqlContext.jsonFile(path) + people = spark.read.json(path) # root # |-- person_name: string (nullable = false) # |-- person_age: integer (nullable = false) @@ -65,16 +60,14 @@ # The inferred schema can be visualized using the printSchema() method. people.printSchema() # root - # |-- age: IntegerType - # |-- name: StringType + # |-- age: long (nullable = true) + # |-- name: string (nullable = true) # Register this DataFrame as a table. - people.registerAsTable("people") + people.registerTempTable("people") # SQL statements can be run by using the sql methods provided by sqlContext - teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") for each in teenagers.collect(): print(each[0]) - - sc.stop() diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 35c36b4935090..da8be0816849e 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -19,6 +19,7 @@ import sys import warnings from functools import reduce +from threading import RLock if sys.version >= '3': basestring = unicode = str @@ -445,6 +446,77 @@ def read(self): """ return DataFrameReader(self._wrapped) + @classmethod + @since(2.0) + def builder(cls): + """Returns a new :class:`SparkSession.Builder` for constructing a :class:`SparkSession`. + """ + return SparkSession.Builder() + + class Builder(object): + """Builder for :class:`SparkSession`. + """ + + _lock = RLock() + _options = {} + + @since(2.0) + def config(self, key=None, value=None, conf=None): + """Sets a config option. Options set using this method are automatically propagated to + both :class:`SparkConf` and :class:`SparkSession`'s own configuration. + + :param key: a key name string for configuration property + :param value: a value for configuration property + :param conf: an instance of :class:`SparkConf` + """ + with SparkSession.Builder._lock: + if conf is None: + self._options[key] = str(value) + else: + for (k, v) in conf.getAll(): + self._options[k] = v + return self + + @since(2.0) + def master(self, master): + """Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" + to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone + cluster. + + :param master: a url for spark master + """ + return self.config("spark.master", master) + + @since(2.0) + def appName(self, name): + """Sets a name for the application, which will be shown in the Spark web UI. + + :param name: an application name + """ + return self.config("spark.app.name", name) + + @since(2.0) + def enableHiveSupport(self): + """Enables Hive support, including connectivity to a persistent Hive metastore, support + for Hive serdes, and Hive user-defined functions. + """ + return self.config("spark.sql.catalogImplementation", "hive") + + @since(2.0) + def getOrCreate(self): + """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a new + one based on the options set in this builder. + """ + with SparkSession.Builder._lock: + from pyspark.conf import SparkConf + from pyspark.context import SparkContext + from pyspark.sql.context import SQLContext + sparkConf = SparkConf() + for key, value in self._options.iteritems(): + sparkConf.set(key, value) + sparkContext = SparkContext.getOrCreate(sparkConf) + return SQLContext.getOrCreate(sparkContext).sparkSession + def _test(): import os From d2bf344376070a108bcf2baa839e290d2f2580ba Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 2 May 2016 21:14:39 -0700 Subject: [PATCH 2/6] Update SparkSession doc. --- python/pyspark/sql/session.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index da8be0816849e..5791d03b2af40 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -59,10 +59,16 @@ def toDF(self, schema=None, sampleRatio=None): class SparkSession(object): - """Main entry point for Spark SQL functionality. + """The entry point to programming Spark with the Dataset and DataFrame API. A SparkSession can be used create :class:`DataFrame`, register :class:`DataFrame` as tables, execute SQL over tables, cache tables, and read parquet files. + To create a SparkSession, use the following builder pattern: + + >>> spark = SparkSession.builder() + ... .master("local") + ... .config("spark.some.config.option", "some-value") + ... .getOrCreate() :param sparkContext: The :class:`SparkContext` backing this SparkSession. :param jsparkSession: An optional JVM Scala SparkSession. If set, we do not instantiate a new From 3e1484e6474a7e73d43a70173deea2a2ca04a08f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 2 May 2016 21:42:57 -0700 Subject: [PATCH 3/6] Fix test. --- python/pyspark/sql/session.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 5791d03b2af40..f896e40439596 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -59,15 +59,15 @@ def toDF(self, schema=None, sampleRatio=None): class SparkSession(object): - """The entry point to programming Spark with the Dataset and DataFrame API. + r"""The entry point to programming Spark with the Dataset and DataFrame API. A SparkSession can be used create :class:`DataFrame`, register :class:`DataFrame` as tables, execute SQL over tables, cache tables, and read parquet files. To create a SparkSession, use the following builder pattern: - >>> spark = SparkSession.builder() - ... .master("local") - ... .config("spark.some.config.option", "some-value") + >>> spark = SparkSession.builder() \ + ... .master("local") \ + ... .config("spark.some.config.option", "some-value") \ ... .getOrCreate() :param sparkContext: The :class:`SparkContext` backing this SparkSession. @@ -518,7 +518,7 @@ def getOrCreate(self): from pyspark.context import SparkContext from pyspark.sql.context import SQLContext sparkConf = SparkConf() - for key, value in self._options.iteritems(): + for key, value in self._options.items(): sparkConf.set(key, value) sparkContext = SparkContext.getOrCreate(sparkConf) return SQLContext.getOrCreate(sparkContext).sparkSession From 4e7429c506a6915a9183f2ff9de6f75c722418b4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 3 May 2016 11:02:02 -0700 Subject: [PATCH 4/6] Use self._lock and add examples. --- python/pyspark/sql/session.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index f896e40439596..8f5defc344283 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -471,11 +471,20 @@ def config(self, key=None, value=None, conf=None): """Sets a config option. Options set using this method are automatically propagated to both :class:`SparkConf` and :class:`SparkSession`'s own configuration. + For an existing SparkConf, use `conf` parameter. + >>> from pyspark.conf import SparkConf + >>> SparkSession.builder().config(conf=SparkConf()) + + + For a (key, value) pair, you can omit parameter names. + >>> SparkSession.builder().config("spark.some.config.option", "some-value") + + :param key: a key name string for configuration property :param value: a value for configuration property :param conf: an instance of :class:`SparkConf` """ - with SparkSession.Builder._lock: + with self._lock: if conf is None: self._options[key] = str(value) else: @@ -513,7 +522,7 @@ def getOrCreate(self): """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a new one based on the options set in this builder. """ - with SparkSession.Builder._lock: + with self._lock: from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.sql.context import SQLContext From ac5bc68d22942c369cc3a79e5462b1eba54accd3 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 3 May 2016 15:38:47 -0700 Subject: [PATCH 5/6] Change build pattern and add stop. --- examples/src/main/python/sql.py | 4 +- python/pyspark/sql/session.py | 158 ++++++++++++++++---------------- 2 files changed, 83 insertions(+), 79 deletions(-) diff --git a/examples/src/main/python/sql.py b/examples/src/main/python/sql.py index 28039d01d65dc..ea6a22dbfe824 100644 --- a/examples/src/main/python/sql.py +++ b/examples/src/main/python/sql.py @@ -25,7 +25,7 @@ if __name__ == "__main__": - spark = SparkSession.builder().appName("PythonSQL").getOrCreate() + spark = SparkSession.builder.appName("PythonSQL").getOrCreate() # A list of Rows. Infer schema from the first row, create a DataFrame and print the schema rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)] @@ -71,3 +71,5 @@ for each in teenagers.collect(): print(each[0]) + + spark.stop() diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 8f5defc344283..e9af0d6c11ae4 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -65,8 +65,9 @@ class SparkSession(object): tables, execute SQL over tables, cache tables, and read parquet files. To create a SparkSession, use the following builder pattern: - >>> spark = SparkSession.builder() \ + >>> spark = SparkSession.builder \ ... .master("local") \ + ... .appName("Word Count") \ ... .config("spark.some.config.option", "some-value") \ ... .getOrCreate() @@ -75,6 +76,81 @@ class SparkSession(object): SparkSession in the JVM, instead we make all calls to this object. """ + class Builder(object): + """Builder for :class:`SparkSession`. + """ + + _lock = RLock() + _options = {} + + @since(2.0) + def config(self, key=None, value=None, conf=None): + """Sets a config option. Options set using this method are automatically propagated to + both :class:`SparkConf` and :class:`SparkSession`'s own configuration. + + For an existing SparkConf, use `conf` parameter. + >>> from pyspark.conf import SparkConf + >>> SparkSession.builder.config(conf=SparkConf()) + >> SparkSession.builder.config("spark.some.config.option", "some-value") + >> from pyspark.conf import SparkConf - >>> SparkSession.builder().config(conf=SparkConf()) - - - For a (key, value) pair, you can omit parameter names. - >>> SparkSession.builder().config("spark.some.config.option", "some-value") - - - :param key: a key name string for configuration property - :param value: a value for configuration property - :param conf: an instance of :class:`SparkConf` - """ - with self._lock: - if conf is None: - self._options[key] = str(value) - else: - for (k, v) in conf.getAll(): - self._options[k] = v - return self - - @since(2.0) - def master(self, master): - """Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" - to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone - cluster. - - :param master: a url for spark master - """ - return self.config("spark.master", master) - - @since(2.0) - def appName(self, name): - """Sets a name for the application, which will be shown in the Spark web UI. - - :param name: an application name - """ - return self.config("spark.app.name", name) - - @since(2.0) - def enableHiveSupport(self): - """Enables Hive support, including connectivity to a persistent Hive metastore, support - for Hive serdes, and Hive user-defined functions. - """ - return self.config("spark.sql.catalogImplementation", "hive") - - @since(2.0) - def getOrCreate(self): - """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a new - one based on the options set in this builder. - """ - with self._lock: - from pyspark.conf import SparkConf - from pyspark.context import SparkContext - from pyspark.sql.context import SQLContext - sparkConf = SparkConf() - for key, value in self._options.items(): - sparkConf.set(key, value) - sparkContext = SparkContext.getOrCreate(sparkConf) - return SQLContext.getOrCreate(sparkContext).sparkSession + self._sc.stop() def _test(): From 589cba8e237b034e96c5b23891236ceb998c6f0c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 3 May 2016 16:01:08 -0700 Subject: [PATCH 6/6] Clean up doctest --- python/pyspark/sql/session.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index e9af0d6c11ae4..fb3e318163e87 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -59,17 +59,17 @@ def toDF(self, schema=None, sampleRatio=None): class SparkSession(object): - r"""The entry point to programming Spark with the Dataset and DataFrame API. + """The entry point to programming Spark with the Dataset and DataFrame API. A SparkSession can be used create :class:`DataFrame`, register :class:`DataFrame` as tables, execute SQL over tables, cache tables, and read parquet files. To create a SparkSession, use the following builder pattern: >>> spark = SparkSession.builder \ - ... .master("local") \ - ... .appName("Word Count") \ - ... .config("spark.some.config.option", "some-value") \ - ... .getOrCreate() + .master("local") \ + .appName("Word Count") \ + .config("spark.some.config.option", "some-value") \ + .getOrCreate() :param sparkContext: The :class:`SparkContext` backing this SparkSession. :param jsparkSession: An optional JVM Scala SparkSession. If set, we do not instantiate a new