Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,20 @@ options.

# Migration Guide

## Upgrading From Spark SQL 1.5 to 1.6

- From Spark 1.6, by default the Thrift server runs in multi-session mode. Which means each JDBC/ODBC
connection owns a copy of their own SQL configuration and temporary function registry. Cached
tables are still shared though. If you prefer to run the Thrift server in the old single-session
mode, please set option `spark.sql.hive.thriftServer.singleSession` to `true`. You may either add
this option to `spark-defaults.conf`, or pass it to `start-thriftserver.sh` via `--conf`:

{% highlight bash %}
./sbin/start-thriftserver.sh \
--conf spark.sql.hive.thriftServer.singleSession=true \
...
{% endhighlight %}

## Upgrading From Spark SQL 1.4 to 1.5

- Optimized execution using manually managed memory (Tungsten) is now enabled by default, along with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
val ctx = hiveContext.newSession()
val ctx = if (hiveContext.hiveThriftServerSingleSession) {
hiveContext
} else {
hiveContext.newSession()
}
ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sessionHandle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.apache.thrift.transport.TSocket
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkFunSuite}
Expand Down Expand Up @@ -510,6 +509,53 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
}

class SingleSessionSuite extends HiveThriftJdbcTest {
override def mode: ServerMode.Value = ServerMode.binary

override protected def extraConf: Seq[String] =
"--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil

test("test single session") {
withMultipleConnectionJdbcStatement(
{ statement =>
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"

// Configurations and temporary functions added in this session should be visible to all
// the other sessions.
Seq(
"SET foo=bar",
s"ADD JAR $jarURL",
s"""CREATE TEMPORARY FUNCTION udtf_count2
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
""".stripMargin
).foreach(statement.execute)
},

{ statement =>
val rs1 = statement.executeQuery("SET foo")

assert(rs1.next())
assert(rs1.getString(1) === "foo")
assert(rs1.getString(2) === "bar")

val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")

assert(rs2.next())
assert(rs2.getString(1) === "Function: udtf_count2")

assert(rs2.next())
assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
rs2.getString(1)
}

assert(rs2.next())
assert(rs2.getString(1) === "Usage: To be added.")
}
)
}
}

class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
override def mode: ServerMode.Value = ServerMode.http

Expand Down Expand Up @@ -600,6 +646,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]

protected def extraConf: Seq[String] = Nil

protected def serverStartCommand(port: Int) = {
val portConf = if (mode == ServerMode.binary) {
ConfVars.HIVE_SERVER2_THRIFT_PORT
Expand Down Expand Up @@ -635,6 +683,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j.debug
| --conf spark.ui.enabled=false
| ${extraConf.mkString("\n")}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we only introduce a command line option here? (Not sure how the JDBC server is used).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something like --single-session? The benefit of SparkConf option is that users can add it in spark-defaults.conf.

Command line options of start-thriftserver.sh are parsed by both SparkSubmit and HiveServer2.ServerOptionsProcessor. The latter isn't extensible. One option is to add a --single-session option in SparkSubmit and make a synonymous of --conf spark.sql.hive.thriftServer.singleSession=true. However, this option doesn't seem to be general enough to be added in SparkSubmit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks!

""".stripMargin.split("\\s+").toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ class HiveContext private[hive](
*/
protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)

protected[hive] def hiveThriftServerSingleSession: Boolean =
sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean

@transient
protected[sql] lazy val substitutor = new VariableSubstitution()

Expand Down