Skip to content

Conversation

@zhangyue19921010
Copy link
Contributor

https://issues.apache.org/jira/browse/HUDI-2648

What is the purpose of the pull request

Hoodie will do lots of list/get/put/delete etc actions on filesystem.
Sometimes will meet the fileSystem performance issue or short service suspension, for example

Using hoodie on S3, and will throw lots of exception like this 

com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 503, AWS Service: Amazon S3, AWS Request ID: J58R9W4E324DWJ1Y, AWS Error Code: SlowDown, AWS Error Message: Please reduce your request rate., S3 Extended Request ID: Pkd6VCJKTuq3NJ6vUQJB1TUznKGvAfjIZkmJOxt+j6oAD5roz+ojKf9rtcBOgUBw4sWeDwxdTv4=
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:1191)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:1168)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:871)
	at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1881)
	at org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$mkdirs$25(HoodieWrapperFileSystem.java:632)
	at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:100)
	at org.apache.hudi.common.fs.HoodieWrapperFileSystem.mkdirs(HoodieWrapperFileSystem.java:631)
	at org.apache.hudi.table.MarkerFiles.create(MarkerFiles.java:195)
	at org.apache.hudi.io.HoodieWriteHandle.createMarkerFile(HoodieWriteHandle.java:129)
	at org.apache.hudi.io.HoodieCreateHandle.lambda$new$1(HoodieCreateHandle.java:97)
	at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:61)
	at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:100)
	at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:67)
	at org.apache.hudi.io.CreateHandleFactory.create(CreateHandleFactory.java:34)
	at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:83)
	at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:40)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

It's because of meet the throttle of S3 list/get/put/delete.
https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/

This exception will cause app crashed, which can be solved by retry wrapper.

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@nsivabalan
Copy link
Contributor

@hudi-bot azure run

@zhangyue19921010
Copy link
Contributor Author

hi @nsivabalan could you please help to review this PR at your convenience? Thanks a lot if you could give me a hand!

.setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true)
.setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setFileSystemGuardConfig(config.getFileSystemGuardConfig())
Copy link
Contributor

@minchowang minchowang Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @zhangyue19921010 ,i think also require add about setFileSystemGuardConfig in HoodieFlinkTable calss

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thing. Will get it done.

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Nov 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @danny0405 :) Hope you are interested in this patch.

@zhangyue19921010
Copy link
Contributor Author

Hi @vinothchandar and @bvaradar. Friendly ping. I notice that you are very familiar with hudi filesystem wrapper. So could you please pay a little attention for this patch if it's possible?According to our Hudi on S3 experience, it's a common issue to meet the S3 threshold, and may need this retry wrapper to solve it.
Looking forward to your replay. Thanks in advance! :)

@nsivabalan nsivabalan added the priority:blocker Production down; release blocker label Nov 12, 2021
@nsivabalan
Copy link
Contributor

looks like a good thing to have. have marked it as release blocker. will try to review it sooner.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments.

public class FileSystemGuardConfig extends HoodieConfig {

public static final ConfigProperty<String> FILESYSTEM_RETRY_ENABLE = ConfigProperty
.key("hoodie.filesystem.action.retry.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"enable" would suffice. we don't need a "d" in the end, to be in line with other configs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, changed.

.withDocumentation("Maximum amount of time (in ms), to wait for next retry.");

public static final ConfigProperty<Integer> MAX_RETRY_NUMBERS = ConfigProperty
.key("hoodie.filesystem.action.retry.max_numbers")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help me understand, why do we need both max retry number and max internal ms ? I thought either one is good enough.
So, either 100*4 = 4 retries w/ 100 ms delay.
or 2000/100 = 20 retries w/ 100 ms delay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use (long) Math.pow(2, retryCount) * initialIntervalTime + random.nextInt(100); to calculate sleep time before each retry. And we may need MAX_RETRY_INTERVAL_MS to control the maximum duration of a single sleep in case sleep too longMath.min(getWaitTimeExp(retries), maxIntervalTime).

Also use MAX_RETRY_NUMBERS to control max retry numbers to limit total retry time.

int retries = 0;
boolean success = false;
RuntimeException exception = null;
T t = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please name the variables nicely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, changed.

}
retries++;
}
} while (retries <= num);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably remove L88 and do retries++ here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emmm, we only do ++ when caught exception, so maybe can't move it out of catch() {} block.

@nsivabalan nsivabalan self-assigned this Nov 23, 2021
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would n't the aws s3 client already do this? the retries? Can you please explain do we need this at the Hudi layer>?

@nsivabalan nsivabalan removed the priority:blocker Production down; release blocker label Nov 23, 2021
@zhangyue19921010
Copy link
Contributor Author

Hi @vinothchandar Thanks for your attention.
Actually S3 java SDK take care of these retry logic through config fs.s3a.attempts.maximum

The necessary to build a retry layer on hudi is to let hudi can deal with different kinds of filesystem not only S3.
After careful thinking maybe it's not urgent for this patch and will close it.
Maybe we can reopen this patch if there's another fs need to let hudi take care of retring.

Anyway, really appreciate for @nsivabalan and @vinothchandar 's attention. And sorry to bother you guys.

@minchowang
Copy link
Contributor

cos(Cloud Object Storage) need this patch help to retry https://cloud.tencent.com/product/cos

@zhangyue19921010
Copy link
Contributor Author

cos(Cloud Object Storage) need this patch help to retry https://cloud.tencent.com/product/cos

Thanks for @mincwang 's info, It looks like https://intl.cloud.tencent.com/?lang=en&pg= (or some other dfs ) needs hoodie to take care of retry logic. So would we please going on this patch and get it done.

What's your opinion? @nsivabalan and @vinothchandar Looking forward to your reply :>

@nsivabalan
Copy link
Contributor

sure, makes sense if there are other cloud stores that needs this retry. Can you please address the feedback given already.

@ConfigClassProperty(name = "FileSystem Guard Configurations",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "The filesystem guard related config options, to help deal with runtime exception like s3 list/get/put/delete performance issues.")
public class FileSystemGuardConfig extends HoodieConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think naming this "FileSystemRetryConfig" would be more appropriate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, "FileSystemRetryConfig" is more appropriate. Changed.

@zhangyue19921010
Copy link
Contributor Author

zhangyue19921010 commented Dec 10, 2021

sure, makes sense if there are other cloud stores that needs this retry. Can you please address the feedback given already.

Sure, I will address the comments ASAP. Thanks a lot for your attention.

@vinothchandar vinothchandar self-assigned this Dec 21, 2021
@vinothchandar
Copy link
Member

@zhangyue19921010 Thanks for patiently working through this, with us. I am wondering if we can guard this behavior for each storage scheme.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some high level restructuring. Let me know what you think! We can take it from there.

return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
return HoodieTableMetaClient.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we please avoid non-essential style fixes in the PR. Makes it harder to review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed :)

HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
HoodieTableMetaClient.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed :)

public class FileSystemRetryConfig extends HoodieConfig {

public static final ConfigProperty<String> FILESYSTEM_RETRY_ENABLE = ConfigProperty
.key("hoodie.filesystem.action.retry.enable")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we drop action from the property name, given action has a different meaning inside Hudi? May be filesystem.operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, Changed.

public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) {
return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad)
.setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null).build();
return HoodieTableMetaClient.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please comment on these reformatted blocks to highlight what has actually changed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

if (fs == null) {
FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy());
FileSystem fileSystem;
if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we make this a property of the StorageScheme?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emmm..., it's easy to make this as a property of the StorageScheme something like FILE("file", append: false, retry: true)
What I am more worried about is that this may mean that the retry function is turned on by default. And it may be hard for users to disable this retry if they don't need it.

Copy link
Contributor

@nsivabalan nsivabalan Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I am not sure if we can make it a property of StorageScheme. we should let users enable on a need basis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess, what vinoth refers to is, we should add a property named supportsRetry or something to each StorageScheme. So, only for those storage schemes where retries are supported, we can check the config (fileSystemRetryConfig.isFileSystemActionRetryEnable()) and then invoke our retry Helper.

@vinothchandar : can you please clarify on your above suggestion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, I think we can just have the config alone. and let the user decide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack and changed as your suggestion. Thanks a lot.


public class RetryHelper<T> {
private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
private HoodieWrapperFileSystem.CheckedFunction<T> func;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep this generic? if we cannot avoid referencing any wrapper file system stuff here, then we need to move this back with the fs package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, Changed.

@nsivabalan nsivabalan self-assigned this Dec 22, 2021
@zhangyue19921010
Copy link
Contributor Author

zhangyue19921010 commented Dec 23, 2021

I am wondering if we can guard this behavior for each storage scheme.

Hi @vinothchandar Thanks a lot for your attention. Let me explain the framework of this retry wrapper.
image

As this picture shows, we design a new wrapper named hoodieRetryWrapper to wrap common file system which will retry specific common filesystem operation.
Also we let HoodieWrapperFileSystem hold this RetryWrapperFileSystem just like hold a common filesystem so that there's no damaged for metrics collection etc.

In summary, hoodieRetryWrapper can wrap and guard all kinds of storage scheme as long as let hoodieRetryWrapper hold this storage scheme client.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Lets wait to hear from vinoth on his suggestion.

if (fs == null) {
FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy());
FileSystem fileSystem;
if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess, what vinoth refers to is, we should add a property named supportsRetry or something to each StorageScheme. So, only for those storage schemes where retries are supported, we can check the config (fileSystemRetryConfig.isFileSystemActionRetryEnable()) and then invoke our retry Helper.

@vinothchandar : can you please clarify on your above suggestion.

@nsivabalan
Copy link
Contributor

@zhangyue19921010 : lets not wait for Vinoth. he has been busy recently. Lets go ahead for now.
can we do this.
add a property named supportsRetry or something to each StorageScheme. So, only for those storage schemes where retries are supported, we can check the config (fileSystemRetryConfig.isFileSystemActionRetryEnable()) and then invoke our retry Helper. for eg, in S3 this can be disabled.

@zhangyue19921010
Copy link
Contributor Author

@zhangyue19921010 : lets not wait for Vinoth. he has been busy recently. Lets go ahead for now. can we do this. add a property named supportsRetry or something to each StorageScheme. So, only for those storage schemes where retries are supported, we can check the config (fileSystemRetryConfig.isFileSystemActionRetryEnable()) and then invoke our retry Helper. for eg, in S3 this can be disabled.

Sure thing. Will do it ASAP :)

@nsivabalan
Copy link
Contributor

thanks!

@danny0405
Copy link
Contributor

@zhangyue19921010 : lets not wait for Vinoth. he has been busy recently. Lets go ahead for now. can we do this. add a property named supportsRetry or something to each StorageScheme. So, only for those storage schemes where retries are supported, we can check the config (fileSystemRetryConfig.isFileSystemActionRetryEnable()) and then invoke our retry Helper. for eg, in S3 this can be disabled.

One thing we should be very careful with is in what exception/error we should do a retry here ? If it is a normal user triggered exception, should we still retry here ?

@zhangyue19921010
Copy link
Contributor Author

zhangyue19921010 commented Feb 9, 2022

Hi @nsivabalan and @danny0405 Thanks a lot for you review.
Now we add a property named supportsRetry each StorageScheme. And check it before retry.

Also and a new config named hoodie.filesystem.operation.retry.exceptions to give users an option to decide which Exceptions that needs to be re-tryed. It could solve the overprotection problem.

In that case, I think we can just have the config alone. and let the user decide. From vinothchandar

updated: only check hoodie.filesystem.operation.retry.enable before using retryHelper.

PTAL :) Thanks a lot.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. few minor comments. I will go ahead and land the patch. Please take it as a follow up patch.
thanks for the contribution!

.withDocumentation("Maximum amount of time (in ms), to wait for next retry.");

public static final ConfigProperty<Integer> MAX_RETRY_NUMBERS = ConfigProperty
.key("hoodie.filesystem.operation.retry.max_numbers")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be we can name this as "hoodie.filesystem.operation.retry.max.count"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_times seems better ?

import java.util.Random;
import java.util.stream.Collectors;

public class RetryHelper<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java docs

private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
private CheckedFunction<T> func;
private int num;
private long maxIntervalTime;
Copy link
Contributor

@nsivabalan nsivabalan Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can some of these be final ?


// Test the scenario that fs keeps retrying until it fails.
@Test
public void testProcessFilesWithExceptions() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add one more test to cover this scenario

  • to assert that call succeeds after 2(basically before hitting max retries) retries.

@nsivabalan nsivabalan merged commit 359fbfd into apache:master Feb 20, 2022
@zhangyue19921010
Copy link
Contributor Author

LGTM. few minor comments. I will go ahead and land the patch. Please take it as a follow up patch.
thanks for the contribution!

Thanks a lot for your review and merge. Sure I will do it ASAP.

vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants