-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3356][HUDI-3142][HUDI-1492] Metadata column stats index - handling delta writes #4761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-3356][HUDI-3142][HUDI-1492] Metadata column stats index - handling delta writes #4761
Conversation
6f056e6 to
974c5ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
Outdated
Show resolved
Hide resolved
| return newColumnStats; | ||
| } | ||
| return new HoodieMetadataColumnStats( | ||
| newColumnStats.getFileName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this field is called fileName in HoodieMetadataColumnStats but filePath in HoodieColumnRangeMetadata. If possible, can we keep the names consistent?
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
| .stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null), | ||
| Arrays.asList(oldColumnStats.getMinValue(), newColumnStats.getMinValue()) | ||
| .stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null), | ||
| oldColumnStats.getNullCount() + newColumnStats.getNullCount(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is my understanding correct that since this is append handle and we don't expect duplicates so simply add these stats?
…lter construction from index based on the type param - Write stats are converted to metadata index records during the commit. Making them use the HoodieData<HoodieRecord> type so that the record generation scales up with needs. - When building the BloomFilter from the index records, using the type param stored in the payload instead of hardcoded type.
…n stats partitions - When metadata table is created and when it decides to the initialization from the filesystem for the user dataset, all the enabled partitions need to be initialized along with FILES partition. This fix adds init support for bloom filter and column stats partitions.
- Delta writes can change column ranges and the column stats index need to be properly updated with new ranges to be consistent with the table dataset. This fix add column stats index update support for the delta writes.
6d40a75 to
b0350aa
Compare
| updateWriteStatus(stat, result); | ||
| } | ||
|
|
||
| Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should populate this only if metadata table and meta index is enabled right? or did we decide to serialize this info irrespective of it. bcoz, computing these stats will def add to write latency. So, trying to see if we can avoid if not required at all.
|
|
||
| public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> COLUMN_RANGE_MERGE_FUNCTION = | ||
| (oldColumnRange, newColumnRange) -> { | ||
| ValidationUtils.checkArgument(oldColumnRange.getColumnName().equals(newColumnRange.getColumnName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to validate for every record? can we move validations one level up and avoid this may be. since this is called or every record in an iterative manner, trying to see if this is over kill
| import java.io.Serializable; | ||
| import java.util.List; | ||
|
|
||
| public class MetadataRecordsGenerationParams implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java docs
| private MetadataRecordsGenerationParams getRecordsGenerationParams() { | ||
| return new MetadataRecordsGenerationParams( | ||
| dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), | ||
| dataWriteConfig.getBloomIndexParallelism(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we can't use the data table's bloom index parallelism here. may be we can re-use file listing parallelism or introduce new config.
| if (fileBloomFilter == null) { | ||
| LOG.error("Failed to read bloom filter for " + appendedFilePath); | ||
| return; | ||
| return Stream.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we be throwing exception here? how an a base file don't have bloom filter?
| final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet() | ||
| .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList()); | ||
| final HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD = engineContext.parallelize(partitionToAppendedFilesList, | ||
| Math.max(partitionToAppendedFiles.size(), recordsGenerationParams.getBloomIndexParallelism())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we using bloom index parallelism in col stats generation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a config for col stats parallelism.
| } | ||
| return Stream.empty(); | ||
| }, 1).stream().collect(Collectors.toList()); | ||
| final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, with getLatestColumns(), I see 2nd arg as isMetaIndexColumnStatsForAllColumns. shouldn't we be setting it appropriately ? why letting it be false here?
| processRestoreMetadata(metadataTableTimeline, restoreMetadata, | ||
| partitionToAppendedFiles, partitionToDeletedFiles, lastSyncTs); | ||
|
|
||
| final HoodieData<HoodieRecord> filesPartitionRecordsRDD = engineContext.parallelize( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why FILES is here and BLOOM and col stats are in getMetadataPartitionTypeHoodieDataMap. can we also move FILES record generation to getMetadataPartitionTypeHoodieDataMap.
| } | ||
|
|
||
| return partitionToRecordsMap; | ||
| return getMetadataPartitionTypeHoodieDataMap(engineContext, recordsGenerationParams, instantTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be I see the reason why. we can have a callback and call into that so that its consistent.
| if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getRecordsStats().isPresent()) { | ||
| columnRangeMap = Option.of(((HoodieDeltaWriteStat) writeStat).getRecordsStats().get().getStats()); | ||
| } | ||
| return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see much value in passing columnRangeMap to getColumnStats. might as well do
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>(columnRangeMap.get().values());
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted);
here within if block and call getColumnStats() only in else block.
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see some new code being added, but no new tests. For eg, AppendHandle we have new code added for which there aren't any tests only. Can you try to add some UTs.
|
Closing it in favor of #4848 |
…lter construction from index based on the type param (#4848) Rework of #4761 This diff introduces following changes: - Write stats are converted to metadata index records during the commit. Making them use the HoodieData type so that the record generation scales up with needs. - Metadata index init support for bloom filter and column stats partitions. - When building the BloomFilter from the index records, using the type param stored in the payload instead of hardcoded type. - Delta writes can change column ranges and the column stats index need to be properly updated with new ranges to be consistent with the table dataset. This fix add column stats index update support for the delta writes. Co-authored-by: Manoj Govindassamy <[email protected]>
…lter construction from index based on the type param (apache#4848) Rework of apache#4761 This diff introduces following changes: - Write stats are converted to metadata index records during the commit. Making them use the HoodieData type so that the record generation scales up with needs. - Metadata index init support for bloom filter and column stats partitions. - When building the BloomFilter from the index records, using the type param stored in the payload instead of hardcoded type. - Delta writes can change column ranges and the column stats index need to be properly updated with new ranges to be consistent with the table dataset. This fix add column stats index update support for the delta writes. Co-authored-by: Manoj Govindassamy <[email protected]>
…lter construction from index based on the type param (apache#4848) Rework of apache#4761 This diff introduces following changes: - Write stats are converted to metadata index records during the commit. Making them use the HoodieData type so that the record generation scales up with needs. - Metadata index init support for bloom filter and column stats partitions. - When building the BloomFilter from the index records, using the type param stored in the payload instead of hardcoded type. - Delta writes can change column ranges and the column stats index need to be properly updated with new ranges to be consistent with the table dataset. This fix add column stats index update support for the delta writes. Co-authored-by: Manoj Govindassamy <[email protected]>
What is the purpose of the pull request
Delta writes can change column ranges and the column stats index need
to be properly updated with new ranges to be consistent with the table
dataset. This fix add column stats index update support for the delta
writes.
Brief change log
This PR is stacked on top of #4746
Commit to review: 974c5ec
Index initialization and the metadata conversion now call the metadata table util
to get the column range stats for the delta writes.
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.