Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

What changes were proposed in this pull request?

1, support schema;
2, support more types: ndarray, list

Why are the changes needed?

for API coverage

Does this PR introduce any user-facing change?

yes

How was this patch tested?

added types

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Dec 8, 2022

PySpark createDataFrame infer and validate the data types, create RDD from list, and directly assign the sql schema in JVM. And there are many related configurations including

self._jconf.inferDictAsStruct()
self._jconf.sessionLocalTimeZone()
self._jconf.arrowPySparkEnabled()
self._jconf.arrowPySparkFallbackEnabled()
self._jconf.arrowMaxRecordsPerBatch()
self._jconf.arrowSafeTypeConversion()
self._jconf.legacyInferArrayTypeFromFirstElement()
is_timestamp_ntz_preferred()
...

In Connect, datasets are always convert to a Pandas DataFrame (internally a PyArrow Table). I simply use pd.DataFrame(list(data)) to infer the datatypes, and cast if user provides the schema.

The two approaches are so different that I am afraid it is hard to 100% match PySpark's createDataFrame.

@zhengruifeng
Copy link
Contributor Author

cc @HyukjinKwon

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to make _to_corrected_pandas_type support StringType by returning np.str_

then the createDataFrame related tests pass as expected, but some other pyspark tests become weird. So check isinstance(dt, StringType) here.

In the future, I think we should directly create PyArrow Table from ndarray and list, to skip the intermediate conversions to/from pandas.

Copy link
Contributor

@tomvanbussel tomvanbussel Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this can be used here, as _parse_datatype_string internally calls into the JVM. I think we have to add a field to the LocalRelation message to store the schema string instead, so that the driver can parse it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, we should not call _parse_datatype_string

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will ignore the names of nested fields, and it will ignore the types. To me it seems that we should leave the Pandas DataFrame untouched here, and instead pass the schema struct in the LocationRelation message to the driver.

@zhengruifeng
Copy link
Contributor Author

difference in casting:
this PR leverages Dataset.to(schema) to cast datatypes, which is very different from the pyspark's approach which relies on the _acceptable_types list

createDataFrame([[1, 2, 3, 4]], schema="col1 int, col2 int, col3 int, col4 double") runs successfully in Connect, while it fails in PySpark:

Traceback (most recent call last):
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/connect/test_connect_basic.py", line 299, in test_with_local_list
    self.spark.createDataFrame([[1, 2, 3, 4]], schema="col1 int, col2 int, col3 int, col4 double")
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/session.py", line 1164, in createDataFrame
    return self._create_dataframe(
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/session.py", line 1206, in _create_dataframe
    rdd, struct = self._createFromLocal(map(prepare, data), schema)
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/session.py", line 850, in _createFromLocal
    data = list(data)
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/session.py", line 1180, in prepare
    verify_func(obj)
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 2003, in verify
    verify_value(obj)
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 1981, in verify_struct
    verifier(v)
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 2003, in verify
    verify_value(obj)
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 1997, in verify_default
    verify_acceptable_types(obj)
  File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/types.py", line 1873, in verify_acceptable_types
    raise TypeError(
TypeError: field col4: DoubleType() can not accept object 4 in type <class 'int'>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyspark_types_to_proto_types does not support StructType now.
I'm going to fix it in a separate PR.

@zhengruifeng
Copy link
Contributor Author

if _schema is not None:
return DataFrame.withPlan(LocalRelation(table, schema=_schema), self)
elif _schema_str is not None:
return DataFrame.withPlan(LocalRelation(table, schema=_schema_str), self)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can have a RPC for parseTableSchema in AnalyzePlan and implement DataFrame.to, then we do not need to add schema in LocalRelation's proto, and simplify here with DataFrame.withPlan(LocalRelation(table), self).toDF(...).to(...)

.asInstanceOf[StructType]
} else {
session.sessionState.sqlParser
.parseTableSchema(rel.getDatatypeStr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to call parseDataType here if parseTableSchema fails according to the implementation of _parse_datatype_string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's handle DDL formatted string together if it's not tricky

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated

DataType datatype = 2;

// Server will use Catalyst parser to parse this string to DataType.
string datatype_str = 3;
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can always pass string implementation for now (by turning DataType to a JSON representation), DataType.json()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm think adding support for _parse_datatype_string in AnalyzePlan, then we don't need to add datatype and datatype_str in LocalRelation at all.

Then the implementation will be like this (after we implement DataFrame.to):

schema = _parse_datatype_string(schema_str)
return DataFrame.withPlan(LocalRelation(table), self).toDF(*schema.fieldNames).to(schema)

address comments

address comments
}
Dataset
.ofRows(session, logicalPlan = relation)
.toDF(schema.names: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This throws away the names of nested fields. Perhaps we can do something like the following instead:

    val (rows, inferredSchema) = ArrowConverters.fromBatchWithSchemaIterator(
      Iterator(rel.getData.toByteArray),
      TaskContext.get())
    if (inferredSchema == null) {
      throw InvalidPlanInput(s"Input data for LocalRelation does not produce a schema.")
    }

    val schemaType = if (rel.hasDataType) {
      DataTypeProtoConverter.toCatalystType(rel.getDataType)
    } else if (rel.hasDataTypeString) {
      parseDatatypeString(rel.getDataTypeString)
    } else {
      inferredSchema
    }

    val schemaStruct = schemaType match {
      case s: StructType => s
      case d => StructType(Seq(StructField("value", d)))
    }

    val attributes = schemaStruct.toAttributes
    val proj = UnsafeProjection.create(attributes, attributes)
    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to use inferredSchema in proj, otherwise it may fail due to datatype mismatch

Copy link
Contributor

@tomvanbussel tomvanbussel Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it work, if we use val proj = UnsafeProjection.create(attributes, inferredSchema.toAttributes) instead? We will likely also have to add some validation that is similar to validation that pyspark currently performs using _make_type_verifier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still fails:

grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Couldn't find 0#1215L in [0#1219L,1#1220L,2#1221L,3#1222L]"
        debug_error_string = "{"created":"@1670584625.321243000","description":"Error received from peer ipv6:[::1]:15002","file":"src/core/lib/surface/call.cc","file_line":1064,"grpc_message":"Couldn't find 0#1215L in [0#1219L,1#1220L,2#1221L,3#1222L]","grpc_status":2}"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems due to unresolved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, this is going to more difficult than I anticipated. We will have to check if the inferred type and the provided type are compatible, and then we'll have to use something similar to EvaluatePython.makeFromJava to perform the conversion. Let's do this in a follow-up :)

Comment on lines 375 to 387
var dataType: DataType = null
try {
dataType = session.sessionState.sqlParser.parseTableSchema(sqlText)
} catch {
case e1: ParseException =>
try {
dataType = session.sessionState.sqlParser.parseDataType(sqlText)
} catch {
case e2: ParseException =>
dataType = session.sessionState.sqlParser.parseDataType(s"struct<${sqlText.strip}>")
}
}
dataType
Copy link
Contributor

@tomvanbussel tomvanbussel Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This can be simplified, and should throw the original exception.

    val parser = session.sessionState.sqlParser
    try {
      parser.parseTableSchema(sqlText)
    } catch {
      case e: ParseException =>
        try {
          parser.parseDataType(sqlText)
        } catch {
          case _: ParseException =>
            try {
              parser.parseDataType(s"struct<${sqlText.strip}>")
            } catch {
              case _: ParseException =>
                throw e
            }
            
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val schema = if (rel.hasDatatype) {
DataTypeProtoConverter
.toCatalystType(rel.getDatatype)
.asInstanceOf[StructType]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not guaranteed to be a StructType. createDataFrame also allows AtomicType to be used, and in that case "value" will be used as the column name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I wanted to support AtomicType in a followup. But let me update it.

.asInstanceOf[StructType]
} else {
parseDatatypeString(rel.getDatatypeStr)
.asInstanceOf[StructType]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here, this can be any DataType, not just a StructType.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with merging it as is and do it in a followup.

@zhengruifeng
Copy link
Contributor Author

all tests passed, let me merge it now. Thanks for the reviews

@zhengruifeng
Copy link
Contributor Author

merged into master

@zhengruifeng zhengruifeng deleted the connect_create_df branch December 9, 2022 13:03
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…and more input dataset types

### What changes were proposed in this pull request?
1, support schema;
2, support more types: ndarray, list

### Why are the changes needed?
for API coverage

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
added types

Closes apache#38979 from zhengruifeng/connect_create_df.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants