Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,13 @@ the following case-insensitive options:
</td>
</tr>

<tr>
<td><code>sessionInitStatement</code></td>
<td>
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: <code>option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")</code>
</td>
</tr>

<tr>
<td><code>truncate</code></td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class JDBCOptions(
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
}
// An option to execute custom SQL before fetching data from the remote DB
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
}

object JDBCOptions {
Expand All @@ -158,4 +160,5 @@ object JDBCOptions {
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,21 @@ private[jdbc] class JDBCRDD(
import scala.collection.JavaConverters._
dialect.beforeFetch(conn, options.asProperties.asScala.toMap)

// This executes a generic SQL statement (or PL/SQL block) before reading
// the table/query via JDBC. Use this feature to initialize the database
// session environment, e.g. for optimizations and/or troubleshooting.
options.sessionInitStatement match {
case Some(sql) =>
val statement = conn.prepareStatement(sql)
logInfo(s"Executing sessionInitStatement: $sql")
try {
statement.execute()
} finally {
statement.close()
}
case None =>
}

// H2's JDBC driver does not support the setSchema() method. We pass a
// fully-qualified table name in the SELECT statement. I don't know how to
// talk about a table in a completely portable way.
Expand Down
19 changes: 19 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1007,4 +1007,23 @@ class JDBCSuite extends SparkFunSuite
assert(sql("select * from people_view").count() == 3)
}
}

test("SPARK-21519: option sessionInitStatement, run SQL to initialize the database session.") {
val initSQL1 = "SET @MYTESTVAR 21519"
val df1 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", "(SELECT NVL(@MYTESTVAR, -1))")
.option("sessionInitStatement", initSQL1)
.load()
assert(df1.collect() === Array(Row(21519)))

val initSQL2 = "SET SCHEMA DUMMY"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I might not explain it clearly. Is that possible we can have a test case to send more than one statements in a single session initialization? Now these two examples have only one statement.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. I have now added a test that runs 2 SQL statements.
For future reference I'd like to stress the fact that the code executed by the option "sessionInitStatement" is just the user-provided string fed through the execute method of the JDBC connection, so it can use the features of the target database language/syntax. In the case of the test I wrote for the H2 database I have just put together two commands separated by ";". When using sessionInitStatement for querying Oracle, for example, the user-provided command can be a SQL statemnet or a PL/SQL block grouping multiple commands and logic.

val df2 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", "TEST.PEOPLE")
.option("sessionInitStatement", initSQL2)
.load()
val e = intercept[SparkException] {df2.collect()}.getMessage
assert(e.contains("""Schema "DUMMY" not found"""))
}
}