diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 8434addec920..b9d1fefb105e 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import java.util.Locale
+import java.util.{Locale, Properties}
import scala.collection.JavaConverters._
@@ -228,7 +228,9 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
// Set path or table
f(builder)
- require(builder.hasPath != builder.hasTable) // Only one can be set
+
+ // Cannot both be set
+ require(!(builder.hasPath && builder.hasTable))
builder.setMode(mode match {
case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
@@ -345,6 +347,37 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
})
}
+ /**
+ * Saves the content of the `DataFrame` to an external database table via JDBC. In the case the
+ * table already exists in the external database, behavior of this function depends on the save
+ * mode, specified by the `mode` function (default to throwing an exception).
+ *
+ * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+ * your external database systems.
+ *
+ * JDBC-specific option and parameter documentation for storing tables via JDBC in
+ * Data Source Option in the version you use.
+ *
+ * @param table
+ * Name of the table in the external database.
+ * @param connectionProperties
+ * JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least
+ * a "user" and "password" property should be included. "batchsize" can be used to control the
+ * number of rows per insert. "isolationLevel" can be one of "NONE", "READ_COMMITTED",
+ * "READ_UNCOMMITTED", "REPEATABLE_READ", or "SERIALIZABLE", corresponding to standard
+ * transaction isolation levels defined by JDBC's Connection object, with default of
+ * "READ_UNCOMMITTED".
+ * @since 3.4.0
+ */
+ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
+ // connectionProperties should override settings in extraOptions.
+ this.extraOptions ++= connectionProperties.asScala
+ // explicit url and dbtable should override all
+ this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+ format("jdbc").save()
+ }
+
/**
* Saves the content of the `DataFrame` in JSON format ( JSON
* Lines text format or newline-delimited JSON) at the specified path. This is equivalent
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 466a51841d42..8b687c67a206 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -22,6 +22,7 @@ import java.nio.file.Files
import scala.collection.JavaConverters._
import io.grpc.StatusRuntimeException
+import java.util.Properties
import org.apache.commons.io.FileUtils
import org.apache.commons.io.output.TeeOutputStream
import org.scalactic.TolerantNumerics
@@ -175,6 +176,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
}
}
+ test("write without table or path") {
+ // Should receive no error to write noop
+ spark.range(10).write.format("noop").mode("append").save()
+ }
+
+ test("write jdbc") {
+ val url = "jdbc:derby:memory:1234"
+ val table = "t1"
+ try {
+ spark.range(10).write.jdbc(url = s"$url;create=true", table, new Properties())
+ val result = spark.read.jdbc(url = url, table, new Properties()).collect()
+ assert(result.length == 10)
+ } finally {
+ // clean up
+ assertThrows[StatusRuntimeException] {
+ spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect()
+ }
+ }
+ }
+
test("writeTo with create and using") {
// TODO (SPARK-42519): Add more test after we can set configs. See more WriteTo test cases
// in SparkConnectProtoSuite.
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 43b0cd2674cc..a55839110f20 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
import io.grpc.Server
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import java.util.Properties
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.connect.proto
@@ -100,6 +101,33 @@ class DatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
assert(actualPlan.equals(expectedPlan))
}
+ test("write jdbc") {
+ val df = ss.newDataFrame(_ => ()).limit(10)
+
+ val builder = proto.WriteOperation.newBuilder()
+ builder
+ .setInput(df.plan.getRoot)
+ .setMode(proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS)
+ .setSource("jdbc")
+ .putOptions("a", "b")
+ .putOptions("1", "2")
+ .putOptions("url", "url")
+ .putOptions("dbtable", "table")
+
+ val expectedPlan = proto.Plan
+ .newBuilder()
+ .setCommand(proto.Command.newBuilder().setWriteOperation(builder))
+ .build()
+
+ val connectionProperties = new Properties
+ connectionProperties.put("a", "b")
+ connectionProperties.put("1", "2")
+ df.write.jdbc("url", "table", connectionProperties)
+
+ val actualPlan = service.getAndClearLatestInputPlan()
+ assert(actualPlan.equals(expectedPlan))
+ }
+
test("write V2") {
val df = ss.newDataFrame(_ => ()).limit(10)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index ae6c6c86fec2..97d130421a24 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -130,9 +130,7 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
// DataFrame Reader & Writer
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), // deprecated
// DataFrameNaFunctions
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.this"),
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 00ff6ac2fb6c..d619782ac3d8 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import com.google.protobuf.ByteString
-import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.{SparkClassNotFoundException, SparkIllegalArgumentException}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.Expression
import org.apache.spark.connect.proto.Join.JoinType
@@ -554,13 +554,16 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
parameters = Map("columnName" -> "`duplicatedcol`"))
}
- // TODO(SPARK-42733): Writes without path or table should work.
- ignore("Writes fails without path or table") {
- assertThrows[UnsupportedOperationException] {
+ test("Writes fails without path or table") {
+ assertThrows[SparkIllegalArgumentException] {
transform(localRelation.write())
}
}
+ test("Writes without path or table") {
+ transform(localRelation.write(format = Some("noop"), mode = Some("Append")))
+ }
+
test("Write fails with unknown table - AnalysisException") {
val cmd = readRel.write(tableName = Some("dest"))
assertThrows[AnalysisException] {