From 92d082a67910bd7e5f3aeefc204f97a60f144e08 Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Mon, 24 Jul 2017 10:46:17 +0200 Subject: [PATCH 1/4] Add an option to the JDBC data source to initialize the environment of the remote database session --- docs/sql-programming-guide.md | 7 +++++++ .../sql/execution/datasources/jdbc/JDBCOptions.scala | 3 +++ .../sql/execution/datasources/jdbc/JDBCRDD.scala | 12 ++++++++++++ 3 files changed, 22 insertions(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index b5eca76480eb8..72f6970e1800a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1308,6 +1308,13 @@ the following case-insensitive options: + + sessionInitStatement + + After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") + + + truncate diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 591096d5efd22..dbc64c40d6ad3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -135,6 +135,8 @@ class JDBCOptions( case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE } + // An option to execute custom SQL before fetching data from the remote DB + val sessionInitStatement = parameters.getOrElse(JDBC_SESSION_INIT_STATEMENT, "") } object JDBCOptions { @@ -158,4 +160,5 @@ object JDBCOptions { val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes") val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") + val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 24e13697c0c9f..71e19ec38bfc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -273,6 +273,18 @@ private[jdbc] class JDBCRDD( import scala.collection.JavaConverters._ dialect.beforeFetch(conn, options.asProperties.asScala.toMap) + // This executes a generic SQL statement (or PL/SQL) before reading + // the table/query via JDBC. Use this feature to initialize the database + // session environment or for diagnostics. + if (options.sessionInitStatement.size > 0) { + val statement = conn.prepareStatement(options.sessionInitStatement) + try { + statement.executeQuery() + } finally { + statement.close() + } + } + // H2's JDBC driver does not support the setSchema() method. We pass a // fully-qualified table name in the SELECT statement. I don't know how to // talk about a table in a completely portable way. From 0a0ff0a3f6ae600547815c7ad2013f129d89182a Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Thu, 10 Aug 2017 22:45:43 +0200 Subject: [PATCH 2/4] Added test in JDBCSuite and changes as per review comments. --- .../datasources/jdbc/JDBCOptions.scala | 2 +- .../execution/datasources/jdbc/JDBCRDD.scala | 21 +++++++++++-------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 10 +++++++++ 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index dbc64c40d6ad3..ff804da2a79cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -136,7 +136,7 @@ class JDBCOptions( case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE } // An option to execute custom SQL before fetching data from the remote DB - val sessionInitStatement = parameters.getOrElse(JDBC_SESSION_INIT_STATEMENT, "") + val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT) } object JDBCOptions { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 71e19ec38bfc6..3274be91d4817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -273,16 +273,19 @@ private[jdbc] class JDBCRDD( import scala.collection.JavaConverters._ dialect.beforeFetch(conn, options.asProperties.asScala.toMap) - // This executes a generic SQL statement (or PL/SQL) before reading + // This executes a generic SQL statement (or PL/SQL block) before reading // the table/query via JDBC. Use this feature to initialize the database - // session environment or for diagnostics. - if (options.sessionInitStatement.size > 0) { - val statement = conn.prepareStatement(options.sessionInitStatement) - try { - statement.executeQuery() - } finally { - statement.close() - } + // session environment, e.g. for optimizations and/or troubleshooting. + options.sessionInitStatement match { + case Some(sql) => + val statement = conn.prepareStatement(sql) + logInfo(s"Executing sessionInitStatement: $sql") + try { + statement.execute() + } finally { + statement.close() + } + case None => } // H2's JDBC driver does not support the setSchema() method. We pass a diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index d1daf860fdfff..a2e0ad338639d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1007,4 +1007,14 @@ class JDBCSuite extends SparkFunSuite assert(sql("select * from people_view").count() == 3) } } + + test("SPARK-21519: option sessionInitStatement, run SQL to initialize the database session.") { + val initSQL = "SET @MYTESTVAR 21519" + val df = spark.read.format("jdbc") + .option("url", urlWithUserAndPass) + .option("dbtable", "(SELECT NVL(@MYTESTVAR, -1))") + .option("sessionInitStatement", initSQL) + .load() + assert(df.collect() === Array(Row(21519))) + } } From 55e63a34cd4fb2c52ca3f1ae0faaa2cb0c51a5c4 Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Fri, 11 Aug 2017 00:20:32 +0200 Subject: [PATCH 3/4] Added second SQL statement in JDBCSuite test. --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index a2e0ad338639d..1efafe313f6a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1009,12 +1009,21 @@ class JDBCSuite extends SparkFunSuite } test("SPARK-21519: option sessionInitStatement, run SQL to initialize the database session.") { - val initSQL = "SET @MYTESTVAR 21519" - val df = spark.read.format("jdbc") + val initSQL1 = "SET @MYTESTVAR 21519" + val df1 = spark.read.format("jdbc") .option("url", urlWithUserAndPass) .option("dbtable", "(SELECT NVL(@MYTESTVAR, -1))") - .option("sessionInitStatement", initSQL) + .option("sessionInitStatement", initSQL1) + .load() + assert(df1.collect() === Array(Row(21519))) + + val initSQL2 = "SET SCHEMA DUMMY" + val df2 = spark.read.format("jdbc") + .option("url", urlWithUserAndPass) + .option("dbtable", "TEST.PEOPLE") + .option("sessionInitStatement", initSQL2) .load() - assert(df.collect() === Array(Row(21519))) + val e = intercept[SparkException] {df2.collect()}.getMessage + assert(e.contains("""Schema "DUMMY" not found""")) } } From 5792fd61e2521b5b391fd59186aa828745aed0ab Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Fri, 11 Aug 2017 12:05:41 +0200 Subject: [PATCH 4/4] Added a test with 2 statements executed by option sessionInitStatement. --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 1efafe313f6a3..b21adbdbf1362 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1025,5 +1025,17 @@ class JDBCSuite extends SparkFunSuite .load() val e = intercept[SparkException] {df2.collect()}.getMessage assert(e.contains("""Schema "DUMMY" not found""")) - } + + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW test_sessionInitStatement + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$urlWithUserAndPass', + |dbtable '(SELECT NVL(@MYTESTVAR1, -1), NVL(@MYTESTVAR2, -1))', + |sessionInitStatement 'SET @MYTESTVAR1 21519; SET @MYTESTVAR2 1234') + """.stripMargin) + + val df3 = sql("SELECT * FROM test_sessionInitStatement") + assert(df3.collect() === Array(Row(21519, 1234))) + } }