Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6be5046
[SPARK-14525][SQL] Make DataFrameWrite.save work for jdbc
JustinPihony Apr 22, 2016
db639a5
Merge assertion into jdbc
JustinPihony May 19, 2016
69f7c7b
Merge https://github.com/apache/spark into jdbc_reconciliation
JustinPihony Jun 6, 2016
88d181e
[SPARK-14525][SQL] Make jdbc a CreatableRelationProvider for simpler …
JustinPihony Jun 7, 2016
c44271e
[SPARK-14525][SQL] Clean empty space commit
JustinPihony Jun 7, 2016
0a98e45
Merge
JustinPihony Jul 7, 2016
cb9889e
[SPARK-14525][SQL]Address some code reviews
JustinPihony Jul 7, 2016
d18efef
Merge branch 'jdbc_reconciliation' of https://github.com/JustinPihony…
JustinPihony Jul 7, 2016
754b360
Change sys.error to require
JustinPihony Aug 31, 2016
1d0d61c
Remove the last sys.error
JustinPihony Aug 31, 2016
c8e4143
Remove the local import
JustinPihony Sep 1, 2016
f7f2615
Merge
JustinPihony Sep 8, 2016
95431e3
Merge branch 'jdbc_reconciliation' of https://github.com/JustinPihony…
JustinPihony Sep 8, 2016
e8c2d7d
Fix whitespace
JustinPihony Sep 8, 2016
ae6ad8b
Removed SchemaRelationProvider
JustinPihony Sep 8, 2016
c387c17
Add forgotten stashed test
JustinPihony Sep 8, 2016
57ac87e
Address comments
JustinPihony Sep 8, 2016
de53734
Address more comments
JustinPihony Sep 8, 2016
379e00f
More insanity
JustinPihony Sep 8, 2016
ea9d2fe
Analysis Exception
JustinPihony Sep 8, 2016
c686b0e
Simplify require
JustinPihony Sep 8, 2016
7ef7a48
Documentation
JustinPihony Sep 12, 2016
c9dcdc4
Documentation for java and python
JustinPihony Sep 15, 2016
447ab82
Add semicolons to my java
JustinPihony Sep 15, 2016
4a02c82
Add import
JustinPihony Sep 15, 2016
a238156
Fixed up java and tested past it on my end
JustinPihony Sep 15, 2016
06c1cba
R and SQL documentation
JustinPihony Sep 24, 2016
8fb86b4
Move import back
JustinPihony Sep 24, 2016
724bbe2
Address comments
JustinPihony Sep 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 10 additions & 40 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
bucketSpec = getBucketSpec,
options = extraOptions.toMap)

dataSource.write(mode, df)
dataSource.providingClass.newInstance() match {
case jdbc: execution.datasources.jdbc.DefaultSource =>
Copy link
Member

@HyukjinKwon HyukjinKwon Apr 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks a new method is introduced. I think we don't necessarily have to introduce this new function but use the existing interfaces, eg. CreatableRelationProvider in interfaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and admit I was being lazy in not trying to figure out how to make the current implementation return a BaseRelation. I'll take another look today at just turning the DefaultSource into a CreatableRelationProvider

jdbc.write(mode, df, extraOptions.toMap)
case _ => dataSource.write(mode, df)
}
}

/**
Expand Down Expand Up @@ -489,46 +493,12 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 1.4.0
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
import scala.collection.JavaConverters._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd import these with other imports

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to only import them here because it is the only place they are required, so there is no need to drag in the import to the whole class.

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(+1 for moving the import up.)

// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val conn = JdbcUtils.createConnectionFactory(url, props)()

try {
var tableExists = JdbcUtils.tableExists(conn, url, table)

if (mode == SaveMode.Ignore && tableExists) {
return
}

if (mode == SaveMode.ErrorIfExists && tableExists) {
sys.error(s"Table $table already exists.")
}

if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table)
tableExists = false
}

// Create the table if the table didn't exist.
if (!tableExists) {
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
}
}
} finally {
conn.close()
}

JdbcUtils.saveTable(df, url, table, props)
this.extraOptions = this.extraOptions ++ (connectionProperties.asScala)
// explicit url and dbtable should override all
this.extraOptions += ("url" -> url, "dbtable" -> table)
format("jdbc").save
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The omission of parentheses on methods should only be used when the method has no side-effects.

Thus, please change it to save()

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc

import java.util.Properties

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}

class DefaultSource extends RelationProvider with DataSourceRegister {
Expand Down Expand Up @@ -56,4 +56,52 @@ class DefaultSource extends RelationProvider with DataSourceRegister {
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
JDBCRelation(url, table, parts, properties)(sqlContext)
}

def write(mode: SaveMode, data: DataFrame, options: Map[String, String]): Unit = {
val url = options.getOrElse("url",
sys.error("Saving jdbc source requires url to be set." +
" (ie. df.option(\"url\", \"ACTUAL_URL\")"))
val table = options.getOrElse("dbtable", options.getOrElse("table",
sys.error("Saving jdbc source requires dbtable to be set." +
" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")")))

import collection.JavaConverters._
val props = new Properties()
props.putAll(options.asJava)

val conn = JdbcUtils.createConnectionFactory(url, props)()

try {
var tableExists = JdbcUtils.tableExists(conn, url, table)

if (mode == SaveMode.Ignore && tableExists) {
return
}

if (mode == SaveMode.ErrorIfExists && tableExists) {
sys.error(s"Table $table already exists.")
}

if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table)
tableExists = false
}

// Create the table if the table didn't exist.
if (!tableExists) {
val schema = JdbcUtils.schemaString(data, url)
val sql = s"CREATE TABLE $table ($schema)"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
}
}
} finally {
conn.close()
}

JdbcUtils.saveTable(data, url, table, props)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}

test("save works for format(\"jdbc\") if url and dbtable are set") {
val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2)

df.write.format("jdbc")
.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST"))
.save
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: save -> save()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count)
assert(
2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length)
}
}