From c089e89de43282b404f3533dd6ba9dd6b4dce17f Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Thu, 9 Mar 2023 20:18:35 -0800 Subject: [PATCH 1/3] Write without path or table --- .../apache/spark/sql/DataFrameWriter.scala | 37 ++++++++++++++++++- .../apache/spark/sql/ClientE2ETestSuite.scala | 5 +++ .../org/apache/spark/sql/DatasetSuite.scala | 28 ++++++++++++++ .../CheckConnectJvmClientCompatibility.scala | 4 +- .../planner/SparkConnectProtoSuite.scala | 11 ++++-- 5 files changed, 76 insertions(+), 9 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8434addec920..d7582f0e74eb 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.util.Locale +import java.util.{Locale, Properties} import scala.collection.JavaConverters._ @@ -228,7 +228,9 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { // Set path or table f(builder) - require(builder.hasPath != builder.hasTable) // Only one can be set + + // Cannot both be set, hasPath or hasTable or nothing + require(builder.hasPath || builder.hasTable || (!builder.hasPath && !builder.hasTable)) builder.setMode(mode match { case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND @@ -345,6 +347,37 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { }) } + /** + * Saves the content of the `DataFrame` to an external database table via JDBC. In the case the + * table already exists in the external database, behavior of this function depends on the save + * mode, specified by the `mode` function (default to throwing an exception). + * + * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash + * your external database systems. + * + * JDBC-specific option and parameter documentation for storing tables via JDBC in + * Data Source Option in the version you use. + * + * @param table + * Name of the table in the external database. + * @param connectionProperties + * JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least + * a "user" and "password" property should be included. "batchsize" can be used to control the + * number of rows per insert. "isolationLevel" can be one of "NONE", "READ_COMMITTED", + * "READ_UNCOMMITTED", "REPEATABLE_READ", or "SERIALIZABLE", corresponding to standard + * transaction isolation levels defined by JDBC's Connection object, with default of + * "READ_UNCOMMITTED". + * @since 1.4.0 + */ + def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { + // connectionProperties should override settings in extraOptions. + this.extraOptions ++= connectionProperties.asScala + // explicit url and dbtable should override all + this.extraOptions ++= Seq("url" -> url, "dbtable" -> table) + format("jdbc").save() + } + /** * Saves the content of the `DataFrame` in JSON format ( JSON * Lines text format or newline-delimited JSON) at the specified path. This is equivalent diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 466a51841d42..dc61a27a0934 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -175,6 +175,11 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { } } + test("write without table or path") { + // Should receive no error to write noop + spark.range(10).write.format("noop").mode("append").save() + } + test("writeTo with create and using") { // TODO (SPARK-42519): Add more test after we can set configs. See more WriteTo test cases // in SparkConnectProtoSuite. diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 43b0cd2674cc..a55839110f20 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong import io.grpc.Server import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import java.util.Properties import org.scalatest.BeforeAndAfterEach import org.apache.spark.connect.proto @@ -100,6 +101,33 @@ class DatasetSuite extends ConnectFunSuite with BeforeAndAfterEach { assert(actualPlan.equals(expectedPlan)) } + test("write jdbc") { + val df = ss.newDataFrame(_ => ()).limit(10) + + val builder = proto.WriteOperation.newBuilder() + builder + .setInput(df.plan.getRoot) + .setMode(proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS) + .setSource("jdbc") + .putOptions("a", "b") + .putOptions("1", "2") + .putOptions("url", "url") + .putOptions("dbtable", "table") + + val expectedPlan = proto.Plan + .newBuilder() + .setCommand(proto.Command.newBuilder().setWriteOperation(builder)) + .build() + + val connectionProperties = new Properties + connectionProperties.put("a", "b") + connectionProperties.put("1", "2") + df.write.jdbc("url", "table", connectionProperties) + + val actualPlan = service.getAndClearLatestInputPlan() + assert(actualPlan.equals(expectedPlan)) + } + test("write V2") { val df = ss.newDataFrame(_ => ()).limit(10) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index ae6c6c86fec2..97d130421a24 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -130,9 +130,7 @@ object CheckConnectJvmClientCompatibility { ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"), // DataFrame Reader & Writer - ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), // deprecated // DataFrameNaFunctions ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.this"), diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala index 00ff6ac2fb6c..d619782ac3d8 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import com.google.protobuf.ByteString -import org.apache.spark.SparkClassNotFoundException +import org.apache.spark.{SparkClassNotFoundException, SparkIllegalArgumentException} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.Expression import org.apache.spark.connect.proto.Join.JoinType @@ -554,13 +554,16 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest { parameters = Map("columnName" -> "`duplicatedcol`")) } - // TODO(SPARK-42733): Writes without path or table should work. - ignore("Writes fails without path or table") { - assertThrows[UnsupportedOperationException] { + test("Writes fails without path or table") { + assertThrows[SparkIllegalArgumentException] { transform(localRelation.write()) } } + test("Writes without path or table") { + transform(localRelation.write(format = Some("noop"), mode = Some("Append"))) + } + test("Write fails with unknown table - AnalysisException") { val cmd = readRel.write(tableName = Some("dest")) assertThrows[AnalysisException] { From 982e14334258840921dbc3eb2af60227372c9234 Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Fri, 10 Mar 2023 07:07:59 -0800 Subject: [PATCH 2/3] fix --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d7582f0e74eb..90961e4b1277 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -368,7 +368,7 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { * "READ_UNCOMMITTED", "REPEATABLE_READ", or "SERIALIZABLE", corresponding to standard * transaction isolation levels defined by JDBC's Connection object, with default of * "READ_UNCOMMITTED". - * @since 1.4.0 + * @since 3.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { // connectionProperties should override settings in extraOptions. From b98a3ec326a62c977b7a45fd6ab7afe580b6a79a Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Fri, 10 Mar 2023 10:56:49 -0800 Subject: [PATCH 3/3] fix --- .../org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- .../apache/spark/sql/ClientE2ETestSuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 90961e4b1277..b9d1fefb105e 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -229,8 +229,8 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { // Set path or table f(builder) - // Cannot both be set, hasPath or hasTable or nothing - require(builder.hasPath || builder.hasTable || (!builder.hasPath && !builder.hasTable)) + // Cannot both be set + require(!(builder.hasPath && builder.hasTable)) builder.setMode(mode match { case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index dc61a27a0934..8b687c67a206 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -22,6 +22,7 @@ import java.nio.file.Files import scala.collection.JavaConverters._ import io.grpc.StatusRuntimeException +import java.util.Properties import org.apache.commons.io.FileUtils import org.apache.commons.io.output.TeeOutputStream import org.scalactic.TolerantNumerics @@ -180,6 +181,21 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { spark.range(10).write.format("noop").mode("append").save() } + test("write jdbc") { + val url = "jdbc:derby:memory:1234" + val table = "t1" + try { + spark.range(10).write.jdbc(url = s"$url;create=true", table, new Properties()) + val result = spark.read.jdbc(url = url, table, new Properties()).collect() + assert(result.length == 10) + } finally { + // clean up + assertThrows[StatusRuntimeException] { + spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect() + } + } + } + test("writeTo with create and using") { // TODO (SPARK-42519): Add more test after we can set configs. See more WriteTo test cases // in SparkConnectProtoSuite.