Skip to content

Conversation

@TJX2014
Copy link
Contributor

@TJX2014 TJX2014 commented Sep 5, 2022

Change Logs

Make hudi-flink of mor table also will gen CreateHandle with base bucket not exist.
Open deduplicate function for mor table.

Impact

The duplicate issue is from hudi-flink mor table, which first append log, but not compact right now, so the bucket num is not in base file;
When spark use loadPartitionBucketIdFileIdMapping of org.apache.hudi.index.bucket.HoodieSimpleBucketIndex, it will not find the bucket num which written by hudi-flink, so it will generate a new one which not consistent with hudi-flink.
After this change, when hudi-flink write mor table use bucket index, it will firstly consider to write base parquet file after deduplicate, if base file exists, it will change to write log file, follow spark way seems more stable for mor table.

Risk level: none | low | medium | high
None.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@minihippo
Copy link
Contributor

minihippo commented Sep 5, 2022

As discussed in the closed pr, I think consider the log only fileslice when loading bucketId-fileId mapping is better, because

  1. for flink writing, log first is more intuitive.
  2. bucket Index can support writing to log first theoretically(although it's not supported right now), both spark and flink. So the problem will be met when using spark writing either

@TJX2014
Copy link
Contributor Author

TJX2014 commented Sep 5, 2022

There are two following considerations that I think follow spark is more graceful:

  1. The log file of mor table is a temporary state, both mor and cow has base file, but only mor has log file, so log base is not commonly, we should firstly consider commonly base file, right ?
  2. Log is not stable, which should merge to base file, if we firstly consider log file, it will make our hudi system not stable. Our target is make hudi stable, right?

@minihippo
Copy link
Contributor

There is a property named canIndexLogFile in HoodeIndex, which means can write to log first. It's true default for HbaseIndex and ConsistentBucketIndex, and it also can be true default for HoodieSimpleBucket. So log file only is a common state for writing to a new partition/table until compaction, whether use Spark or Flink. Fixing the way that spark loading map is once and for all.

@TJX2014
Copy link
Contributor Author

TJX2014 commented Sep 5, 2022

canIndexLogFile

Thanks for your suggestion, I will give another pr fix in spark side too, in spark side, load the index both consider log and base, considering canIndexLogFile.

@hudi-bot
Copy link
Collaborator

hudi-bot commented Sep 5, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@yihua yihua added priority:high Significant impact; potential bugs engine:flink Flink integration index labels Sep 5, 2022
@danny0405
Copy link
Contributor

When spark use loadPartitionBucketIdFileIdMapping of org.apache.hudi.index.bucket.HoodieSimpleBucketIndex, it will not find the bucket num which written by hudi-flink

Seems we should fix the code in spark side right ?

@TJX2014
Copy link
Contributor Author

TJX2014 commented Sep 8, 2022

I will fix give pr fix in spark side too, but in flink side, I think deduplicate should also open as default option for mor table , when duplicate write to log file, very hard for compact to read, also lead mor table not stable due to the duplicate record twice read into memory.

@minihippo
Copy link
Contributor

but in flink side, I think deduplicate should also open as default option for mor table , when duplicate write to log file, very hard for compact to read, also lead mor table not stable due to the duplicate record twice read into memory.

Do you mean that there are two client writing to the same partition at the same time?

@danny0405
Copy link
Contributor

I will fix give pr fix in spark side too, but in flink side, I think deduplicate should also open as default option for mor table , when duplicate write to log file, very hard for compact to read, also lead mor table not stable due to the duplicate record twice read into memory.

The initial idea is to keep the details of the log records, such as in the cdc change log feed.

@TJX2014
Copy link
Contributor Author

TJX2014 commented Sep 9, 2022

but in flink side, I think deduplicate should also open as default option for mor table , when duplicate write to log file, very hard for compact to read, also lead mor table not stable due to the duplicate record twice read into memory.

Do you mean that there are two client writing to the same partition at the same time?

Not exactly, if we deduplicate the record in memory, and then write to log is elegant for MOR because result is same. As @danny0405 say, in cdc situation, we need to retain origin records, not compact firstly in memory, which is acceptable.

@danny0405
Copy link
Contributor

Guess we can close this PR now, feel free to reopen it if you still have questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:flink Flink integration priority:high Significant impact; potential bugs

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

5 participants