-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3315] RFC-35 Part-1 Support bucket index in Flink writer #4679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc: @yihua FYI |
|
Wow~ Great |
|
@hudi-bot run azure |
| public Boolean buffer(HoodieRecord<T> record, String instantTime, HoodieFlinkTable<T> table) { | ||
| final HoodieRecordLocation loc = record.getCurrentLocation(); | ||
| final String fileID = loc.getFileId(); | ||
| final String partitionPath = record.getPartitionPath(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we find a way to move the buffering logic into the write function ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we need to move the buffering logic into hoodie common client, then other engine could reuse it(a streaming API later). Kafka connect sink is also looking for a streaming way to write. Is there any advantage of putting the buffering logic into the write function that I am not aware of?
| import org.apache.hudi.common.engine.HoodieEngineContext; | ||
| import org.apache.hudi.common.model.HoodieRecordPayload; | ||
| import org.apache.hudi.common.model.WriteOperationType; | ||
| import org.apache.hudi.config.HoodieWriteConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guess this class can be avoided if we move the buffering logic into the write function ?
| .key(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key()) | ||
| .intType() | ||
| .defaultValue(256) // default 256 buckets per partition | ||
| .withDescription("Hudi bucket number per partition. Only affected if using Hudi bucket index."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason the default value is 256 here, seems to generate many small files for small data sets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, will change it into a smaller number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If people change the BUCKET_INDEX_NUM_BUCKETS or the write function parallelism, does the hash index still work ?
| pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); | ||
| if (OptionsResolver.isBucketIndexTable(conf)) { | ||
| if (!OptionsResolver.isMorTable(conf)) { | ||
| throw new HoodieNotSupportedException("Bucket index only support MOR table type."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move the whole if ... else ... code block into the Pipelines.hoodieStreamWrite function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
|
cc @yihua could you also please review this from the angle of making the write client abstractions more friendly |
I'll review this. |
|
@hudi-bot run azure |
|
@hudi-bot run azure |
|
As discussed with @danny0405 , the changes in write client will be not included in this PR, because it will make the write client looks ugly. We will include those once we have a streaming API. So this PR will only include the bucket index for Flink writer. cc: @yihua |
|
@minihippo would you review this PR if you have time. Thanks~ |
|
@hudi-bot run azure |
|
@hudi-bot run azure |
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, thanks for the contribution, i have left some comments ~
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
Show resolved
Hide resolved
|
|
||
| public static final ConfigOption<String> INDEX_KEY_FIELD = ConfigOptions | ||
| .key(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()) | ||
| .stringType() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this key must be same same with the primary key ? Because all the changes of a key must belong to one data bucket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be a subset of primary keys. e.g. primary key could be "id1,id2", this index key could be either "id1" "id2" "id1,id2".
hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
Show resolved
Hide resolved
hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
Show resolved
Hide resolved
| } else { | ||
| LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId)); | ||
| bucketToFileIDMap.put(partitionBucketId, fileID); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bucketToFileIDMap seems never be cleared, is there possibility that this map be put into the state ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this could be store in the state, but we need to think about the consistency issue between the state and actual file system view. Any diff could lead to incorrect data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checkpoint can keep the correctness i think.
hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
Show resolved
Hide resolved
| .key(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key()) | ||
| .intType() | ||
| .defaultValue(256) // default 256 buckets per partition | ||
| .withDescription("Hudi bucket number per partition. Only affected if using Hudi bucket index."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If people change the BUCKET_INDEX_NUM_BUCKETS or the write function parallelism, does the hash index still work ?
| final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, hiveBucketNum); | ||
| final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum); | ||
|
|
||
| if (bucketToFileIDMap.containsKey(partitionBucketId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition changes are not supported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can change the job parallelism, but you can't change the bucket index number at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess @loukey-lj want to address that when the record switches to new partition, how we send a delete record to the old partition ?
|
@danny0405 If people change the write parallelism, it will still work because we load the parallelism-bucketID mapping at the runtime, but the parallelism should be less than the bucket number to avoid empty task. |
|
@hudi-bot run azure |
1 similar comment
|
@hudi-bot run azure |
|
This is an automatic reply, confirming that your e-mail was received, I will get back to you ASAP.Thank you ! Allen
|
hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
Show resolved
Hide resolved
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
Outdated
Show resolved
Hide resolved
hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
Outdated
Show resolved
Hide resolved
| table.getMetaClient().getBasePath())); | ||
|
|
||
| // Iterate through all existing partitions to load existing fileID belongs to this task | ||
| List<String> partitions = table.getMetadata().getAllPartitionPaths(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may have poor performance for application starting when the partition num is huge. Load it at runtime may be better, especially in the case most of the partitions in the table are frozen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's do the optimization part in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, can we fire an issue to address this improvement ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix it with #5093
|
@hudi-bot run azure |
|
|
||
| public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) { | ||
| WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf); | ||
| return dataStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a separate method for hash index and not modify these two methods ? The methods are already too complex i think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
man you forgot your previous comment lol. It used to be separated and you suggested to put them into hoodieStreamWrite. Let me know if I misunderstood.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if we can make both the code in Pipelines and HoodieTableSink clean ~
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, thanks for the contribution ~
|
🆒 I will try to this the PR soon. thanks for the bigolds. 🙏 |
|
I have tryed and got an Exception: Caused by: java.util.NoSuchElementException: No value present in Option
at org.apache.hudi.common.util.Option.get(Option.java:88) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
at org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:485) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:142) ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]@garyli1019 |
|
@wxplovecc hi, this issue looks like not related to this PR, would you submit an issue then we can take a look. |
Seems a bug 🐛 of the pr |
…e#4679) * Support bucket index in Flink writer * Use record key as default index key
…e#4679) * Support bucket index in Flink writer * Use record key as default index key
|
This is an automatic reply, confirming that your e-mail was received, I will get back to you ASAP.Thank you ! Allen
|
What is the purpose of the pull request
This pull request is to implement RFC-35(https://cwiki.apache.org/confluence/display/HUDI/RFC-35%3A+Make+Flink+MOR+table+writing+streaming+friendly)
Brief change log
Verify this pull request
This change added tests and can be verified as follows:
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.