-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-651] Fix incremental queries in hive for MOR tables #1817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- HoodieTestUtils changes to include fake write stats per affected file - Refactor AbstractRealtimeRecordReader to be able to support additional FileSplit types. Currently it only takes HoodieRealtimeFileSplit. In future, we can add another constructor with FileSplit for incremental queries.
- This commit addresses two issues: 1. Honors end time if less than the most recent completed commit time 2. Doesnt require a base parquet file to be present in case when the begin and end times match only the the deltacommits. To achieve this: - Created a seperate FileSplit for handling incremental queries - New RecordReader to handle the new FileSplit - FileSlice Scanner to scan files in a File Slice. First takes base parquet file (if present) and applies merged records from all log files in that slice. If base file is not present returns merged records from log files on scanning - HoodieParquetRealtimeInputFormat modified to switch to this HoodieMORIncrementalFileSplit and HoodieMORIncrementalRecordReader from getSplit(..) and getRecordReader(..) in case of incremental queries. - Includes unit test to test different incremental queries
|
@garyli1019 Please take a look at this and provide feedback |
| private final List<FileSlice> fileSlices; | ||
|
|
||
| // Final map of compacted/merged records | ||
| // TODO change map to external spillable map. But ArrayWritable is not implementing Serializable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are merging parquet and log files and need to assume either of the files can be absent. I kept the Map interface as <String, ArrayWritable>. But since ArrayWritable is not implementing Serializable, that prevents the use of ExternalSpillableMap. Appreciate any ideas.
| // is this an incremental query | ||
| List<String> incrementalTables = getIncrementalTableNames(Job.getInstance(job)); | ||
| if (!incrementalTables.isEmpty()) { | ||
| //TODO For now assuming the query can be either incremental or snapshot and NOT both. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an assumption for now since we are not touching the snapshot queries and adding a new path to handle incremental queries, if incremental tables is not empty. In future this might have to change. OR may be a better way to apply this constraint instead of relying on simply the incremental tables ?
| private Map<String, List<FileStatus>> listStatusForAffectedPartitions( | ||
| Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException { | ||
| // Extract files touched by these commits. | ||
| // TODO This might need to be done in parallel like listStatus parallelism ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is scope for parallelized listing here. But should not be a blocker immediately.
| String relativeFilePath = stat.getPath(); | ||
| Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; | ||
| if (fullPath != null) { | ||
| //TODO Should the length of file be totalWriteBytes or fileSizeInBytes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bhasudha : totalWriteBytes = fileSizeInBytes for base files (parquet). For log files, this is not the case as we use a heuristic to estimated the bytes written per delta-commit.
Getting the actual file size would require a RPC call and will be costly here. Also, looking at where the file size is useful in read-path, it is only needed for combining and splitting file-splits which is based on base-file for Realtime. So, it should be fine to use the metadata stat stat.getTotalWriteBytes() with a change.
As same log file can appear in multiple delta commits (for HDFS and other file-systems supporting appends), the logic below needs to handle that, You can simply have the cumulative write-bytes for each log file appearing across delta-commits to get a better approximate size of log files.
| String relativeFilePath = stat.getPath(); | ||
| Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; | ||
| if (fullPath != null) { | ||
| //TODO Should the length of file be totalWriteBytes or fileSizeInBytes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bhasudha : totalWriteBytes = fileSizeInBytes for base files (parquet). For log files, this is not the case as we use a heuristic to estimated the bytes written per delta-commit.
Getting the actual file size would require a RPC call and will be costly here. Also, looking at where the file size is useful in read-path, it is only needed for combining and splitting file-splits which is based on base-file for Realtime. So, it should be fine to use the metadata stat stat.getTotalWriteBytes() with a change.
As same log file can appear in multiple delta commits (for HDFS and other file-systems supporting appends), the logic below needs to handle that, You can simply have the cumulative write-bytes for each log file appearing across delta-commits to get a better approximate size of log files.
| //TODO Should the length of file be totalWriteBytes or fileSizeInBytes? | ||
| FileStatus fs = new FileStatus(stat.getTotalWriteBytes(), false, 0, 0, | ||
| 0, fullPath); | ||
| partitionToFileStatusesMap.get(entry.getKey()).add(fs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to handle duplicate log files here.
garyli1019
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @bhasudha , I took an initial pass and have a few questions.
I thought about the scenario that the base file was not included but the log files should be included in the incremental query for Spark Datasource as well. The two approaches I have been thinking:
- If the starting commit time of the incremental query is a delta commit without compaction, we can search the previous compaction commit and read it from there, then do filtering later. So we can avoid the case like missing the base file, but this is not the optimal solution since we read some redundant parquet files.
- We can improve the existing
HoodieRealtimeFormatorHoodieRealtimeRecordReaderto handle the missing base file scenario. Maybe create anHoodieRealtimeFlieSlicewith an emptybaseFileduring the file listing? Do you think this is doable?
WDYT?
|
@garyli1019 are you talking about corner cases not handled in this PR? can you review the PR once for intended functionality? I am trying to see if this can help MOR/Incremental query on spark SQL in some form. |
@vinothchandar I think the Spark Datasource will use a different approach. IIUC, this PR is trying to solve when the incremental query started from an uncompacted delta commit, which doesn't have a base file for some file groups and leads to missing the log records. For Spark Datasource, we can create a |
|
yes this is trying to fix things for Hive primarily. We can take a separate approach for spark incremental queries |
|
cc @satishkotha to review and suggest what to do in 0.6.0 timelines |
satishkotha
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@n3nash said he's going to take a look at this. I'm just leaving 1 high level question to understand the approach.
| + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); | ||
| // sanity check | ||
| ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit, | ||
| ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit || split instanceof HoodieMORIncrementalFileSplit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level question, is it possible to make baseFile optional in HoodieRealtimeFileSplit instead of creating new class HoodieMORIncrementalFileSplit? We may also have to make changes in RecordReader classes if baseFile is not present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishkotha There are few requirements we need to satisfy in order to support this in HoodieRealtimeFileSplit:
- The start and end time should be honored by the incremental query. If end time is not specified then it can be assumed to be minCommit from (maxNumberrOfCommits, mostRecentCommit). Currently this is not happening as intended.
- The base file and log files can be optional. This can be the case when the boundaries of incremental query filter is such that the start commit time matches a log file and/or an end commit time matches only the base file across file slices. Or the incremental query is touching a FileSlice that is not compacted yet.
When I initially started, I was not sure how big the refactor and testing it would be to achieve both of the above requirements in the same HoodieRealtimeFileSplit. This would also require regression testing of snapshot queries in all query engines and new incremental query path in all query engines. So instead of impacting the snapshot queries code path that is running fine, conservatively, I branched out to make these changes only applicable to incremental query path and intended to consolidate them in long term after stabilizing and gaining more confidence.
|
@garyli1019 Is this PR addressed as part of #1938 or should we revive this ? |
hi @n3nash , #1938 add support for MOR incremental query on Spark DataSource. This PR seems to aim to fix on Hive. |
|
@garyli1019 Thanks for the context. @bhasudha Are you able to revive and rebase this PR ? Once you do that, I can help land this. |
|
@bhasudha I will try to rebase this PR and get it to a working state tomorrow, if you are working on it already, please let me know. |
|
@garyli1019 and @n3nash : can we have some owner for this. if not, I can take it up, but I haven't worked on incremental path before, but willing to expand my knowledge :) |
|
@nsivabalan I am too busy to land hudi at work so I may not able to work on this recently. Please feel free to pick this up if you are interested. |
|
@alexeykudinkin I believe |
|
@yihua it does |
|
Sounds good. Given |
Tips
What is the purpose of the pull request
This commit addresses two issues:
1. Honors end time if less than the most recent completed commit time
2. Doesnt require a base parquet file to be present in case when the begin and end times match only the the deltacommits.
(For example: This pull request adds quick-start document.)
Brief change log
(for example:)
Pending action items before moving to full PR
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.