Skip to content

Conversation

@prashantwason
Copy link
Member

  • [HUDI-6098] Use bulk insert prepped for the initial write into MDT.

Change Logs

  1. Added a flag to HoodieTableMetadataWriter.commit to specify if the commit is an initial commit
  2. For initial commit, bulkInsertPrepped API is used
  3. Added a partitioner for MDT bulk insert which partitions the records based on their file Group. Since the records are already tagged before calling commit, this partitioner can retrieve the fileID and partition from the current location of the record.

Impact

Massive increase in read performance after initial creation of a index.
Reduces the large read/write IO requirement for the first compaction in MDT.
Reduces the duplicate storage of initial log files keeping the redundant initial commit data until cleaned.
Faster initial commit as bulkInsert is more performant for billions of records than upsert which has a workload profiling stage.

Risk level (write none, low medium or high below)

None

Already covered by existing unit tests.

Documentation Update

None

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

public static int getFileGroupIndexFromFileId(String fileId) {
// 0.10 version MDT code added -0 (0th fileIndex) to the fileID
int endIndex = fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length();
int fromIndex = fileId.lastIndexOf("-", endIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we abstract this code as separate method:

    // 0.10 version MDT code added -0 (0th fileIndex) to the fileID
    int endIndex = fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@danny0405 danny0405 self-assigned this Apr 19, 2023
@danny0405 danny0405 added metadata engine:spark Spark integration engine:flink Flink integration labels Apr 19, 2023
@danny0405
Copy link
Contributor

Oops, code conflicts with your previous change.

@prashantwason
Copy link
Member Author

Rebased and fixed conflict.

@danny0405
Copy link
Contributor

@hudi-bot run azure

1 similar comment
@prashantwason
Copy link
Member Author

@hudi-bot run azure

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

@prashantwason You need to rebase with the latest master to get the tests passed.

*
* This partitioner requires the records to be already tagged with location.
*/
public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner<JavaRDD<HoodieRecord>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have UT for this ?


// Partition the records by their file group
JavaRDD<HoodieRecord> partitionedRDD = records
// key by <file group index, record key>. The file group index is used to partition and the record key is used to sort within the partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from what I glean, partitioning is based on fileGroupIndex disregarding the MDT partition. So, tell me something.
if we have 2 file groups in col stats and 2 file groups for RLI, does 1st file group for both col stats and RLI belong to same partition in this repartition call ?

should the partitioning be based on fileId itself and sorting within that can be based on the record keys within each partition. or am I missing something ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reason being, the argument "JavaRDD records " to this method could contain records for N no of partitions in MDT. not sure if we are making any assumptions on that.

fileIds.add(fileID);
} else {
// Empty partition
fileIds.add("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help me understand when we might hit this ?

* @param index Index of the file group within the partition
* @return The fileID
*/
public static String getFileIDForFileGroup(MetadataPartitionType metadataPartition, int index) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have UTs for these.

int fileGroupIndex = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId());
return new Tuple2<Integer, String>(fileGroupIndex, r.getRecordKey());
})
.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), keyComparator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we know the total partitions is going to be equal to total file groups. can we override the "numPartitions" for FileGroupPartitioner?

@nsivabalan nsivabalan self-assigned this May 2, 2023
@nsivabalan nsivabalan added release-0.14.0 priority:critical Production degraded; pipelines stalled labels May 2, 2023
@nsivabalan
Copy link
Contributor

synced up directly. I am ok with the assumption that, we will initialize one metadata partition at a time.
Lets see if we can address other feedback.

@prashantwason
Copy link
Member Author

Closing this as I have added the changes in another PR: #8684

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:flink Flink integration engine:spark Spark integration priority:critical Production degraded; pipelines stalled release-0.14.0

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

4 participants