Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql

import java.util.Locale
import java.util.{Locale, Properties}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -228,7 +228,9 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {

// Set path or table
f(builder)
require(builder.hasPath != builder.hasTable) // Only one can be set

// Cannot both be set
require(!(builder.hasPath && builder.hasTable))

builder.setMode(mode match {
case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
Expand Down Expand Up @@ -345,6 +347,37 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
})
}

/**
* Saves the content of the `DataFrame` to an external database table via JDBC. In the case the
* table already exists in the external database, behavior of this function depends on the save
* mode, specified by the `mode` function (default to throwing an exception).
*
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* JDBC-specific option and parameter documentation for storing tables via JDBC in <a
* href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
* Data Source Option</a> in the version you use.
*
* @param table
* Name of the table in the external database.
* @param connectionProperties
* JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least
* a "user" and "password" property should be included. "batchsize" can be used to control the
* number of rows per insert. "isolationLevel" can be one of "NONE", "READ_COMMITTED",
* "READ_UNCOMMITTED", "REPEATABLE_READ", or "SERIALIZABLE", corresponding to standard
* transaction isolation levels defined by JDBC's Connection object, with default of
* "READ_UNCOMMITTED".
* @since 3.4.0
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
// connectionProperties should override settings in extraOptions.
this.extraOptions ++= connectionProperties.asScala
// explicit url and dbtable should override all
this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
format("jdbc").save()
}

/**
* Saves the content of the `DataFrame` in JSON format (<a href="http://jsonlines.org/"> JSON
* Lines text format or newline-delimited JSON</a>) at the specified path. This is equivalent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.file.Files
import scala.collection.JavaConverters._

import io.grpc.StatusRuntimeException
import java.util.Properties
import org.apache.commons.io.FileUtils
import org.apache.commons.io.output.TeeOutputStream
import org.scalactic.TolerantNumerics
Expand Down Expand Up @@ -175,6 +176,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
}
}

test("write without table or path") {
// Should receive no error to write noop
spark.range(10).write.format("noop").mode("append").save()
}

test("write jdbc") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to break branch-3.4 somehow. I'm checking it now.

[info] - write jdbc *** FAILED *** (527 milliseconds)
[info]   io.grpc.StatusRuntimeException: INTERNAL: No suitable driver
[info]   at io.grpc.Status.asRuntimeException(Status.java:535)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun I checked branch-3.4 locally and I can run the following without error:

build/sbt -Phive -Pconnect package
build/sbt "connect-client-jvm/test"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To @zhenlineo , I reproduced the error locally in this way on branch-3.4 while the same command works in master.

$ build/sbt -Phive -Phadoop-3 assembly/package "protobuf/test" "connect-common/test" "connect/test" "connect-client-jvm/test"
...
[info] ClientE2ETestSuite:
[info] - spark result schema (319 milliseconds)
[info] - spark result array (350 milliseconds)
[info] - eager execution of sql (18 seconds, 3 milliseconds)
[info] - simple dataset (1 second, 194 milliseconds)
[info] - SPARK-42665: Ignore simple udf test until the udf is fully implemented. !!! IGNORED !!!
[info] - read and write (1 second, 32 milliseconds)
[info] - read path collision (32 milliseconds)
[info] - write table (5 seconds, 349 milliseconds)
[info] - write without table or path (170 milliseconds)
[info] - write jdbc *** FAILED *** (325 milliseconds)
[info]   io.grpc.StatusRuntimeException: INTERNAL: No suitable driver

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, please see the CI. It's broken consistently.

Screenshot 2023-03-14 at 10 26 22 AM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Be free to revert it in 3.4 and I will take a better look late today or tomorrow. Thanks. I can send a PR target at 3.4 directly. Or whatever is the easiest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @zhenlineo . Let me check more.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my side, #40358 (comment) also failed.

$ build/sbt -Phive -Pconnect package
$ build/sbt "connect-client-jvm/test"
...
[info] ClientE2ETestSuite:
[info] - spark result schema (290 milliseconds)
[info] - spark result array (290 milliseconds)
[info] - eager execution of sql (15 seconds, 819 milliseconds)
[info] - simple dataset (1 second, 28 milliseconds)
[info] - SPARK-42665: Ignore simple udf test until the udf is fully implemented. !!! IGNORED !!!
[info] - read and write (929 milliseconds)
[info] - read path collision (31 milliseconds)
[info] - write table (4 seconds, 540 milliseconds)
[info] - write without table or path (348 milliseconds)
[info] - write jdbc *** FAILED *** (365 milliseconds)
[info]   io.grpc.StatusRuntimeException: INTERNAL: No suitable driver
...

In this case, the usual suspect is Java. GitHub Action CI and I'm using Java 8.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun I saw the 3.4 build went back to green. Is this bf9c4b9 the fix? Is there still a problem that I shall fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Aha, I saw the fix ab7c4f8. Thanks a lot!

Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's resolved for now. Thank you for checking, @zhenlineo !

val url = "jdbc:derby:memory:1234"
val table = "t1"
try {
spark.range(10).write.jdbc(url = s"$url;create=true", table, new Properties())
val result = spark.read.jdbc(url = url, table, new Properties()).collect()
assert(result.length == 10)
} finally {
// clean up
assertThrows[StatusRuntimeException] {
spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect()
}
}
}

test("writeTo with create and using") {
// TODO (SPARK-42519): Add more test after we can set configs. See more WriteTo test cases
// in SparkConnectProtoSuite.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong

import io.grpc.Server
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import java.util.Properties
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.connect.proto
Expand Down Expand Up @@ -100,6 +101,33 @@ class DatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
assert(actualPlan.equals(expectedPlan))
}

test("write jdbc") {
val df = ss.newDataFrame(_ => ()).limit(10)

val builder = proto.WriteOperation.newBuilder()
builder
.setInput(df.plan.getRoot)
.setMode(proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS)
.setSource("jdbc")
.putOptions("a", "b")
.putOptions("1", "2")
.putOptions("url", "url")
.putOptions("dbtable", "table")

val expectedPlan = proto.Plan
.newBuilder()
.setCommand(proto.Command.newBuilder().setWriteOperation(builder))
.build()

val connectionProperties = new Properties
connectionProperties.put("a", "b")
connectionProperties.put("1", "2")
df.write.jdbc("url", "table", connectionProperties)

val actualPlan = service.getAndClearLatestInputPlan()
assert(actualPlan.equals(expectedPlan))
}

test("write V2") {
val df = ss.newDataFrame(_ => ()).limit(10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),

// DataFrame Reader & Writer
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), // deprecated

// DataFrameNaFunctions
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.this"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._

import com.google.protobuf.ByteString

import org.apache.spark.SparkClassNotFoundException
import org.apache.spark.{SparkClassNotFoundException, SparkIllegalArgumentException}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.Expression
import org.apache.spark.connect.proto.Join.JoinType
Expand Down Expand Up @@ -554,13 +554,16 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
parameters = Map("columnName" -> "`duplicatedcol`"))
}

// TODO(SPARK-42733): Writes without path or table should work.
ignore("Writes fails without path or table") {
assertThrows[UnsupportedOperationException] {
test("Writes fails without path or table") {
assertThrows[SparkIllegalArgumentException] {
transform(localRelation.write())
}
}

test("Writes without path or table") {
transform(localRelation.write(format = Some("noop"), mode = Some("Append")))
}

test("Write fails with unknown table - AnalysisException") {
val cmd = readRel.write(tableName = Some("dest"))
assertThrows[AnalysisException] {
Expand Down