-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-4148] Preparations and client for hudi table manager service #5681
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6566d73 to
f486cdd
Compare
|
can you please update the description and other elements in the template for the PR |
Sure, I will add description for PR later. |
| @Override | ||
| public HoodieCleanMetadata execute() { | ||
| if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.clean.name())) { | ||
| LOG.warn("Compaction delegate to table management service, do not clean for client!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo, -> 'clean delegate to'
| private void submitClusteringToService() { | ||
| HoodieTableMetaClient metaClient = table.getMetaClient(); | ||
| List<String> instantsToSubmit = metaClient.getActiveTimeline() | ||
| .filterPendingReplaceTimeline() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace instant may also include insert overwrite operation. Will the manager service double-check to ensure it is a clustering instant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we will ensure clustering instant in replaceMetadata operationType. In table manager service we can only deserialize replaceMetadata once.
| } | ||
| return Option.empty(); | ||
|
|
||
| if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.compaction.name())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could abstract this as an interface in config, e.g., isTableManagerIncludeAction(xxx)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, update later.
| // Will be set when auto-commit happens | ||
| private boolean isCommitted; | ||
| // Will be set when table management service enable | ||
| private boolean isEmpty; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isEmpty seems a little bit confusing. How about isSkipped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, update later.
| return option; | ||
| } | ||
|
|
||
| private void submitCleanToService() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These submit function will submit all pending clean/compaction/clustering instant to the service. What happens if we submit the same instant multiple times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the table management service, the received instant will be deduplicated, and the repeatedly accepted instant will not be repeated.
|
@yuzhaojing Can you please fill out the pull request template so we have more context heading into the review? |
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. Overall - I would prefer is most changes are the client level, rather than the action executors. Overloading code paths there, will make it harder to maintain going forward IMO.
| return metadataConfig; | ||
| } | ||
|
|
||
| public HoodieTableManagerConfig getTableManagerConfig() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have this as a separate module? That is build on top of hudi-client-common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think TableManagerConfig is a public config, clients can get it through HoodieWriteConfig, so I think the config in HoodieWriteConfig is better.
|
|
||
| public class HoodieTableManagerClient { | ||
|
|
||
| private static final String BASE_URL = "/v1/hoodie/serivce"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/v1/hudi?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the Typo on word service.
|
|
||
| private static final String BASE_URL = "/v1/hoodie/serivce"; | ||
|
|
||
| public static final String REGISTER = String.format("%s/%s", BASE_URL, "register"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a _ENDPOINT to the names, so its clear what this is?
REGISTER_ENDPOINT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will update it.
|
|
||
| private static int failTimes; | ||
|
|
||
| private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably need to rebase this to log4j2 now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do this in another MR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets actually fix this in this PR please, now that log4j2 is landed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
| try { | ||
| executeRequest(SUBMIT_CLEAN, StringUtils.join(instantsToSubmit.toArray(new String[0]), ",")); | ||
| } catch (Exception e) { | ||
| LOG.error("Failed to schedule clean to service", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we reuse all this code around exception handling? there is 5 lines of code across the submitXXX methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll reuse this code.
| failTimes = 0; | ||
| } | ||
|
|
||
| private void checkTolerableSchedule(Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overalll I wonder if there is a simpler way of implementing retryable requests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already a retry process for request failure, checkTolerableSchedule is just a process to prevent all tasks from failing when table manager service in case have an exception.
| @Override | ||
| public HoodieCleanMetadata execute() { | ||
| if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.clean.name())) { | ||
| LOG.warn("Compaction delegate to table management service, do not clean for client!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is cleaning? Can we check all log messages like this
| } | ||
| return Option.empty(); | ||
|
|
||
| if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.clean.name())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of leaking the table management service into the action executors., can we layer it on top? Introduce a higher level write config say hoodie.enable.table.services, (default: true) which when set to false will not do any scheduling/execution of any table services. This check can be added at the BaseHoodieWriteClient level when the calls are made for clean, scheduleClean, compact, scheduleCompact etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can then instruct the users to set hoodie.enable.table.services=false when running with Table management services. Another point here is that we should check for some lock manager being configured when operating in this mode, so that there is no race between delta commits and compaction scheduling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cleaner implementation, I will modify the implementation.
But I don't want to introduce a lock manager in the current version, it is too heavy for the user.
|
|
||
| private void submitCleanToService() { | ||
| HoodieTableMetaClient metaClient = table.getMetaClient(); | ||
| List<String> instantsToSubmit = metaClient.getActiveTimeline() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm. based on this, do you intend to have the scheduling done by the writer still and only move execution to the Table manager service?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We plan to schedule plan and move execution to tms by writer in tms phase1, and support schedule in tms in tms phase2. Because the current concurrency control needs to introduce external dependencies, but we can reserve parameters in the core api. WDYT
| @Override | ||
| public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() { | ||
| if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.replacecommit.name())) { | ||
| LOG.warn("Clustering delegate to table management service, do not cluster for client!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid invoking this code path from the client level, rather than returning an empty writeMetadata object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will update it.
|
@vinothchandar Thanks for review, I'll fill in the template later and consider an updated scenario based on the comments. |
|
@vinothchandar Modified some code, Major changes:
|
|
@suryaprasanna Could you please take a look? |
75571c4 to
f50bb15
Compare
| } else { | ||
| runAnyPendingCompactions(table); | ||
| inlineCompaction(extraMetadata); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L552-L557 Extract into a method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update.
| scheduleClustering(extraMetadata); | ||
| } else { | ||
| runAnyPendingClustering(table); | ||
| inlineClustering(extraMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update.
| } | ||
|
|
||
| if (delegateToTableManagerService(config, ActionType.clean)) { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is null returned here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the clean action has not been carried out at this time. So the cleanMetadata is null.
| public static final String SUBMIT_COMPACTION = String.format("%s/%s", BASE_URL, "compact/submit"); | ||
| public static final String REMOVE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/remove"); | ||
|
|
||
| public static final String SUBMIT_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/submit"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
submit -> create remove -> delete ?
https://cloud.google.com/apis/design/standard_methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update.
|
|
||
| public class HoodieTableManagerClient { | ||
|
|
||
| private static final String BASE_URL = "/v1/hoodie/serivce"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the Typo on word service.
| public void submitClustering() { | ||
| try { | ||
| metaClient.reloadActiveTimeline(); | ||
| String instantRange = StringUtils.join(ClusteringUtils.getPendingClusteringInstantTimes(metaClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that whenever an action is scheduled, instead of sending that particular instant, we are sending all the pending instants for that action as part of the request. This would mean the remote client has to iterate over the instants and update the contents to the mysql/hudi db.
I am guessing this was done so that in case an action is scheduled but failed to update the table service, then the .requested instant will be left on the timeline.
One way we are trying to workaround this is by sending the request before creating .requested files.
Timeline of actions will look like this,
- Generate clustering/compaction plan for an instant
x - After plan is generated, send request to table service for the instant
x - Create x.clustering.requested or x.compaction.requested file on the timeline
In case the request is successfully send to table service and the .requested file creation failed then it will be treated as a no-op by table service when it schedules the job. There is a possibility that table service can execute right after point 2, but it can be handled by introducing some delay.
This above approach is useful, if we want to send any spark job specific parameters to the table service for executing that instants, example no. of executors. Since, no.of executors can vary depending on the plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause two problems:
- When the table service is unavailable, the timeline will not be able to generate the requested instant, so it will not be able to operate when the table service is restored, which has a greater impact on the day level write job.
- When the table service receives the request, it needs to scan the timeline corresponding to the hudi table to determine whether it is invalid, which will make table management service appear a lot of additional operations.
Considering these two points, I think sending all the pending instant is a better way. WDYT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is my understanding, we have 3 components, scheduler(Hudi table via write API), storage (state manager or metaserver(can be mysql or hudi table)) and execution(Table services).
When the table service is unavailable, the timeline will not be able to generate the requested instant, so it will not be able to operate when the table service is restored, which has a greater impact on the day level write job.
- If table services is down then there is no one to execute the plan, so no need to create requested file. Initially table generated the plan, similarly it will try to regenerate every time until the table services accepts the request.
- There is an advantage to this approach, the latest plan that we generate is also going to include latest changes happened on the table during the period the table service is down.
When the table service receives the request, it needs to scan the timeline corresponding to the hudi table to determine whether it is invalid, which will make table management service appear a lot of additional operations.
- I agree, Table service has to be as dumb as possible. Here, it's duty is to read from the storage and execute the spark job. So, basically we are starting the spark job, and the job will not do anything because it cannot find the .requested file for the instant. So, it will just quit.
Another approach, I can think of is to have async thread running that starts on write path and incrementally post the requests to table service reading through the timeline. We can even update file's extrametadata map with any information if needed.
I am inclined with your approach because it is a clean way to do it, but we need to address following things,
- I have seen people configure clustering on non-partitioned tables. If we schedule clustering on them and fail to update the table services then no other clustering job will be able to schedule on them because we don't schedule clustering on pending file groups under clustering. So, the replacecommit.requested can be left alone.
- Passing job specific parameters like executor count, either we need to pass this information while calling the table service APIs or the table service API need to read through the plan before scheduling a spark job. Ideally we do not want table services to read through clustering plans.
- Also, we can schedule clustering jobs on a small group of filegroups one after the other and these clustering plans can be smaller in size and more in number. If table service is running slow, more and more .requested file will be created and more requests will be send to table service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on keeping the table manager/service decouple as discussed.
-
Wont' 1 be handled automatically because we resend all the pending instants again to the table manager.
-
I would imagine 2 to be a config specified at the table manager service level and not passed through from the writer/
-
there is no backpressure mechanism like what you are describing today. I think your point exists in async clustering/compaction today, regardless of table service or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on keeping the table manager/service decouple as discussed.
- Wont' 1 be handled automatically because we resend all the pending instants again to the table manager.
- I would imagine 2 to be a config specified at the table manager service level and not passed through from the writer/
- there is no backpressure mechanism like what you are describing today. I think your point exists in async clustering/compaction today, regardless of table service or not?
+1 from me.
| } | ||
| } | ||
|
|
||
| throw new IOException(String.format("Failed request to table management service %s after retry %d times", url, requestRetryLimit)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it's better to put this exception on line 110
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, will fixed.
| } | ||
| } | ||
|
|
||
| private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we create a clean TableServiceClient abstraction and move all the compaction/clustering/cleaning api into that. We can have multiple engine specific implementation. Thay way we dont combine Write APIs and table services API and avoid multiple if checks here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good suggestion, Can we refactor this later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just needto co ordinate with RDD APl users like uber, Since it's a breaking change. We can file a code cleanup follow-up JIRA, Orthogonal to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading this again, a separate client impl is probably cleaner even for this PR. These if checks will grow harder to maintain over time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have built a Jira to trace, we can do a refactoring of this later, Jira: https://issues.apache.org/jira/browse/HUDI-4407, WDYT?
| * @return Collection of WriteStatus to inspect errors and counts | ||
| */ | ||
| public HoodieWriteMetadata<O> cluster(String clusteringInstantTime) { | ||
| if (delegateToTableManagerService(config, ActionType.replacecommit)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to my comment above. Would rather have interface having multiple impl than the if conditions here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will update.
| } | ||
|
|
||
| default boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) { | ||
| boolean includeAction = config.isTableManagerIncludeAction(actionType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isTableManagerIncludeAction -> isTableManagerSupportAction to make it more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update it.
| /** | ||
| * Client which send the table service instants to the table management service. | ||
| */ | ||
| public class HoodieTableManagerClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handcoding a HTTP Rest client seems very painful / error prone. Why dont we standardize and pick a framework to support any rest endpoints. Jersey is good one. Lets you enable REST endpoints with annotations, provides exception recovery hooks, testability etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already use javalin and the rest end points generated/wired that way. Can we just reuse that. it will save all the new bundling work needed for a new dependency. @prasannarajaperumal wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javalin seems okay to me. We should use that here.
| /** | ||
| * Client which send the table service instants to the table management service. | ||
| */ | ||
| public class HoodieTableManagerClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to implement the common HoodieTableServiceClient - Referring to my comment above.
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. Overall, much much better. Thanks for making the changes!
| } | ||
|
|
||
| default boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) { | ||
| boolean supportAction = config.isTableManagerSupportAction(actionType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename: is Table Manager Supported Action?
Also can this check move out of WriteConfig? We want to keep methods there pretty light - Small validations on data type conversions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will move it to TableManagementConfig.
| } | ||
|
|
||
| default boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) { | ||
| boolean supportAction = config.isTableManagerSupportAction(actionType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename: supports Action
| /** | ||
| * Client which send the table service instants to the table management service. | ||
| */ | ||
| public class HoodieTableManagerClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already use javalin and the rest end points generated/wired that way. Can we just reuse that. it will save all the new bundling work needed for a new dependency. @prasannarajaperumal wdyt?
| import org.apache.hudi.common.util.ValidationUtils; | ||
| import org.apache.hudi.exception.HoodieRemoteException; | ||
|
|
||
| import org.apache.http.client.fluent.Request; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just align this with how the timeline server is built i.e via javalin instead of the raw HTTP client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Table Manager Service use the javalin, the implemention of HoodieTableManagerClient align with RemoteHoodieTableFileSystemView.
|
|
||
| private static int failTimes; | ||
|
|
||
| private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets actually fix this in this PR please, now that log4j2 is landed.
| } | ||
|
|
||
| if (config.isTableManagerSupportAction(ActionType.replacecommit)) { | ||
| createClusteringToService(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename all such createXXX to delegateXXX to clearly capture what's going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update.
| } | ||
| return Option.empty(); | ||
|
|
||
| if (config.isTableManagerSupportAction(ActionType.compaction)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prasannarajaperumal can you think any way where can avoid this if switch here. the ActionExecutor hierarchy is pretty deep. So wonder if there is a better way to do without writing a bunch of scaffolding/small subclasses and such/
| LOG.info("Executing clustering instance " + instant); | ||
| SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) clusteringClient; | ||
| Option<HoodieCommitMetadata> commitMetadata = writeClient.cluster(instant.getTimestamp(), true).getCommitMetadata(); | ||
| Option<HoodieCommitMetadata> commitMetadata = writeClient.cluster(instant.getTimestamp()).getCommitMetadata(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
who calls this today? Are they are side effects from not forcing the completion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to prevent the user from directly calling the behavior, if it is considered that no user can use it without modification.
| description = "Configurations used by the Hudi Table Management Service.") | ||
| public class HoodieTableManagerConfig extends HoodieConfig { | ||
|
|
||
| public static final String TABLE_MANAGEMENT_SERVICE_PREFIX = "hoodie.table.management.service"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same feedback. Consistent naming - table.manager.service vs table.management.service. IMHO we can just keep it short as table.manager (its kind of conceivable manager is a service)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get it.
| .setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA)) | ||
| .setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) | ||
| .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) | ||
| .setDatabaseName(conf.getString(FlinkOptions.DATABASE_NAME)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side effects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table service uses db_name.tbl_name to determine the unique hoodie table, but currently Flink does not set it by default db_name.
dad49f1 to
4da8bf7
Compare
|
@hudi-bot run azure |
|
Discussed with @yuzhaojing who has a draft PR abstracting TMS client into a member variable of writeclient. It is not a top-level client as WriteClient but serving the purpose of encapsulating if-checks in one place. I think it is a practical step towards full abstraction of TMS client to the same level as BaseWriteClient, which would involve more many more code changes and not fit into releasing TMS MVP timeline. |
a9e0129 to
195a76d
Compare
|
@yuzhaojing let's make sure we link PRs so reviewers know where to follow up |
Sure, my mistake. |
What is the purpose of the pull request
The preparations which changes existing modules for table management service.
The following commits are in #5926.
Brief change log
(for example:)
hoodie.table.management.service.enable = trueVerify this pull request
This pull request is already covered by existing tests.
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.