Skip to content

Conversation

@YuweiXiao
Copy link
Contributor

Tips

What is the purpose of the pull request

(For example: This pull request adds quick-start document.)

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@vinothchandar vinothchandar self-assigned this Dec 21, 2021
@vinothchandar vinothchandar added the rfc Request for comments label Dec 21, 2021
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this. Overall approach looks good to me. We can land this and iterate.

@garyli1019 @leesf any comments?

(Folks, terribly sorry for the delay here. Got bogged down in the last two months. I was hoping someone would have reviewed this :()


However, there are also some limitions.
As described in [RFC-29](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+29%3A+Hash+Index), the one-one mapping between buckets and file groups may cause data skew and doesn't scale well.
One solution to address these problems is allowing one bucket to have multiple file groups, which in turn requires indexing to be performed inside each bucket.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is indeed problematic.


### Hashing Metadata

The hashing metadata will be persisted as files named as `<instant>.hashing_meta` for each partition as we manage hashing for each partition independently.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also a candidate to be moved into the metadata table ultimately? Can we track a JIRA for this under the EPIC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, will create a JIRA to track down this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please resolve once you have filed them!

Assuming thousands of buckets case, the metadata file is around several MB in size.
Only three operations will modify the hashing metadata:

- Initial write to a partition: a fresh new hashing metadata will be created with timestamp `00000000000000`, with default equally divided range mapping and randomly generated file group UUID.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are existing constants around this kind of initial timestamp for bootstrap operation. Worth checking out and ensuring everything works correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and we indeed use the same constant INIT_INSTANT_TS in the current implementation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack


### Bucket Resizing (Splitting & Merging)

Considering there is a semantic similarity between bucket resizing and clustering (i.e., re-organizing small data files), this proposal plans to integrate the resizing process as a subtask into the clustering service.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. may be add a new operation type though ? to distinguish various other write operations that use replacecommit path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were planning to add a new operation type, but decided to reuse clustering in order not to make the whole thing too complex. So in current implementation, clustering on a table using bucket index, will do 1) split a file group into two 2) merge two file groups into a single 3) rewrite a file group based on given sorting policy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me.

![bucket resizing](bucket_resizing.png)

The figure above shows a concurrent write process during the bucket resizing process (i.e., splitting).
Dual-write is activated for buckets that are splitting.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this dual-write only happening when we generating the clustering plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happens when we are executing the clustering. As in single writer case, scheduling a clustering (i.e., generating the plan) is serialized to the writing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can block the writer when generating the clustering plan, do we still need to do the dual-write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The dual-write is necessary to allow asynchronous clustering. Regarding the generation of the plan, it is actually similar to compaction, where we also block writer while scheduling.

Copy link
Member

@garyli1019 garyli1019 Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's say we are splitting bucket2 into bucket2.1 and bucket2.2, if we use dual-write that we will write bucket2.log and bucket2.1.log + bucket2.2.log. we can get the bucket2.log by merging bucket2.1.log + bucket2.2.log. Is that possible that we treat the two files as one? just like we treat the base file + log file as a base file before a pending compaction. Then we don't need the dual-write anymore

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the ultimate solution is to support this through read path, or support this in the file group level. Treat the child buckets as one file group(like the virtual log file as you mentioned) before the clustering finish and separate them after the clustering committed would work. Cause we have the timeline information when constructing the file group. We have a similar logic for pending compaction I believe, like pretend a base file + a log file as a single base file.
What I concerned was the dual-write might cause some extra operational workload and it's not streaming friendly. It's possible that the clustering plan would not be execute in a short amount of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With resizing enabled, file group size will be controlled under a specified threshold (like max parquet file size). So in a normal case (reasonable initial bucket size and write speed), there would be an upper bound of the clustering execution time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the discussion, Gray! I'll put the non-dual-write solution in the RFC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the main point of the non-dual-write is to support reading two filegroup bucket2.1.log / bucket2.2.log + bucket2.parquet as bucket2, right? One thing of the rfc29 follow ups is that read data by bucket not file group to get better read performance by leverage hash distribution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-dual-write is to minimize the impact of resizing on the write performance. And it requires modification of the read path in order to understand the children buckets' logs. Reading based on hash distribution may be more complex in the context of consistent hashing, because partitions now have their own hashing metadata (we cannot align bucket id directly). I haven't dig into read path optimization yet, but I believe it is a great feature to put effects into.

However, the record's new location can only be determined until the clustering finishes, which means the writer will be blocked as it has to wait for the location information.

For tables using Bucket Index, the above conflicting condition can be avoided because record locations are calculated by the hash algorithm.
So even before the clustering finishes, the writer can calculate record locations as long as it knows the latest hash algorithm (in our Consistent Hashing case, it is stored in the clustering plan).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we store the resizing plan in the clustering plan, we will need to combine the hash metadata + all the clustering plans to find out the final hashing algorithm right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. And we also constrain one partition to have a single clustering ongoing, to reduce some complexity here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does a partition have more than one pending clustering plan? If yes, can we update the hash metadata to the latest to keep the following upsert/insert easier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, a partition is only allowed to schedule one clustering, to make sure there is no race condition on the update to the metadata.


| | Consistent Hashing Index | Bucket Index |
|-------------------|--------------------------|--------------|
| lookup a record | log(N) | O(1) |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the cost would be log(N), IIUC it would be log(the total sub-bucket of a root bucket)? Resizing would be a rare case if we set a reasonable initial bucket number, so I expect this algorithm would be very close to the bucket index at lookup performance in most cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N here denotes the number of total bucket for a partition. Yeah, in our tests, the performance is close to bucket index.

Copy link
Member

@garyli1019 garyli1019 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the RFC, looking forward to this feature!

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@garyli1019 garyli1019 merged commit 18cdad9 into apache:master Mar 11, 2022
vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
* [HUDI-2999] rfc for consistent hashing index

* [HUDI-2999] review: add metadata table & non-dual-write solution (virtual log file) for resizing

Co-authored-by: xiaoyuwei <[email protected]>
stayrascal pushed a commit to stayrascal/hudi that referenced this pull request Apr 12, 2022
* [HUDI-2999] rfc for consistent hashing index

* [HUDI-2999] review: add metadata table & non-dual-write solution (virtual log file) for resizing

Co-authored-by: xiaoyuwei <[email protected]>
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice feature 👍


The figure above shows a concurrent write process during the bucket resizing process (i.e., splitting).
Dual-write is activated for buckets that are splitting.
Each incoming record will be written to log files of both the old file group (i.e., FG1) and children file groups (i.e., FG1.1 or FG1.2).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, with this, a regular writer could end up initializing the new file group is it? as of today (w/o this feature), new file groups for clustering is never touched by regular writers and so, initialization does not need to happen upfront. But with the proposal, if clustering/resizing is scheduled, but new files are not initialized, and if a regular writes comes through, will it take care of instantiating the file group?
If yes, do we guard the initialization using a lock? if not, what happens if there are two concurrent writes trying to write to a new file group that is not initialized yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By 'initializing the new file group', do you mean generating file id? During the scheduling of the clustering, we have generated file group id and the new hashing mapping. So when the concurrent writer arrives, it can see all the necessary infos to operate the dual write. By the way, in the implementation, we only allow the clustering to be scheduled when there is no inflight writers, so that all writers can see a consistent table states (with or without resizing/clustering task in the background).

Also, concurrent cases also happen when multiple writers try to write a fresh new table (or a table has not been created yet). In this case, we rely on that only one client can write the hashing metadata successfully (the hashing metadata will have an initial name 00000000.hashing_meta). Other writers that failed to create the hashing metadata will wait util they can see the hashing metadata (because there maybe visibility latency of the underlying object store). Concurrency control will be carried out as usual, which means only one can commit when writings happens to the same file group.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rfc Request for comments

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants