Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

Tips

What is the purpose of the pull request

Refactoring MergeOnReadRDD to

  • Avoid duplication
  • Enable optimization to avoid reading base-file with full-schema, fetching only projected columns

Brief change log

  • Unified MOR record iteration logic w/in 3 iterators extending each other
  • Added optimization to consider whether a) virtual keys are used and b) whether non-default RecordPayload class is used to determine whether we can do projected read from base-file instead of full-schema one

Verify this pull request

This pull request is already covered by existing tests, such as (please describe tests).

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@alexeykudinkin alexeykudinkin changed the title [HUDI-3396] Refactoring MergeOnReadRDD to avoid duplication, fetch only projected columns [HUDI-3396][Stacked on 4877] Refactoring MergeOnReadRDD to avoid duplication, fetch only projected columns Feb 23, 2022
@alexeykudinkin alexeykudinkin force-pushed the ak/spkds-ref-3 branch 7 times, most recently from c6b213e to c7321c5 Compare March 16, 2022 19:06
@nsivabalan nsivabalan added the priority:blocker Production down; release blocker label Mar 16, 2022
@alexeykudinkin alexeykudinkin force-pushed the ak/spkds-ref-3 branch 2 times, most recently from 1f7cca1 to 59ff930 Compare March 17, 2022 21:27
@alexeykudinkin alexeykudinkin changed the title [HUDI-3396][Stacked on 4877] Refactoring MergeOnReadRDD to avoid duplication, fetch only projected columns [HUDI-3396] Refactoring MergeOnReadRDD to avoid duplication, fetch only projected columns Mar 21, 2022
@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yet to read HoodieMergeOnReadRDD

case class HoodieTableState(recordKeyField: String,
preCombineFieldOpt: Option[String])
case class HoodieTableState(tablePath: String,
latestCommit: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestCommitTime

override type FileSplit = HoodieBaseFileSplit

override lazy val mandatoryColumns: Seq[String] = {
if (isMetadataTable(metaClient)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove this special casing for metadata table? It's dealing with an abstraction few layers deeper than here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, this actually not required here


// If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one
// NOTE: This is historical behavior which is preserved as is
// NOTE: Record key-field is assumed singular here due to the either of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. can we avoid references to metadata table from this layer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually refers to metadata fields, not MT. Will amend

@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice improvements!

val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's always call baseFile instead of dataFile, while you're at it

Comment on lines +303 to +307
maxCompactionMemoryInBytes: Long,
hadoopConf: Configuration): HoodieMergedLogRecordScanner = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why still have maxCompactionMemoryInBytes as an arg, which can be retrieved from hadoopConf ? having >5 args makes the API harder to use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment seems not addressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined maxCompactionMemoryInBytes into HoodieMergeOnReadRDD (previously was passed in as an arg), but i don't think it makes sense to eliminate it from this arg line here -- it's not a simple field access but requires quite some computation.

Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. suggest to test with spark 3.x in read/write scenarios. also rebase master to have github actions covering some spark3.x quickstart tests.

// a) It does use one of the standard (and whitelisted) Record Payload classes
// then we can avoid reading and parsing the records w/ _full_ schema, and instead only
// rely on projected one, nevertheless being able to perform merging correctly
if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/nit I'd prefer flip the condition to avoid logical negation; less mind processing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure i follow how you propose to change this expr?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
if (whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))

ok i just meant without negation it reads better

Comment on lines +303 to +307
maxCompactionMemoryInBytes: Long,
hadoopConf: Configuration): HoodieMergedLogRecordScanner = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment seems not addressed?

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xushiyan xushiyan merged commit 51034fe into apache:master Mar 25, 2022
@xushiyan
Copy link
Member

@alexeykudinkin landing this as the CI passed before "Tidying up" commit. Also basic multi spark version tests passed in GA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants