From 0e969ec3fc88a7a4d84cc1e975364ec89cfb788f Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 1 May 2016 20:44:31 -0700 Subject: [PATCH 1/3] [SPARK-15052][SQL] Use builder pattern to create SparkSession --- .../org/apache/spark/sql/SparkSession.scala | 130 +++++++++++++++++- 1 file changed, 126 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 4c2a7b8ae9060..8ec908ae618b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -18,9 +18,7 @@ package org.apache.spark.sql import java.beans.Introspector -import java.util.Properties -import scala.collection.immutable import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -30,7 +28,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{CATALOG_IMPLEMENTATION, ConfigEntry} +import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalog.Catalog import org.apache.spark.sql.catalyst._ @@ -49,7 +47,16 @@ import org.apache.spark.util.Utils /** - * The entry point to Spark execution. + * The entry point to programming Spark with the Dataset and DataFrame API. + * + * To create a SparkSession, use the following builder pattern: + * + * {{{ + * SparkSession.builder() + * .master("local") + * .config("spark.some.config.option", "some-value"). + * .getOrCreate() + * }}} */ class SparkSession private( @transient val sparkContext: SparkContext, @@ -635,6 +642,121 @@ class SparkSession private( object SparkSession { + /** + * Builder for [[SparkSession]]. + */ + class Builder { + + private[this] val options = new scala.collection.mutable.HashMap[String, String] + + /** + * Sets a name for the application, which will be shown in the Spark web UI. + * + * @since 2.0.0 + */ + def appName(name: String): Builder = config("spark.app.name", name) + + /** + * Sets a config option. Options set using this method are automatically propagated to + * both [[SparkConf]] and SparkSession's own configuration. + * + * @since 2.0.0 + */ + def config(key: String, value: String): Builder = synchronized { + options += key -> value + this + } + + /** + * Sets a config option. Options set using this method are automatically propagated to + * both [[SparkConf]] and SparkSession's own configuration. + * + * @since 2.0.0 + */ + def config(key: String, value: Long): Builder = synchronized { + options += key -> value.toString + this + } + + /** + * Sets a config option. Options set using this method are automatically propagated to + * both [[SparkConf]] and SparkSession's own configuration. + * + * @since 2.0.0 + */ + def config(key: String, value: Double): Builder = synchronized { + options += key -> value.toString + this + } + + /** + * Sets a config option. Options set using this method are automatically propagated to + * both [[SparkConf]] and SparkSession's own configuration. + * + * @since 2.0.0 + */ + def config(key: String, value: Boolean): Builder = synchronized { + options += key -> value.toString + this + } + + /** + * Sets a list of config options based on the given [[SparkConf]]. + * + * @since 2.0.0 + */ + def config(conf: SparkConf): Builder = synchronized { + conf.getAll.foreach { case (k, v) => options += k -> v } + this + } + + /** + * Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to + * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster. + * + * @since 2.0.0 + */ + def master(master: String): Builder = config("spark.master", master) + + /** + * Enables Hive support, including connectivity to a persistent Hive metastore, support for + * Hive serdes, and Hive user-defined functions. + * + * @return 2.0.0 + */ + def enableHiveSupport(): Builder = synchronized { + if (hiveClassesArePresent) { + config(CATALOG_IMPLEMENTATION.key, "hive") + } else { + throw new IllegalArgumentException( + "Unable to instantiate SparkSession with Hive support because " + + "Hive classes are not found.") + } + } + + /** + * Create a + * + * @since 2.0.0 + */ + def getOrCreate(): SparkSession = synchronized { + // Step 1. Create a SparkConf + // Step 2. Get a SparkContext + // Step 3. Get a SparkSession + val sparkConf = new SparkConf() + options.foreach { case (k, v) => sparkConf.set(k, v) } + val sparkContext = SparkContext.getOrCreate(sparkConf) + + SQLContext.getOrCreate(sparkContext).sparkSession + } + } + + /** + * Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]]. + * @since 2.0.0 + */ + def builder: Builder = new Builder + private val HIVE_SHARED_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSharedState" private val HIVE_SESSION_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSessionState" From 8172d91febbae526378c16eb6c95e5df839e3e73 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 1 May 2016 20:50:11 -0700 Subject: [PATCH 2/3] Finish doc --- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 8ec908ae618b8..5955cd71ec67d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -735,7 +735,8 @@ object SparkSession { } /** - * Create a + * Gets an existing [[SparkSession]] or, if there is no existing one, creates a new one + * based on the options set in this builder. * * @since 2.0.0 */ From 0005a3deb8292c93b4e699abd922f12ad2606c45 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 2 May 2016 15:25:51 -0700 Subject: [PATCH 3/3] Fix bug --- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 5955cd71ec67d..3836ce2daa56e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -722,7 +722,7 @@ object SparkSession { * Enables Hive support, including connectivity to a persistent Hive metastore, support for * Hive serdes, and Hive user-defined functions. * - * @return 2.0.0 + * @since 2.0.0 */ def enableHiveSupport(): Builder = synchronized { if (hiveClassesArePresent) {