Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
.withRemoteServerHost(hostAddr)
.withRemoteServerPort(serverPort)
.withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
.withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled())
.withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers())
.withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs())
.withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs())
.withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(Serial
LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server="
+ viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout="
+ viewConf.getRemoteTimelineClientTimeoutSecs());
return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(),
metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
return new RemoteHoodieTableFileSystemView(metaClient, viewConf);
}

public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
.defaultValue(5 * 60) // 5 min
.withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server.");

public static final ConfigProperty<String> REMOTE_RETRY_ENABLE = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.enable")
.defaultValue("false")
.sinceVersion("0.12.1")
.withDocumentation("Whether to enable API request retry for remote file system view.");

public static final ConfigProperty<Integer> REMOTE_MAX_RETRY_NUMBERS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.max_numbers")
.defaultValue(3) // 3 times
.sinceVersion("0.12.1")
.withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server.");

public static final ConfigProperty<Long> REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.initial_interval_ms")
.defaultValue(100L)
.sinceVersion("0.12.1")
.withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");

public static final ConfigProperty<Long> REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.max_interval_ms")
.defaultValue(2000L)
.sinceVersion("0.12.1")
.withDocumentation("Maximum amount of time (in ms), to wait for next retry.");

public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.exceptions")
.defaultValue("")
.sinceVersion("0.12.1")
.withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
+ "Default is empty which means retry all the IOException and RuntimeException from Remote Request.");

public static final ConfigProperty<String> REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty
.key("hoodie.filesystem.remote.backup.view.enable")
.defaultValue("true") // Need to be disabled only for tests.
Expand Down Expand Up @@ -144,6 +175,26 @@ public Integer getRemoteTimelineClientTimeoutSecs() {
return getInt(REMOTE_TIMEOUT_SECS);
}

public boolean isRemoteTimelineClientRetryEnabled() {
return getBoolean(REMOTE_RETRY_ENABLE);
}

public Integer getRemoteTimelineClientMaxRetryNumbers() {
return getInt(REMOTE_MAX_RETRY_NUMBERS);
}

public Long getRemoteTimelineInitialRetryIntervalMs() {
return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS);
}

public Long getRemoteTimelineClientMaxRetryIntervalMs() {
return getLong(REMOTE_MAX_RETRY_INTERVAL_MS);
}

public String getRemoteTimelineClientRetryExceptions() {
return getString(RETRY_EXCEPTIONS);
}

public long getMaxMemoryForFileGroupMap() {
long totalMemory = getLong(SPILLABLE_MEMORY);
return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile();
Expand Down Expand Up @@ -245,6 +296,31 @@ public Builder withRemoteTimelineClientTimeoutSecs(Integer timelineClientTimeout
return this;
}

public Builder withRemoteTimelineClientRetry(boolean enableRetry) {
fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry));
return this;
}

public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) {
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString());
return this;
}

public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) {
fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString());
return this;
}

public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) {
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString());
return this;
}

public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) {
fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions);
return this;
}

public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) {
fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString());
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RetryHelper;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -128,26 +129,36 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
private final HoodieTableMetaClient metaClient;
private HoodieTimeline timeline;
private final ObjectMapper mapper;
private final int timeoutSecs;
private final int timeoutMs;

private boolean closed = false;

private RetryHelper<Response> retryHelper;

private enum RequestMethod {
GET, POST
}

public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) {
this(server, port, metaClient, 300);
this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build());
}

public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) {
public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) {
this.basePath = metaClient.getBasePath();
this.serverHost = server;
this.serverPort = port;
this.mapper = new ObjectMapper();
this.metaClient = metaClient;
this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
this.timeoutSecs = timeoutSecs;
this.serverHost = viewConf.getRemoteViewServerHost();
this.serverPort = viewConf.getRemoteViewServerPort();
this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
if (viewConf.isRemoteTimelineClientRetryEnabled()) {
retryHelper = new RetryHelper(
viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
viewConf.getRemoteTimelineClientMaxRetryNumbers(),
viewConf.getRemoteTimelineInitialRetryIntervalMs(),
viewConf.getRemoteTimelineClientRetryExceptions(),
"Sending request");
}
}

private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference reference,
Expand All @@ -165,17 +176,7 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame

String url = builder.toString();
LOG.info("Sending request : (" + url + ")");
Response response;
int timeout = this.timeoutSecs * 1000; // msec
switch (method) {
case GET:
response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
break;
case POST:
default:
response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
break;
}
Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method);
String content = response.returnContent().asString();
return (T) mapper.readValue(content, reference);
}
Expand Down Expand Up @@ -495,4 +496,14 @@ public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fil
throw new HoodieRemoteException(e);
}
}

private Response get(int timeoutMs, String url, RequestMethod method) throws IOException {
switch (method) {
case GET:
return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
case POST:
default:
return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,57 @@

package org.apache.hudi.common.util;

import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class RetryHelper<T> {
public class RetryHelper<T> implements Serializable {
private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
private CheckedFunction<T> func;
private int num;
private long maxIntervalTime;
private long initialIntervalTime = 100L;
private transient CheckedFunction<T> func;
private final int num;
private final long maxIntervalTime;
private final long initialIntervalTime;
private String taskInfo = "N/A";
private List<? extends Class<? extends Exception>> retryExceptionsClasses;

public RetryHelper() {
}

public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
this.num = maxRetryNumbers;
this.initialIntervalTime = initialRetryIntervalMs;
this.maxIntervalTime = maxRetryIntervalMs;
if (StringUtils.isNullOrEmpty(retryExceptions)) {
this.retryExceptionsClasses = new ArrayList<>();
} else {
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
.map(Exception::getClass)
.collect(Collectors.toList());
try {
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
.map(Exception::getClass)
.collect(Collectors.toList());
} catch (HoodieException e) {
LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e);
this.retryExceptionsClasses = new ArrayList<>();
}
}
}

public RetryHelper(String taskInfo) {
public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) {
this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions);
this.taskInfo = taskInfo;
}

public RetryHelper tryWith(CheckedFunction<T> func) {
public RetryHelper<T> tryWith(CheckedFunction<T> func) {
this.func = func;
return this;
}

public T start() throws IOException {
public T start(CheckedFunction<T> func) throws IOException {
int retries = 0;
T functionResult = null;

Expand All @@ -77,24 +82,33 @@ public T start() throws IOException {
throw e;
}
if (retries++ >= num) {
LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
String message = "Still failed to " + taskInfo + " after retried " + num + " times.";
LOG.error(message, e);
if (e instanceof IOException) {
throw new IOException(message, e);
}
throw e;
}
LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e);
try {
Thread.sleep(waitTime);
} catch (InterruptedException ex) {
// ignore InterruptedException here
// ignore InterruptedException here
}
}
}

if (retries > 0) {
LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
}

return functionResult;
}

public T start() throws IOException {
return start(this.func);
}

private boolean checkIfExceptionInRetryList(Exception e) {
boolean inRetryList = false;

Expand Down Expand Up @@ -123,7 +137,7 @@ private long getWaitTimeExp(int retryCount) {
}

@FunctionalInterface
public interface CheckedFunction<T> {
public interface CheckedFunction<T> extends Serializable {
T get() throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,11 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
.withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled())
.withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers())
.withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs())
.withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs())
.withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions())
.build();
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf);
return writeClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.fail;

/**
* Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it.
Expand Down Expand Up @@ -64,4 +68,46 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) {
view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient);
return view;
}

@Test
public void testRemoteHoodieTableFileSystemViewWithRetry() {
// Service is available.
view.getLatestBaseFiles();
// Shut down the service.
server.close();
try {
// Immediately fails and throws a connection refused exception.
view.getLatestBaseFiles();
fail("Should be catch Exception 'Connection refused (Connection refused)'");
} catch (HoodieRemoteException e) {
assert e.getMessage().contains("Connection refused (Connection refused)");
}
// Enable API request retry for remote file system view.
view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig
.newBuilder()
.withRemoteServerHost("localhost")
.withRemoteServerPort(server.getServerPort())
.withRemoteTimelineClientRetry(true)
.withRemoteTimelineClientMaxRetryIntervalMs(2000L)
.withRemoteTimelineClientMaxRetryNumbers(4)
.build());
try {
view.getLatestBaseFiles();
fail("Should be catch Exception 'Still failed to Sending request after retried 4 times.'");
} catch (HoodieRemoteException e) {
assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times.");
}
// Retry succeed after 2 or 3 tries.
new Thread(() -> {
try {
Thread.sleep(5000L);
LOG.info("Restart server.");
server.startService();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).run();
view.getLatestBaseFiles();
server.close();
}
}