From e7dee4f522479a75c587189bf8c37cfcddb184ba Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 21 Apr 2016 17:08:28 -0700 Subject: [PATCH 1/4] Initialize sparkSession in REPL instead of sqlContext --- .../org/apache/spark/repl/SparkILoop.scala | 19 ++------------ .../apache/spark/repl/SparkILoopInit.scala | 12 ++++----- .../scala/org/apache/spark/repl/Main.scala | 26 +++++++++---------- .../org/apache/spark/repl/SparkILoop.scala | 12 ++++----- .../org/apache/spark/sql/SparkSession.scala | 10 ++++++- 5 files changed, 36 insertions(+), 43 deletions(-) diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index c5dc6ba2219f..91d299917ab3 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -43,7 +43,7 @@ import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.util.Utils /** The Scala interactive shell. It provides a read-eval-print loop @@ -129,7 +129,6 @@ class SparkILoop( // NOTE: Must be public for visibility @DeveloperApi var sparkContext: SparkContext = _ - var sqlContext: SQLContext = _ override def echoCommandMessage(msg: String) { intp.reporter printMessage msg @@ -1026,21 +1025,7 @@ class SparkILoop( } @DeveloperApi - def createSQLContext(): SQLContext = { - val name = "org.apache.spark.sql.hive.HiveContext" - val loader = Utils.getContextOrSparkClassLoader - try { - sqlContext = loader.loadClass(name).getConstructor(classOf[SparkContext]) - .newInstance(sparkContext).asInstanceOf[SQLContext] - logInfo("Created sql context (with Hive support)..") - } - catch { - case _: java.lang.ClassNotFoundException | _: java.lang.NoClassDefFoundError => - sqlContext = new SQLContext(sparkContext) - logInfo("Created sql context..") - } - sqlContext - } + def createSparkSession(): SparkSession = Main.createSparkSession() private def getMaster(): String = { val master = this.master match { diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index 99e1e1df33fd..173e644faf0a 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -131,15 +131,15 @@ private[repl] trait SparkILoopInit { } """) command(""" - @transient val sqlContext = { - val _sqlContext = org.apache.spark.repl.Main.interp.createSQLContext() - println("SQL context available as sqlContext.") - _sqlContext + @transient val sparkSession = { + val _sparkSession = org.apache.spark.repl.Main.interp.createSparkSession() + println("Spark session available as sparkSession.") + _sparkSession } """) command("import org.apache.spark.SparkContext._") - command("import sqlContext.implicits._") - command("import sqlContext.sql") + command("import sparkSession.implicits._") + command("import sparkSession.sql") command("import org.apache.spark.sql.functions._") } } diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index b822ff496c11..527f22af27ca 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -23,8 +23,8 @@ import scala.tools.nsc.GenericRunnerSettings import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{CausedBy, Utils} object Main extends Logging { @@ -35,7 +35,7 @@ object Main extends Logging { val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") var sparkContext: SparkContext = _ - var sqlContext: SQLContext = _ + var sparkSession: SparkSession = _ // this is a public var because tests reset it. var interp: SparkILoop = _ @@ -91,19 +91,19 @@ object Main extends Logging { sparkContext } - def createSQLContext(): SQLContext = { - val name = "org.apache.spark.sql.hive.HiveContext" - val loader = Utils.getContextOrSparkClassLoader + def createSparkSession(): SparkSession = { try { - sqlContext = loader.loadClass(name).getConstructor(classOf[SparkContext]) - .newInstance(sparkContext).asInstanceOf[SQLContext] - logInfo("Created sql context (with Hive support)..") + sparkSession = SparkSession.withHiveSupport(sparkContext) + logInfo("Created Spark session with Hive support") } catch { - case _: java.lang.ClassNotFoundException | _: java.lang.NoClassDefFoundError => - sqlContext = new SQLContext(sparkContext) - logInfo("Created sql context..") + case CausedBy(ex: ClassNotFoundException) => + case CausedBy(err: NoClassDefFoundError) => } - sqlContext + if (sparkSession == null) { + sparkSession = new SparkSession(sparkContext) + logInfo("Created Spark session") + } + sparkSession } } diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index db09d6ace1c6..4832e5dcee25 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -44,15 +44,15 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) } """) processLine(""" - @transient val sqlContext = { - val _sqlContext = org.apache.spark.repl.Main.createSQLContext() - println("SQL context available as sqlContext.") - _sqlContext + @transient val sparkSession = { + val _sparkSession = org.apache.spark.repl.Main.createSparkSession() + println("Spark session available as sparkSession.") + _sparkSession } """) processLine("import org.apache.spark.SparkContext._") - processLine("import sqlContext.implicits._") - processLine("import sqlContext.sql") + processLine("import sparkSession.implicits._") + processLine("import sparkSession.sql") processLine("import org.apache.spark.sql.functions._") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 70d889b002e4..28aa86abb3d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -904,7 +904,7 @@ class SparkSession private( } -private object SparkSession { +private[spark] object SparkSession { private def sharedStateClassName(conf: SparkConf): String = { conf.get(CATALOG_IMPLEMENTATION) match { @@ -937,4 +937,12 @@ private object SparkSession { } } + /** + * Create a new [[SparkSession]] with a catalog backed by Hive. + */ + def withHiveSupport(sc: SparkContext): SparkSession = { + sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive") + new SparkSession(sc) + } + } From 45783de96a500febfc4be2aa9dec5e5ab624ba9f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 21 Apr 2016 17:15:20 -0700 Subject: [PATCH 2/4] sparkSession -> spark --- .../org/apache/spark/repl/SparkILoopInit.scala | 14 +++++++------- .../scala/org/apache/spark/repl/SparkILoop.scala | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index 173e644faf0a..4ce776e17d5e 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -125,21 +125,21 @@ private[repl] trait SparkILoopInit { command(""" @transient val sc = { val _sc = org.apache.spark.repl.Main.interp.createSparkContext() - println("Spark context available as sc " + + println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") _sc } """) command(""" - @transient val sparkSession = { - val _sparkSession = org.apache.spark.repl.Main.interp.createSparkSession() - println("Spark session available as sparkSession.") - _sparkSession + @transient val spark = { + val _session = org.apache.spark.repl.Main.interp.createSparkSession() + println("Spark session available as 'spark'.") + _session } """) command("import org.apache.spark.SparkContext._") - command("import sparkSession.implicits._") - command("import sparkSession.sql") + command("import spark.implicits._") + command("import spark.sql") command("import org.apache.spark.sql.functions._") } } diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 4832e5dcee25..d029659fed41 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -38,21 +38,21 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) processLine(""" @transient val sc = { val _sc = org.apache.spark.repl.Main.createSparkContext() - println("Spark context available as sc " + + println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") _sc } """) processLine(""" - @transient val sparkSession = { - val _sparkSession = org.apache.spark.repl.Main.createSparkSession() - println("Spark session available as sparkSession.") - _sparkSession + @transient val spark = { + val _session = org.apache.spark.repl.Main.createSparkSession() + println("Spark session available as 'spark'.") + _session } """) processLine("import org.apache.spark.SparkContext._") - processLine("import sparkSession.implicits._") - processLine("import sparkSession.sql") + processLine("import spark.implicits._") + processLine("import spark.sql") processLine("import org.apache.spark.sql.functions._") } } From 8642cd7e606999c9d40a1c6936566846858fd9d5 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 22 Apr 2016 11:57:02 -0700 Subject: [PATCH 3/4] Fix in-memory case --- .../scala/org/apache/spark/repl/Main.scala | 8 ++--- .../org/apache/spark/repl/ReplSuite.scala | 5 ++-- .../org/apache/spark/sql/SparkSession.scala | 30 ++++++++++++++++--- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 527f22af27ca..8339a0c37092 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -92,14 +92,10 @@ object Main extends Logging { } def createSparkSession(): SparkSession = { - try { + if (SparkSession.hiveClassesArePresent) { sparkSession = SparkSession.withHiveSupport(sparkContext) logInfo("Created Spark session with Hive support") - } catch { - case CausedBy(ex: ClassNotFoundException) => - case CausedBy(err: NoClassDefFoundError) => - } - if (sparkSession == null) { + } else { sparkSession = new SparkSession(sparkContext) logInfo("Created Spark session") } diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index d3dafe9c42ee..af82e7a111fa 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -249,10 +249,11 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("Exception", output) } - test("SPARK-2576 importing SQLContext.createDataFrame.") { + test("SPARK-2576 importing implicits") { // We need to use local-cluster to test this case. val output = runInterpreter("local-cluster[1,1,1024]", """ + |import spark.implicits._ |case class TestCaseClass(value: Int) |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect() | @@ -366,7 +367,7 @@ class ReplSuite extends SparkFunSuite { test("define case class and create Dataset together with paste mode") { val output = runInterpreterInPasteMode("local-cluster[1,1,1024]", """ - |import sqlContext.implicits._ + |import spark.implicits._ |case class TestClass(value: Int) |Seq(TestClass(1)).toDS() """.stripMargin) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f0738b3af26f..f7ffc5048ccd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -907,16 +907,19 @@ class SparkSession private( private[spark] object SparkSession { + private val HIVE_SHARED_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSharedState" + private val HIVE_SESSION_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSessionState" + private def sharedStateClassName(conf: SparkConf): String = { conf.get(CATALOG_IMPLEMENTATION) match { - case "hive" => "org.apache.spark.sql.hive.HiveSharedState" + case "hive" => HIVE_SHARED_STATE_CLASS_NAME case "in-memory" => classOf[SharedState].getCanonicalName } } private def sessionStateClassName(conf: SparkConf): String = { conf.get(CATALOG_IMPLEMENTATION) match { - case "hive" => "org.apache.spark.sql.hive.HiveSessionState" + case "hive" => HIVE_SESSION_STATE_CLASS_NAME case "in-memory" => classOf[SessionState].getCanonicalName } } @@ -938,12 +941,31 @@ private[spark] object SparkSession { } } + /** + * Return true if Hive classes can be loaded, otherwise false. + */ + def hiveClassesArePresent: Boolean = { + try { + Utils.classForName(HIVE_SESSION_STATE_CLASS_NAME) + Utils.classForName(HIVE_SHARED_STATE_CLASS_NAME) + Utils.classForName("org.apache.hadoop.hive.conf.HiveConf") + true + } catch { + case _: ClassNotFoundException | _: NoClassDefFoundError => false + } + } + /** * Create a new [[SparkSession]] with a catalog backed by Hive. */ def withHiveSupport(sc: SparkContext): SparkSession = { - sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive") - new SparkSession(sc) + if (hiveClassesArePresent) { + sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive") + new SparkSession(sc) + } else { + throw new IllegalArgumentException( + "Unable to instantiate SparkSession with Hive support because Hive classes are not found.") + } } } From e69d7cfe137a1aff81ff8c187eb6515ece7c303a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 25 Apr 2016 10:51:42 -0700 Subject: [PATCH 4/4] Just do-plicate it (check) --- .../main/scala/org/apache/spark/repl/SparkILoop.scala | 11 ++++++++++- .../src/main/scala/org/apache/spark/repl/Main.scala | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 4349c9ee0a90..6a811adcf9b7 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1026,7 +1026,16 @@ class SparkILoop( } @DeveloperApi - def createSparkSession(): SparkSession = Main.createSparkSession() + // TODO: don't duplicate this code + def createSparkSession(): SparkSession = { + if (SparkSession.hiveClassesArePresent) { + logInfo("Creating Spark session with Hive support") + SparkSession.withHiveSupport(sparkContext) + } else { + logInfo("Creating Spark session") + new SparkSession(sparkContext) + } + } private def getMaster(): String = { val master = this.master match { diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 6584be1cb3bd..8e381ff6ae5a 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -24,7 +24,7 @@ import scala.tools.nsc.GenericRunnerSettings import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.util.{CausedBy, Utils} +import org.apache.spark.util.Utils object Main extends Logging {