Skip to content

Conversation

@yuzhaojing
Copy link
Contributor

…ce for Hudi

Tips

What is the purpose of the pull request

(For example: This pull request adds quick-start document.)

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@vinothchandar vinothchandar added the rfc Request for comments label Dec 15, 2021
@yuzhaojing yuzhaojing force-pushed the HUDI-3016 branch 2 times, most recently from b4c1b6a to fbe2769 Compare December 17, 2021 07:33
@yuzhaojing yuzhaojing changed the title [HUDI-3016][RFC-42] Proposal to implement Compaction/Clustering Servi… [HUDI-3016][RFC-43] Proposal to implement Compaction/Clustering Servi… Dec 17, 2021
@vinothchandar vinothchandar self-assigned this Dec 25, 2021
@vinothchandar
Copy link
Member

cc @prashantwason @nbalajee and folks at uber, who are looking into similar things.

@vinothchandar
Copy link
Member

@yuzhaojing Can we fold this under the #4718 proposal? It would be awesome to move

  • Metadata
  • Table service management
  • Locking

into a single, highly available metadata layer! This is very exciting, for many reasons, that I can elaborate on

@vinothchandar
Copy link
Member

cc @minihippo what do you think?

See the License for the specific language governing permissions and
limitations under the License.
-->
# RFC-43: Implement Compaction/Clustering Service for Hudi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this may cover other HUDI operations in future (Async Indexing of metadata table partitions, Clean, etc) maybe rename this to "HUDI Table Management Service".


### Meta Table

Meta table has been implemented by using an internal HUDI MOR Table to store the required metadata for the compaction/clustering message. This table will be internal to a dataset and will not be exposed directly to the user to write/modify.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using metadata table for this has some issues:

  1. Scale - Since each write to metadata table will be a deltacommit which may take many seconds (my tests should 5 to 10seconds is minimum), this restricts us to maximum number of tables possible
  2. Job state tracking (start job, complete job, job error) means further writes into the metadata table. This will further lower the number of possible requests / sec that can be served.

I feel there are better approached for saving this information:

  1. Using rocksdb
  2. Using simple "job files" in yaml format in the folder ".hoodie/service". Each job scheduling writes a new job file and on completion we can simply delete the file or archive it. This will be more work than rocksdb.
  3. Using an external store like MySQL or MongoDB (downside is external depedecy ofcourse).

We have a scale of 10000+ tables with ingestion running atleast every 30 mins.

In any case, this could be an Interface with multiple implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review.
I will update this document later, the current implementation is already an Interface with multiple implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. real time db/key value store like rocksDB or mysql might be a good fit as proposed.

Copy link
Contributor Author

@yuzhaojing yuzhaojing Mar 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great idea, we can use a HUDI table by default and provide a MYSQL based implementation as configuration.


### Schema

Being an actual Hoodie Table, the Meta Table needs to have a schema to store the compaction/clustering message.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The service itself will be a stand alone Java program which will be running all the time. So it does not have to be a HUDI table and hence there is no metadata table.

I feel since this is long running Java program, it should be "fast" and hence use an embedded database like rocksdb.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hoodie table is only an implementation for this, I think we can add other database for the service like MySQL / rocksdb.


## Interface design

### Register
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the different between register and schedule?

I feel we can merge the two to just register:
When you register a job, it will be added to the list of jobs and scheduled as necessary. Some jobs may be periodic (e.g. clean and compaction) and some jobs one-shot (e.g. clustering).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my understanding, actual scheduling (strategy of when to compact and which file groups to compact) is not delegated to this service. That still lies w/ original hudi table. Once compaction plan is finalized, we delegate it to this service.

So, not sure if we need to overload this service w/ actual scheduling as well. But I do see there is scope of improvement and add the functionality to the service. may be in first cut, we can leave it to original hudi table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration means that this table is scheduled by the service. This interface is to reserve the scheme of subsequent calls through the service, which will be a follow up.In the current first version we can leave the scheduling to the original HUDI table.

@prashantwason @nsivabalan What do you think?

"base_path":"/hoodie/base_path",
"compaction_config":"compaction_config",
"clustering_config":"clustering_config"
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many other configs will also be required:

  1. retry_on_error: true/false (if auto retry on error)
  2. Periodic: true/false (if should be done periodically)
  3. timeout: timeout for job execution
  4. priority: some priority
  5. context: some config which the user can add and retrieve. This helps us add "tier", "SLA", "owner", "yarn_queue" etc which are useful for the execution engine to schedule the job.

We have over 10000+ tables which belong to different teams. So scheduling jobs on each table should be executed in the table-owners yarn_queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for above.
yarn_queue might def be useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need entire write configs for each table as well.for eg, lock provider configurations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these parameters are necessary.

## Proposal

### Proposal For V1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have a python / GO CLI too for the following:

  1. List all jobs
  2. Add job (useful for testing as well as manually adding operations for tables)
  3. Remove job
  4. Clear jobs for table (etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put the implementation of the client in the next stage, I hope the goal of the first step is to be able to complete the task execution of compaction & clustering job.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuzhaojing please go ahead and incorporate/list out these future items in the RFC accordingly. then you may resolve the comment


### Proposal For V2

- Scalable service
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does scalable service imply? More than one node? More than one web server node? Primary-secondary replication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to implement stateless multi-instance nodes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuzhaojing please add the details in the RFC to clarify these questions and resolve comment after updating.


### Scheduler

Periodically scan the Meta Table and submit compaction/clustering job according to user-specified rules, and need to plug-in the execution engine.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler will need to consider various things:

  1. Priority of jobs
  2. Resources available to schedule jobs (e.g. a bin-fitting algorithm to execute maximum jobs in given resources)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might have to introduce scheduling strategy here. FIFO could be one. priority based could be another. for eg, compaction could have higher priority compared to cleaning and cleaning could take higher priority compared to clustering.
I mean, just an example. users should be able to configure these properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great addition. I think it is necessary to provide a configurable scheduling strategy.
We can implement a FIFO and priority scheduling strategy first, and leave the interface to improve the scheduling strategy in the follow up.


2. Using Async compaction/clustering, in this mode the job execute async but also sharing the resource with HUDI to write a job that may affect the stability of job writing, which is not what the user wants to see.

3. Using independent compaction/clustering job is a better way to schedule the job, in this mode the job execute async and do not sharing resources with writing job, but also has some questions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do add a line here that, users have to enable lock service providers so that there is not data loss. Especially when compaction/clustering is getting scheduled, no other writes should proceed concurrently and hence a lock is required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is very critical!


### Meta Table

Meta table has been implemented by using an internal HUDI MOR Table to store the required metadata for the compaction/clustering message. This table will be internal to a dataset and will not be exposed directly to the user to write/modify.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. real time db/key value store like rocksDB or mysql might be a good fit as proposed.


### Scheduler

Periodically scan the Meta Table and submit compaction/clustering job according to user-specified rules, and need to plug-in the execution engine.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might have to introduce scheduling strategy here. FIFO could be one. priority based could be another. for eg, compaction could have higher priority compared to cleaning and cleaning could take higher priority compared to clustering.
I mean, just an example. users should be able to configure these properties.


4. Follows all the conventions of a HUDI Table

### Scheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we are introducing 2 long running services here, request handler and scheduler?
any particular reason to de-couple them. why not have one service for both? I understand its easy to logically de-couple them. but we need to ensure availability of both services?
not too strong on this. just wanted to hear your thoughts.

Copy link
Contributor Author

@yuzhaojing yuzhaojing Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my concept, the scheduler is not a separate service, it is just a thread in the service.


### Scheduler

Periodically scan the Meta Table and submit compaction/clustering job according to user-specified rules, and need to plug-in the execution engine.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we ensure availability of these services. what incase the service goes down. devs have to manually restart it? until then, these jobs are never going to be tried right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as long as the operation of the job is handled appropriately, the availability of the service should be relatively high, because it just accepts the request and writes it to the storage.
Even if there are 10000+ hudi jobs, it will not exceed the qps that the service can handle. This has been verified on the timeline service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can provide a solution to monitor the service and automatically restart it when the service goes down, or provide users with a way to start it through the supervisor, I would like to hear your thoughts.


Register to Service with CompactionConfig/ClusteringConfig when the job starts. When generating a compaction/clustering plan, report to the compaction/clustering Service address specified in HoodieConfig and request the Service to stop the corresponding compaction/clustering job when the Service is not required or rollback compaction/clustering instant.

### Request Handler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I understand correctly, request handler is very light weight and may not need any compute resources as such. where as the scheduler will be using the cluster compute resources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bcoz, the meta table as such may not have more than million entries even for a large deployment (w/ large no of tables).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think request handler is very light weight and meta table also not large.


## Interface design

### Register
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my understanding, actual scheduling (strategy of when to compact and which file groups to compact) is not delegated to this service. That still lies w/ original hudi table. Once compaction plan is finalized, we delegate it to this service.

So, not sure if we need to overload this service w/ actual scheduling as well. But I do see there is scope of improvement and add the functionality to the service. may be in first cut, we can leave it to original hudi table.

"base_path":"/hoodie/base_path",
"compaction_config":"compaction_config",
"clustering_config":"clustering_config"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for above.
yarn_queue might def be useful.

"base_path":"/hoodie/base_path",
"compaction_config":"compaction_config",
"clustering_config":"clustering_config"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need entire write configs for each table as well.for eg, lock provider configurations?


2. RequestHandler received request but the commit is not completed.

3. Client rollback plan after request to Compaction/Clustering Service.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by client we mean regular writer to original hudi table right. Let's try to keep the overhead less as possible.

I guess here we are employing push based mechanism. Alternatively, can we think a pull based one. Each hudi table will register w/ this service for table services it likes to delegate. for eg, tableA can delegate just cleaning, tableB can delegate cleaning and compaction.
Once registered, regular writers don't make any further communication w/ the service. The request handler will periodically poll the datatable timeline of these tables and check for any requested instants for table services and work on them. i.e. update the internal meta table, etc.
This way, the client does not need to worry about whether request handler is available or not. if not available, should it abort and continue it inline etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good proposal. Such a design will be more flexible. I previously considered that regular polling of the service may concentrate the pressure on the service and cause delays. Performance optimization can be placed in follow up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pull based mechanism works for less number of tables. Scanning 1000s of tables for possible services is going to induce lots of load of listing.

The push based model is actually very elegant. Let me explain how I envision it:

Assume a regular table which has some cleaning setting. In the current code, we schedule the clean and then execute the clean in the same Spark application. When hoodie.service.enable=true, the scheduling of clean will be as earlier but the execution part (HoodieWriteClient.clean(...) will detect that TableServices are enabled and instead of running clean, it will schedule it with the TableService by calling in the API endpoint.

Benefits:

  1. Easy Integration with HUDI current functionality
  2. No extra load to list datasets to find operations to run
  3. Confirms to all HUDI write config settings (e.g. when to clean, when to compact)
  4. In case of downtime of TableServices, one can simply disable hoodie.service.enable and revert to inline functionality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, we can implement a Java client for table services - HoodieTableServiceClient - which can enable scheduling any type of services from within the logic of table operation.

E.g. Assume the HoodieWriteConfig enable record level index. Since the index does not exist yet, it needs to be bootstrapped. There are two possibilities here:

  1. The user manually runs async-indexing to initialize the record index
  2. The metadata table code detects that record index is enabled but not present and calls HoodieTableServiceClient.scheduleService(...) to enable async indexing automatically.

The 2 scenario above can be extended to various intelligent cases:

  1. The ingestion side code decided when it is time to compact the MOR table (based on some metric like read time / write time) rather than a fixed schedule like after 10 deltacommits.

@yuzhaojing
Copy link
Contributor Author

@prashantwason @nsivabalan Thanks for the review, I'll be updating the RFC next week, looking forward to more comments from you.

Copy link
Member

@prashantwason prashantwason left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments as per our requirements.

- Independent compaction/clustering job, execute an async compaction/clustering job of another application.

With the increase in the number of HUDI tables, due to a lack of management capabilities, maintenance costs will become
higher. This proposal is to implement an independent compaction/clustering Service to manage the Hudi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should generalize this PR to include all type of "services" on HUDI Tables.

A service on HUDI table can be defined as any operation which needs to be run async. This includes compaction, clustering, async-indexing, cleaning, validation, etc.

Service definition can take various key-value parameters like:

  • basepath
  • priority
  • mainClass (java class to execute to run the service)
  • context (key=value params specific to the service execution)

This allows adding more services in the future and running various other management operations on HUDI datasets.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree, we will evolve in this direction!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the comment "maintenance cost will become higher". Can you explain?


2. RequestHandler received request but the commit is not completed.

3. Client rollback plan after request to Compaction/Clustering Service.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pull based mechanism works for less number of tables. Scanning 1000s of tables for possible services is going to induce lots of load of listing.

The push based model is actually very elegant. Let me explain how I envision it:

Assume a regular table which has some cleaning setting. In the current code, we schedule the clean and then execute the clean in the same Spark application. When hoodie.service.enable=true, the scheduling of clean will be as earlier but the execution part (HoodieWriteClient.clean(...) will detect that TableServices are enabled and instead of running clean, it will schedule it with the TableService by calling in the API endpoint.

Benefits:

  1. Easy Integration with HUDI current functionality
  2. No extra load to list datasets to find operations to run
  3. Confirms to all HUDI write config settings (e.g. when to clean, when to compact)
  4. In case of downtime of TableServices, one can simply disable hoodie.service.enable and revert to inline functionality.


2. RequestHandler received request but the commit is not completed.

3. Client rollback plan after request to Compaction/Clustering Service.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, we can implement a Java client for table services - HoodieTableServiceClient - which can enable scheduling any type of services from within the logic of table operation.

E.g. Assume the HoodieWriteConfig enable record level index. Since the index does not exist yet, it needs to be bootstrapped. There are two possibilities here:

  1. The user manually runs async-indexing to initialize the record index
  2. The metadata table code detects that record index is enabled but not present and calls HoodieTableServiceClient.scheduleService(...) to enable async indexing automatically.

The 2 scenario above can be extended to various intelligent cases:

  1. The ingestion side code decided when it is time to compact the MOR table (based on some metric like read time / write time) rather than a fixed schedule like after 10 deltacommits.


## Implementation

![](service.jpg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following interfaces would be required:

  1. API (REST / GRPC) - We would need GRPC
    • The Request Handler would be common
  2. Execution Engine:
    • This is the component which executes a Spark job (e.g. Spark-submit) and returns result
  3. Metrics / alerts may also need to be added

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We can implement REST API for phase1, and have a common Request Handler.
    2 and 3 is very necessary, which is also reflected in the RFC.

@yuzhaojing yuzhaojing changed the title [HUDI-3016][RFC-43] Proposal to implement Compaction/Clustering Servi… [HUDI-3016][RFC-43] Proposal to implement Table Management Service May 10, 2022
@yuzhaojing
Copy link
Contributor Author

In a production environment with limited resources, the concept of priority is required, and different priorities are required for different tables and different actions.
Should we provide an interface of schedulation strategy and use FIFO as default?

@yuzhaojing
Copy link
Contributor Author

yuzhaojing commented May 11, 2022

@vinothchandar @prashantwason Would like to hear your thoughts on this!

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@vinothchandar
Copy link
Member

Taking this over, given @prashantwason is now on a break.

@vinothchandar vinothchandar added the priority:blocker Production down; release blocker label Jul 13, 2022
- Independent compaction/clustering job, execute an async compaction/clustering job of another application.

With the increase in the number of HUDI tables, due to a lack of management capabilities, maintenance costs will become
higher. This proposal is to implement an independent compaction/clustering Service to manage the Hudi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the comment "maintenance cost will become higher". Can you explain?

- Perfect metrics and reuse HoodieMetric expose to the outside.

- Provide automatic failure retry for compaction/clustering job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to this - We should also think about a pluggable triggering strategies for any table management service to run. Eventually, we can be more intelligent when we trigger a service I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the comment "maintenance cost will become higher". Can you explain?

If we have more than 1000 tables, each configured with a compaction and a clustering task, then we need to manage more than 2000 table service tasks, and this management cost is very huge. Including but not limited to task failure retry, exception cause troubleshooting, resource management.

In addition to this - We should also think about a pluggable triggering strategies for any table management service to run. Eventually, we can be more intelligent when we trigger a service I guess.

We plan to support triggering service execution via API, is that what you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pluggable triggering strategies

@yuzhaojing this is for scheduling strategy, like priority-based, FIFO, etc, and make it customizable via pluggable interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expected to provide pluggable interfaces to support various scheduling strategies.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuzhaojing to resolve this comment, please add the details for "pluggable triggering strategies" in the doc

## Implementation

### Processing mode
Different processing modes depending on whether the meta server is enabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that Table Management Service polling the datasets and determining what service to run is a scalable design. We should never do this. We need a scalable persistent queue implementation to schedule "Table Service Job" - This could be file system based or RPC based.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a scalable persistent queue implementation to schedule

i think this is the idea below, whether with metaserver or not, TMS is to be triggered by pushing events. Metaserver contains an event bus to dispatch events. In case of no metaserver, TMS can internally keep a queue to buffer the incoming request, as in async requests. This can be clarified in the section below.

@codope codope added priority:critical Production degraded; pipelines stalled and removed priority:blocker Production down; release blocker labels Jul 20, 2022
Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall i think there should be more design details to be illustrated on the RFC document. For example, would like to see the overview of inter-components interactions/workflows for phase 1, phase 2 (and more future phases?), best if illustrated via diagrams. Please also resolve previous comments wherever applicable.

An important aspect missing from the RFC is how users can package and deploy TMS. Are people expected to run a bundle jar? Or are we providing scripts to run the server? the deployment mode is crucial to promoting this feature.

Comment on lines 82 to 83
- The meta server provides a listener that takes as input the uris of the Table Management Service and triggers a callback through the hook at each instant commit, thereby calling the Table Management Service to do the scheduling/execution for the table.
![](service_with_meta_server.png)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the design choice here? RPC or REST?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current design choice is REST.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls make it explicit

Comment on lines 85 to 88
- Do not enable meta server
- for every write/commit on the table, the table management server is notified.
We can set a heartbeat timeout for each hoodie table, and if it exceeds it, we will actively pull it once to prevent the commit request from being lost
![](service_without_meta_server.png)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the default heartbeat timeout? how does the configuration look like? how is TMS registered with this heartbeat info? let's be explicit with design details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At present, this design has been modified to bring all pending instant every time the client requests TMS, and update the RFC document later.

Comment on lines +202 to +212
- | name | type | comment |
| ------------ | ------ | --------------------- |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix the markdown table. it's not showing properly

hoodie.table.service.clustering.enable=true
hoodie.table.service.clean.enable=true

## Proposal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not proposal section, this is implementation plan. The whole document is a proposal.

5. API(only REST)
6. Writer

**Landing plan: 0.12**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
**Landing plan: 0.12**
**Taget: 0.12**

Comment on lines 313 to 316
This RFC aims to implement a new Service to manager the Hudi table compaction and clustering action, to test this
feature, there will be some test tables trigger compaction and clustering action to Service with unit tests for the
code. Since this is an entirely new feature, I am confident that this will not cause any regressions during and after
roll out. No newline at end of file
Copy link
Member

@xushiyan xushiyan Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test plan is not only about regression or impacting existing services. For this feature itself, what kinds of tests can be carried out? what scenarios will be covered in the OSS CI? please list out relevant items accordingly.

- Perfect metrics and reuse HoodieMetric expose to the outside.

- Provide automatic failure retry for compaction/clustering job.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuzhaojing to resolve this comment, please add the details for "pluggable triggering strategies" in the doc

## Implementation

### Processing mode
Different processing modes depending on whether the meta server is enabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a scalable persistent queue implementation to schedule

i think this is the idea below, whether with metaserver or not, TMS is to be triggered by pushing events. Metaserver contains an event bus to dispatch events. In case of no metaserver, TMS can internally keep a queue to buffer the incoming request, as in async requests. This can be clarified in the section below.

Comment on lines +82 to +83
- The pull-based mechanism works for fewer tables. Scanning 1000s of tables for possible services is going to induce
lots of a load of listing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we're not doing pull, please make it explicit as this confuses people and makes it sound like you propose pull-based for fewer tables. You can say "As pull-based mechanism won't scale for 1000s of tables, we ..."

### Processing flow

- If hudi metaserver is used, after receiving the request, the table management server schedules the relevant table
service to the table's timeline
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schedules the relevant table service to the table's timeline

need to make it explicit: this is table timeline managed in metaserver, right? it can confuse with the table timeline on storage. Should also mention how metaserver interact with storage in this case.

- Persist each table service into an instance table of Table Management Service
- notify a separate execution component/thread can start executing it
- Monitor task execution status, update table information, and retry failed table services up to the maximum number of
times
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not combine this "Processing flow" section with the "Processing mode" section above? they are basically talking about the same thing. We need the end-to-end flow for both cases, of which the no-metaserver case is missing in "Processing flow" section

5. API(only REST)
6. Writer

**Target: 0.12**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs update

@nsivabalan
Copy link
Contributor

nsivabalan commented Oct 5, 2022

One high level comment/ask. not sure if its doable already. can regular writers take care of scheduling table services by themselves and only delegate execution to TMS? I am thinking from a deltastreamer standpoint. As of now, Hudi could achieve async non blocking compaction/clustering, bcoz, regular writer will take care of scheduling (while table is frozen (no other writes in flight)) and delegate the execution to a separate thread. if we delegate the scheduling also to TMS, we can't guarantee that there won't be any other inflight operation while scheduling compaction (on which case, scheduling could fail). So, instead of a separate thread executing(as of master), we can let TMS handle the execution alone. This will keep the deltastreamer job lean (less resources), while TMS takes up heavy compute job of executing the table service.

From what I glean, only if scheduling is done by TMS, it goes into its backing storage which the scheduler will be polling. if scheduling is done by regular writer, not sure how this will be handled. sorry, I haven't taken a look at the impl yet.

Is this functionality supported? if not, we should look to support this.

@nsivabalan
Copy link
Contributor

Infact, this could also be a problem w/ other writes as well. when scheduling compaction, hudi expects to not have any other delta commits in flight. So, better to let one of the writers to take care of scheduling and delegate the execution to TMS. probably this is specific to compaction. Wanted to remind just incase.

@xushiyan
Copy link
Member

if we delegate the scheduling also to TMS, we can't guarantee that there won't be any other inflight operation while scheduling compaction

@nsivabalan TMS is a downstream listener to metaserver (or any timeline server used if no metaserver) so TMS is aware of all inflight commits on the registered tables, and we should use that info to generate plans.

can regular writers take care of scheduling table services by themselves and only delegate execution to TMS?

Let's clarify: TMS is responsible for plan generation and scheduling, while execution is send to a separate cluster, and TMS is monitoring the execution. You're proposing to basically delegating execution, it's possible but not really making use of TMS capabilities. I'd suggest full delegation to TMS; we're starting a server, so making full use of it will justify the design and the cost.

@xushiyan xushiyan changed the title [HUDI-3016][RFC-43] Proposal to implement Table Management Service [HUDI-3016][RFC-43] Proposal to implement Table Service Manager May 10, 2023
@zyclove
Copy link

zyclove commented Dec 24, 2023

@xushiyan @yuzhaojing @danny0405
Hi, Can version 1.0 support this feature? This feature is very necessary. Please push forward the progress.

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Feb 26, 2024
@yihua yihua added the area:table-service Table services label Mar 26, 2024
@vinothchandar vinothchandar removed their assignment May 23, 2025
@vinothchandar
Copy link
Member

@yuzhaojing are you still interested in pushing this ..

There is some interest from @suryaprasanna & uber team to contribute.

I will wait for few days and close this if there is no response.. We can open a new RFC.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:table-service Table services priority:critical Production degraded; pipelines stalled rfc Request for comments size:L PR with lines of changes in (300, 1000]

Projects

Status: 👤 User Action
Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

10 participants