diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 72f8e29c9fa8e..4d5375894d7e3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -117,6 +117,11 @@ public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() { .withRemoteServerHost(hostAddr) .withRemoteServerPort(serverPort) .withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs()) + .withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled()) + .withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers()) + .withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs()) + .withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs()) + .withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions()) .build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 35fda6c416ac7..48023d50463d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -214,8 +214,7 @@ private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(Serial LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server=" + viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout=" + viewConf.getRemoteTimelineClientTimeoutSecs()); - return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(), - metaClient, viewConf.getRemoteTimelineClientTimeoutSecs()); + return new RemoteHoodieTableFileSystemView(metaClient, viewConf); } public static FileSystemViewManager createViewManager(final HoodieEngineContext context, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java index 63f10855bad84..92937f61e2c2c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java @@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends HoodieConfig { .defaultValue(5 * 60) // 5 min .withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server."); + public static final ConfigProperty REMOTE_RETRY_ENABLE = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.enable") + .defaultValue("false") + .sinceVersion("0.12.1") + .withDocumentation("Whether to enable API request retry for remote file system view."); + + public static final ConfigProperty REMOTE_MAX_RETRY_NUMBERS = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.max_numbers") + .defaultValue(3) // 3 times + .sinceVersion("0.12.1") + .withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server."); + + public static final ConfigProperty REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.initial_interval_ms") + .defaultValue(100L) + .sinceVersion("0.12.1") + .withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage."); + + public static final ConfigProperty REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.max_interval_ms") + .defaultValue(2000L) + .sinceVersion("0.12.1") + .withDocumentation("Maximum amount of time (in ms), to wait for next retry."); + + public static final ConfigProperty RETRY_EXCEPTIONS = ConfigProperty + .key("hoodie.filesystem.view.remote.retry.exceptions") + .defaultValue("") + .sinceVersion("0.12.1") + .withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. " + + "Default is empty which means retry all the IOException and RuntimeException from Remote Request."); + public static final ConfigProperty REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty .key("hoodie.filesystem.remote.backup.view.enable") .defaultValue("true") // Need to be disabled only for tests. @@ -144,6 +175,26 @@ public Integer getRemoteTimelineClientTimeoutSecs() { return getInt(REMOTE_TIMEOUT_SECS); } + public boolean isRemoteTimelineClientRetryEnabled() { + return getBoolean(REMOTE_RETRY_ENABLE); + } + + public Integer getRemoteTimelineClientMaxRetryNumbers() { + return getInt(REMOTE_MAX_RETRY_NUMBERS); + } + + public Long getRemoteTimelineInitialRetryIntervalMs() { + return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS); + } + + public Long getRemoteTimelineClientMaxRetryIntervalMs() { + return getLong(REMOTE_MAX_RETRY_INTERVAL_MS); + } + + public String getRemoteTimelineClientRetryExceptions() { + return getString(RETRY_EXCEPTIONS); + } + public long getMaxMemoryForFileGroupMap() { long totalMemory = getLong(SPILLABLE_MEMORY); return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile(); @@ -245,6 +296,31 @@ public Builder withRemoteTimelineClientTimeoutSecs(Integer timelineClientTimeout return this; } + public Builder withRemoteTimelineClientRetry(boolean enableRetry) { + fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry)); + return this; + } + + public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) { + fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString()); + return this; + } + + public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) { + fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString()); + return this; + } + + public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) { + fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString()); + return this; + } + + public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) { + fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions); + return this; + } + public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) { fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString()); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 099b79cbba0ab..0b32211a5b6c5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.table.timeline.dto.InstantDTO; import org.apache.hudi.common.table.timeline.dto.TimelineDTO; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.RetryHelper; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -128,26 +129,36 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, private final HoodieTableMetaClient metaClient; private HoodieTimeline timeline; private final ObjectMapper mapper; - private final int timeoutSecs; + private final int timeoutMs; private boolean closed = false; + private RetryHelper retryHelper; + private enum RequestMethod { GET, POST } public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) { - this(server, port, metaClient, 300); + this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build()); } - public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) { + public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) { this.basePath = metaClient.getBasePath(); - this.serverHost = server; - this.serverPort = port; this.mapper = new ObjectMapper(); this.metaClient = metaClient; this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - this.timeoutSecs = timeoutSecs; + this.serverHost = viewConf.getRemoteViewServerHost(); + this.serverPort = viewConf.getRemoteViewServerPort(); + this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000; + if (viewConf.isRemoteTimelineClientRetryEnabled()) { + retryHelper = new RetryHelper( + viewConf.getRemoteTimelineClientMaxRetryIntervalMs(), + viewConf.getRemoteTimelineClientMaxRetryNumbers(), + viewConf.getRemoteTimelineInitialRetryIntervalMs(), + viewConf.getRemoteTimelineClientRetryExceptions(), + "Sending request"); + } } private T executeRequest(String requestPath, Map queryParameters, TypeReference reference, @@ -165,17 +176,7 @@ private T executeRequest(String requestPath, Map queryParame String url = builder.toString(); LOG.info("Sending request : (" + url + ")"); - Response response; - int timeout = this.timeoutSecs * 1000; // msec - switch (method) { - case GET: - response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute(); - break; - case POST: - default: - response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute(); - break; - } + Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method); String content = response.returnContent().asString(); return (T) mapper.readValue(content, reference); } @@ -495,4 +496,14 @@ public Option getLatestBaseFile(String partitionPath, String fil throw new HoodieRemoteException(e); } } + + private Response get(int timeoutMs, String url, RequestMethod method) throws IOException { + switch (method) { + case GET: + return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); + case POST: + default: + return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java index 067c5ee40dad7..2e82b548f0da7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java @@ -18,28 +18,27 @@ package org.apache.hudi.common.util; +import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.stream.Collectors; -public class RetryHelper { +public class RetryHelper implements Serializable { private static final Logger LOG = LogManager.getLogger(RetryHelper.class); - private CheckedFunction func; - private int num; - private long maxIntervalTime; - private long initialIntervalTime = 100L; + private transient CheckedFunction func; + private final int num; + private final long maxIntervalTime; + private final long initialIntervalTime; private String taskInfo = "N/A"; private List> retryExceptionsClasses; - public RetryHelper() { - } - public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) { this.num = maxRetryNumbers; this.initialIntervalTime = initialRetryIntervalMs; @@ -47,23 +46,29 @@ public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRet if (StringUtils.isNullOrEmpty(retryExceptions)) { this.retryExceptionsClasses = new ArrayList<>(); } else { - this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(",")) - .map(exception -> (Exception) ReflectionUtils.loadClass(exception, "")) - .map(Exception::getClass) - .collect(Collectors.toList()); + try { + this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(",")) + .map(exception -> (Exception) ReflectionUtils.loadClass(exception, "")) + .map(Exception::getClass) + .collect(Collectors.toList()); + } catch (HoodieException e) { + LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e); + this.retryExceptionsClasses = new ArrayList<>(); + } } } - public RetryHelper(String taskInfo) { + public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) { + this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions); this.taskInfo = taskInfo; } - public RetryHelper tryWith(CheckedFunction func) { + public RetryHelper tryWith(CheckedFunction func) { this.func = func; return this; } - public T start() throws IOException { + public T start(CheckedFunction func) throws IOException { int retries = 0; T functionResult = null; @@ -77,14 +82,18 @@ public T start() throws IOException { throw e; } if (retries++ >= num) { - LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e); + String message = "Still failed to " + taskInfo + " after retried " + num + " times."; + LOG.error(message, e); + if (e instanceof IOException) { + throw new IOException(message, e); + } throw e; } - LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e); + LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e); try { Thread.sleep(waitTime); } catch (InterruptedException ex) { - // ignore InterruptedException here + // ignore InterruptedException here } } } @@ -92,9 +101,14 @@ public T start() throws IOException { if (retries > 0) { LOG.info("Success to " + taskInfo + " after retried " + retries + " times."); } + return functionResult; } + public T start() throws IOException { + return start(this.func); + } + private boolean checkIfExceptionInRetryList(Exception e) { boolean inRetryList = false; @@ -123,7 +137,7 @@ private long getWaitTimeExp(int retryCount) { } @FunctionalInterface - public interface CheckedFunction { + public interface CheckedFunction extends Serializable { T get() throws IOException; } } \ No newline at end of file diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b09d7ad8bfc37..4b93faeaf72d9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -429,6 +429,11 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()) .withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs()) + .withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled()) + .withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers()) + .withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs()) + .withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs()) + .withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions()) .build(); ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf); return writeClient; diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java index f9a6172b5ec39..127bc51e95006 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java @@ -28,12 +28,16 @@ import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView; +import org.apache.hudi.exception.HoodieRemoteException; import org.apache.hudi.timeline.service.TimelineService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.fail; /** * Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it. @@ -64,4 +68,46 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient); return view; } + + @Test + public void testRemoteHoodieTableFileSystemViewWithRetry() { + // Service is available. + view.getLatestBaseFiles(); + // Shut down the service. + server.close(); + try { + // Immediately fails and throws a connection refused exception. + view.getLatestBaseFiles(); + fail("Should be catch Exception 'Connection refused (Connection refused)'"); + } catch (HoodieRemoteException e) { + assert e.getMessage().contains("Connection refused (Connection refused)"); + } + // Enable API request retry for remote file system view. + view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig + .newBuilder() + .withRemoteServerHost("localhost") + .withRemoteServerPort(server.getServerPort()) + .withRemoteTimelineClientRetry(true) + .withRemoteTimelineClientMaxRetryIntervalMs(2000L) + .withRemoteTimelineClientMaxRetryNumbers(4) + .build()); + try { + view.getLatestBaseFiles(); + fail("Should be catch Exception 'Still failed to Sending request after retried 4 times.'"); + } catch (HoodieRemoteException e) { + assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times."); + } + // Retry succeed after 2 or 3 tries. + new Thread(() -> { + try { + Thread.sleep(5000L); + LOG.info("Restart server."); + server.startService(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).run(); + view.getLatestBaseFiles(); + server.close(); + } }