Skip to content

Commit 50b89d0

Browse files
JustinPihonysrowen
authored andcommitted
[SPARK-14525][SQL] Make DataFrameWrite.save work for jdbc
## What changes were proposed in this pull request? This change modifies the implementation of DataFrameWriter.save such that it works with jdbc, and the call to jdbc merely delegates to save. ## How was this patch tested? This was tested via unit tests in the JDBCWriteSuite, of which I added one new test to cover this scenario. ## Additional details rxin This seems to have been most recently touched by you and was also commented on in the JIRA. This contribution is my original work and I license the work to the project under the project's open source license. Author: Justin Pihony <[email protected]> Author: Justin Pihony <[email protected]> Closes #12601 from JustinPihony/jdbc_reconciliation.
1 parent ac65139 commit 50b89d0

File tree

9 files changed

+246
-73
lines changed

9 files changed

+246
-73
lines changed

docs/sql-programming-guide.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1100,9 +1100,13 @@ CREATE TEMPORARY VIEW jdbcTable
11001100
USING org.apache.spark.sql.jdbc
11011101
OPTIONS (
11021102
url "jdbc:postgresql:dbserver",
1103-
dbtable "schema.tablename"
1103+
dbtable "schema.tablename",
1104+
user 'username',
1105+
password 'password'
11041106
)
11051107

1108+
INSERT INTO TABLE jdbcTable
1109+
SELECT * FROM resultTable
11061110
{% endhighlight %}
11071111

11081112
</div>

examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Arrays;
2323
import java.util.List;
2424
// $example off:schema_merging$
25+
import java.util.Properties;
2526

2627
// $example on:basic_parquet_example$
2728
import org.apache.spark.api.java.JavaRDD;
@@ -235,13 +236,33 @@ private static void runJsonDatasetExample(SparkSession spark) {
235236

236237
private static void runJdbcDatasetExample(SparkSession spark) {
237238
// $example on:jdbc_dataset$
239+
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
240+
// Loading data from a JDBC source
238241
Dataset<Row> jdbcDF = spark.read()
239242
.format("jdbc")
240243
.option("url", "jdbc:postgresql:dbserver")
241244
.option("dbtable", "schema.tablename")
242245
.option("user", "username")
243246
.option("password", "password")
244247
.load();
248+
249+
Properties connectionProperties = new Properties();
250+
connectionProperties.put("user", "username");
251+
connectionProperties.put("password", "password");
252+
Dataset<Row> jdbcDF2 = spark.read()
253+
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
254+
255+
// Saving data to a JDBC source
256+
jdbcDF.write()
257+
.format("jdbc")
258+
.option("url", "jdbc:postgresql:dbserver")
259+
.option("dbtable", "schema.tablename")
260+
.option("user", "username")
261+
.option("password", "password")
262+
.save();
263+
264+
jdbcDF2.write()
265+
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
245266
// $example off:jdbc_dataset$
246267
}
247268
}

examples/src/main/python/sql/datasource.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,32 @@ def json_dataset_example(spark):
143143

144144
def jdbc_dataset_example(spark):
145145
# $example on:jdbc_dataset$
146+
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
147+
# Loading data from a JDBC source
146148
jdbcDF = spark.read \
147149
.format("jdbc") \
148150
.option("url", "jdbc:postgresql:dbserver") \
149151
.option("dbtable", "schema.tablename") \
150152
.option("user", "username") \
151153
.option("password", "password") \
152154
.load()
155+
156+
jdbcDF2 = spark.read \
157+
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
158+
properties={"user": "username", "password": "password"})
159+
160+
# Saving data to a JDBC source
161+
jdbcDF.write \
162+
.format("jdbc") \
163+
.option("url", "jdbc:postgresql:dbserver") \
164+
.option("dbtable", "schema.tablename") \
165+
.option("user", "username") \
166+
.option("password", "password") \
167+
.save()
168+
169+
jdbcDF2.write \
170+
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
171+
properties={"user": "username", "password": "password"})
153172
# $example off:jdbc_dataset$
154173

155174

examples/src/main/r/RSparkSQLExample.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,11 @@ results <- collect(sql("FROM src SELECT key, value"))
204204

205205

206206
# $example on:jdbc_dataset$
207+
# Loading data from a JDBC source
207208
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
209+
210+
# Saving data to a JDBC source
211+
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
208212
# $example off:jdbc_dataset$
209213

210214
# Stop the SparkSession now

examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.examples.sql
1818

19+
import java.util.Properties
20+
1921
import org.apache.spark.sql.SparkSession
2022

2123
object SQLDataSourceExample {
@@ -148,13 +150,33 @@ object SQLDataSourceExample {
148150

149151
private def runJdbcDatasetExample(spark: SparkSession): Unit = {
150152
// $example on:jdbc_dataset$
153+
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
154+
// Loading data from a JDBC source
151155
val jdbcDF = spark.read
152156
.format("jdbc")
153157
.option("url", "jdbc:postgresql:dbserver")
154158
.option("dbtable", "schema.tablename")
155159
.option("user", "username")
156160
.option("password", "password")
157161
.load()
162+
163+
val connectionProperties = new Properties()
164+
connectionProperties.put("user", "username")
165+
connectionProperties.put("password", "password")
166+
val jdbcDF2 = spark.read
167+
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
168+
169+
// Saving data to a JDBC source
170+
jdbcDF.write
171+
.format("jdbc")
172+
.option("url", "jdbc:postgresql:dbserver")
173+
.option("dbtable", "schema.tablename")
174+
.option("user", "username")
175+
.option("password", "password")
176+
.save()
177+
178+
jdbcDF2.write
179+
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
158180
// $example off:jdbc_dataset$
159181
}
160182
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 4 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -425,62 +425,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
425425
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
426426
assertNotPartitioned("jdbc")
427427
assertNotBucketed("jdbc")
428-
429-
// to add required options like URL and dbtable
430-
val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table)
431-
val jdbcOptions = new JDBCOptions(params)
432-
val jdbcUrl = jdbcOptions.url
433-
val jdbcTable = jdbcOptions.table
434-
435-
val props = new Properties()
436-
extraOptions.foreach { case (key, value) =>
437-
props.put(key, value)
438-
}
439428
// connectionProperties should override settings in extraOptions
440-
props.putAll(connectionProperties)
441-
val conn = JdbcUtils.createConnectionFactory(jdbcUrl, props)()
442-
443-
try {
444-
var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable)
445-
446-
if (mode == SaveMode.Ignore && tableExists) {
447-
return
448-
}
449-
450-
if (mode == SaveMode.ErrorIfExists && tableExists) {
451-
sys.error(s"Table $jdbcTable already exists.")
452-
}
453-
454-
if (mode == SaveMode.Overwrite && tableExists) {
455-
if (jdbcOptions.isTruncate &&
456-
JdbcUtils.isCascadingTruncateTable(jdbcUrl) == Some(false)) {
457-
JdbcUtils.truncateTable(conn, jdbcTable)
458-
} else {
459-
JdbcUtils.dropTable(conn, jdbcTable)
460-
tableExists = false
461-
}
462-
}
463-
464-
// Create the table if the table didn't exist.
465-
if (!tableExists) {
466-
val schema = JdbcUtils.schemaString(df, jdbcUrl)
467-
// To allow certain options to append when create a new table, which can be
468-
// table_options or partition_options.
469-
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
470-
val createtblOptions = jdbcOptions.createTableOptions
471-
val sql = s"CREATE TABLE $jdbcTable ($schema) $createtblOptions"
472-
val statement = conn.createStatement
473-
try {
474-
statement.executeUpdate(sql)
475-
} finally {
476-
statement.close()
477-
}
478-
}
479-
} finally {
480-
conn.close()
481-
}
482-
483-
JdbcUtils.saveTable(df, jdbcUrl, jdbcTable, props)
429+
this.extraOptions = this.extraOptions ++ (connectionProperties.asScala)
430+
// explicit url and dbtable should override all
431+
this.extraOptions += ("url" -> url, "dbtable" -> table)
432+
format("jdbc").save()
484433
}
485434

486435
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ class JDBCOptions(
2727
// ------------------------------------------------------------
2828
// Required parameters
2929
// ------------------------------------------------------------
30+
require(parameters.isDefinedAt("url"), "Option 'url' is required.")
31+
require(parameters.isDefinedAt("dbtable"), "Option 'dbtable' is required.")
3032
// a JDBC URL
31-
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
33+
val url = parameters("url")
3234
// name of table
33-
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
35+
val table = parameters("dbtable")
3436

3537
// ------------------------------------------------------------
3638
// Optional parameter list
@@ -44,6 +46,11 @@ class JDBCOptions(
4446
// the number of partitions
4547
val numPartitions = parameters.getOrElse("numPartitions", null)
4648

49+
require(partitionColumn == null ||
50+
(lowerBound != null && upperBound != null && numPartitions != null),
51+
"If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," +
52+
" and 'numPartitions' are required.")
53+
4754
// ------------------------------------------------------------
4855
// The options for DataFrameWriter
4956
// ------------------------------------------------------------

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc
1919

2020
import java.util.Properties
2121

22-
import org.apache.spark.sql.SQLContext
23-
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
22+
import scala.collection.JavaConverters.mapAsJavaMapConverter
2423

25-
class JdbcRelationProvider extends RelationProvider with DataSourceRegister {
24+
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
25+
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
26+
27+
class JdbcRelationProvider extends CreatableRelationProvider
28+
with RelationProvider with DataSourceRegister {
2629

2730
override def shortName(): String = "jdbc"
2831

29-
/** Returns a new base relation with the given parameters. */
3032
override def createRelation(
3133
sqlContext: SQLContext,
3234
parameters: Map[String, String]): BaseRelation = {
3335
val jdbcOptions = new JDBCOptions(parameters)
34-
if (jdbcOptions.partitionColumn != null
35-
&& (jdbcOptions.lowerBound == null
36-
|| jdbcOptions.upperBound == null
37-
|| jdbcOptions.numPartitions == null)) {
38-
sys.error("Partitioning incompletely specified")
39-
}
36+
val partitionColumn = jdbcOptions.partitionColumn
37+
val lowerBound = jdbcOptions.lowerBound
38+
val upperBound = jdbcOptions.upperBound
39+
val numPartitions = jdbcOptions.numPartitions
4040

41-
val partitionInfo = if (jdbcOptions.partitionColumn == null) {
41+
val partitionInfo = if (partitionColumn == null) {
4242
null
4343
} else {
4444
JDBCPartitioningInfo(
45-
jdbcOptions.partitionColumn,
46-
jdbcOptions.lowerBound.toLong,
47-
jdbcOptions.upperBound.toLong,
48-
jdbcOptions.numPartitions.toInt)
45+
partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt)
4946
}
5047
val parts = JDBCRelation.columnPartition(partitionInfo)
5148
val properties = new Properties() // Additional properties that we will pass to getConnection
5249
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
5350
JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
5451
}
52+
53+
/*
54+
* The following structure applies to this code:
55+
* | tableExists | !tableExists
56+
*------------------------------------------------------------------------------------
57+
* Ignore | BaseRelation | CreateTable, saveTable, BaseRelation
58+
* ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation
59+
* Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation
60+
* | saveTable, BaseRelation |
61+
* Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation
62+
*
63+
* *Overwrite & tableExists with truncate, will not drop & create, but instead truncate
64+
*/
65+
override def createRelation(
66+
sqlContext: SQLContext,
67+
mode: SaveMode,
68+
parameters: Map[String, String],
69+
data: DataFrame): BaseRelation = {
70+
val jdbcOptions = new JDBCOptions(parameters)
71+
val url = jdbcOptions.url
72+
val table = jdbcOptions.table
73+
74+
val props = new Properties()
75+
props.putAll(parameters.asJava)
76+
val conn = JdbcUtils.createConnectionFactory(url, props)()
77+
78+
try {
79+
val tableExists = JdbcUtils.tableExists(conn, url, table)
80+
81+
val (doCreate, doSave) = (mode, tableExists) match {
82+
case (SaveMode.Ignore, true) => (false, false)
83+
case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(
84+
s"Table or view '$table' already exists, and SaveMode is set to ErrorIfExists.")
85+
case (SaveMode.Overwrite, true) =>
86+
if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
87+
JdbcUtils.truncateTable(conn, table)
88+
(false, true)
89+
} else {
90+
JdbcUtils.dropTable(conn, table)
91+
(true, true)
92+
}
93+
case (SaveMode.Append, true) => (false, true)
94+
case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," +
95+
" for handling existing tables.")
96+
case (_, false) => (true, true)
97+
}
98+
99+
if (doCreate) {
100+
val schema = JdbcUtils.schemaString(data, url)
101+
// To allow certain options to append when create a new table, which can be
102+
// table_options or partition_options.
103+
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
104+
val createtblOptions = jdbcOptions.createTableOptions
105+
val sql = s"CREATE TABLE $table ($schema) $createtblOptions"
106+
val statement = conn.createStatement
107+
try {
108+
statement.executeUpdate(sql)
109+
} finally {
110+
statement.close()
111+
}
112+
}
113+
if (doSave) JdbcUtils.saveTable(data, url, table, props)
114+
} finally {
115+
conn.close()
116+
}
117+
118+
createRelation(sqlContext, parameters)
119+
}
55120
}

0 commit comments

Comments
 (0)