Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6be5046
[SPARK-14525][SQL] Make DataFrameWrite.save work for jdbc
JustinPihony Apr 22, 2016
db639a5
Merge assertion into jdbc
JustinPihony May 19, 2016
69f7c7b
Merge https://github.com/apache/spark into jdbc_reconciliation
JustinPihony Jun 6, 2016
88d181e
[SPARK-14525][SQL] Make jdbc a CreatableRelationProvider for simpler …
JustinPihony Jun 7, 2016
c44271e
[SPARK-14525][SQL] Clean empty space commit
JustinPihony Jun 7, 2016
0a98e45
Merge
JustinPihony Jul 7, 2016
cb9889e
[SPARK-14525][SQL]Address some code reviews
JustinPihony Jul 7, 2016
d18efef
Merge branch 'jdbc_reconciliation' of https://github.com/JustinPihony…
JustinPihony Jul 7, 2016
754b360
Change sys.error to require
JustinPihony Aug 31, 2016
1d0d61c
Remove the last sys.error
JustinPihony Aug 31, 2016
c8e4143
Remove the local import
JustinPihony Sep 1, 2016
f7f2615
Merge
JustinPihony Sep 8, 2016
95431e3
Merge branch 'jdbc_reconciliation' of https://github.com/JustinPihony…
JustinPihony Sep 8, 2016
e8c2d7d
Fix whitespace
JustinPihony Sep 8, 2016
ae6ad8b
Removed SchemaRelationProvider
JustinPihony Sep 8, 2016
c387c17
Add forgotten stashed test
JustinPihony Sep 8, 2016
57ac87e
Address comments
JustinPihony Sep 8, 2016
de53734
Address more comments
JustinPihony Sep 8, 2016
379e00f
More insanity
JustinPihony Sep 8, 2016
ea9d2fe
Analysis Exception
JustinPihony Sep 8, 2016
c686b0e
Simplify require
JustinPihony Sep 8, 2016
7ef7a48
Documentation
JustinPihony Sep 12, 2016
c9dcdc4
Documentation for java and python
JustinPihony Sep 15, 2016
447ab82
Add semicolons to my java
JustinPihony Sep 15, 2016
4a02c82
Add import
JustinPihony Sep 15, 2016
a238156
Fixed up java and tested past it on my end
JustinPihony Sep 15, 2016
06c1cba
R and SQL documentation
JustinPihony Sep 24, 2016
8fb86b4
Move import back
JustinPihony Sep 24, 2016
724bbe2
Address comments
JustinPihony Sep 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1100,9 +1100,13 @@ CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename"
dbtable "schema.tablename",
user 'username',
password 'password'
)

INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable
{% endhighlight %}

</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
// $example off:schema_merging$
import java.util.Properties;

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should put java.util.List and java.util.Properties imports together without additional newline. It seems you already know but just in case - imports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this really be added to the example, though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to not follow the guildline?

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, maybe, my previous comment was not clear. I meant

import java.util.List;
// $example off:schema_merging$
import java.util.Properties;

I haven't tried to build the doc against the current state of this PR but I guess we won't need this import for Parquet`s schema mering example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Yes, that is what I was talking about...just fixed it back

// $example on:basic_parquet_example$
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -235,13 +236,33 @@ private static void runJsonDatasetExample(SparkSession spark) {

private static void runJdbcDatasetExample(SparkSession spark) {
// $example on:jdbc_dataset$
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();

jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// $example off:jdbc_dataset$
}
}
19 changes: 19 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,32 @@ def json_dataset_example(spark):

def jdbc_dataset_example(spark):
# $example on:jdbc_dataset$
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()

jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})

# Saving data to a JDBC source
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()

jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# $example off:jdbc_dataset$


Expand Down
4 changes: 4 additions & 0 deletions examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ results <- collect(sql("FROM src SELECT key, value"))


# $example on:jdbc_dataset$
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# $example off:jdbc_dataset$

# Stop the SparkSession now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.examples.sql

import java.util.Properties

import org.apache.spark.sql.SparkSession

object SQLDataSourceExample {
Expand Down Expand Up @@ -148,13 +150,33 @@ object SQLDataSourceExample {

private def runJdbcDatasetExample(spark: SparkSession): Unit = {
// $example on:jdbc_dataset$
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()

jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// $example off:jdbc_dataset$
}
}
59 changes: 4 additions & 55 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,62 +420,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
assertNotPartitioned("jdbc")
assertNotBucketed("jdbc")

// to add required options like URL and dbtable
val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table)
val jdbcOptions = new JDBCOptions(params)
val jdbcUrl = jdbcOptions.url
val jdbcTable = jdbcOptions.table

val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val conn = JdbcUtils.createConnectionFactory(jdbcUrl, props)()

try {
var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable)

if (mode == SaveMode.Ignore && tableExists) {
return
}

if (mode == SaveMode.ErrorIfExists && tableExists) {
sys.error(s"Table $jdbcTable already exists.")
}

if (mode == SaveMode.Overwrite && tableExists) {
if (jdbcOptions.isTruncate &&
JdbcUtils.isCascadingTruncateTable(jdbcUrl) == Some(false)) {
JdbcUtils.truncateTable(conn, jdbcTable)
} else {
JdbcUtils.dropTable(conn, jdbcTable)
tableExists = false
}
}

// Create the table if the table didn't exist.
if (!tableExists) {
val schema = JdbcUtils.schemaString(df, jdbcUrl)
// To allow certain options to append when create a new table, which can be
// table_options or partition_options.
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
val createtblOptions = jdbcOptions.createTableOptions
val sql = s"CREATE TABLE $jdbcTable ($schema) $createtblOptions"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
}
}
} finally {
conn.close()
}

JdbcUtils.saveTable(df, jdbcUrl, jdbcTable, props)
this.extraOptions = this.extraOptions ++ (connectionProperties.asScala)
// explicit url and dbtable should override all
this.extraOptions += ("url" -> url, "dbtable" -> table)
format("jdbc").save()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ class JDBCOptions(
// ------------------------------------------------------------
// Required parameters
// ------------------------------------------------------------
require(parameters.isDefinedAt("url"), "Option 'url' is required.")
require(parameters.isDefinedAt("dbtable"), "Option 'dbtable' is required.")
// a JDBC URL
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
val url = parameters("url")
// name of table
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
val table = parameters("dbtable")

// ------------------------------------------------------------
// Optional parameter list
Expand All @@ -44,6 +46,11 @@ class JDBCOptions(
// the number of partitions
val numPartitions = parameters.getOrElse("numPartitions", null)

require(partitionColumn == null ||
(lowerBound != null && upperBound != null && numPartitions != null),
"If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," +
" and 'numPartitions' are required.")

// ------------------------------------------------------------
// The options for DataFrameWriter
// ------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc

import java.util.Properties

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
import scala.collection.JavaConverters.mapAsJavaMapConverter

class JdbcRelationProvider extends RelationProvider with DataSourceRegister {
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}

class JdbcRelationProvider extends CreatableRelationProvider
with RelationProvider with DataSourceRegister {

override def shortName(): String = "jdbc"

/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val jdbcOptions = new JDBCOptions(parameters)
if (jdbcOptions.partitionColumn != null
&& (jdbcOptions.lowerBound == null
|| jdbcOptions.upperBound == null
|| jdbcOptions.numPartitions == null)) {
sys.error("Partitioning incompletely specified")
}
val partitionColumn = jdbcOptions.partitionColumn
val lowerBound = jdbcOptions.lowerBound
val upperBound = jdbcOptions.upperBound
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this is removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was moved into the JDBCOptions as had been previously discussed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, we do not have a test case to cover that. Since you made a change, could you add such a test case? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

val numPartitions = jdbcOptions.numPartitions

val partitionInfo = if (jdbcOptions.partitionColumn == null) {
val partitionInfo = if (partitionColumn == null) {
null
} else {
JDBCPartitioningInfo(
jdbcOptions.partitionColumn,
jdbcOptions.lowerBound.toLong,
jdbcOptions.upperBound.toLong,
jdbcOptions.numPartitions.toInt)
partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
val properties = new Properties() // Additional properties that we will pass to getConnection
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
}

/*
* The following structure applies to this code:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this table mean? what is CreateTable, saveTable, BaseRelation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, at least, three of reviewers are confused of this bit. Do you mind if I submit a PR to clean up this part?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the table does not exist and the mode is OVERWRITE, we create a table, then insert rows into the table, and finally return a BaseRelation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also took a look at @gatorsmile 's approach, I think it's easier to understand, why it's rejected? We can also get rid of the return:

if (tableExists) {
  mode match {
    case SaveMode.Ignore =>
    ......
  }
} else {
  ......
}

* | tableExists | !tableExists
*------------------------------------------------------------------------------------
* Ignore | BaseRelation | CreateTable, saveTable, BaseRelation
* ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation
* Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation
* | saveTable, BaseRelation |
* Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation
*
* *Overwrite & tableExists with truncate, will not drop & create, but instead truncate
*/
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val jdbcOptions = new JDBCOptions(parameters)
val url = jdbcOptions.url
val table = jdbcOptions.table

val props = new Properties()
props.putAll(parameters.asJava)
val conn = JdbcUtils.createConnectionFactory(url, props)()

try {
val tableExists = JdbcUtils.tableExists(conn, url, table)

val (doCreate, doSave) = (mode, tableExists) match {
Copy link
Member

@HyukjinKwon HyukjinKwon Sep 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I meant to correct this as @gatorsmile did in here. I am not saying this is wrong or inappropriate but just personally I'd prefer this way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also prefer to my way, which looks cleaner and easier to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your way results in the need for a return, which can lead to problems and is generally discouraged. In the current implementation you could just have it do nothing and the next if block will be skipped anyway, but that leaves a lot of room for error in further code changes. Whereas this way is very explicit about the rules and what each combination will yield.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I am fine, if the other are ok about it. Let me review your version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then would it make sense if we add some comments for each case? In a quick look, it seems really confusing what each case means to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did add a comment in the method signature. That and the variable naming conventions should cover this.

case (SaveMode.Ignore, true) => (false, false)
case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(
s"Table or view '$table' already exists, and SaveMode is set to ErrorIfExists.")
case (SaveMode.Overwrite, true) =>
if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
JdbcUtils.truncateTable(conn, table)
(false, true)
} else {
JdbcUtils.dropTable(conn, table)
(true, true)
}
case (SaveMode.Append, true) => (false, true)
case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," +
" for handling existing tables.")
case (_, false) => (true, true)
Copy link
Member

@HyukjinKwon HyukjinKwon Jun 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I think this combinations of booleans might be a bit confusing. It might be better if they have some variables so that we can understand what each case means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure I get what you mean, as the booleans map directly to a variable. The only thing I can think of beyond using a var and setting the variables directly (ugly) is to create a JDBCActionDecider to wrap the values and return that, but that ultimately adds another level of indirection that seems unnecessary in this case.
And I do not think that comments should be necessary, but there is also the added benefit of having this broken down in the method comment.

}

if (doCreate) {
val schema = JdbcUtils.schemaString(data, url)
// To allow certain options to append when create a new table, which can be
// table_options or partition_options.
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
val createtblOptions = jdbcOptions.createTableOptions
val sql = s"CREATE TABLE $table ($schema) $createtblOptions"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
}
}
if (doSave) JdbcUtils.saveTable(data, url, table, props)
} finally {
conn.close()
}

createRelation(sqlContext, parameters)
}
}
Loading