Skip to content

Conversation

@alexr17
Copy link
Contributor

@alexr17 alexr17 commented Mar 7, 2025

Change Logs

Adds RFC-91 for storage based lock provider using conditional writes

Impact

none

Risk level (write none, low medium or high below)

none

Documentation Update

n/a

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Mar 7, 2025
Copy link
Collaborator

@cshuo cshuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @alexr17 thanks for bringing up this highly anticipated rfc. From side of flink itegration, FS based lock provider is used by default from release 1.0. so for S3 users using flink ingestion, this is a must-have feature. Left some comments.

- Mechanism: Update the lock file’s expiration using a conditional write that verifies the unique tag from the current lock state. If the tag does not match, the renewal fails, indicating that the lock has been lost.

`unlock()`
- Purpose: Safely release the lock.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unlock would not delete lock file? so once the lock file is created in the first time lock, it would be always there, and will just be updated with conditional write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. Deleting the lock file is an unnecessary operation, we can just constantly update it.

`tryLock()`
- No Existing Lock: If the lock file doesn’t exist, a new lock file is created with the current instance’s details using a conditional write that only succeeds if the file is absent.
- Existing Lock – Not Expired: If a valid (non-expired) lock exists, the process refrains from taking the lock.
- Existing Lock – Expired: If the lock file exists but is expired, a new lock file is conditionally written. This write uses a precondition based on the current file’s unique tag from cloud storage to ensure that no other process has updated it in the meantime.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new lock file is conditionally written

"new lock file" do you mean a brand new lock file will be created, or just update the current lock file with new lock payload? If it's the former, how is the "Existing lock" file dealt with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is vague, I will change it. We are updating the current lock file with new lock payload. Essentially it overwrites the existing lock file.

### Edge cases
- If the thread which acquired the lock dies, we stop the heartbeat.
- If the renewal fails past the expiration, we log an error, and stop the heartbeat. Other Hudi lock provider implementations are susceptible to this behavior. If a writer somehow loses access to Zookeeper, there is no way to tell the writer to exit gracefully.
- If we are unable to start the heartbeat (renewal) we throw exception.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the lock holder perceive the exception in this case, or continue to finish the writing/txn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cannot start the heartbeat we throw an exception but also release the lock. So the writer will never start a txn.

For renewal failures the writer will continue to finishing the txn


## Implementation

This design implements a leader election algorithm for Apache Hudi using a single lock file per table stored in cloud storage. Each table’s lock is represented by a JSON file with the following fields:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the design mainly focuses on cloud storage, how about the HDFS which also has a large user base.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like conditional write support is on the way for s3a: apache/hadoop#7011

However there does not seem to be any development in progress for GCS.

The code will be written in a way to abstract the storage format from the conditional write logic. Once HDFS supports conditional writes we can add an implementation as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we investigated if Apache OpenDAL can be integrated here? By using it as an abstraction layer, we should be able to simplify the lock provider code without worrying too much about the underlying storage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, looks like they just added java support: apache/opendal#5664

So this might be doable. I think it still needs to be a pluggable implementation, in case OpenDAL does not support conditional writes in a storage type that someone wants to add a lock provider for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it still needs to be a pluggable implementation

+1 as of now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For HDFS and OpenDAL we will track them here:

Since both of these implementations are not available yet we will proceed with current S3/GCS client library approach to unblock Flink for 1.0.2 release, but can hopefully refactor in the future. cc @yihua

alexr17 and others added 11 commits March 25, 2025 16:43
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@yihua yihua merged commit bef9827 into apache:master Mar 26, 2025
1 check passed
@alexr17 alexr17 deleted the HUDI-9123/add-rfc-91 branch March 26, 2025 03:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants