Skip to content

Conversation

@codope
Copy link
Member

@codope codope commented Feb 18, 2022

What is the purpose of the pull request

Rework of #4761
This diff introduces follinwg changes:

  • Write stats are converted to metadata index records during the commit. Making them use the HoodieData type so that the record generation scales up with needs.
  • Metadata index init support for bloom filter and column stats partitions.
  • When building the BloomFilter from the index records, using the type param stored in the payload instead of hardcoded type.
  • Delta writes can change column ranges and the column stats index need to be properly updated with new ranges to be consistent with the table dataset. This fix add column stats index update support for the delta writes.

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

* @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing enabled for all columns
*/
private static List<String> getLatestColumns(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
private static List<String> getColumnsToIndex(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment about L 834. I feel we can't directly take in RecordKeyFieldProp as is. may not work for all key gens.
may be we have to split with "," and then set the columns to index.
Can you think if there are any other places where we have this dependency and check if we have done the right thing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed here. But, there are couple of other places.. i'll create a separate patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. do we have a tracking jira?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not create a separate jira.. This is already being tracked in https://issues.apache.org/jira/browse/HUDI-3411

@codope codope force-pushed the feature/HUDI-1492-deltawrite-record-stats-1 branch from 63aac43 to 97f253e Compare February 22, 2022 02:28
@nsivabalan nsivabalan self-assigned this Feb 22, 2022
@codope codope force-pushed the feature/HUDI-1492-deltawrite-record-stats-1 branch 2 times, most recently from 19ba560 to 125d2cd Compare February 23, 2022 06:22
return HoodieMetadataPayload.createBloomFilterMetadataRecord(
deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, ByteBuffer.allocate(0), true);
}, 1).stream().collect(Collectors.toList());
HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to cast this action t/h RDD? Do we envision that this will scale past the point when we won't be able to handle this on the driver?

I'm worried about serialization cost we incur for every record we handle t/h RDD (serializing/de closure) to be able to create a single object

}, 1).stream().collect(Collectors.toList());
HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList,
Math.max(deleteFileList.size(), recordsGenerationParams.getBloomIndexParallelism()));
return deleteFileListRDD.map(deleteFileInfo -> HoodieMetadataPayload.createBloomFilterMetadataRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create common override for this method (it seems to be used in 3 more places at least)

return recordsStats;
}

public static class RecordsStats<T> implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we need this wrapper for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapper abstracts away the underlying metadata. I think write stat should be aware that it saves the record stats but not necessarily what those stats are composed of. Are you concerned about serde cost here? It shouldn't add much overhead over keeping it as a prvate field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope i'm concerned about it as an abstraction that isn't bringing much value, while increasing complexity: It adds cognitive load to understand what it does for anybody interacting with it.

In general, i'd suggest to follow the principle to keep things as simple as possible, but no simpler than needed to solve the problem. It helps on many fronts:

  1. Makes the code easier to comprehend
  2. Makes component evolution easier (the simpler things are, the easier it is to evolve them)
  3. Makes component age better: if things change and we need to refactor it -- the simpler the system is, the easier the refactoring will be

@codope codope changed the title [HUDI-3356][HUDI-3203] HoodieData for metadata index records, bloom and colstats init [HUDI-3258] HoodieData for metadata index records, bloom and colstats init Feb 28, 2022
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from addressing feedback from me and Alexey, was there any additional changes. I did not review entire patch, but did pointed review for the feedback given and addressed. Let me know if you have made any other additional changes. or point me to commits hashes that I need to look into specifically.

@codope codope force-pushed the feature/HUDI-1492-deltawrite-record-stats-1 branch 2 times, most recently from 21dc93b to ff1f746 Compare March 7, 2022 06:36
@nsivabalan
Copy link
Contributor

nsivabalan commented Mar 8, 2022

@codope : am good with the patch. Can you rebase w/ latest master. we can land once CI is green. sorry, lets get this landed by tomorrow.

…lter construction from index based on the type param

 - Write stats are converted to metadata index records during the commit.
   Making them use the HoodieData<HoodieRecord> type so that the record
   generation scales up with needs.

 - When building the BloomFilter from the index records, using the type param
   stored in the payload instead of hardcoded type.

[HUDI-3142] Metadata index initialization for bloom filters and column stats partitions

 - When metadata table is created and when it decides to the initialization
   from the filesystem for the user dataset, all the enabled partitions need
   to be initialized along with FILES partition. This fix adds init support
   for bloom filter and column stats partitions.

[HUDI-1492] Metadata column stats index - handling delta writes

 - Delta writes can change column ranges and the column stats index need
   to be properly updated with new ranges to be consistent with the table
   dataset. This fix add column stats index update support for the delta
   writes.

Fix a test and few minor refactorings

Make HoodieColumnRangeMetadata serializable

Add a config for colstat parallelism and address review

Fix a UT and address review comments

Union will fail if we choose min parallelism which could be 0

Fix payload construction

Added HoodieIndex UTs based on apache#4516

Minor refactoring to address review comments

Change the logic of building column range metadata

Improve the way column stats is collected for avro records

Take min index parallelism

Fix conversion to long
@codope codope force-pushed the feature/HUDI-1492-deltawrite-record-stats-1 branch from ff1f746 to fa193b7 Compare March 8, 2022 03:35
@hudi-bot
Copy link
Collaborator

hudi-bot commented Mar 8, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. thanks for the perseverance. Definitely at a much better place right now.

@nsivabalan nsivabalan merged commit 575bc63 into apache:master Mar 8, 2022
return recordsStats;
}

public static class RecordsStats<T> implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope i'm concerned about it as an abstraction that isn't bringing much value, while increasing complexity: It adds cognitive load to understand what it does for anybody interacting with it.

In general, i'd suggest to follow the principle to keep things as simple as possible, but no simpler than needed to solve the problem. It helps on many fronts:

  1. Makes the code easier to comprehend
  2. Makes component evolution easier (the simpler things are, the easier it is to evolve them)
  3. Makes component age better: if things change and we need to refactor it -- the simpler the system is, the easier the refactoring will be

*/
public class MetadataRecordsGenerationParams implements Serializable {

private final HoodieTableMetaClient dataMetaClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's limit the scope of this component to just parameters for Index Generation. Otherwise this has a potential to become a dependency magnet, where random dependencies will be added here to avoid threading them through.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, i see it as Serializable, how are we serializing the metaClient?

}

private MetadataRecordsGenerationParams getRecordsGenerationParams() {
return new MetadataRecordsGenerationParams(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, why do we even need this component if we can just get all of this from the Writer Config?


if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()
? stat.getRecordsStats().get().getStats() : new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope that's what i was referring to with my comments regarding increased complexity in respect to RecordStats. Why not just have stat.getRecordsStats().get() instead?

Now, when reading this code reader actually need to understand what is this additional getStats() call is about and why it's needed, while w/o it the call-site is crystal clear and doesn't require scanning through of getRecordStats to understand what's going on

Map<String, Map<String, Object>> columnToStats = new HashMap<>();
writeSchemaWithMetaFields.getFields().forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>()));
// collect stats for columns at once per record and keep iterating through every record to eventually find col stats for all fields.
recordList.forEach(record -> aggregateColumnStats(record, writeSchemaWithMetaFields, columnToStats, config.isConsistentLogicalTimestampEnabled()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we, instead of placing iteration and aggregation into separate methods, consolidate them in aggregateColumnStats so that its signature actually is:

Map<String, Map<...>> aggregateColumnStats(records, writeSchema, ...)

* @param columnRangeMap - old column range statistics, which will be merged in this computation
* @param columnToStats - map of column to map of each stat and its value
*/
public static void accumulateColumnRanges(Schema.Field field, String filePath,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unify both of these methods into one?

public static List<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
HoodieEngineContext engineContext,
String instantTime) {
public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There's general convention that "context" objects are usually passed as first arg

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI, no need to fix this

if (filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>();
final Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePathWithPartition);
if (!isDeleted) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted files handling is invariant of the file format, right?

}
// set the max value of the field
if (fieldVal.compareTo(String.valueOf(columnStats.getOrDefault(MAX, ""))) > 0) {
columnStats.put(MAX, fieldVal);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need Map for that, right? Let's instead create mutable object with all the statistics that we're collecting:

class FileColumnStats {
  Object min, max;
  long count, totalSize;
  // ...
}

return getLatestColumns(datasetMetaClient, false);
public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, HoodieMetadataColumnStats newColumnStats) {
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
if (newColumnStats.getIsDeleted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle inverse case as well -- when existing records is a deleted one, otherwise we will merge incorrectly

vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
…lter construction from index based on the type param (apache#4848)

Rework of apache#4761 
This diff introduces following changes:

- Write stats are converted to metadata index records during the commit. Making them use the HoodieData type so that the record generation scales up with needs. 
- Metadata index init support for bloom filter and column stats partitions.
- When building the BloomFilter from the index records, using the type param stored in the payload instead of hardcoded type.
- Delta writes can change column ranges and the column stats index need to be properly updated with new ranges to be consistent with the table dataset. This fix add column stats index update support for the delta writes.

Co-authored-by: Manoj Govindassamy <[email protected]>
stayrascal pushed a commit to stayrascal/hudi that referenced this pull request Apr 12, 2022
…lter construction from index based on the type param (apache#4848)

Rework of apache#4761 
This diff introduces following changes:

- Write stats are converted to metadata index records during the commit. Making them use the HoodieData type so that the record generation scales up with needs. 
- Metadata index init support for bloom filter and column stats partitions.
- When building the BloomFilter from the index records, using the type param stored in the payload instead of hardcoded type.
- Delta writes can change column ranges and the column stats index need to be properly updated with new ranges to be consistent with the table dataset. This fix add column stats index update support for the delta writes.

Co-authored-by: Manoj Govindassamy <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants