-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-6288] Create IngestionPrimaryWriterBasedConflictResolutionStrategy to prioritize ingestion writers over other writers #8832
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6288] Create IngestionPrimaryWriterBasedConflictResolutionStrategy to prioritize ingestion writers over other writers #8832
Conversation
874cfa4 to
00897f8
Compare
00897f8 to
3fa4e68
Compare
| * @return | ||
| */ | ||
| Stream<HoodieInstant> getCandidateInstants(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant); | ||
| Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient metaClient, HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a public interface. there are chances that someone outside could have implemented their own resolution strategy. so, lets deprecate this and introduce a new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but curious to know why this change though ?
from the impl, I see that we are reloading the active timeline. so, why can't we do that at the caller before calling this method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Infact resolveWriteConflictIfAny already had arguments whether to reload the active timeline or not. So, can't we leverage that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hard code to false now, kind of think it is hard to maintain for the invoker to know whether to refresh these timelines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Caller does not now which implementation to use, so whether to refresh the timeline or not something should be up to the implementation classes. Like IngestionPrimaryWriterBasedConflictResolutionStrategy.java requires reload other may not require reload.
- Also, if others have custom implementation of the base class and they are tied to using HoodieTimeline, they can still easily migrate to the new API with HoodieTableMetaClient whereas vice versa breaks the usability. In this case by replacing timeline with metaClient object we are providing more functionality to users.
| // since the last successful write based on the transition times. | ||
| // We need to check for write conflicts since they may have mutated the same files | ||
| // that are being newly created by the current write. | ||
| List<HoodieInstant> completedCommitsInstants = activeTimeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets avoid code duplication. we can move code from within SimpleConcurrentFileWritesConflictResolutionStrategy and make them protected and re-use them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a follow task https://issues.apache.org/jira/browse/HUDI-6353
| .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, REPLACE_COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION)) | ||
| .filterCompletedInstants() | ||
| .findInstantsModifiedAfter(currentInstant.getTimestamp()) | ||
| .getInstantsOrderedByStateTransitionTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using state transition is driving by a config if I am not wrong.
@danny0405 : have we started using state transition everywhere? if not, we should avoid these apis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's okay we add some apis on the timeline, the timeline api is kind of internal, users should not be exposed to/aware of these changes. Anyway we need some completion time filtering on the timeline, these apis can be reused when we migrate to more elegant impl for the real time solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, completion time ordering API will be ideal to use. For now rewording the findInstantsModifiedAfter method to findInstantsModifiedAfterByStateTransitionTime.
| finalizeWrite(table, clusteringCommitTime, writeStats); | ||
| // Do conflict resolution checks for clustering if SparkAllowUpdateStrategy is used. | ||
| // By using this UpdateStrategy implementation, Ingestion writers are given preference over clustering re-writers. | ||
| if (this.config.getWriteConflictResolutionStrategy() instanceof IngestionPrimaryWriterBasedConflictResolutionStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very tight coupling. lets try to abstract this out.
may be we can introduce new api like isPreCommitRequired() within ConflictResolutionStrategy interface.
and call preCommit when it returns true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, and can we also fix the HoodieFlinkTableServiceClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Implemented it.
| @Override | ||
| public HoodieDefaultTimeline findInstantsModifiedAfter(String instantTime) { | ||
| return new HoodieDefaultTimeline(instants.stream() | ||
| .filter(s -> HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets be wary on using state transition time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wherever stateTransition times are used, I have added state transition in the APIs itself, so users are well aware of what they are using.
| return stateTransitionTime; | ||
| } | ||
|
|
||
| /* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why commented out? if not required, we can remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| Set<Pair<String, String>> partitionTofileId = new HashSet<>(); | ||
| // list all partitions paths | ||
| for (Map.Entry<String, List<String>> entry : partitionToReplaceFileIds.entrySet()) { | ||
| entry.getValue().forEach(replaceFileId -> partitionTofileId.add(Pair.of(entry.getKey(), replaceFileId))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work for you:
partitionToReplaceFileIds.entrySet()
.stream()
.flatMap(partitionFileIds -> partitionFileIds.getValue().stream().map(replaceFileId -> Pair.of(partitionFileIds.getKey(), replaceFileId)))
.collect(Collectors.toSet());| private final String timestamp; | ||
| private final String stateTransitionTime; | ||
| private String stateTransitionTime; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert this change.
| finalizeWrite(table, clusteringCommitTime, writeStats); | ||
| // Do conflict resolution checks for clustering if SparkAllowUpdateStrategy is used. | ||
| // By using this UpdateStrategy implementation, Ingestion writers are given preference over clustering re-writers. | ||
| if (this.config.getWriteConflictResolutionStrategy() instanceof IngestionPrimaryWriterBasedConflictResolutionStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, and can we also fix the HoodieFlinkTableServiceClient
…iters over table service writers Summary: Added new conflict resolution strategy to precede ingestion commits over table services. It uses HoodieInstant's modifiedTime to fetch instants with conflicts. For table services, during completion phase this resolution also looks into inflight ingestion commits and self evict if a conflict is found. - Enables conflict resolution for clustering during commit. 0.8 version does not have support for transactions during commit phase. - Few test scenarios around clustering behavior when there are ingestion inflight commits are also added. - Handling insert overwrite cases are bit tricky so even though added support for some inflight replacemetadata, the logic should be revisited, and for now insert overwrite is not considered as primary ingestion so clustering jobs wont fail during commit phase if there are inflight insert overwrite operations. - Latest changes from 0.10 for class HoodieTablePreCommitFileSystemView are also fetched which should solve replaced files issues when there are both clustering and insertoverwrite operating on the same partition. Test Plan: Tested this using unit tests. Need to do integration tests. Reviewers: balajee, jsbali, meenalb Reviewed By: meenalb JIRA Issues: HUDI-1703, HUDI-1735, HUDI-1598, HUDI-1805 Differential Revision: https://code.uberinternal.com/D7088075
…tant's modified time Summary: Modified times are getting rounded to previous, this makes the instant look as if it went back in time. So, creating a 1 sec delay to spot fix the unit tests. Reviewers: pwason, balajee, O955 Project Hoodie Project Reviewer: Add blocking reviewers, PHID-PROJ-pxfpotkfgkanblb3detq Reviewed By: pwason, O955 Project Hoodie Project Reviewer: Add blocking reviewers, PHID-PROJ-pxfpotkfgkanblb3detq JIRA Issues: HUDI-2075 Differential Revision: https://code.uberinternal.com/D7514949
3fa4e68 to
3d40a14
Compare
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Wow, a bit late to see this PR, will read it carefully |
| .getInstantsAsStream(); | ||
|
|
||
| // Fetch list of ingestion inflight commits. | ||
| Stream<HoodieInstant> inflightIngestionCommitsStream = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@suryaprasanna @danny0405 One reminder, not all inflight commits have metadata, for example, bulk insert's inflight metadata is empty,
in this case, IngestionPrimaryWriterBasedConflictResolutionStrategy will be invalid.
Flink even not save metadata in inflight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Zouxxyy I idea behind creating IngestionPrimaryWriterBasedConflictResolutionStrategy, is to give preference for Ingestion writers over clustering writers.
For example, you have clustering plans as mutable plans which can be created and removed by rollback, this is achievable by using SparkAllowUpdateStrategy. Clustering writers and Ingestion writers can run simultaneously on the same partition. If clustering writer were to complete writing first and about to create a completed replacecommit, in that stage it will check if there are any pending inflight ingestion instants in the timeline and fail if it finds an overlap.
For bulk insert operations, they create new files so there wont be ay conflict with clustering writers, so in that case replacecommit is allowed to commit. There could be a case where if users are using insert API, but it can still write to existing files, if small file handling is enabled. Even then possibility of overlap between clustering and ingestion can be reduced and even if there is overlap you can fail clustering commit and allow Ingestion to commit even if it is arriving late.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that stage it will check if there are any pending inflight ingestion instants in the timeline and fail if it finds an overlap.
Thanks for the reply, what I mean is now we use the metadata of pending ingestion instant for conflict detection here, but not all inflight commits have metadata, e.g. the .inflght file written by flink is empty.
And can you help review this #9220 which improve the current stragies, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I am not much familiar with Flink, but on the Spark engine the inflight commits that are doing upsert operations should have partition to fileIds(that needs to be modified) present in the .inflight file, this is actually done after the WorkloadProfile object is loaded. So, conflict resolution will only see those file ids that are getting modified not new ones. During conflict resolution that is all we care, since there won't be any overlap between new files. Sure, I will review the RFC.

Change Logs
Creating new ConflictResolutionStrategy called IngestionPrimaryWriterBasedConflictResolutionStrategy, to prioritize Ingestion writers over other writers. This strategy also requires replacecommit to be mutable plan and would require SparkAllowUpdateStrategy to be passed in HoodieWriteConfig.
Impact
No impact for the existing flow, this change is introducing a new conflict resolution strategy.
Risk level (write none, low medium or high below)
Low. Created unit tests to test various scenarios.
Documentation Update
Did not create new configs, used existing configs. Documentation for the strategy class is given as part of comments.
Contributor's checklist