Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client;

import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.log4j.LogManager;
Expand All @@ -34,4 +35,12 @@ default boolean tableServicesEnabled(HoodieWriteConfig config) {
}
return enabled;
}

default boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) {
boolean supportsAction = config.getTableManagerConfig().isTableManagerSupportsAction(actionType);
if (supportsAction) {
LOG.warn(actionType.name() + " delegate to table manager service!");
}
return supportsAction;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.table.manager;

import org.apache.hudi.common.config.HoodieTableManagerConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieRemoteException;

import org.apache.http.client.fluent.Request;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just align this with how the timeline server is built i.e via javalin instead of the raw HTTP client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table Manager Service use the javalin, the implemention of HoodieTableManagerClient align with RemoteHoodieTableFileSystemView.

import org.apache.http.client.fluent.Response;
import org.apache.http.client.utils.URIBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Client which send the table service instants to the table management service.
*/
public class HoodieTableManagerClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handcoding a HTTP Rest client seems very painful / error prone. Why dont we standardize and pick a framework to support any rest endpoints. Jersey is good one. Lets you enable REST endpoints with annotations, provides exception recovery hooks, testability etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already use javalin and the rest end points generated/wired that way. Can we just reuse that. it will save all the new bundling work needed for a new dependency. @prasannarajaperumal wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javalin seems okay to me. We should use that here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to implement the common HoodieTableServiceClient - Referring to my comment above.


private static final String BASE_URL = "/v1/hoodie/service";

public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register");

public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/execute");
public static final String DELETE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/delete");

public static final String EXECUTE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/execute");
public static final String DELETE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/delete");

public static final String EXECUTE_CLEAN = String.format("%s/%s", BASE_URL, "clean/execute");
public static final String DELETE_CLEAN = String.format("%s/%s", BASE_URL, "clean/delete");

public static final String DATABASE_NAME_PARAM = "db_name";
public static final String TABLE_NAME_PARAM = "table_name";
public static final String BASEPATH_PARAM = "basepath";
public static final String INSTANT_PARAM = "instant";
public static final String USERNAME = "username";
public static final String CLUSTER = "cluster";
public static final String QUEUE = "queue";
public static final String RESOURCE = "resource";
public static final String PARALLELISM = "parallelism";
public static final String EXTRA_PARAMS = "extra_params";
public static final String EXECUTION_ENGINE = "execution_engine";

private final HoodieTableManagerConfig config;
private final HoodieTableMetaClient metaClient;
private final String host;
private final int port;
private final String basePath;
private final String dbName;
private final String tableName;

private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably need to rebase this to log4j2 now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this in another MR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets actually fix this in this PR please, now that log4j2 is landed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) {
this.basePath = metaClient.getBasePathV2().toString();
this.dbName = metaClient.getTableConfig().getDatabaseName();
this.tableName = metaClient.getTableConfig().getTableName();
this.host = config.getTableManagerHost();
this.port = config.getTableManagerPort();
this.config = config;
this.metaClient = metaClient;
}

private String executeRequest(String requestPath, Map<String, String> queryParameters) throws IOException {
URIBuilder builder =
new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");
queryParameters.forEach(builder::addParameter);

String url = builder.toString();
LOG.info("Sending request to table management service : (" + url + ")");
Response response;
int timeout = this.config.getConnectionTimeout() * 1000; // msec
int requestRetryLimit = config.getConnectionRetryLimit();
int retry = 0;

while (retry < requestRetryLimit) {
try {
response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
return response.returnContent().asString();
} catch (IOException e) {
retry++;
LOG.warn(String.format("Failed request to server %s, will retry for %d times", url, requestRetryLimit - retry), e);
if (requestRetryLimit == retry) {
throw e;
}
}

try {
TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay());
} catch (InterruptedException e) {
// ignore
}
}

throw new IOException(String.format("Failed request to table management service %s after retry %d times", url, requestRetryLimit));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's better to put this exception on line 110

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will fixed.

}

private Map<String, String> getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) {
Map<String, String> paramsMap = new HashMap<>();
paramsMap.put(BASEPATH_PARAM, basePath);
ValidationUtils.checkArgument(paramNames.length == paramVals.length);
for (int i = 0; i < paramNames.length; i++) {
paramsMap.put(paramNames[i], paramVals[i]);
}
return paramsMap;
}

public void register() {
try {
executeRequest(REGISTER_ENDPOINT, getDefaultParams(null));
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}

public void executeCompaction() {
try {
String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
.filterPendingCompactionTimeline()
.getInstants()
.map(HoodieInstant::getTimestamp)
.toArray(String[]::new), ",");

executeRequest(EXECUTE_COMPACTION, getDefaultParams(instantRange));
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}

public void executeClean() {
try {
String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
.getCleanerTimeline()
.filterInflightsAndRequested()
.getInstants()
.map(HoodieInstant::getTimestamp)
.toArray(String[]::new), ",");

executeRequest(EXECUTE_CLEAN, getDefaultParams(instantRange));
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}

public void executeClustering() {
try {
metaClient.reloadActiveTimeline();
String instantRange = StringUtils.join(ClusteringUtils.getPendingClusteringInstantTimes(metaClient)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that whenever an action is scheduled, instead of sending that particular instant, we are sending all the pending instants for that action as part of the request. This would mean the remote client has to iterate over the instants and update the contents to the mysql/hudi db.

I am guessing this was done so that in case an action is scheduled but failed to update the table service, then the .requested instant will be left on the timeline.

One way we are trying to workaround this is by sending the request before creating .requested files.

Timeline of actions will look like this,

  1. Generate clustering/compaction plan for an instant x
  2. After plan is generated, send request to table service for the instant x
  3. Create x.clustering.requested or x.compaction.requested file on the timeline

In case the request is successfully send to table service and the .requested file creation failed then it will be treated as a no-op by table service when it schedules the job. There is a possibility that table service can execute right after point 2, but it can be handled by introducing some delay.

This above approach is useful, if we want to send any spark job specific parameters to the table service for executing that instants, example no. of executors. Since, no.of executors can vary depending on the plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause two problems:

  1. When the table service is unavailable, the timeline will not be able to generate the requested instant, so it will not be able to operate when the table service is restored, which has a greater impact on the day level write job.
  2. When the table service receives the request, it needs to scan the timeline corresponding to the hudi table to determine whether it is invalid, which will make table management service appear a lot of additional operations.

Considering these two points, I think sending all the pending instant is a better way. WDYT

Copy link
Contributor

@suryaprasanna suryaprasanna Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my understanding, we have 3 components, scheduler(Hudi table via write API), storage (state manager or metaserver(can be mysql or hudi table)) and execution(Table services).

When the table service is unavailable, the timeline will not be able to generate the requested instant, so it will not be able to operate when the table service is restored, which has a greater impact on the day level write job.

  • If table services is down then there is no one to execute the plan, so no need to create requested file. Initially table generated the plan, similarly it will try to regenerate every time until the table services accepts the request.
  • There is an advantage to this approach, the latest plan that we generate is also going to include latest changes happened on the table during the period the table service is down.

When the table service receives the request, it needs to scan the timeline corresponding to the hudi table to determine whether it is invalid, which will make table management service appear a lot of additional operations.

  • I agree, Table service has to be as dumb as possible. Here, it's duty is to read from the storage and execute the spark job. So, basically we are starting the spark job, and the job will not do anything because it cannot find the .requested file for the instant. So, it will just quit.

Another approach, I can think of is to have async thread running that starts on write path and incrementally post the requests to table service reading through the timeline. We can even update file's extrametadata map with any information if needed.

I am inclined with your approach because it is a clean way to do it, but we need to address following things,

  1. I have seen people configure clustering on non-partitioned tables. If we schedule clustering on them and fail to update the table services then no other clustering job will be able to schedule on them because we don't schedule clustering on pending file groups under clustering. So, the replacecommit.requested can be left alone.
  2. Passing job specific parameters like executor count, either we need to pass this information while calling the table service APIs or the table service API need to read through the plan before scheduling a spark job. Ideally we do not want table services to read through clustering plans.
  3. Also, we can schedule clustering jobs on a small group of filegroups one after the other and these clustering plans can be smaller in size and more in number. If table service is running slow, more and more .requested file will be created and more requests will be send to table service.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on keeping the table manager/service decouple as discussed.

  1. Wont' 1 be handled automatically because we resend all the pending instants again to the table manager.

  2. I would imagine 2 to be a config specified at the table manager service level and not passed through from the writer/

  3. there is no backpressure mechanism like what you are describing today. I think your point exists in async clustering/compaction today, regardless of table service or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on keeping the table manager/service decouple as discussed.

  1. Wont' 1 be handled automatically because we resend all the pending instants again to the table manager.
  2. I would imagine 2 to be a config specified at the table manager service level and not passed through from the writer/
  3. there is no backpressure mechanism like what you are describing today. I think your point exists in async clustering/compaction today, regardless of table service or not?

+1 from me.

.stream()
.map(HoodieInstant::getTimestamp)
.toArray(String[]::new), ",");

executeRequest(EXECUTE_CLUSTERING, getDefaultParams(instantRange));
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}

private Map<String, String> getDefaultParams(String instantRange) {
return getParamsWithAdditionalParams(
new String[] {DATABASE_NAME_PARAM, TABLE_NAME_PARAM, INSTANT_PARAM, USERNAME, QUEUE, RESOURCE, PARALLELISM, EXTRA_PARAMS, EXECUTION_ENGINE},
new String[] {dbName, tableName, instantRange, config.getDeployUsername(), config.getDeployQueue(), config.getDeployResource(),
String.valueOf(config.getDeployParallelism()), config.getDeployExtraParams(), config.getDeployExecutionEngine()});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieMetastoreConfig;
import org.apache.hudi.common.config.HoodieTableManagerConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
Expand Down Expand Up @@ -497,6 +498,7 @@ public class HoodieWriteConfig extends HoodieConfig {
private HoodiePayloadConfig hoodiePayloadConfig;
private HoodieMetadataConfig metadataConfig;
private HoodieMetastoreConfig metastoreConfig;
private HoodieTableManagerConfig tableManagerConfig;
private HoodieCommonConfig commonConfig;
private EngineType engineType;

Expand Down Expand Up @@ -889,6 +891,7 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) {
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
this.metastoreConfig = HoodieMetastoreConfig.newBuilder().fromProperties(props).build();
this.tableManagerConfig = HoodieTableManagerConfig.newBuilder().fromProperties(props).build();
this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build();
}

Expand Down Expand Up @@ -1948,6 +1951,10 @@ public HoodieMetadataConfig getMetadataConfig() {
return metadataConfig;
}

public HoodieTableManagerConfig getTableManagerConfig() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have this as a separate module? That is build on top of hudi-client-common

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think TableManagerConfig is a public config, clients can get it through HoodieWriteConfig, so I think the config in HoodieWriteConfig is better.

return tableManagerConfig;
}

public HoodieCommonConfig getCommonConfig() {
return commonConfig;
}
Expand Down Expand Up @@ -2160,6 +2167,13 @@ public boolean isMetastoreEnabled() {
return metastoreConfig.enableMetastore();
}

/**
* Table Manager configs.
*/
public boolean isTableManagerEnabled() {
return tableManagerConfig.enableTableManager();
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.table.manager.HoodieTableManagerClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -146,6 +148,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
*/
protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
final HoodieCleanerPlan cleanerPlan = requestClean(context);
Option<HoodieCleanerPlan> option = Option.empty();
if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()
&& cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
Expand All @@ -159,9 +162,19 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
LOG.error("Got exception when saving cleaner requested file", e);
throw new HoodieIOException(e.getMessage(), e);
}
return Option.of(cleanerPlan);
option = Option.of(cleanerPlan);
}
return Option.empty();

if (config.getTableManagerConfig().isTableManagerSupportsAction(ActionType.clean)) {
delegateCleanExecutionToTableManager();
}

return option;
}

private void delegateCleanExecutionToTableManager() {
HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(table.getMetaClient(), config.getTableManagerConfig());
tableManagerClient.executeClean();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.table.manager.HoodieTableManagerClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -105,6 +107,16 @@ public Option<HoodieClusteringPlan> execute() {
throw new HoodieIOException("Exception scheduling clustering", ioe);
}
}

if (config.getTableManagerConfig().isTableManagerSupportsAction(ActionType.replacecommit)) {
delegateClusteringExecutionToTableManager();
}

return planOption;
}

private void delegateClusteringExecutionToTableManager() {
HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(table.getMetaClient(), config.getTableManagerConfig());
tableManagerClient.executeClustering();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.hudi.table.action.compact;

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.table.manager.HoodieTableManagerClient;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
Expand Down Expand Up @@ -91,6 +93,7 @@ public Option<HoodieCompactionPlan> execute() {
}

HoodieCompactionPlan plan = scheduleCompaction();
Option<HoodieCompactionPlan> option = Option.empty();
if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
extraMetadata.ifPresent(plan::setExtraMetadata);
HoodieInstant compactionInstant =
Expand All @@ -101,9 +104,14 @@ public Option<HoodieCompactionPlan> execute() {
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling compaction", ioe);
}
return Option.of(plan);
option = Option.of(plan);
}
return Option.empty();

if (config.getTableManagerConfig().isTableManagerSupportsAction(ActionType.compaction)) {
delegateCompactionExecutionToTableManager();
}

return option;
}

private HoodieCompactionPlan scheduleCompaction() {
Expand Down Expand Up @@ -217,4 +225,9 @@ private Long parsedToSeconds(String time) {
}
return timestamp;
}

private void delegateCompactionExecutionToTableManager() {
HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(table.getMetaClient(), config.getTableManagerConfig());
tableManagerClient.executeCompaction();
}
}
Loading