-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-1897] Deltastreamer source for AWS S3 #3433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@hudi-bot run azure |
0761233 to
33f7d78
Compare
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsMetaSource.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsMetaSource.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsMetaSource.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsMetaSource.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
Outdated
Show resolved
Hide resolved
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the 2 stage pipeline
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
...lities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsMetaSelector.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we can set numMessagesToProcess = min(approxMessagesAvailable, maxMessageEachBatch), we can avoid line 159 to 161.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. But we will still need to check messages.isEmpty() and break off the loop because the the value of ApproximateNumberOfMessages returned by SQS is eventually consistent. So, in case this is some positive value but actually there are no messages, we don't want to run the loop again.
...-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few high level questions/clarifications:
- Do these 2 stage pipeline handle (S3 object) deletes as well or just for immutable data?
- Some of the info can be out of sync at diff points in time. Can you go through how do we handle each of them.
a. Events in SQS is out of sync w/ actual S3 state. for eg, you could find a PUT entry in SQS, but actual object in S3 is deleted?
b. Similarly, if an object was updated twice at t0 and t10. but SQS has info only about t0. I assume when we process t0 event, 2nd stage would fetch entire file from S3 and dump to hudi, so likely it will take latest state of the file of interest. Do we ignore the event from SQS when we process t10 event?
c. object of interest was active in S3 during 1st stage. But during 2nd stage, the S3 object got deleted.
d. Again, similar to (b), during first stage event was referring to t0 and hoodie cloud meta table also has t0 event info. during 2nd stage, lets say file got updated and is in version1(but t0 was referring to version0). What happens here? - all cloud provider related code needs to be abstracted out. guess we don't have much time. But once we land this patch, lets ensure we abstract it out so that we can support other cloud providers (GCS, etc) as well.
- can we abstract out the source format. so that we can support any file format to read from S3. Basically any file that can be read using spark.read.format("abc") should be doable. Or is there any tight couplings.
...-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand, how exactly deletes in S3 are handled by these 2 sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not handling delete right now. It will need some work. I was thinking to capture the delete events and add a column in event meta table like is_deleted or something. I or Satish can take it up as a followup task.
...-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
Outdated
Show resolved
Hide resolved
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bunch of naming comments.
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsDfsSource.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsDfsSource.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
Outdated
Show resolved
Hide resolved
...-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsHoodieIncrSource.java
Outdated
Show resolved
Hide resolved
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CloudObjectsMetaSource.java
Outdated
Show resolved
Hide resolved
* HUDI-1896 intial source for Cloud Dfs * update with changes, added for fileMap support HUDI-1896 * update with changes, added for fileMap support HUDI-1896 * s3 meta source HUDI-1896 * adding hoodie cloud object source class * adding hoodie cloud object source class * [HUDI-1896] adding selector test cases * [HUDI-1896] Intial source for Cloud Dfs and test cases * [HUDI-1896] Intial source for Cloud Dfs and test cases * [HUDI-1896] Intial source for Cloud Dfs and test cases Resolve conflicts and rename opt keys Minor refactoring in CloudObjectsDfsSelector Add region config for cloud sources Fix test failures
33f7d78 to
bd9b7dd
Compare
| <dependency> | ||
| <groupId>com.amazonaws</groupId> | ||
| <artifactId>aws-java-sdk-sqs</artifactId> | ||
| <version>${aws.sdk.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are not bundling this. So we should ensure to document --jars to add this for to work during runtime cc @codope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
|
https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=1719&view=results passed. So this can land per se |
HUDI-1896 intial source for Cloud Dfs
update with changes, added for fileMap support HUDI-1896
update with changes, added for fileMap support HUDI-1896
s3 meta source HUDI-1896
adding hoodie cloud object source class
adding hoodie cloud object source class
[HUDI-1896] adding selector test cases
[HUDI-1896] Intial source for Cloud Dfs and test cases
[HUDI-1896] Intial source for Cloud Dfs and test cases
[HUDI-1896] Intial source for Cloud Dfs and test cases
Resolve conflicts and rename opt keys
Minor refactoring in CloudObjectsDfsSelector
Tips
What is the purpose of the pull request
(For example: This pull request adds quick-start document.)
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.