-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-53] Implementation of a native DFS based index based on the metadata table. #5581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ta table. 1. Complete implementation 2. Metrics 3. Performance tuning of metadata table
|
@vinothchandar @nsivabalan This is the complete code from my internal branch ported to 0.10.1. I will udpate HUDI-53 to list the various enhancements that are part of this large PR. I will need your help to break down the various enhancements into separate PR to apply to master after discussion. Some code had merge conflicts so I removed some testing code. Disregard the failing tests for now. |
prasannarajaperumal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super-exciting to see the implementation in a PR :). Thanks for doing this.
I am about half-way through, need to look into the read/write part a bit more closely. Will do it very soon.
| } | ||
|
|
||
| @CliCommand(value = "metadata init", help = "Update the metadata table from commits since the creation") | ||
| public String init(@CliOption(key = {"readonly"}, unspecifiedDefaultValue = "false", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is init being removed?
| * @param maxFileGroupSizeBytes Maximum size of the file group. | ||
| * @return The estimated number of file groups. | ||
| */ | ||
| private int estimateFileGroupCount(String partitionName, long recordCount, int averageRecordSize, int minFileGroupCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the plan to generate file group count for other metadata partitions (bloom, column stats) also using some heuristics like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we should. it definitely helps.
| List<HoodieRecord> records = convertMetadataFunction.convertMetadata(); | ||
| commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService); | ||
| } | ||
| private void commitFileListingUpdate(String instantTime, List<HoodieRecord> recordList, boolean canTriggerTableService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for renaming this to make it more clear.
| // File groups in each partitions are fixed at creation time and we do not want them to be split into muliple files | ||
| // ever. Hence we use a very large basefile size in metadata table. The actual size of the HFiles created will | ||
| // eventually depend on the number of file groups seleted for each partition (See estimateFileGroupCount function) | ||
| final long maxHFileSizeBytes = 10 * 1024 * 1024 * 1024L; // 10GB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the mapping of row key to file group breaks down if the HFile actually gets to this size and a new base file gets created? Should we just disable the max file check here?
| public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata, | ||
| String commitActionType) { | ||
| return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap()); | ||
| public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just clarifying - WriteStatus contains only deflated HoodieRecord - right? Making sure we dont hold on to the data in memory until we update the metadata table.
| /** | ||
| * If a valid location is specified, a copy of the write config is saved before each operation. | ||
| */ | ||
| public static final ConfigProperty<String> CONFIG_EXPORT_DIR = ConfigProperty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear where this is used? Write configs are persisted after each operation?
| // We need to track written records within WriteStatus in two cases: | ||
| // 1. When the HoodieIndex being used is not implicit with storage | ||
| // 2. If any of the metadata table partitions (record index, etc) which require written record tracking are enabled | ||
| final boolean trackSuccessRecords = !hoodieTable.getIndex().isImplicitWithStorage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move !hoodieTable.getIndex().isImplicitWithStorage() also into HoodieTableMetadataUtil.needsWriteStatusTracking
| * These partitions need the list of written records so that they can update their metadata. | ||
| */ | ||
| public static List<MetadataPartitionType> needWriteStatusTracking() { | ||
| return Arrays.asList(MetadataPartitionType.RECORD_INDEX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Just simplify this to boolean and check if its RECORD_INDEX?
| taggedRecords.add(rec); | ||
| } | ||
|
|
||
| List<String> recordKeys = keyToIndexMap.keySet().stream().sorted().collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, why do we need sort here? And It seems we can directly maintain a mapping from key to records, instead of another list index indirection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this sorting is to ensure lookup in hfile does not have unnecessary seeks.
but, we already sort all lookups within HFileDataBlock
if (!enableFullScan) {
// HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks.
Collections.sort(keys);
}
So, I don't see a need to sort here. Or do you mean to sort for some other reason?
| * A {@code BulkInsertPartitioner} implementation for Metadata Table to improve performance of initialization of metadata | ||
| * table partition when a very large number of records are inserted. | ||
| * | ||
| * This partitioner requires the records to tbe already tagged with the appropriate file slice. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
| this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); | ||
| finalizeWrite(table, clusteringCommitTime, writeStats); | ||
| writeTableMetadataForTableServices(table, metadata,clusteringInstant); | ||
| preCommit(clusteringInstant, metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this fix, I was bumbed that we did not have preCommit even for compaction. (preCommit is where we do conflict resolution).
Probably it was designed that way.
- For compaction: conflict resolution is done during compaction planning. So, there is no other condition with which we might abort compaction during completion. so probably thats why it was left out.
- For clustering: for most common scenario (update reject strategy), again, an incoming write if overlaps w/ clustering file groups(to be replaced), it will be aborted right away. And new file groups that are being created by clustering is not known to any new writes until clustering completes. So, there is no real necessity to do conflict resolution during clustering completion.
Did you identify any other gaps wrt conflict resolution ?
| taggedRecords.add(rec); | ||
| } | ||
|
|
||
| List<String> recordKeys = keyToIndexMap.keySet().stream().sorted().collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this sorting is to ensure lookup in hfile does not have unnecessary seeks.
but, we already sort all lookups within HFileDataBlock
if (!enableFullScan) {
// HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks.
Collections.sort(keys);
}
So, I don't see a need to sort here. Or do you mean to sort for some other reason?
| protected void commit(HoodieData<HoodieRecord> records, String instantTime, String partitionName, | ||
| int fileGroupCount) { | ||
| LOG.info("Performing bulk insert for partition " + partitionName + " with " + fileGroupCount + " file groups"); | ||
| SparkHoodieMetadataBulkInsertPartitioner partitioner = new SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good optimization 👏
| HoodieRecord record = recordItr.next(); | ||
| final String fileID = record.getCurrentLocation().getFileId(); | ||
| // Remove the write-token from the fileID as we need to return only the prefix | ||
| int index = fileID.lastIndexOf("-"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets move this to FSUtils or something like PathUtils. i.e. parsing fileID given a file name
| } | ||
|
|
||
| @Override | ||
| public List<String> generateFileIDPfxs(int numPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure where is this used. can you throw some light.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bcoz, within SparkBulkInsertHelper, we are not leveraging this new method. looks like we are generating inline there.
L110 in SparkBulkInsertHelper
// generate new file ID prefixes for each output partition
final List<String> fileIDPrefixes =
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
| .withDocumentation("Enable the internal metadata table which serves table metadata like level file listings"); | ||
| .withDocumentation("Create and use the metadata table (MDT) which serves table metadata like file listings " | ||
| + "and indexes. If set to true, MDT will be created. Once created, this setting only controls if the MDT " | ||
| + "will be used on reader side. MDT cannot be disabled/removed by setting this to false."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually we have fix this flow in master. ie. if on write path, if metadata was disabled and hudi detects that metadata exists, it will delete the metadata table.
| } | ||
|
|
||
| @Override | ||
| public String next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can probably use parquetUtils.readRowKeys()
| * @param recordKeys The list of record keys to read | ||
| */ | ||
| @Override | ||
| public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> recordKeys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List of records is gonna scale. we need to fix this as HoodieData.
| } | ||
| // TODO: This does not work with clustering | ||
| if (!previousRecord.getRecordGlobalLocation().equals(getRecordGlobalLocation())) { | ||
| throw new HoodieMetadataException(String.format("Location for %s should not change from %s to %s", previousRecord.key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this check. w/ clustering, records could move from 1 location to another right. any harm in removing this?
| * @param maxFileGroupSizeBytes Maximum size of the file group. | ||
| * @return The estimated number of file groups. | ||
| */ | ||
| private int estimateFileGroupCount(String partitionName, long recordCount, int averageRecordSize, int minFileGroupCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we should. it definitely helps.
|
Closing this as a new PR is being raised. |
Tips
What is the purpose of the pull request
(For example: This pull request adds quick-start document.)
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.