Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,13 @@ the following case-insensitive options:
</td>
</tr>

<tr>
<td><code>sessionInitStatement</code></td>
<td>
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: <code>option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")</code>
</td>
</tr>

<tr>
<td><code>truncate</code></td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class JDBCOptions(
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
}
// An option to execute custom SQL before fetching data from the remote DB
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
}

object JDBCOptions {
Expand All @@ -158,4 +160,5 @@ object JDBCOptions {
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,21 @@ private[jdbc] class JDBCRDD(
import scala.collection.JavaConverters._
dialect.beforeFetch(conn, options.asProperties.asScala.toMap)

// This executes a generic SQL statement (or PL/SQL block) before reading
// the table/query via JDBC. Use this feature to initialize the database
// session environment, e.g. for optimizations and/or troubleshooting.
options.sessionInitStatement match {
case Some(sql) =>
val statement = conn.prepareStatement(sql)
logInfo(s"Executing sessionInitStatement: $sql")
try {
statement.execute()
} finally {
statement.close()
}
case None =>
}

// H2's JDBC driver does not support the setSchema() method. We pass a
// fully-qualified table name in the SELECT statement. I don't know how to
// talk about a table in a completely portable way.
Expand Down
31 changes: 31 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1007,4 +1007,35 @@ class JDBCSuite extends SparkFunSuite
assert(sql("select * from people_view").count() == 3)
}
}

test("SPARK-21519: option sessionInitStatement, run SQL to initialize the database session.") {
val initSQL1 = "SET @MYTESTVAR 21519"
val df1 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", "(SELECT NVL(@MYTESTVAR, -1))")
.option("sessionInitStatement", initSQL1)
.load()
assert(df1.collect() === Array(Row(21519)))

val initSQL2 = "SET SCHEMA DUMMY"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I might not explain it clearly. Is that possible we can have a test case to send more than one statements in a single session initialization? Now these two examples have only one statement.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. I have now added a test that runs 2 SQL statements.
For future reference I'd like to stress the fact that the code executed by the option "sessionInitStatement" is just the user-provided string fed through the execute method of the JDBC connection, so it can use the features of the target database language/syntax. In the case of the test I wrote for the H2 database I have just put together two commands separated by ";". When using sessionInitStatement for querying Oracle, for example, the user-provided command can be a SQL statemnet or a PL/SQL block grouping multiple commands and logic.

val df2 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", "TEST.PEOPLE")
.option("sessionInitStatement", initSQL2)
.load()
val e = intercept[SparkException] {df2.collect()}.getMessage
assert(e.contains("""Schema "DUMMY" not found"""))

sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW test_sessionInitStatement
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$urlWithUserAndPass',
|dbtable '(SELECT NVL(@MYTESTVAR1, -1), NVL(@MYTESTVAR2, -1))',
|sessionInitStatement 'SET @MYTESTVAR1 21519; SET @MYTESTVAR2 1234')
""".stripMargin)

val df3 = sql("SELECT * FROM test_sessionInitStatement")
assert(df3.collect() === Array(Row(21519, 1234)))
}
}