Skip to content

Conversation

@mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented Nov 19, 2024

Currently we get the scan schema from the plan nodes scan schema, and then serialize that back to a Parquet schema, then parse that on the native side. This is lossy, particularly with timestamps. For example:

schema: message root {
  optional int64 _0 (TIMESTAMP(MILLIS,true));
  optional int64 _1 (TIMESTAMP(MICROS,true));
  optional int64 _2 (TIMESTAMP(MILLIS,true));
  optional int64 _3 (TIMESTAMP(MILLIS,false));
  optional int64 _4 (TIMESTAMP(MICROS,true));
  optional int64 _5 (TIMESTAMP(MICROS,false));
  optional int64 _6 (INTEGER(64,true));
}

dataSchema: message spark_schema {
  optional int96 _0;
  optional int96 _1;
  optional int96 _2;
  optional int64 _3 (TIMESTAMP(MICROS,false));
  optional int96 _4;
  optional int64 _5 (TIMESTAMP(MICROS,false));
  optional int64 _6;
}

The former is the original Parquet footer, the latter is what we get after going through Spark. We need the original to handle timestamps correctly in ParquetExec.

This PR extracts some code from elsewhere (CometParquetFileFormat, CometNativeScanExec) to read the footer from the Parquet file, and serialize the original metadata. We also now generate the projection vector on the Spark side because the required columns is in Spark schema format, so will not match the Parquet schema 1:1. On the native side, we now have to regenerate the required schema from the Parquet schema using the projection vector (converted to a DF ProjectionMask).

@mbutrovich mbutrovich marked this pull request as draft November 19, 2024 21:08
@mbutrovich mbutrovich changed the title Use Parquet schema for scan instead of Spark schema [comet-parquet-exec] Use Parquet schema for scan instead of Spark schema Nov 19, 2024
@mbutrovich mbutrovich marked this pull request as ready for review November 19, 2024 22:44
new SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema)
val dataSchemaParquet =
new SparkToParquetSchemaConverter(conf).convert(scan.relation.dataSchema)
val projection_vector: Array[java.lang.Long] = scan.requiredSchema.fields.map(field => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change essentially means that any schema 'adaptation' made in SparkToParquetSchemaConverter.convert to support legacy timestamps and decimals will not be supported. But we will probably fail tests with incorrect results.
Also, Comet's Parquet file reader uses CometParquetReadSupport.clipParquetSchema to do similar conversion and it includes support for Parquet field_id which is desirable for delta sources like Iceberg.
Basically a field_id, if present, identifies a field more precisely (in the event of field name changes) in a schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SchemaConverter seems like it could be handled in DF's SchemaAdapter. I'll look at clipParquetSchema as well, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you just need Arrow types, can you just convert Spark types to Arrow types? For example, if the column in Spark is treated as timestamp type, its Arrow type is timestamp too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that...

  1. Java side parses Parquet metadata, generates a Spark schema
  2. Java side converts Spark schema to Arrow schema (following Comet conversion rules)
  3. Serialize Arrow types, native side feeds this into ParquetExec as the data schema

...may yield different results than:

  1. Java side serializes original Parquet metadata
  2. Serialize schema message
  3. Native side parses message, generates Arrow schema and feeds this into ParquetExec as the data schema

I guess I could exhaustively test this hypothesis with all types.

val broadcastedHadoopConf =
sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val sharedConf = broadcastedHadoopConf.value.value
val footer = FooterReader.readFooter(sharedConf, file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. This can never be in production code. For one, this is expensive.

Copy link
Contributor Author

@mbutrovich mbutrovich Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory it's just replacing the call that currently takes place during the fileReader instantiation, but yeah I'm still curious if it's already cached somewhere. I see references within Spark to a footersCache so I'm curious to look for that as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about a footersCache in Spark. Could you share a link maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks. I don't think we ever travel this path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants