From 6be5046119f1f9cffb78a08a9dbe614ed4f9d460 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Fri, 22 Apr 2016 01:55:34 -0400 Subject: [PATCH 01/23] [SPARK-14525][SQL] Make DataFrameWrite.save work for jdbc --- .../apache/spark/sql/DataFrameWriter.scala | 50 ++++--------------- .../datasources/jdbc/DefaultSource.scala | 50 ++++++++++++++++++- .../spark/sql/jdbc/JDBCWriteSuite.scala | 12 +++++ 3 files changed, 71 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 54d250867fbb..534b3398169b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -244,7 +244,11 @@ final class DataFrameWriter private[sql](df: DataFrame) { bucketSpec = getBucketSpec, options = extraOptions.toMap) - dataSource.write(mode, df) + dataSource.providingClass.newInstance() match { + case jdbc: execution.datasources.jdbc.DefaultSource => + jdbc.write(mode, df, extraOptions.toMap) + case _ => dataSource.write(mode, df) + } } /** @@ -489,46 +493,12 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { - val props = new Properties() - extraOptions.foreach { case (key, value) => - props.put(key, value) - } + import scala.collection.JavaConverters._ // connectionProperties should override settings in extraOptions - props.putAll(connectionProperties) - val conn = JdbcUtils.createConnectionFactory(url, props)() - - try { - var tableExists = JdbcUtils.tableExists(conn, url, table) - - if (mode == SaveMode.Ignore && tableExists) { - return - } - - if (mode == SaveMode.ErrorIfExists && tableExists) { - sys.error(s"Table $table already exists.") - } - - if (mode == SaveMode.Overwrite && tableExists) { - JdbcUtils.dropTable(conn, table) - tableExists = false - } - - // Create the table if the table didn't exist. - if (!tableExists) { - val schema = JdbcUtils.schemaString(df, url) - val sql = s"CREATE TABLE $table ($schema)" - val statement = conn.createStatement - try { - statement.executeUpdate(sql) - } finally { - statement.close() - } - } - } finally { - conn.close() - } - - JdbcUtils.saveTable(df, url, table, props) + this.extraOptions = this.extraOptions ++ (connectionProperties.asScala) + // explicit url and dbtable should override all + this.extraOptions += ("url" -> url, "dbtable" -> table) + format("jdbc").save } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala index 4dcd261f5cbe..73002cb91d90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} class DefaultSource extends RelationProvider with DataSourceRegister { @@ -56,4 +56,52 @@ class DefaultSource extends RelationProvider with DataSourceRegister { parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(url, table, parts, properties)(sqlContext) } + + def write(mode: SaveMode, data: DataFrame, options: Map[String, String]): Unit = { + val url = options.getOrElse("url", + sys.error("Saving jdbc source requires url to be set." + + " (ie. df.option(\"url\", \"ACTUAL_URL\")")) + val table = options.getOrElse("dbtable", options.getOrElse("table", + sys.error("Saving jdbc source requires dbtable to be set." + + " (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")"))) + + import collection.JavaConverters._ + val props = new Properties() + props.putAll(options.asJava) + + val conn = JdbcUtils.createConnectionFactory(url, props)() + + try { + var tableExists = JdbcUtils.tableExists(conn, url, table) + + if (mode == SaveMode.Ignore && tableExists) { + return + } + + if (mode == SaveMode.ErrorIfExists && tableExists) { + sys.error(s"Table $table already exists.") + } + + if (mode == SaveMode.Overwrite && tableExists) { + JdbcUtils.dropTable(conn, table) + tableExists = false + } + + // Create the table if the table didn't exist. + if (!tableExists) { + val schema = JdbcUtils.schemaString(data, url) + val sql = s"CREATE TABLE $table ($schema)" + val statement = conn.createStatement + try { + statement.executeUpdate(sql) + } finally { + statement.close() + } + } + } finally { + conn.close() + } + + JdbcUtils.saveTable(data, url, table, props) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index e23ee6693133..7c2e178de73f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -151,4 +151,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count) assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { + val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + df.write.format("jdbc") + .options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST")) + .save + + assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) + assert( + 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + } } From 88d181e1b83c423bec90c01b205c8864489e2c1a Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Tue, 7 Jun 2016 00:47:09 -0400 Subject: [PATCH 02/23] [SPARK-14525][SQL] Make jdbc a CreatableRelationProvider for simpler saving --- .../apache/spark/sql/DataFrameWriter.scala | 6 +- .../execution/datasources/jdbc/JDBCRDD.scala | 1 + .../datasources/jdbc/JDBCRelation.scala | 14 ++- .../jdbc/JdbcRelationProvider.scala | 94 +++++++++++-------- 4 files changed, 71 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index db688e0de8c0..34cffa380f57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -290,11 +290,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { bucketSpec = getBucketSpec, options = extraOptions.toMap) - dataSource.providingClass.newInstance() match { - case jdbc: execution.datasources.jdbc.DefaultSource => - jdbc.write(mode, df, extraOptions.toMap) - case _ => dataSource.write(mode, df) - } + dataSource.write(mode, df) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 6a5564addf48..201c81fe2b00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -144,6 +144,7 @@ private[sql] object JDBCRDD extends Logging { val columnType = dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( getCatalystType(dataType, fieldSize, fieldScale, isSigned)) + fields(i) = StructField(columnName, columnType, nullable, metadata.build()) i = i + 1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 233b7891d664..35aa4136083f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -87,7 +87,8 @@ private[sql] case class JDBCRelation( url: String, table: String, parts: Array[Partition], - properties: Properties = new Properties())(@transient val sparkSession: SparkSession) + properties: Properties = new Properties(), + providedSchemaOption: Option[StructType] = None)(@transient val sparkSession: SparkSession) extends BaseRelation with PrunedFilteredScan with InsertableRelation { @@ -96,7 +97,16 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false - override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + override val schema: StructType = { + val resolvedSchema = JDBCRDD.resolveTable(url, table, properties) + providedSchemaOption match { + case Some(providedSchema) => + if (providedSchema.sql.toLowerCase == resolvedSchema.sql.toLowerCase) resolvedSchema + else sys.error(s"User specified schema, $providedSchema, " + + s"does not match the actual schema, $resolvedSchema.") + case None => resolvedSchema + } + } // Check if JDBCRDD.compileFilter can accept input filters override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index fa6b19745510..e54e8afe9984 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -20,71 +20,90 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - val jdbcOptions = new JDBCOptions(parameters) - if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null - || jdbcOptions.upperBound == null - || jdbcOptions.numPartitions == null)) { + createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { + val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) + val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) + val partitionColumn = parameters.getOrElse("partitionColumn", null) + val lowerBound = parameters.getOrElse("lowerBound", null) + val upperBound = parameters.getOrElse("upperBound", null) + val numPartitions = parameters.getOrElse("numPartitions", null) + + if (partitionColumn != null + && (lowerBound == null || upperBound == null || numPartitions == null)) { sys.error("Partitioning incompletely specified") } - val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null - } else { + val partitionInfo = if (partitionColumn == null) null + else { JDBCPartitioningInfo( - jdbcOptions.partitionColumn, - jdbcOptions.lowerBound.toLong, - jdbcOptions.upperBound.toLong, - jdbcOptions.numPartitions.toInt) + partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) - JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) + JDBCRelation(url, table, parts, properties, Option(schema))(sqlContext.sparkSession) } - def write(mode: SaveMode, data: DataFrame, options: Map[String, String]): Unit = { - val url = options.getOrElse("url", + /* + * The following structure applies to this code: + * | tableExists | !tableExists + *------------------------------------------------------------------------------------ + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite | DropTable, CreateTable, | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + val url = parameters.getOrElse("url", sys.error("Saving jdbc source requires url to be set." + " (ie. df.option(\"url\", \"ACTUAL_URL\")")) - val table = options.getOrElse("dbtable", options.getOrElse("table", + val table = parameters.getOrElse("dbtable", parameters.getOrElse("table", sys.error("Saving jdbc source requires dbtable to be set." + " (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")"))) import collection.JavaConverters._ val props = new Properties() - props.putAll(options.asJava) - + props.putAll(parameters.asJava) val conn = JdbcUtils.createConnectionFactory(url, props)() try { - var tableExists = JdbcUtils.tableExists(conn, url, table) - - if (mode == SaveMode.Ignore && tableExists) { - return - } - - if (mode == SaveMode.ErrorIfExists && tableExists) { - sys.error(s"Table $table already exists.") - } - - if (mode == SaveMode.Overwrite && tableExists) { - JdbcUtils.dropTable(conn, table) - tableExists = false + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { + case (SaveMode.Ignore, true) => (false, false) + case (SaveMode.ErrorIfExists, true) => sys.error(s"Table $table already exists.") + case (SaveMode.Overwrite, true) => + JdbcUtils.dropTable(conn, table) + (true, true) + case (SaveMode.Append, true) => (false, true) + case (_, true) => sys.error(s"Unexpected SaveMode, '$mode', for handling existing tables.") + case (_, false) => (true, true) } - // Create the table if the table didn't exist. - if (!tableExists) { + if(doCreate) { val schema = JdbcUtils.schemaString(data, url) val sql = s"CREATE TABLE $table ($schema)" val statement = conn.createStatement @@ -94,10 +113,11 @@ class JdbcRelationProvider extends RelationProvider with DataSourceRegister { statement.close() } } + if(doSave) JdbcUtils.saveTable(data, url, table, props) } finally { conn.close() } - JdbcUtils.saveTable(data, url, table, props) + createRelation(sqlContext, parameters, data.schema) } } From c44271e8575722764d5a525548e941fa2ea50088 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Tue, 7 Jun 2016 00:48:59 -0400 Subject: [PATCH 03/23] [SPARK-14525][SQL] Clean empty space commit --- .../apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 201c81fe2b00..6a5564addf48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -144,7 +144,6 @@ private[sql] object JDBCRDD extends Logging { val columnType = dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( getCatalystType(dataType, fieldSize, fieldScale, isSigned)) - fields(i) = StructField(columnName, columnType, nullable, metadata.build()) i = i + 1 } From cb9889e0692994b4d67909ade36cfd6e92d1b784 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Wed, 6 Jul 2016 21:12:38 -0400 Subject: [PATCH 04/23] [SPARK-14525][SQL]Address some code reviews --- .../datasources/jdbc/JDBCOptions.scala | 5 +++++ .../datasources/jdbc/JDBCRelation.scala | 9 +-------- .../jdbc/JdbcRelationProvider.scala | 19 +++++++------------ 3 files changed, 13 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 6c6ec89746ee..aadd5b363dfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -36,4 +36,9 @@ private[jdbc] class JDBCOptions( val upperBound = parameters.getOrElse("upperBound", null) // the number of partitions val numPartitions = parameters.getOrElse("numPartitions", null) + + if (partitionColumn != null + && (lowerBound == null || upperBound == null || numPartitions == null)) { + sys.error("Partitioning incompletely specified") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index a1bbaa394d40..e37bc7653365 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -116,14 +116,7 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false override val schema: StructType = { - val resolvedSchema = JDBCRDD.resolveTable(url, table, properties) - providedSchemaOption match { - case Some(providedSchema) => - if (providedSchema.sql.toLowerCase == resolvedSchema.sql.toLowerCase) resolvedSchema - else sys.error(s"User specified schema, $providedSchema, " + - s"does not match the actual schema, $resolvedSchema.") - case None => resolvedSchema - } + providedSchemaOption.getOrElse(JDBCRDD.resolveTable(url, table, properties)) } // Check if JDBCRDD.compileFilter can accept input filters diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index e54e8afe9984..bd98b81ae161 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -39,17 +39,11 @@ class JdbcRelationProvider extends CreatableRelationProvider sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = { - val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) - val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) - val partitionColumn = parameters.getOrElse("partitionColumn", null) - val lowerBound = parameters.getOrElse("lowerBound", null) - val upperBound = parameters.getOrElse("upperBound", null) - val numPartitions = parameters.getOrElse("numPartitions", null) - - if (partitionColumn != null - && (lowerBound == null || upperBound == null || numPartitions == null)) { - sys.error("Partitioning incompletely specified") - } + val jdbcOptions = new JDBCOptions(parameters) + val partitionColumn = jdbcOptions.partitionColumn + val lowerBound = jdbcOptions.lowerBound + val upperBound = jdbcOptions.upperBound + val numPartitions = jdbcOptions.numPartitions val partitionInfo = if (partitionColumn == null) null else { @@ -59,7 +53,8 @@ class JdbcRelationProvider extends CreatableRelationProvider val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) - JDBCRelation(url, table, parts, properties, Option(schema))(sqlContext.sparkSession) + JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) } /* From 754b360b08b07d406bfe801be33fdad1161bf37a Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Wed, 31 Aug 2016 16:29:32 -0400 Subject: [PATCH 05/23] Change sys.error to require --- .../execution/datasources/jdbc/JDBCOptions.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index aadd5b363dfa..74efd2cc5f71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -24,10 +24,12 @@ private[jdbc] class JDBCOptions( @transient private val parameters: Map[String, String]) extends Serializable { + require(parameters.isDefinedAt("url"), "Option 'url' is required.") + require(parameters.isDefinedAt("dbtable"), "Option 'dbtable' is required.") // a JDBC URL - val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) + val url = parameters("url") // name of table - val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) + val table = parameters("dbtable") // the column used to partition val partitionColumn = parameters.getOrElse("partitionColumn", null) // the lower bound of partition column @@ -37,8 +39,8 @@ private[jdbc] class JDBCOptions( // the number of partitions val numPartitions = parameters.getOrElse("numPartitions", null) - if (partitionColumn != null - && (lowerBound == null || upperBound == null || numPartitions == null)) { - sys.error("Partitioning incompletely specified") - } + require(partitionColumn == null || + (partitionColumn != null && lowerBound != null && upperBound != null && numPartitions != null), + "If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," + + " and 'numPartitions' are required.") } From 1d0d61c55ce1faf290f1eb7677efac33317a66da Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Wed, 31 Aug 2016 16:54:08 -0400 Subject: [PATCH 06/23] Remove the last sys.error --- .../jdbc/JdbcRelationProvider.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index bd98b81ae161..ac3512d6abd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} @@ -72,12 +73,12 @@ class JdbcRelationProvider extends CreatableRelationProvider mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { - val url = parameters.getOrElse("url", - sys.error("Saving jdbc source requires url to be set." + - " (ie. df.option(\"url\", \"ACTUAL_URL\")")) - val table = parameters.getOrElse("dbtable", parameters.getOrElse("table", - sys.error("Saving jdbc source requires dbtable to be set." + - " (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")"))) + require(parameters.isDefinedAt("url"), "Saving jdbc source requires 'url' to be set." + + " (ie. df.option(\"url\", \"ACTUAL_URL\")") + require(parameters.isDefinedAt("dbtable"), "Saving jdbc source requires 'dbtable' to be set." + + " (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")") + val url = parameters("url") + val table = parameters("dbtable") import collection.JavaConverters._ val props = new Properties() @@ -89,12 +90,14 @@ class JdbcRelationProvider extends CreatableRelationProvider val (doCreate, doSave) = (mode, tableExists) match { case (SaveMode.Ignore, true) => (false, false) - case (SaveMode.ErrorIfExists, true) => sys.error(s"Table $table already exists.") + case (SaveMode.ErrorIfExists, true) => throw new SQLException( + s"Table $table already exists, and SaveMode is set to ErrorIfExists.") case (SaveMode.Overwrite, true) => JdbcUtils.dropTable(conn, table) (true, true) case (SaveMode.Append, true) => (false, true) - case (_, true) => sys.error(s"Unexpected SaveMode, '$mode', for handling existing tables.") + case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," + + " for handling existing tables.") case (_, false) => (true, true) } From c8e414388254b7a29b906b68017d169ec61c581a Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 1 Sep 2016 15:31:34 -0400 Subject: [PATCH 07/23] Remove the local import It seems that since my original commit there is other usage at the top anyway --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a2e01ffaccdf..b7843b06899a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -396,7 +396,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { - import scala.collection.JavaConverters._ assertNotPartitioned("jdbc") assertNotBucketed("jdbc") // connectionProperties should override settings in extraOptions From e8c2d7d3e65e3f00fca130396a519518c626028d Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Wed, 7 Sep 2016 22:16:49 -0400 Subject: [PATCH 08/23] Fix whitespace --- .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 07ce4bc750c5..4cef50f9f55c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -46,9 +46,9 @@ class JDBCOptions( // the number of partitions val numPartitions = parameters.getOrElse("numPartitions", null) - require(partitionColumn == null || + require(partitionColumn == null || (partitionColumn != null && lowerBound != null && upperBound != null && numPartitions != null), - "If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," + + "If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," + " and 'numPartitions' are required.") // ------------------------------------------------------------ From ae6ad8b729828f0f0e355083a7640cb8661f442b Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 8 Sep 2016 00:41:06 -0400 Subject: [PATCH 09/23] Removed SchemaRelationProvider --- .../datasources/jdbc/JDBCRelation.scala | 7 ++----- .../datasources/jdbc/JdbcRelationProvider.scala | 17 ++++------------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index e37bc7653365..11613dd912ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -105,8 +105,7 @@ private[sql] case class JDBCRelation( url: String, table: String, parts: Array[Partition], - properties: Properties = new Properties(), - providedSchemaOption: Option[StructType] = None)(@transient val sparkSession: SparkSession) + properties: Properties = new Properties())(@transient val sparkSession: SparkSession) extends BaseRelation with PrunedFilteredScan with InsertableRelation { @@ -115,9 +114,7 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false - override val schema: StructType = { - providedSchemaOption.getOrElse(JDBCRDD.resolveTable(url, table, properties)) - } + override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) // Check if JDBCRDD.compileFilter can accept input filters override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 8e62c4784e7d..304e8c3e6d92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -21,25 +21,17 @@ import java.sql.SQLException import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} -import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} import org.apache.spark.sql.types.StructType class JdbcRelationProvider extends CreatableRelationProvider - with SchemaRelationProvider with RelationProvider with DataSourceRegister { + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - createRelation(sqlContext, parameters, null) - } - - /** Returns a new base relation with the given parameters. */ - override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String], - schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) val partitionColumn = jdbcOptions.partitionColumn val lowerBound = jdbcOptions.lowerBound @@ -54,8 +46,7 @@ class JdbcRelationProvider extends CreatableRelationProvider val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) - JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, - Option(schema))(sqlContext.sparkSession) + JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } /* @@ -124,6 +115,6 @@ class JdbcRelationProvider extends CreatableRelationProvider conn.close() } - createRelation(sqlContext, parameters, data.schema) + createRelation(sqlContext, parameters) } } From c387c17b55541758b5a2e7bcde4ec5a7d8b9d82d Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 8 Sep 2016 00:52:00 -0400 Subject: [PATCH 10/23] Add forgotten stashed test --- .../spark/sql/jdbc/JDBCWriteSuite.scala | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 444bfb23499c..cfbb270157b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -220,4 +220,63 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert( 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) } + + test("save API with SaveMode.Overwrite") { + import scala.collection.JavaConverters._ + + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + + df.write.format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") + .options(properties.asScala) + .save() + df2.write.mode(SaveMode.Overwrite).format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") + .options(properties.asScala) + .save() + assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) + assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + } + + test("save errors if url is not specified") { + import scala.collection.JavaConverters._ + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + var e = intercept[RuntimeException] { + df.write.format("jdbc") + .option("dbtable", "TEST.TRUNCATETEST") + .options(properties.asScala) + .save() + }.getMessage + assert(e.contains("Option 'url' is required")) + } + + test("save errors if dbtable is not specified") { + import scala.collection.JavaConverters._ + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + var e = intercept[RuntimeException] { + df.write.format("jdbc") + .option("url", url1) + .options(properties.asScala) + .save() + }.getMessage + assert(e.contains("Option 'dbtable' is required")) + } + + test("save errors if wrong user/password combination") { + import scala.collection.JavaConverters._ + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + var e = intercept[org.h2.jdbc.JdbcSQLException] { + df.write.format("jdbc") + .option("dbtable", "TEST.TRUNCATETEST") + .option("url", url1) + .save() + }.getMessage + assert(e.contains("Wrong user name or password")) + } } From 57ac87eff052650fb290ad6b05d5cc5f9093fad4 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 8 Sep 2016 00:57:05 -0400 Subject: [PATCH 11/23] Address comments --- .../execution/datasources/jdbc/JdbcRelationProvider.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 304e8c3e6d92..598b36e3e08c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -38,7 +38,7 @@ class JdbcRelationProvider extends CreatableRelationProvider val upperBound = jdbcOptions.upperBound val numPartitions = jdbcOptions.numPartitions - val partitionInfo = if (partitionColumn == null) null + val partitionInfo = if (partitionColumn == null) { null } else { JDBCPartitioningInfo( partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) @@ -66,11 +66,12 @@ class JdbcRelationProvider extends CreatableRelationProvider mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { + import collection.JavaConverters._ + val jdbcOptions = new JDBCOptions(parameters) val url = jdbcOptions.url val table = jdbcOptions.table - import collection.JavaConverters._ val props = new Properties() props.putAll(parameters.asJava) val conn = JdbcUtils.createConnectionFactory(url, props)() From de537340538c7e18257a158be986de02d384b1e3 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 8 Sep 2016 01:35:44 -0400 Subject: [PATCH 12/23] Address more comments --- .../datasources/jdbc/JdbcRelationProvider.scala | 15 ++++++++------- .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 13 +++++-------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 598b36e3e08c..9df7ed325583 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.sql.SQLException import java.util.Properties +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} import org.apache.spark.sql.types.StructType @@ -38,8 +40,9 @@ class JdbcRelationProvider extends CreatableRelationProvider val upperBound = jdbcOptions.upperBound val numPartitions = jdbcOptions.numPartitions - val partitionInfo = if (partitionColumn == null) { null } - else { + val partitionInfo = if (partitionColumn == null) { + null + } else { JDBCPartitioningInfo( partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } @@ -66,8 +69,6 @@ class JdbcRelationProvider extends CreatableRelationProvider mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { - import collection.JavaConverters._ - val jdbcOptions = new JDBCOptions(parameters) val url = jdbcOptions.url val table = jdbcOptions.table @@ -81,7 +82,7 @@ class JdbcRelationProvider extends CreatableRelationProvider val (doCreate, doSave) = (mode, tableExists) match { case (SaveMode.Ignore, true) => (false, false) - case (SaveMode.ErrorIfExists, true) => throw new SQLException( + case (SaveMode.ErrorIfExists, true) => throw new TableAlreadyExistsException( s"Table $table already exists, and SaveMode is set to ErrorIfExists.") case (SaveMode.Overwrite, true) => if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) { @@ -97,7 +98,7 @@ class JdbcRelationProvider extends CreatableRelationProvider case (_, false) => (true, true) } - if(doCreate) { + if (doCreate) { val schema = JdbcUtils.schemaString(data, url) // To allow certain options to append when create a new table, which can be // table_options or partition_options. @@ -111,7 +112,7 @@ class JdbcRelationProvider extends CreatableRelationProvider statement.close() } } - if(doSave) JdbcUtils.saveTable(data, url, table, props) + if (doSave) JdbcUtils.saveTable(data, url, table, props) } finally { conn.close() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index cfbb270157b9..799682aed3b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.jdbc import java.sql.DriverManager import java.util.Properties +import scala.collection.JavaConverters.propertiesAsScalaMapConverter + import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException @@ -222,8 +224,6 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { } test("save API with SaveMode.Overwrite") { - import scala.collection.JavaConverters._ - val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) @@ -242,10 +242,9 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { } test("save errors if url is not specified") { - import scala.collection.JavaConverters._ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - var e = intercept[RuntimeException] { + val e = intercept[RuntimeException] { df.write.format("jdbc") .option("dbtable", "TEST.TRUNCATETEST") .options(properties.asScala) @@ -255,10 +254,9 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { } test("save errors if dbtable is not specified") { - import scala.collection.JavaConverters._ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - var e = intercept[RuntimeException] { + val e = intercept[RuntimeException] { df.write.format("jdbc") .option("url", url1) .options(properties.asScala) @@ -268,10 +266,9 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { } test("save errors if wrong user/password combination") { - import scala.collection.JavaConverters._ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - var e = intercept[org.h2.jdbc.JdbcSQLException] { + val e = intercept[org.h2.jdbc.JdbcSQLException] { df.write.format("jdbc") .option("dbtable", "TEST.TRUNCATETEST") .option("url", url1) From 379e00f526fe1eda8e628f8e8db2f9f5b7b2fc50 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 8 Sep 2016 01:50:56 -0400 Subject: [PATCH 13/23] More insanity --- .../jdbc/JdbcRelationProvider.scala | 7 ++-- .../spark/sql/jdbc/JDBCWriteSuite.scala | 32 +++++++++++++------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 9df7ed325583..4138264743c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} -import org.apache.spark.sql.types.StructType class JdbcRelationProvider extends CreatableRelationProvider with RelationProvider with DataSourceRegister { @@ -40,8 +39,8 @@ class JdbcRelationProvider extends CreatableRelationProvider val upperBound = jdbcOptions.upperBound val numPartitions = jdbcOptions.numPartitions - val partitionInfo = if (partitionColumn == null) { - null + val partitionInfo = if (partitionColumn == null) { + null } else { JDBCPartitioningInfo( partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) @@ -82,7 +81,7 @@ class JdbcRelationProvider extends CreatableRelationProvider val (doCreate, doSave) = (mode, tableExists) match { case (SaveMode.Ignore, true) => (false, false) - case (SaveMode.ErrorIfExists, true) => throw new TableAlreadyExistsException( + case (SaveMode.ErrorIfExists, true) => throw new SQLException( s"Table $table already exists, and SaveMode is set to ErrorIfExists.") case (SaveMode.Overwrite, true) => if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 799682aed3b3..0df1b370e27e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -215,12 +215,12 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) df.write.format("jdbc") - .options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST")) + .options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST")) .save - assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) + assert(2 === sqlContext.read.jdbc(url, "TEST.SAVETEST", new Properties).count) assert( - 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + 2 === sqlContext.read.jdbc(url, "TEST.SAVETEST", new Properties).collect()(0).length) } test("save API with SaveMode.Overwrite") { @@ -229,16 +229,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { df.write.format("jdbc") .option("url", url1) - .option("dbtable", "TEST.TRUNCATETEST") + .option("dbtable", "TEST.SAVETEST") .options(properties.asScala) .save() df2.write.mode(SaveMode.Overwrite).format("jdbc") .option("url", url1) - .option("dbtable", "TEST.TRUNCATETEST") + .option("dbtable", "TEST.SAVETEST") .options(properties.asScala) .save() - assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) - assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + assert(1 === spark.read.jdbc(url1, "TEST.SAVETEST", properties).count()) + assert(2 === spark.read.jdbc(url1, "TEST.SAVETEST", properties).collect()(0).length) } test("save errors if url is not specified") { @@ -246,7 +246,7 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val e = intercept[RuntimeException] { df.write.format("jdbc") - .option("dbtable", "TEST.TRUNCATETEST") + .option("dbtable", "TEST.SAVETEST") .options(properties.asScala) .save() }.getMessage @@ -270,10 +270,24 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val e = intercept[org.h2.jdbc.JdbcSQLException] { df.write.format("jdbc") - .option("dbtable", "TEST.TRUNCATETEST") + .option("dbtable", "TEST.SAVETEST") .option("url", url1) .save() }.getMessage assert(e.contains("Wrong user name or password")) } + + test("save errors if partitionColumn and numPartitions and bounds not set") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + val e = intercept[java.lang.IllegalArgumentException] { + df.write.format("jdbc") + .option("dbtable", "TEST.SAVETEST") + .option("url", url1) + .option("partitionColumn", "foo") + .save() + }.getMessage + assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," + + " and 'numPartitions' are required.")) + } } From ea9d2fea9b34678859eb5f17f9636699d2c85364 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 8 Sep 2016 01:59:58 -0400 Subject: [PATCH 14/23] Analysis Exception --- .../execution/datasources/jdbc/JdbcRelationProvider.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 4138264743c2..ae04af2479c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.SQLException import java.util.Properties import scala.collection.JavaConverters.mapAsJavaMapConverter -import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} class JdbcRelationProvider extends CreatableRelationProvider @@ -81,8 +80,8 @@ class JdbcRelationProvider extends CreatableRelationProvider val (doCreate, doSave) = (mode, tableExists) match { case (SaveMode.Ignore, true) => (false, false) - case (SaveMode.ErrorIfExists, true) => throw new SQLException( - s"Table $table already exists, and SaveMode is set to ErrorIfExists.") + case (SaveMode.ErrorIfExists, true) => throw new AnalysisException( + s"Table or view '$table' already exists, and SaveMode is set to ErrorIfExists.") case (SaveMode.Overwrite, true) => if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) { JdbcUtils.truncateTable(conn, table) From c686b0e694406771e24cc3a0f93a3327eac3123e Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 8 Sep 2016 02:05:01 -0400 Subject: [PATCH 15/23] Simplify require --- .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 4cef50f9f55c..bcf65e53afa7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -47,7 +47,7 @@ class JDBCOptions( val numPartitions = parameters.getOrElse("numPartitions", null) require(partitionColumn == null || - (partitionColumn != null && lowerBound != null && upperBound != null && numPartitions != null), + (lowerBound != null && upperBound != null && numPartitions != null), "If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," + " and 'numPartitions' are required.") From 7ef7a489b27fa6bd5d79ee4d428874162fd813de Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Mon, 12 Sep 2016 15:25:06 -0400 Subject: [PATCH 16/23] Documentation --- .../examples/sql/SQLDataSourceExample.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index dc3915a4882b..66f7cb1b53f4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.examples.sql +import java.util.Properties + import org.apache.spark.sql.SparkSession object SQLDataSourceExample { @@ -148,6 +150,8 @@ object SQLDataSourceExample { private def runJdbcDatasetExample(spark: SparkSession): Unit = { // $example on:jdbc_dataset$ + // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods + // Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") @@ -155,6 +159,24 @@ object SQLDataSourceExample { .option("user", "username") .option("password", "password") .load() + + val connectionProperties = new Properties() + connectionProperties.put("user", "username") + connectionProperties.put("password", "password") + val jdbcDF2 = spark.read + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) + + // Saving data to a JDBC source + jdbcDF.write + .format("jdbc") + .option("url", "jdbc:postgresql:dbserver") + .option("dbtable", "schema.tablename") + .option("user", "username") + .option("password", "password") + .save() + + jdbcDF2.write + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // $example off:jdbc_dataset$ } } From c9dcdc4058c47a82c3dfa99b6e10aa034981c700 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 15 Sep 2016 14:08:13 -0400 Subject: [PATCH 17/23] Documentation for java and python --- .../sql/JavaSQLDataSourceExample.java | 20 +++++++++++++++++++ examples/src/main/python/sql/datasource.py | 19 ++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index f9087e059385..598544b5bd30 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -235,6 +235,8 @@ private static void runJsonDatasetExample(SparkSession spark) { private static void runJdbcDatasetExample(SparkSession spark) { // $example on:jdbc_dataset$ + // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods + // Loading data from a JDBC source Dataset jdbcDF = spark.read() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") @@ -242,6 +244,24 @@ private static void runJdbcDatasetExample(SparkSession spark) { .option("user", "username") .option("password", "password") .load(); + + Properties connectionProperties = new Properties() + connectionProperties.put("user", "username") + connectionProperties.put("password", "password") + Dataset jdbcDF2 = spark.read + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) + + // Saving data to a JDBC source + jdbcDF.write + .format("jdbc") + .option("url", "jdbc:postgresql:dbserver") + .option("dbtable", "schema.tablename") + .option("user", "username") + .option("password", "password") + .save() + + jdbcDF2.write + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // $example off:jdbc_dataset$ } } diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index b36c901d2b40..e9aa9d9ac258 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -143,6 +143,8 @@ def json_dataset_example(spark): def jdbc_dataset_example(spark): # $example on:jdbc_dataset$ + # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods + # Loading data from a JDBC source jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ @@ -150,6 +152,23 @@ def jdbc_dataset_example(spark): .option("user", "username") \ .option("password", "password") \ .load() + + jdbcDF2 = spark.read \ + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", + properties={"user": "username", "password": "password"}) + + # Saving data to a JDBC source + jdbcDF.write \ + .format("jdbc") \ + .option("url", "jdbc:postgresql:dbserver") \ + .option("dbtable", "schema.tablename") \ + .option("user", "username") \ + .option("password", "password") \ + .save() + + jdbcDF2.write \ + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", + properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$ From 447ab821deeb0500a78080c7d7597705664e3240 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 15 Sep 2016 14:40:29 -0400 Subject: [PATCH 18/23] Add semicolons to my java --- .../spark/examples/sql/JavaSQLDataSourceExample.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 598544b5bd30..e60d99ed57ff 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -245,11 +245,11 @@ private static void runJdbcDatasetExample(SparkSession spark) { .option("password", "password") .load(); - Properties connectionProperties = new Properties() - connectionProperties.put("user", "username") - connectionProperties.put("password", "password") + Properties connectionProperties = new Properties(); + connectionProperties.put("user", "username"); + connectionProperties.put("password", "password"); Dataset jdbcDF2 = spark.read - .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Saving data to a JDBC source jdbcDF.write @@ -258,10 +258,10 @@ private static void runJdbcDatasetExample(SparkSession spark) { .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") - .save() + .save(); jdbcDF2.write - .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) + .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // $example off:jdbc_dataset$ } } From 4a02c8249ff224f4cbae7050e99f77d31601f4af Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 15 Sep 2016 15:07:12 -0400 Subject: [PATCH 19/23] Add import --- .../org/apache/spark/examples/sql/JavaSQLDataSourceExample.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index e60d99ed57ff..8c377500e064 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -23,6 +23,8 @@ import java.util.List; // $example off:schema_merging$ +import java.util.Properties + // $example on:basic_parquet_example$ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; From a2381567ebdf9ea8c15824da8e2c57561a6f4a66 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 15 Sep 2016 16:53:06 -0400 Subject: [PATCH 20/23] Fixed up java and tested past it on my end --- .../spark/examples/sql/JavaSQLDataSourceExample.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 8c377500e064..60ff16954594 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -23,7 +23,7 @@ import java.util.List; // $example off:schema_merging$ -import java.util.Properties +import java.util.Properties; // $example on:basic_parquet_example$ import org.apache.spark.api.java.JavaRDD; @@ -250,11 +250,11 @@ private static void runJdbcDatasetExample(SparkSession spark) { Properties connectionProperties = new Properties(); connectionProperties.put("user", "username"); connectionProperties.put("password", "password"); - Dataset jdbcDF2 = spark.read + Dataset jdbcDF2 = spark.read() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Saving data to a JDBC source - jdbcDF.write + jdbcDF.write() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") @@ -262,7 +262,7 @@ private static void runJdbcDatasetExample(SparkSession spark) { .option("password", "password") .save(); - jdbcDF2.write + jdbcDF2.write() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // $example off:jdbc_dataset$ } From 06c1cba1da5ab140d71c29f41afd608e863bfe1b Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Fri, 23 Sep 2016 23:33:49 -0400 Subject: [PATCH 21/23] R and SQL documentation --- docs/sql-programming-guide.md | 8 ++++++-- .../spark/examples/sql/JavaSQLDataSourceExample.java | 3 +-- examples/src/main/r/RSparkSQLExample.R | 4 ++++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 28cc88c322b7..7e5acab43a98 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1096,13 +1096,17 @@ the Data Sources API. The following options are supported: {% highlight sql %} -CREATE TEMPORARY VIEW jdbcTable +CREATE TEMPORARY TABLE jdbcTable USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:postgresql:dbserver", - dbtable "schema.tablename" + dbtable "schema.tablename", + user 'username', + password 'password' ) +INSERT INTO TABLE jdbcTable +SELECT * FROM resultTable {% endhighlight %} diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 60ff16954594..3f4025f5da6e 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -21,9 +21,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -// $example off:schema_merging$ - import java.util.Properties; +// $example off:schema_merging$ // $example on:basic_parquet_example$ import org.apache.spark.api.java.JavaRDD; diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R index 4e0267a03851..373a36dba14f 100644 --- a/examples/src/main/r/RSparkSQLExample.R +++ b/examples/src/main/r/RSparkSQLExample.R @@ -204,7 +204,11 @@ results <- collect(sql("FROM src SELECT key, value")) # $example on:jdbc_dataset$ +# Loading data from a JDBC source df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password") + +# Saving data to a JDBC source +write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password") # $example off:jdbc_dataset$ # Stop the SparkSession now From 8fb86b482929e321f4ec8865124b8661f1a29bbf Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Sat, 24 Sep 2016 00:11:53 -0400 Subject: [PATCH 22/23] Move import back --- .../org/apache/spark/examples/sql/JavaSQLDataSourceExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 3f4025f5da6e..1860594e8e54 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -21,8 +21,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Properties; // $example off:schema_merging$ +import java.util.Properties; // $example on:basic_parquet_example$ import org.apache.spark.api.java.JavaRDD; From 724bbe22b23050f3bdbf6d1bf14d4dabc52113b2 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Sun, 25 Sep 2016 23:45:26 -0400 Subject: [PATCH 23/23] Address comments --- docs/sql-programming-guide.md | 2 +- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 7e5acab43a98..1d10da12d79c 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1096,7 +1096,7 @@ the Data Sources API. The following options are supported: {% highlight sql %} -CREATE TEMPORARY TABLE jdbcTable +CREATE TEMPORARY VIEW jdbcTable USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:postgresql:dbserver", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 0ae445a5b5ba..eefafbd31dc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -424,7 +424,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { this.extraOptions = this.extraOptions ++ (connectionProperties.asScala) // explicit url and dbtable should override all this.extraOptions += ("url" -> url, "dbtable" -> table) - format("jdbc").save + format("jdbc").save() } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 0df1b370e27e..506971362f86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -216,7 +216,7 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { df.write.format("jdbc") .options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST")) - .save + .save() assert(2 === sqlContext.read.jdbc(url, "TEST.SAVETEST", new Properties).count) assert(