Skip to content

Conversation

@flashJd
Copy link
Contributor

@flashJd flashJd commented Aug 7, 2022

when we incremental query a hudi table, if

// 1. there are files in metadata be deleted;
// 2. read from earliest
// 3. the start commit is archived
// 4. the end commit is archived
this query will turns to a fullTableScan.
In this condition, the endInstant parameter in getInputSplits() will be the latest
instance, cause to scan the latest fileSlice(which may be larger as time goes by) and then open it and filter the record using instantRange.
Considering a query scenario, read.start-commit is archived and read.end-commit is in activeTimeLine, this is a fullTableScan. But we can set the endInstant parameter to read.end-commit, not the lastest instance, so as to read less data, more over, if there is an upsert between read.end-commit and the lastest instance, if we use lastest instance as endInstant, we will lose the insert data between read.start-commit and read.end-commit(the data is upserted, so the original data is missing in the lastest instance).
Considering another query scenario, read.start-commit is archived and read.end-commit is also archived, this is a fullTableScan. if read.end-commit is long along and be cleaned, but there is savepoint after it, we can use this savepoint to incremental query the table, not care about the data inserted or upserted after the savepoint.
The core idea is making the searching fileSlice adjacent to read.end-commit.
ps. This PR is based on #6096

@flashJd
Copy link
Contributor Author

flashJd commented Aug 7, 2022

@danny0405 can you help review it, thx.

@danny0405
Copy link
Contributor

If the end commit is active and is used as the filtering instant for scan, shouldn't getLatestFileSliceBeforeOrOn work here ?

@flashJd
Copy link
Contributor Author

flashJd commented Aug 8, 2022

If the end commit is active and is used as the filtering instant for scan, shouldn't getLatestFileSliceBeforeOrOn work here ?

yes, it works, but it can't handle the condition when end commit is archived, if we use getLatestFileSliceBeforeOrOn, it may be a savepoint or just no fileSlice(be cleaned), thus not right, getLatestFileSliceAfterOrOn can hande it.

if the end commit is active, getLatestFileSliceAfterOrOn may get the right fileSlice or the next fileSlice.

if getLatestFileSliceAfterOrOn not found a fileSlice, we must then search before/reverse, as it may have a fileSlice before which is in the incremental query range(it must be the latest fileSlice in this fileGroup), we should judge if this fileSlice is before the read.start-commit, if it is, we skip this fileSlice, otherwise we use this fileSlice to search. so why getLatestFileSliceAfterOrOnThenBefore

@danny0405
Copy link
Contributor

danny0405 commented Aug 9, 2022

If the end commit is active and is used as the filtering instant for scan, shouldn't getLatestFileSliceBeforeOrOn work here ?

yes, it works, but it can't handle the condition when end commit is archived, if we use getLatestFileSliceBeforeOrOn, it may be a savepoint or just no fileSlice(be cleaned), thus not right, getLatestFileSliceAfterOrOn can hande it.

if the end commit is active, getLatestFileSliceAfterOrOn may get the right fileSlice or the next fileSlice.

if getLatestFileSliceAfterOrOn not found a fileSlice, we must then search before/reverse, as it may have a fileSlice before which is in the incremental query range(it must be the latest fileSlice in this fileGroup), we should judge if this fileSlice is before the read.start-commit, if it is, we skip this fileSlice, otherwise we use this fileSlice to search. so why getLatestFileSliceAfterOrOnThenBefore

In general, let's not make things complex here, the only right way for multi-versioning is the timeline instant, one instant has its counterpart fs view, we should not dig into file slices for different version for one snapshot query even though there is performance gain.

If start/end commit are both archived, very probably they are cleaned also, we have 2 choices here

  1. read the save point commit if there is with greater timestamp
  2. read the latest commit

If start commit is archived but end commit is active, there is also possibility that the active instants with smaller timestamp that end commit is cleaned, we should check that first before we use the end commit as the fs view version filtering condition, that makes the thing more complex too.

So, i would -1 if this is only an improvement not a bug fix.

@flashJd
Copy link
Contributor Author

flashJd commented Aug 9, 2022

If the end commit is active and is used as the filtering instant for scan, shouldn't getLatestFileSliceBeforeOrOn work here ?

yes, it works, but it can't handle the condition when end commit is archived, if we use getLatestFileSliceBeforeOrOn, it may be a savepoint or just no fileSlice(be cleaned), thus not right, getLatestFileSliceAfterOrOn can hande it.
if the end commit is active, getLatestFileSliceAfterOrOn may get the right fileSlice or the next fileSlice.
if getLatestFileSliceAfterOrOn not found a fileSlice, we must then search before/reverse, as it may have a fileSlice before which is in the incremental query range(it must be the latest fileSlice in this fileGroup), we should judge if this fileSlice is before the read.start-commit, if it is, we skip this fileSlice, otherwise we use this fileSlice to search. so why getLatestFileSliceAfterOrOnThenBefore

In general, let's not make things complex here, the only right way for multi-versioning is the timeline instant, one instant has its counterpart fs view, we should not dig into file slices for different version for one snapshot query even though there is performance gain.

If start/end commit are both archived, very probably they are cleaned also, we have 2 choices here

  1. read the save point commit if there is with greater timestamp
  2. read the latest commit

If start commit is archived but end commit is active, there is also possibility that the active instants with smaller timestamp that end commit is cleaned, we should check that first before we use the end commit as the fs view version filtering condition, that makes the thing more complex too.

So, i would -1 if this is only an improvement not a bug fix.

image
If we read the latest commit, then the upsert between read.end-commit and latest commit will shadow the insert
betweent read.start-commit and read.end-commit, thus incremental query lost the data, is this a bug or not?

I draw a sketch above, suppose instant35 is a savepoint and we retain 40 commits for clean,above is the timeline
1)if read.end-commit <= instant30, we can use instant35(savepoint) to do incremental query
2)if instant30 < read.end-commit <= instant35, we use [slice35,slice58,slice35,slice30] to do query
3)if instant35 < read.end-commit <= instant58, we can use instant60(the earliest uncleaned instant) to do query
4)if instant58 < read.end-commit <= instant60, we use [slice60, slice61?, slice60, slice30] to do query
5)if read.end-commit > instant60, we use the original getLatestFileSliceBeforeOrOn logic

As you said "we should not dig into file slices for different version for one snapshot query", I understand it as we should
use one instant's fs view(flieSlice combination), not recombine as 2) and 4)

Then consider another solution, get the savepoint instant or earliest uncleaned instant just after the read.end-commit, then use
this instant as the parameter of getLatestFileSliceBeforeOrOn, the problem is how we can easliy get the
earliest uncleaned instant just after the read.end-commit, as we don't have a timeline which is dataActive?

Can we just align the FirstNonSavepointActiveCommit to EarliestNonSavepointUncleanedCommit? Then will be simpler in many
processing logic.

@hudi-bot
Copy link
Collaborator

hudi-bot commented Aug 9, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@flashJd
Copy link
Contributor Author

flashJd commented Aug 11, 2022

@danny0405 looking forward to your reply

@yihua yihua added engine:flink Flink integration reader-core priority:high Significant impact; potential bugs labels Sep 6, 2022
@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Feb 26, 2024
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 it seems that Hudi 1.0 already solves the problem with completion time and new file slicing logic. Let me know if this is something specific to Hudi on Flink.

@danny0405
Copy link
Contributor

danny0405 commented Sep 11, 2024

yeah, we have switched to completion time semantics since 1.0 for Flink in here:

String maxInstantTime = timeline.getInstantsAsStream()
, but spark still got this issue.

@yihua
Copy link
Contributor

yihua commented Sep 12, 2024

Makes sense. We have HUDI-8141 and HUDI-7227 to track the fix of incremental queries on Spark. Closing this PR on Flink.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:flink Flink integration priority:high Significant impact; potential bugs size:M PR with lines of changes in (100, 300]

Projects

Status: 🚧 Needs Repro
Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

4 participants