Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

Tips

What is the purpose of the pull request

After we did a fallback to HadoopFsRelation in #5352, old issues of HUDI-3204 re-surfaced due to the fact that Spark forcibly appends partition values parsed from the actual partition path to the fetched dataset.

Unfortunately, this behavior is not configurable, and therefore to address this problem we have to override default ParquetFileFormat w/ our own modified instance that overrides this behavior: our own extension for ParquetFileFormat make such behavior configurable -- caller can configure whether it would prefer

  • To append partition values parsed from the actually partition path
  • To avoid appending such partition values and instead rely on reading the source columns being used as partition values (Hudi-specific behavior)

Brief change log

  • Scaffolded Spark24HoodieParquetFileFormat extending ParquetFileFormat and overriding the behavior of adding partition columns to every row
  • Amended SparkAdapters createHoodieParquetFileFormat API to be able to configure whether to append partition values or not
  • Fallback to append partition values in cases when the source columns are not persisted in data-file
  • Fixing HoodieBaseRelation incorrectly handling mandatory columns

Verify this pull request

This pull request is already covered by existing tests, such as (please describe tests).

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

* <li>Avoiding appending partition values to the rows read from the data file</li>
* </ol>
*/
class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only aspects that diverge from the source are the ones using shouldAppendPartitionValues

Alexey Kudinkin added 2 commits April 19, 2022 12:24
…one (to make sure partition values appending is handled correctly)
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Left a few nits.


val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
case HoodieFileFormat.PARQUET => (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, "hoodie-parquet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: create a constant for "hoodie-parquet" so it can be referenced everywhere.

// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
// - Values parsed from the actual partition pat would be appended to the final dataset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "pat"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in a follow-up

sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET => if (!internalSchema.isEmptySchema) "HoodieParquet" else "parquet"
case HoodieFileFormat.PARQUET => "hoodie-parquet"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here for constant


class SparkHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
override def shortName(): String = "HoodieParquet"
override def shortName(): String = "hoodie-parquet"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is used by Spark to identify the format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

@yihua yihua self-assigned this Apr 19, 2022
*/
class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {

override def buildReaderWithPartitionValues(sparkSession: SparkSession,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github UI has hard time reflecting the changes properly:

  1. Had to remove top-level conditional (since this FileFormat is now used to control whether partition values will be appended)
  2. Did minor cleanup for things related to handling of InternalSchema to make sure those are not failing w/ NPEs
  3. Adding changes to handle shouldAppendPartitionValues

NOTE: Copy both of those into IDEA scratchpad to be able to compare them side by side in a more meaningful way

* <li>Schema on-read</li>
* </ol>
*/
class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as for Spark 3.1

@alexeykudinkin
Copy link
Contributor Author

@xiarixiaoyao please take a look as well

@nsivabalan nsivabalan added the priority:blocker Production down; release blocker label Apr 19, 2022
None
}
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why the class loader is used before, instead of directly creating a new instance with the class? @xushiyan do you have any context here, to make sure there is no historical get-around and we're not breaking any logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My hunch is that @xiarixiaoyao was using the reflection to load this component to handle the case of Spark 3.0. But given that we're dropping support for it in 0.11, i just dropped the reflection and instantiate it directly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
// fallback to origin parquet File read
super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shouldAppendPartitionValues is true and the existing if condition is true, can we still fall back to the original parquet file read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldAppendPartitionValues is almost never true now (only in cases when we drop the source columns)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg

filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responded above

def toHadoopFsRelation: HadoopFsRelation = {
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val shouldAppendPartitionColumns = omitPartitionColumnsInFile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. instead of "omitPartitionColumnsInFile" (present tense), may be we can name the variable as "isPartitionColumnPersistedInDataFile" (past tense).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan on a second thought -- this flag is actually directing whether we should be omitting partition columns when we persist in data files, so kept it as omitPartitionColumns to be aligned with the config value


object Spark32HoodieParquetFileFormat {

def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel free to fix this in a follow up PR if need be. may be we can move this to a util class and used in across adaptors? I see same exact method in Spark312HoodieParquetFileFormat class as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there's quite a bit duplication we can eliminate. We can take it up as a a follow-up for the sake of moving f/w w/ RC3 ASAP

val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
convertTz.orNull,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls use shouldUseInternalSchema to fallback origin VectorizedParquetRecordReader. Spark312HoodieVectorizedParquetRecordReader is used for schema evolution.

@xiarixiaoyao
Copy link
Contributor

@alexeykudinkin now we use hoodieparquetFile,
we may need to modfy this function to reduce the impact of schema evolution
HoodieDataSourceHelper.getConfigurationWithInternalSchema

  def getConfigurationWithInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = {
    val querySchemaString = SerDeHelper.toJson(internalSchema)
    if (!querySchemaString.isEmpty) {
      conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, querySchemaString)
      conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
      conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
    }
    conf
  }

@alexeykudinkin
Copy link
Contributor Author

@xiarixiaoyao addressed

@xiarixiaoyao
Copy link
Contributor

@alexeykudinkin thanks for your address.
@alexeykudinkin @nsivabalan @yihua now schema evolution is not worked with this pr, let me find the reason, pls wait a moment

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xiarixiaoyao
Copy link
Contributor

@alexeykudinkin @nsivabalan @yihua
LGTM

i will put another patch to deal with schema evolution.

@xushiyan xushiyan merged commit f7544e2 into apache:master Apr 20, 2022
xushiyan pushed a commit that referenced this pull request Apr 20, 2022
… instead of source columns (#5364)

 - Scaffolded `Spark24HoodieParquetFileFormat` extending `ParquetFileFormat` and overriding the behavior of adding partition columns to every row
 - Amended `SparkAdapter`s `createHoodieParquetFileFormat` API to be able to configure whether to append partition values or not
 - Fallback to append partition values in cases when the source columns are not persisted in data-file
 - Fixing HoodieBaseRelation incorrectly handling mandatory columns
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
// - Values parsed from the actual partition pat would be appended to the final dataset
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in a follow-up

val projectedRDD = if (prunedRequiredSchema.structTypeSchema != requiredSchema.structTypeSchema) {
rdd.mapPartitions { it =>
val fullPrunedSchema = StructType(prunedRequiredSchema.structTypeSchema.fields ++ partitionSchema.fields)
val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron this is the problem you're hitting with mandatory columns -- when you're filtering out partition columns from the schema, you actually re-ordered the columns relative to what caller (Spark) was expecting and it was simply projecting schema assuming that BaseRelation will return rows adhering to the schema, while it was returning it w/ columns reordered (where partition columns were appended at the end).

Proper fix for that was to do projection here back into the schema that caller expects

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker

Projects

No open projects
Status: No status

Development

Successfully merging this pull request may close these issues.

6 participants