Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,14 @@ private[sql] object ParquetRelation2 extends Logging {
|${parquetSchema.prettyJson}
""".stripMargin

assert(metastoreSchema.size <= parquetSchema.size, schemaConflictMessage)
val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)

assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)

val ordinalMap = metastoreSchema.zipWithIndex.map {
case (field, index) => field.name.toLowerCase -> index
}.toMap
val reorderedParquetSchema = parquetSchema.sortBy(f =>
val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))

StructType(metastoreSchema.zip(reorderedParquetSchema).map {
Expand All @@ -775,6 +777,32 @@ private[sql] object ParquetRelation2 extends Logging {
})
}

/**
* Returns the original schema from the Parquet file with any missing nullable fields from the
* Hive Metastore schema merged in.
*
* When constructing a DataFrame from a collection of structured data, the resulting object has
* a schema corresponding to the union of the fields present in each element of the collection.
* Spark SQL simply assigns a null value to any field that isn't present for a particular row.
* In some cases, it is possible that a given table partition stored as a Parquet file doesn't
* contain a particular nullable field in its schema despite that field being present in the
* table schema obtained from the Hive Metastore. This method returns a schema representing the
* Parquet file schema along with any additional nullable fields from the Metastore schema
* merged in.
*/
private[parquet] def mergeMissingNullableFields(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
val missingFields = metastoreSchema
.map(_.name.toLowerCase)
.diff(parquetSchema.map(_.name.toLowerCase))
.map(fieldMap(_))
.filter(_.nullable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually fields in metastore schema are always nullable. Hive always assume all fields are nullable and doesn't preserve nullability information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above comment is just a note. Having it as a defensive check is generally good here.

StructType(parquetSchema ++ missingFields)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid diff and ++ are not OK here. For example, if the metastore schema has fields <a, b, c>, the Parquet schema has fields <a, c>, then the result schema would be <a, c, b>.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected order of fields in a schema? Is is lexicographic? Should we maintain the order of the metastore schema?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not lexicographic, the order of fields in the result schema should be the same as the metastore schema.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should we deal with potential ambiguities that may be introduced due to #5141? For instance, say we are merging the following schemas:

Metastores schema Parquet schema
Foo Foo
Bar Bar
Baz Bop
Bat Bat

The following options come to mind:

  • Attempt to merge the orderings and accept any possibility when there are ambiguities (e.g. both Foo Bar Baz Bop Bat and Foo Bar Bop Baz Bat are acceptable).
  • The fields defined in the metastore schema always begin in order, followed by any additional fields defined in the Parquet schema (e.g. Foo Bar Baz Bat Bop is the only accepted ordering).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the metastore schema is available, we are actually converting a metastore Parquet table into ParquetRelation2. Thus, the final reconciled schema should have exactly the same fields as the metastore schema, and simply drop any fields only appear in the Parquet data file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Based on the change made in #5141, it looks like the schema returned by mergeMissingNullableFields() will still contain any additional fields defined in parquetSchema (lines 766-767). How would you feel about simply removing the additional parquetSchema fields in the mergeMissingNullableFields() method?

Execution would look something like this:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, now that I consider it, I'm not convinced that having the mergeNullableFields() method return the fields in non-metastore order is a problem here. Lines 766-767 of mergeMetastoreParquetSchema() should handle putting them in the proper order.

Removing the additional fields is still an option to consider, however.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, you're right :) Totally forgot that mergeMetastoreParquetSchema already handles field reordering here. And all additional Parquet fields are removed via this zip call.

}


// TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
// However, we are already using Catalyst expressions for partition pruning and predicate
// push-down here...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,22 +226,54 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("UPPERCase", IntegerType, nullable = true))))
}

// Conflicting field count
// Metastore schema contains additional non-nullable fields.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't happen as Hive always assumes all fields are nullable. But leaving this test here should still be OK.

assert(intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType))),
StructField("lowerCase", BinaryType, nullable = false))),

StructType(Seq(
StructField("UPPERCase", IntegerType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))

// Conflicting field names
// Conflicting non-nullable field names
intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType))),
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
}

test("merge missing nullable fields from Metastore schema") {
// Standard case: Metastore schema contains additional nullable fields not present
// in the Parquet file schema.
assertResult(
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true))),
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true))))
}

// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = false))),
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))
}
}