Skip to content

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Mar 8, 2023

What changes were proposed in this pull request?

This pr add a new proto message

message Parse {
  // (Required) Input relation to Parse. The input is expected to have single text column.
  Relation input = 1;
  // (Required) The expected format of the text.
  ParseFormat format = 2;

  // (Optional) DataType representing the schema. If not set, Spark will infer the schema.
  optional DataType schema = 3;

  // Options for the csv/json parser. The map key is case insensitive.
  map<string, string> options = 4;
  enum ParseFormat {
    PARSE_FORMAT_UNSPECIFIED = 0;
    PARSE_FORMAT_CSV = 1;
    PARSE_FORMAT_JSON = 2;
  }
}

and implement CSV/JSON parsing functions for Scala client.

Why are the changes needed?

Add Spark connect jvm client api coverage.

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • Pass Github Actions
  • Manual checked Scala 2.13

@github-actions github-actions bot added the PYTHON label Mar 8, 2023
@zhengruifeng
Copy link
Contributor

why not using from_json and from_csv to do this?

@LuciferYang
Copy link
Contributor Author

why not using from_json and from_csv to do this?

How to get the schema?

@zhengruifeng
Copy link
Contributor

not sure whether I’m missing something, but isn’t the schema already provided by users?

new StructType().add("age", LongType).add("city", StringType).add("name", StringType)))
val ds = Seq("""{"name":"Kong","age":73,"city":'Shandong'}""").toDS()
val result = spark.read.option("allowSingleQuotes", "true").json(ds)
checkSameResult(expected, result)
Copy link
Contributor Author

@LuciferYang LuciferYang Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def json(jsonDataset: Dataset[String]): DataFrame = {
val parsedOptions = new JSONOptions(
extraOptions.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
userSpecifiedSchema.foreach(checkJsonSchema)
val schema = userSpecifiedSchema.map {
case s if !SQLConf.get.getConf(
SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => s.asNullable
case other => other
}.getOrElse {
TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
}

From the code of the server side, userSpecifiedSchema is an Option[StructType] and default is None, so I think we can use it without specifying theuserSpecifiedSchema for this function? Or is my test case not the correct scenario?

@zhengruifeng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, you are right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks ~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we should add the user provided schema in the message? Or always discard it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will inferFromDataset trigger an job? If so, I think we’d better skip it if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think you are right, we should add schema to the message if it exists, thanks ~ I will update it later

@LuciferYang LuciferYang marked this pull request as draft March 8, 2023 13:14
@LuciferYang LuciferYang changed the title [SPARK-42690][CONNECT] Implement CSV/JSON parsing functions for Scala client [WIP][SPARK-42690][CONNECT] Implement CSV/JSON parsing functions for Scala client Mar 8, 2023
@LuciferYang LuciferYang changed the title [WIP][SPARK-42690][CONNECT] Implement CSV/JSON parsing functions for Scala client [SPARK-42690][CONNECT] Implement CSV/JSON parsing functions for Scala client Mar 8, 2023
@LuciferYang LuciferYang marked this pull request as ready for review March 8, 2023 14:18
session.read
.schema(new StructType().add("c1", StringType).add("c2", IntegerType))
.option("allowSingleQuotes", "true")
.json(session.createDataset(Seq.empty[String])(StringEncoder))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session.emptyDataset(StringEncoder)?

// (Optional) If not set, Spark will infer the schema.
//
// This schema string should be either DDL-formatted or JSON-formatted.
optional string schema = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use the actual data type?

Copy link
Contributor Author

@LuciferYang LuciferYang Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#40332 (comment) & #40332 (comment)

and I think the userSpecifiedSchema can be different from infered schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if userSpecifiedSchema is set, we should pass it

@@ -0,0 +1 @@
LogicalRDD [c1#0, c2#0], false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this makes me sad. We are we using RDDs here?

Copy link
Contributor Author

@LuciferYang LuciferYang Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val parsed = jsonDataset.rdd.mapPartitions { iter =>
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)

val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
val headerChecker = new CSVHeaderChecker(
actualSchema,
parsedOptions,
source = s"CSV source: $csvDataset")
headerChecker.checkHeaderColumnNames(firstLine)
filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
}.getOrElse(filteredLines.rdd)
val parsed = linesWithoutHeader.mapPartitions { iter =>
val rawParser = new UnivocityParser(actualSchema, parsedOptions)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming)

private[sql] def internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType,
isStreaming: Boolean = false): DataFrame = {
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val logicalPlan = LogicalRDD(
schema.toAttributes,
catalystRows,
isStreaming = isStreaming)(self)
Dataset.ofRows(self, logicalPlan)
}

Copy link
Contributor Author

@LuciferYang LuciferYang Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the server side, the input csvDataset and jsonDataset are still LocalRelation, and the above code path(sparkSession.internalCreateDataFrame) is converted them to LogicalRDD .

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@zhengruifeng
Copy link
Contributor

probably not related to this PR:

/**
* Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON)
* can infer the input schema automatically from data. By specifying the schema here, the
* underlying data source can skip the schema inference step, and thus speed up data loading.
*
* {{{
* spark.read.schema("a INT, b STRING, c DOUBLE").csv("test.csv")
* }}}
*
* @since 3.4.0
*/
def schema(schemaString: String): DataFrameReader = {
schema(StructType.fromDDL(schemaString))
}

  def schema(schemaString: String): DataFrameReader = {
    schema(StructType.fromDDL(schemaString))
  }

when the user provide a DDL string, it invoke the parser. Here I think we should keep both StructType and DDL string, and pass them to the server side.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Mar 9, 2023

probably not related to this PR:

/**
* Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON)
* can infer the input schema automatically from data. By specifying the schema here, the
* underlying data source can skip the schema inference step, and thus speed up data loading.
*
* {{{
* spark.read.schema("a INT, b STRING, c DOUBLE").csv("test.csv")
* }}}
*
* @since 3.4.0
*/
def schema(schemaString: String): DataFrameReader = {
schema(StructType.fromDDL(schemaString))
}

  def schema(schemaString: String): DataFrameReader = {
    schema(StructType.fromDDL(schemaString))
  }

when the user provide a DDL string, it invoke the parser. Here I think we should keep both StructType and DDL string, and pass them to the server side.

message Read seems also need to consider this?I think we can further discuss this problem in a separate pr?

val parseBuilder = builder.getParseBuilder
.setInput(ds.plan.getRoot)
.setFormat(format)
userSpecifiedSchema.foreach(schema => parseBuilder.setSchema(schema.toDDL))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as to this PR itself, I think we should probably use DataType schema in the proto message, schema.toDDL always discards the metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this? @hvanhovell

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20f1722 change to pass a DataType @zhengruifeng

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ca6ec7b rename data_type to schema in proto message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this scenario, I think DataType schema is used and schema.toDDL is no longer needed

zhengruifeng pushed a commit that referenced this pull request Mar 9, 2023
… client

### What changes were proposed in this pull request?
This pr add a new proto message

```
message Parse {
  // (Required) Input relation to Parse. The input is expected to have single text column.
  Relation input = 1;
  // (Required) The expected format of the text.
  ParseFormat format = 2;

  // (Optional) DataType representing the schema. If not set, Spark will infer the schema.
  optional DataType schema = 3;

  // Options for the csv/json parser. The map key is case insensitive.
  map<string, string> options = 4;
  enum ParseFormat {
    PARSE_FORMAT_UNSPECIFIED = 0;
    PARSE_FORMAT_CSV = 1;
    PARSE_FORMAT_JSON = 2;
  }
}
```

and implement CSV/JSON parsing functions for Scala client.

### Why are the changes needed?
Add Spark connect jvm client api coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass Github Actions
- Manual checked Scala 2.13

Closes #40332 from LuciferYang/SPARK-42690.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 07f71d2)
Signed-off-by: Ruifeng Zheng <[email protected]>
@zhengruifeng
Copy link
Contributor

@LuciferYang thank you for woking on this.

merged into master/branch-3.4

@LuciferYang
Copy link
Contributor Author

Thanks @zhengruifeng @hvanhovell @HyukjinKwon ~

snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
… client

### What changes were proposed in this pull request?
This pr add a new proto message

```
message Parse {
  // (Required) Input relation to Parse. The input is expected to have single text column.
  Relation input = 1;
  // (Required) The expected format of the text.
  ParseFormat format = 2;

  // (Optional) DataType representing the schema. If not set, Spark will infer the schema.
  optional DataType schema = 3;

  // Options for the csv/json parser. The map key is case insensitive.
  map<string, string> options = 4;
  enum ParseFormat {
    PARSE_FORMAT_UNSPECIFIED = 0;
    PARSE_FORMAT_CSV = 1;
    PARSE_FORMAT_JSON = 2;
  }
}
```

and implement CSV/JSON parsing functions for Scala client.

### Why are the changes needed?
Add Spark connect jvm client api coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass Github Actions
- Manual checked Scala 2.13

Closes apache#40332 from LuciferYang/SPARK-42690.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 07f71d2)
Signed-off-by: Ruifeng Zheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants