Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f6585f9
[SPARK-14647][SQL] Group SQLContext/HiveContext state into SharedState
Apr 18, 2016
fe89b8d
Make HiveSessionState take in SQLContext, not HiveContext
Apr 18, 2016
54046d6
Move QueryExecution out of HiveContext
Apr 18, 2016
5fc8177
Merge branch 'master' of github.com:apache/spark into spark-session
Apr 18, 2016
83b3f70
Minor cleanup
Apr 18, 2016
b33514c
Implement SparkSession and use it to track state
Apr 18, 2016
8379143
Merge branch 'master' of github.com:apache/spark into spark-session
Apr 18, 2016
6b808aa
Clean up some TODO's and bad signatures
Apr 18, 2016
5198955
Move the bulk of HiveContext into SessionCatalog
Apr 19, 2016
d58c6af
Remove more things from HiveContext
Apr 19, 2016
edaebe5
Fix style
Apr 19, 2016
ce1214d
Merge branch 'master' of github.com:apache/spark into spark-session
Apr 19, 2016
4f3ade9
Minor fixes
Apr 19, 2016
6019541
Use in-memory catalog by default in tests
Apr 19, 2016
75d1115
Fix NPE when initializing HiveSessionState
Apr 19, 2016
36d6bc8
Fix the conf
Apr 19, 2016
95ffe86
Merge branch 'master' of github.com:apache/spark into spark-session
Apr 19, 2016
a1d45e8
Fix REPL in in-memory case
Apr 19, 2016
0df39fc
Fix tests: set "in-memory" in more places
Apr 19, 2016
0d7309b
Fix style
Apr 19, 2016
bc35206
Fix SQLExecutionSuite
Apr 19, 2016
5fcc249
Fix ParquetHadoopFsRelationSuite
Apr 19, 2016
d937038
Fix HiveUDFSuite + refactor TestHive
Apr 19, 2016
303f991
Make diff slightly smaller?
Apr 19, 2016
d27ec50
Fix test compile
Apr 19, 2016
b3d23fa
Merge branch 'master' of github.com:apache/spark into spark-session
Apr 19, 2016
ddc752a
Fix HiveResolutionSuite
Apr 19, 2016
9b8dc3a
Fix StatisticsSuite
Apr 19, 2016
e257137
Minor change
Apr 19, 2016
8bf1236
Fix HiveUDFSuite (and many others)
Apr 20, 2016
74b105e
Merge branch 'master' of github.com:apache/spark into spark-session
Apr 20, 2016
32212bb
Fix tests
yhuai Apr 20, 2016
9422128
Merge branch 'master' of github.com:apache/spark into spark-session
Apr 20, 2016
6e3c366
Fix Python tests.
yhuai Apr 20, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,11 @@ package object config {
.stringConf
.toSequence
.createWithDefault(Nil)

// Note: This is a SQL config but needs to be in core because the REPL depends on it
private[spark] val CATALOG_IMPLEMENTATION = ConfigBuilder("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a class name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use this for both SharedState and SessionState. If we want to use a class name then we need two configs.

.createWithDefault("in-memory")
}
52 changes: 27 additions & 25 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,18 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class SQLContext private[sql](
@transient protected[sql] val sharedState: SharedState,
@transient private val sparkSession: SparkSession,
val isRootContext: Boolean)
extends Logging with Serializable {

self =>

private[sql] def this(sparkSession: SparkSession) = {
this(sparkSession, true)
}

def this(sc: SparkContext) = {
this(new SharedState(sc), true)
this(new SparkSession(sc))
}

def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
Expand All @@ -97,27 +101,25 @@ class SQLContext private[sql](
}
}

def sparkContext: SparkContext = sharedState.sparkContext

protected[sql] def sessionState: SessionState = sparkSession.sessionState
protected[sql] def sharedState: SharedState = sparkSession.sharedState
protected[sql] def conf: SQLConf = sessionState.conf
protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
protected[sql] def listener: SQLListener = sharedState.listener
protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog

def sparkContext: SparkContext = sharedState.sparkContext

/**
* Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary
* tables, registered functions, but sharing the same [[SparkContext]], cached data and
* other things.
*
* @since 1.6.0
*/
def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false)

/**
* Per-session state, e.g. configuration, functions, temporary tables etc.
*/
@transient
protected[sql] lazy val sessionState: SessionState = new SessionState(self)
protected[spark] def conf: SQLConf = sessionState.conf
def newSession(): SQLContext = {
new SQLContext(sparkSession.newSession(), isRootContext = false)
}

/**
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
Expand All @@ -132,18 +134,22 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
def setConf(props: Properties): Unit = conf.setConf(props)
def setConf(props: Properties): Unit = sessionState.setConf(props)

/** Set the given Spark SQL configuration property. */
private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = conf.setConf(entry, value)
/**
* Set the given Spark SQL configuration property.
*/
private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
sessionState.setConf(entry, value)
}

/**
* Set the given Spark SQL configuration property.
*
* @group config
* @since 1.0.0
*/
def setConf(key: String, value: String): Unit = conf.setConfString(key, value)
def setConf(key: String, value: String): Unit = sessionState.setConf(key, value)

/**
* Return the value of Spark SQL configuration property for the given key.
Expand Down Expand Up @@ -186,23 +192,19 @@ class SQLContext private[sql](
*/
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs

// Extract `spark.sql.*` entries and put it in our SQLConf.
// Subclasses may additionally set these entries in other confs.
SQLContext.getSQLProperties(sparkContext.getConf).asScala.foreach { case (k, v) =>
setConf(k, v)
}

protected[sql] def parseSql(sql: String): LogicalPlan = sessionState.sqlParser.parsePlan(sql)

protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql))

protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan)
protected[sql] def executePlan(plan: LogicalPlan): QueryExecution = {
sessionState.executePlan(plan)
}

/**
* Add a jar to SQLContext
*/
protected[sql] def addJar(path: String): Unit = {
sparkContext.addJar(path)
sessionState.addJar(path)
}

/** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */
Expand Down Expand Up @@ -768,7 +770,7 @@ class SQLContext private[sql](
* as Spark can parse all supported Hive DDLs itself.
*/
private[sql] def runNativeSql(sqlText: String): Seq[Row] = {
throw new UnsupportedOperationException
sessionState.runNativeSql(sqlText).map { r => Row(r) }
}

/**
Expand Down
100 changes: 100 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.util.Utils


/**
* The entry point to Spark execution.
*/
class SparkSession private(
sparkContext: SparkContext,
existingSharedState: Option[SharedState]) { self =>

def this(sc: SparkContext) {
this(sc, None)
}

/**
* Start a new session where configurations, temp tables, temp functions etc. are isolated.
*/
def newSession(): SparkSession = {
// Note: materialize the shared state here to ensure the parent and child sessions are
// initialized with the same shared state.
new SparkSession(sparkContext, Some(sharedState))
}

@transient
protected[sql] lazy val sharedState: SharedState = {
existingSharedState.getOrElse(
SparkSession.reflect[SharedState, SparkContext](
SparkSession.sharedStateClassName(sparkContext.conf),
sparkContext))
}

@transient
protected[sql] lazy val sessionState: SessionState = {
SparkSession.reflect[SessionState, SQLContext](
SparkSession.sessionStateClassName(sparkContext.conf),
new SQLContext(self, isRootContext = false))
}

}


private object SparkSession {

private def sharedStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => "org.apache.spark.sql.hive.HiveSharedState"
case "in-memory" => classOf[SharedState].getCanonicalName
}
}

private def sessionStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => "org.apache.spark.sql.hive.HiveSessionState"
case "in-memory" => classOf[SessionState].getCanonicalName
}
}

/**
* Helper method to create an instance of [[T]] using a single-arg constructor that
* accepts an [[Arg]].
*/
private def reflect[T, Arg <: AnyRef](
className: String,
ctorArg: Arg)(implicit ctorArgTag: ClassTag[Arg]): T = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need a class tag. You can call ctorArg.getClass().

Copy link
Contributor Author

@andrewor14 andrewor14 Apr 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't work because there are places where we pass subclasses of Arg in here (see SQLExecutionSuite)

try {
val clazz = Utils.classForName(className)
val ctor = clazz.getDeclaredConstructor(ctorArgTag.runtimeClass)
ctor.newInstance(ctorArg).asInstanceOf[T]
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@

package org.apache.spark.sql.internal

import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
import org.apache.spark.sql.util.ExecutionListenerManager


/**
* A class that holds all session-specific state in a given [[SQLContext]].
*/
Expand All @@ -37,7 +44,10 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* SQL-specific key-value configurations.
*/
lazy val conf = new SQLConf
lazy val conf: SQLConf = new SQLConf

// Automatically extract `spark.sql.*` entries and put it in our SQLConf
setConf(SQLContext.getSQLProperties(ctx.sparkContext.getConf))

lazy val experimentalMethods = new ExperimentalMethods

Expand Down Expand Up @@ -101,5 +111,45 @@ private[sql] class SessionState(ctx: SQLContext) {
* Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
*/
lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)
}


// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
// ------------------------------------------------------

def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(ctx, plan)

def refreshTable(tableName: String): Unit = {
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
}

def invalidateTable(tableName: String): Unit = {
catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
}

final def setConf(properties: Properties): Unit = {
properties.asScala.foreach { case (k, v) => setConf(k, v) }
}

final def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
conf.setConf(entry, value)
setConf(entry.key, entry.stringConverter(value))
}

def setConf(key: String, value: String): Unit = {
conf.setConfString(key, value)
}

def addJar(path: String): Unit = {
ctx.sparkContext.addJar(path)
}

def analyze(tableName: String): Unit = {
throw new UnsupportedOperationException
}

def runNativeSql(sql: String): Seq[String] = {
throw new UnsupportedOperationException
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader =
hiveContext.executionHive.state.getConf.getClassLoader
hiveContext.sessionState.executionHive.state.getConf.getClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

HiveThriftServer2.listener.onStatementStart(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveQueryExecution}

private[hive] class SparkSQLDriver(
val context: HiveContext = SparkSQLEnv.hiveContext)
Expand All @@ -41,7 +41,7 @@ private[hive] class SparkSQLDriver(
override def init(): Unit = {
}

private def getResultSetSchema(query: context.QueryExecution): Schema = {
private def getResultSetSchema(query: HiveQueryExecution): Schema = {
val analyzed = query.analyzed
logDebug(s"Result Schema: ${analyzed.output}")
if (analyzed.output.isEmpty) {
Expand All @@ -59,7 +59,8 @@ private[hive] class SparkSQLDriver(
// TODO unify the error code
try {
context.sparkContext.setJobDescription(command)
val execution = context.executePlan(context.sql(command).logicalPlan)
val execution =
context.executePlan(context.sql(command).logicalPlan).asInstanceOf[HiveQueryExecution]
hiveResponse = execution.stringResult()
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ private[hive] object SparkSQLEnv extends Logging {
sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext)

hiveContext.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
hiveContext.sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
hiveContext.sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
hiveContext.sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))

hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
val ctx = if (hiveContext.hiveThriftServerSingleSession) {
val ctx = if (hiveContext.sessionState.hiveThriftServerSingleSession) {
hiveContext
} else {
hiveContext.newSession()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[thriftserver] class SparkSQLOperationManager()
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val hiveContext = sessionToContexts(parentSession.getSessionHandle)
val runInBackground = async && hiveContext.hiveThriftServerAsync
val runInBackground = async && hiveContext.sessionState.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(hiveContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
Expand Down
Loading