Skip to content

Commit 415730e

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-18419][SQL] JDBCRelation.insert should not remove Spark options
## What changes were proposed in this pull request? Currently, `JDBCRelation.insert` removes Spark options too early by mistakenly using `asConnectionProperties`. Spark options like `numPartitions` should be passed into `DataFrameWriter.jdbc` correctly. This bug have been **hidden** because `JDBCOptions.asConnectionProperties` fails to filter out the mixed-case options. This PR aims to fix both. **JDBCRelation.insert** ```scala override def insert(data: DataFrame, overwrite: Boolean): Unit = { val url = jdbcOptions.url val table = jdbcOptions.table - val properties = jdbcOptions.asConnectionProperties + val properties = jdbcOptions.asProperties data.write .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) .jdbc(url, table, properties) ``` **JDBCOptions.asConnectionProperties** ```scala scala> import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions scala> import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap scala> new JDBCOptions(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10")).asConnectionProperties res0: java.util.Properties = {numpartitions=10} scala> new JDBCOptions(new CaseInsensitiveMap(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10"))).asConnectionProperties res1: java.util.Properties = {numpartitions=10} ``` ## How was this patch tested? Pass the Jenkins with a new testcase. Author: Dongjoon Hyun <[email protected]> Closes #15863 from dongjoon-hyun/SPARK-18419. (cherry picked from commit 55d528f) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 65e896a commit 415730e

File tree

4 files changed

+28
-8
lines changed

4 files changed

+28
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc
2020
import java.sql.{Connection, DriverManager}
2121
import java.util.Properties
2222

23-
import scala.collection.mutable.ArrayBuffer
24-
2523
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2624

2725
/**
@@ -41,10 +39,23 @@ class JDBCOptions(
4139
JDBCOptions.JDBC_TABLE_NAME -> table)))
4240
}
4341

42+
/**
43+
* Returns a property with all options.
44+
*/
45+
val asProperties: Properties = {
46+
val properties = new Properties()
47+
parameters.foreach { case (k, v) => properties.setProperty(k, v) }
48+
properties
49+
}
50+
51+
/**
52+
* Returns a property with all options except Spark internal data source options like `url`,
53+
* `dbtable`, and `numPartition`. This should be used when invoking JDBC API like `Driver.connect`
54+
* because each DBMS vendor has its own property list for JDBC driver. See SPARK-17776.
55+
*/
4456
val asConnectionProperties: Properties = {
4557
val properties = new Properties()
46-
// We should avoid to pass the options into properties. See SPARK-17776.
47-
parameters.filterKeys(!jdbcOptionNames.contains(_))
58+
parameters.filterKeys(key => !jdbcOptionNames(key.toLowerCase))
4859
.foreach { case (k, v) => properties.setProperty(k, v) }
4960
properties
5061
}
@@ -125,10 +136,10 @@ class JDBCOptions(
125136
}
126137

127138
object JDBCOptions {
128-
private val jdbcOptionNames = ArrayBuffer.empty[String]
139+
private val jdbcOptionNames = collection.mutable.Set[String]()
129140

130141
private def newOption(name: String): String = {
131-
jdbcOptionNames += name
142+
jdbcOptionNames += name.toLowerCase
132143
name
133144
}
134145

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ object JDBCRDD extends Logging {
5454
def resolveTable(options: JDBCOptions): StructType = {
5555
val url = options.url
5656
val table = options.table
57-
val properties = options.asConnectionProperties
5857
val dialect = JdbcDialects.get(url)
5958
val conn: Connection = JdbcUtils.createConnectionFactory(options)()
6059
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ private[sql] case class JDBCRelation(
131131
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
132132
val url = jdbcOptions.url
133133
val table = jdbcOptions.table
134-
val properties = jdbcOptions.asConnectionProperties
134+
val properties = jdbcOptions.asProperties
135135
data.write
136136
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
137137
.jdbc(url, table, properties)

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
2626

2727
import org.apache.spark.SparkFunSuite
2828
import org.apache.spark.sql.{DataFrame, Row}
29+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2930
import org.apache.spark.sql.execution.DataSourceScanExec
3031
import org.apache.spark.sql.execution.command.ExplainCommand
3132
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -890,4 +891,13 @@ class JDBCSuite extends SparkFunSuite
890891
assert(sql("SELECT * FROM mixedCaseCols WHERE Id = 1 OR Name = 'mary'").collect().size == 2)
891892
assert(sql("SELECT * FROM mixedCaseCols WHERE Name = 'mary' AND Id = 2").collect().size == 1)
892893
}
894+
895+
test("SPARK-18419: Fix `asConnectionProperties` to filter case-insensitively") {
896+
val parameters = Map(
897+
"url" -> "jdbc:mysql://localhost:3306/temp",
898+
"dbtable" -> "t1",
899+
"numPartitions" -> "10")
900+
assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty)
901+
assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty)
902+
}
893903
}

0 commit comments

Comments
 (0)