-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-42690][CONNECT] Implement CSV/JSON parsing functions for Scala client #40332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
2aed104
aadfe3f
8dbf090
70406c0
14ce16c
7805fe3
0f77457
7b9ce61
c9f8522
dca6f60
3734ee9
20f1722
ca6ec7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -27,6 +27,9 @@ import org.apache.commons.io.output.TeeOutputStream | |||||||||||||||||||||||||||||
| import org.scalactic.TolerantNumerics | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import org.apache.spark.SPARK_VERSION | ||||||||||||||||||||||||||||||
| import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder | ||||||||||||||||||||||||||||||
| import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema | ||||||||||||||||||||||||||||||
| import org.apache.spark.sql.catalyst.parser.ParseException | ||||||||||||||||||||||||||||||
| import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession} | ||||||||||||||||||||||||||||||
| import org.apache.spark.sql.functions.{aggregate, array, broadcast, col, count, lit, rand, sequence, shuffle, struct, transform, udf} | ||||||||||||||||||||||||||||||
| import org.apache.spark.sql.types._ | ||||||||||||||||||||||||||||||
|
|
@@ -644,6 +647,67 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { | |||||||||||||||||||||||||||||
| .collect() | ||||||||||||||||||||||||||||||
| assert(result sameElements expected) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| test("json from Dataset[String] inferSchema") { | ||||||||||||||||||||||||||||||
| val session = spark | ||||||||||||||||||||||||||||||
| import session.implicits._ | ||||||||||||||||||||||||||||||
| val expected = Seq( | ||||||||||||||||||||||||||||||
| new GenericRowWithSchema( | ||||||||||||||||||||||||||||||
| Array(73, "Shandong", "Kong"), | ||||||||||||||||||||||||||||||
| new StructType().add("age", LongType).add("city", StringType).add("name", StringType))) | ||||||||||||||||||||||||||||||
| val ds = Seq("""{"name":"Kong","age":73,"city":'Shandong'}""").toDS() | ||||||||||||||||||||||||||||||
| val result = spark.read.option("allowSingleQuotes", "true").json(ds) | ||||||||||||||||||||||||||||||
| checkSameResult(expected, result) | ||||||||||||||||||||||||||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spark/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala Lines 404 to 417 in 69dd20b
From the code of the server side,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense, you are right
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks ~
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably we should add the user provided schema in the message? Or always discard it?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will inferFromDataset trigger an job? If so, I think we’d better skip it if possible
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think you are right, we should add |
||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| test("json from Dataset[String] with schema") { | ||||||||||||||||||||||||||||||
| val session = spark | ||||||||||||||||||||||||||||||
| import session.implicits._ | ||||||||||||||||||||||||||||||
| val schema = new StructType().add("city", StringType).add("name", StringType) | ||||||||||||||||||||||||||||||
| val expected = Seq(new GenericRowWithSchema(Array("Shandong", "Kong"), schema)) | ||||||||||||||||||||||||||||||
| val ds = Seq("""{"name":"Kong","age":73,"city":'Shandong'}""").toDS() | ||||||||||||||||||||||||||||||
| val result = spark.read.schema(schema).option("allowSingleQuotes", "true").json(ds) | ||||||||||||||||||||||||||||||
| checkSameResult(expected, result) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| test("json from Dataset[String] with invalid schema") { | ||||||||||||||||||||||||||||||
| val message = intercept[ParseException] { | ||||||||||||||||||||||||||||||
| spark.read.schema("123").json(spark.createDataset(Seq.empty[String])(StringEncoder)) | ||||||||||||||||||||||||||||||
| }.getMessage | ||||||||||||||||||||||||||||||
| assert(message.contains("PARSE_SYNTAX_ERROR")) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| test("csv from Dataset[String] inferSchema") { | ||||||||||||||||||||||||||||||
| val session = spark | ||||||||||||||||||||||||||||||
| import session.implicits._ | ||||||||||||||||||||||||||||||
| val expected = Seq( | ||||||||||||||||||||||||||||||
| new GenericRowWithSchema( | ||||||||||||||||||||||||||||||
| Array("Meng", 84, "Shandong"), | ||||||||||||||||||||||||||||||
| new StructType().add("name", StringType).add("age", LongType).add("city", StringType))) | ||||||||||||||||||||||||||||||
| val ds = Seq("name,age,city", """"Meng",84,"Shandong"""").toDS() | ||||||||||||||||||||||||||||||
| val result = spark.read | ||||||||||||||||||||||||||||||
| .option("header", "true") | ||||||||||||||||||||||||||||||
| .option("inferSchema", "true") | ||||||||||||||||||||||||||||||
| .csv(ds) | ||||||||||||||||||||||||||||||
| checkSameResult(expected, result) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| test("csv from Dataset[String] with schema") { | ||||||||||||||||||||||||||||||
| val session = spark | ||||||||||||||||||||||||||||||
| import session.implicits._ | ||||||||||||||||||||||||||||||
| val schema = new StructType().add("name", StringType).add("age", LongType) | ||||||||||||||||||||||||||||||
| val expected = Seq(new GenericRowWithSchema(Array("Meng", 84), schema)) | ||||||||||||||||||||||||||||||
| val ds = Seq(""""Meng",84,"Shandong"""").toDS() | ||||||||||||||||||||||||||||||
| val result = spark.read.schema(schema).csv(ds) | ||||||||||||||||||||||||||||||
| checkSameResult(expected, result) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| test("csv from Dataset[String] with invalid schema") { | ||||||||||||||||||||||||||||||
| val message = intercept[ParseException] { | ||||||||||||||||||||||||||||||
| spark.read.schema("123").csv(spark.createDataset(Seq.empty[String])(StringEncoder)) | ||||||||||||||||||||||||||||||
| }.getMessage | ||||||||||||||||||||||||||||||
| assert(message.contains("PARSE_SYNTAX_ERROR")) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| private[sql] case class MyType(id: Long, a: Double, b: Double) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,7 @@ message Relation { | |
| RepartitionByExpression repartition_by_expression = 27; | ||
| FrameMap frame_map = 28; | ||
| CollectMetrics collect_metrics = 29; | ||
| Parse parse = 30; | ||
|
|
||
| // NA functions | ||
| NAFill fill_na = 90; | ||
|
|
@@ -798,3 +799,23 @@ message CollectMetrics { | |
| // (Required) The metric sequence. | ||
| repeated Expression metrics = 3; | ||
| } | ||
|
|
||
| message Parse { | ||
| // (Required) Input relation to Parse. The input is expected to have single text column. | ||
| Relation input = 1; | ||
| // (Required) The expected format of the text. | ||
| ParseFormat format = 2; | ||
|
|
||
| // (Optional) If not set, Spark will infer the schema. | ||
| // | ||
| // This schema string should be either DDL-formatted or JSON-formatted. | ||
| optional string schema = 3; | ||
|
||
|
|
||
| // Options for the csv/json parser. The map key is case insensitive. | ||
| map<string, string> options = 4; | ||
| enum ParseFormat { | ||
| PARSE_FORMAT_UNSPECIFIED = 0; | ||
| PARSE_FORMAT_CSV = 1; | ||
| PARSE_FORMAT_JSON = 2; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| LogicalRDD [c1#0, c2#0], false | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh this makes me sad. We are we using RDDs here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spark/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala Lines 424 to 433 in 39a5512
spark/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala Lines 503 to 521 in 39a5512
spark/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala Lines 560 to 571 in 39a5512
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the server side, the input |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| LogicalRDD [c1#0, c2#0], false |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| { | ||
| "common": { | ||
| "planId": "1" | ||
| }, | ||
| "parse": { | ||
| "input": { | ||
| "common": { | ||
| "planId": "0" | ||
| }, | ||
| "localRelation": { | ||
| "schema": "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}" | ||
| } | ||
| }, | ||
| "format": "PARSE_FORMAT_CSV", | ||
| "schema": "c1 STRING,c2 INT", | ||
| "options": { | ||
| "header": "true" | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| { | ||
| "common": { | ||
| "planId": "1" | ||
| }, | ||
| "parse": { | ||
| "input": { | ||
| "common": { | ||
| "planId": "0" | ||
| }, | ||
| "localRelation": { | ||
| "schema": "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}" | ||
| } | ||
| }, | ||
| "format": "PARSE_FORMAT_JSON", | ||
| "schema": "c1 STRING,c2 INT", | ||
| "options": { | ||
| "allowsinglequotes": "true" | ||
| } | ||
| } | ||
| } |
Large diffs are not rendered by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as to this PR itself, I think we should probably use
DataType schemain the proto message,schema.toDDLalways discards the metadata.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about this? @hvanhovell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
20f1722 change to pass a DataType @zhengruifeng
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ca6ec7b rename
data_typetoschemain proto messageThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this scenario, I think
DataType schemais used andschema.toDDLis no longer needed