-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-251] JDBC incremental load to HUDI DeltaStreamer #962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…correct bin packing
…he GenericRecord Removing Converter hierarchy as we now depend purely on JavaSerialization and require the payload to be java serializable
…ory as partition paths
(1) Apply transformation when using delta-streamer to ingest data. (2) Add Hudi Incremental Source for Delta Streamer (3) Allow delta-streamer config-property to be passed as command-line (4) Add Hive Integration to Delta-Streamer and address Review comments (5) Ensure MultiPartKeysValueExtractor handle hive style partition description (6) Reuse same spark session on both source and transformer (7) Support extracting partition fields from _hoodie_partition_path for HoodieIncrSource (8) Reuse Binary Avro coders (9) Add push down filter for Incremental source (10) Add Hoodie DeltaStreamer metrics to track total time taken
… at any time to avoid race conditions
typo: bundle jar with unrecognized variables
Exclude common dependencies that are available in Presto
… during on-the-fly merge of Real Time tables (#956) * Fix issue with incorrect column mapping casusing bad data, during on-the-fly merge of Real Time tables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will reimplement this section with the last comment, we should do hoodie.datasource.jdbc.extra.options.fetchsize, hoodie.datasource.jdbc.extra.options.timestampFormat ..etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way we do jdbc incremental pull is that we will run use the checkpoint value and prepare a ppd query, the query we prepare inside the JDBCSource with the incremental value is "(select * from table where incremental_column > '2019-09-09 15:45:01.53' ) rdbms_table" and we except that the incremental column is either a timestamp or an int/long val.
This worked well when we tried it with MYSQL and mysql was ok converting the string column into int or long or timestamp based on whichever column we compared it with. However, derby is not doing that and is throwing exceptions. if the ppd query is changed to "(select * from table where incremental_column >Timestamp( '2019-09-09 15:45:01.53') ) rdbms_table" then it works as the incremental column value has been casted/converted to timestamp, but normal strings do not work here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any help on this will be appreciated!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to get the type of columns? If it is a timestamp, just convert or cast to Sting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep with the current log framework, use LogManager.getLogger(JDBCSource.class) instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that that logger doesn't allow me to do {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please use + or StringFormat yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System.out.println => LOG.info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was for test purposes. The PR is not yet fully done. Work is still in progress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this line ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That too is there just for test purposes as I was facing issues with derby which u have highlighted above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the indentation is perfect as it is ide indented and follows checkstyle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (lastCheckpoint.isPresent()) {
if (StringUtils.isNullOrEmpty(lastCheckpoint.get())) {
LOG.warn("Previous checkpoint entry was null or empty. Falling back to full jdbc pull.");
return full(isIncremental);
} else {
return incremental(ppdQuery, lastCheckpoint, isIncremental);
}
} else {
LOG.info("No previous checkpoints found.. ");
return full(isIncremental);
}maybe would be changed to the code snippet since it is more clear and readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo internall
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could remove when use Assert.assertEquals
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any Asserts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR is not final yet. WIP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be removed too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this?
|
Thanks for opening the PR @taherk77 . I have left some comments. |
|
@taherk77 why closing this? |
While pushing some new code i messed my git repo, so closed this one. Will reopen again |
No description provided.