-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HDFS-17601. [ARR] RouterRpcServer supports asynchronous rpc. #7108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
915f8d3
HDFS-17601. [ARR] RouterRpcServer supports asynchronous rpc.
hfutatzhanghb 49d7587
use CatchFuntion.
hfutatzhanghb 7d6bff2
fix blank lines.
hfutatzhanghb c5f26e2
fix return value of getExistingLocationAsync
hfutatzhanghb 5123f66
fix javadoc
hfutatzhanghb cb4d99a
use getRPCClient() instead of rpcClient in async methods,.
hfutatzhanghb ee52b3c
remove unnecessary condition。
hfutatzhanghb e0ca34b
add getDatanodeStorageReportMapAsync with one paramerter.
hfutatzhanghb a703b89
fix unused import
hfutatzhanghb bee8a8d
add UT.
KeeProMise File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,13 @@ | |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient.isExpectedClass; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry; | ||
| import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; | ||
|
|
||
| import java.io.FileNotFoundException; | ||
|
|
@@ -49,6 +56,7 @@ | |
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.EnumSet; | ||
| import java.util.Iterator; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.LinkedHashSet; | ||
| import java.util.List; | ||
|
|
@@ -68,6 +76,8 @@ | |
| import org.apache.hadoop.hdfs.HAUtil; | ||
| import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; | ||
| import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; | ||
| import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; | ||
| import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction; | ||
| import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; | ||
| import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; | ||
| import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; | ||
|
|
@@ -791,6 +801,46 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz) | |
| return invokeOnNs(method, clazz, io, nss); | ||
| } | ||
|
|
||
| /** | ||
| * Invokes the method at default namespace, if default namespace is not | ||
| * available then at the other available namespaces. | ||
| * If the namespace is unavailable, retry with other namespaces. | ||
| * Asynchronous version of invokeAtAvailableNs method. | ||
| * @param <T> expected return type. | ||
| * @param method the remote method. | ||
| * @return the response received after invoking method. | ||
| * @throws IOException | ||
| */ | ||
| <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz) | ||
| throws IOException { | ||
| String nsId = subclusterResolver.getDefaultNamespace(); | ||
| // If default Ns is not present return result from first namespace. | ||
| Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); | ||
| // If no namespace is available, throw IOException. | ||
| IOException io = new IOException("No namespace available."); | ||
|
|
||
| asyncComplete(null); | ||
| if (!nsId.isEmpty()) { | ||
| asyncTry(() -> { | ||
| rpcClient.invokeSingle(nsId, method, clazz); | ||
| }); | ||
|
|
||
| asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> { | ||
| if (!clientProto.isUnavailableSubclusterException(ioe)) { | ||
| LOG.debug("{} exception cannot be retried", | ||
| ioe.getClass().getSimpleName()); | ||
| throw ioe; | ||
| } | ||
| nss.removeIf(n -> n.getNameserviceId().equals(nsId)); | ||
| invokeOnNs(method, clazz, io, nss); | ||
| }, IOException.class); | ||
| } else { | ||
| // If not have default NS. | ||
| invokeOnNsAsync(method, clazz, io, nss); | ||
| } | ||
| return asyncReturn(clazz); | ||
| } | ||
|
|
||
| /** | ||
| * Invoke the method sequentially on available namespaces, | ||
| * throw no namespace available exception, if no namespaces are available. | ||
|
|
@@ -824,6 +874,60 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe, | |
| throw ioe; | ||
| } | ||
|
|
||
| /** | ||
| * Invoke the method sequentially on available namespaces, | ||
| * throw no namespace available exception, if no namespaces are available. | ||
| * Asynchronous version of invokeOnNs method. | ||
| * @param method the remote method. | ||
| * @param clazz Class for the return type. | ||
| * @param ioe IOException . | ||
| * @param nss List of name spaces in the federation | ||
| * @return the response received after invoking method. | ||
| * @throws IOException | ||
| */ | ||
| <T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe, | ||
| Set<FederationNamespaceInfo> nss) throws IOException { | ||
| if (nss.isEmpty()) { | ||
| throw ioe; | ||
| } | ||
|
|
||
| asyncComplete(null); | ||
| Iterator<FederationNamespaceInfo> nsIterator = nss.iterator(); | ||
| asyncForEach(nsIterator, (foreach, fnInfo) -> { | ||
| String nsId = fnInfo.getNameserviceId(); | ||
| LOG.debug("Invoking {} on namespace {}", method, nsId); | ||
| asyncTry(() -> { | ||
| rpcClient.invokeSingle(nsId, method, clazz); | ||
| asyncApply(result -> { | ||
| if (result != null && isExpectedClass(clazz, result)) { | ||
| foreach.breakNow(); | ||
| return result; | ||
| } | ||
| return null; | ||
| }); | ||
| }); | ||
|
|
||
| asyncCatch((AsyncCatchFunction<T, IOException>)(ret, ex) -> { | ||
|
||
| LOG.debug("Failed to invoke {} on namespace {}", method, nsId, ex); | ||
| // Ignore the exception and try on other namespace, if the tried | ||
| // namespace is unavailable, else throw the received exception. | ||
| if (!clientProto.isUnavailableSubclusterException(ex)) { | ||
| throw ex; | ||
| } | ||
| }, IOException.class); | ||
| }); | ||
|
|
||
| asyncApply(obj -> { | ||
| if (obj == null) { | ||
| // Couldn't get a response from any of the namespace, throw ioe. | ||
| throw ioe; | ||
| } | ||
| return obj; | ||
| }); | ||
|
|
||
| return asyncReturn(clazz); | ||
| } | ||
|
|
||
| @Override // ClientProtocol | ||
| public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) | ||
| throws IOException { | ||
|
|
@@ -875,6 +979,10 @@ public HdfsFileStatus create(String src, FsPermission masked, | |
| */ | ||
| RemoteLocation getCreateLocation(final String src) throws IOException { | ||
| final List<RemoteLocation> locations = getLocationsForPath(src, true); | ||
| if (isAsync()) { | ||
| getCreateLocationAsync(src, locations); | ||
| return asyncReturn(RemoteLocation.class); | ||
| } | ||
| return getCreateLocation(src, locations); | ||
| } | ||
|
|
||
|
|
@@ -911,6 +1019,43 @@ RemoteLocation getCreateLocation( | |
| return createLocation; | ||
| } | ||
|
|
||
| /** | ||
| * Get the location to create a file. It checks if the file already existed | ||
| * in one of the locations. | ||
| * | ||
| * @param src Path of the file to check. | ||
| * @param locations Prefetched locations for the file. | ||
| * @return The remote location for this file. | ||
| * @throws IOException If the file has no creation location. | ||
| */ | ||
| RemoteLocation getCreateLocationAsync( | ||
| final String src, final List<RemoteLocation> locations) | ||
| throws IOException { | ||
|
|
||
| if (locations == null || locations.isEmpty()) { | ||
| throw new IOException("Cannot get locations to create " + src); | ||
| } | ||
|
|
||
| RemoteLocation createLocation = locations.get(0); | ||
| if (locations.size() > 1) { | ||
| asyncTry(() -> { | ||
| getExistingLocationAsync(src, locations); | ||
| asyncApply((ApplyFunction<RemoteLocation, RemoteLocation>) existingLocation -> { | ||
| if (existingLocation != null) { | ||
| LOG.debug("{} already exists in {}.", src, existingLocation); | ||
| return existingLocation; | ||
| } | ||
| return createLocation; | ||
| }); | ||
| }); | ||
| asyncCatch((o, e) -> createLocation, FileNotFoundException.class); | ||
| } else { | ||
| asyncComplete(createLocation); | ||
| } | ||
|
|
||
| return asyncReturn(RemoteLocation.class); | ||
| } | ||
|
|
||
| /** | ||
| * Gets the remote location where the file exists. | ||
| * @param src the name of file. | ||
|
|
@@ -932,6 +1077,31 @@ private RemoteLocation getExistingLocation(String src, | |
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the remote location where the file exists. | ||
| * Asynchronous version of getExistingLocation method. | ||
| * @param src the name of file. | ||
| * @param locations all the remote locations. | ||
| * @return the remote location of the file if it exists, else null. | ||
| * @throws IOException in case of any exception. | ||
| */ | ||
| private RemoteLocation getExistingLocationAsync(String src, | ||
| List<RemoteLocation> locations) throws IOException { | ||
| RemoteMethod method = new RemoteMethod("getFileInfo", | ||
| new Class<?>[] {String.class}, new RemoteParam()); | ||
| rpcClient.invokeConcurrent( | ||
| locations, method, true, false, HdfsFileStatus.class); | ||
| asyncApply((ApplyFunction<Map<RemoteLocation, HdfsFileStatus>, Object>) results -> { | ||
| for (RemoteLocation loc : locations) { | ||
| if (results.get(loc) != null) { | ||
| return loc; | ||
| } | ||
| } | ||
| return null; | ||
| }); | ||
| return asyncReturn(null); | ||
| } | ||
|
|
||
| @Override // ClientProtocol | ||
| public LastBlockWithStatus append(String src, final String clientName, | ||
| final EnumSetWritable<CreateFlag> flag) throws IOException { | ||
|
|
@@ -1186,6 +1356,38 @@ public DatanodeInfo[] getDatanodeReport( | |
| return toArray(datanodes, DatanodeInfo.class); | ||
| } | ||
|
|
||
| /** | ||
| * Get the datanode report with a timeout. | ||
| * Asynchronous version of the getDatanodeReport method. | ||
| * @param type Type of the datanode. | ||
| * @param requireResponse If we require all the namespaces to report. | ||
| * @param timeOutMs Time out for the reply in milliseconds. | ||
| * @return List of datanodes. | ||
| * @throws IOException If it cannot get the report. | ||
| */ | ||
| public DatanodeInfo[] getDatanodeReportAsync( | ||
| DatanodeReportType type, boolean requireResponse, long timeOutMs) | ||
| throws IOException { | ||
| checkOperation(OperationCategory.UNCHECKED); | ||
|
|
||
| Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>(); | ||
| RemoteMethod method = new RemoteMethod("getDatanodeReport", | ||
| new Class<?>[] {DatanodeReportType.class}, type); | ||
|
|
||
| Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); | ||
| rpcClient.invokeConcurrent(nss, method, requireResponse, false, | ||
| timeOutMs, DatanodeInfo[].class); | ||
|
|
||
| asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>, | ||
| DatanodeInfo[]>) results -> { | ||
| updateDnMap(results, datanodesMap); | ||
| // Map -> Array | ||
| Collection<DatanodeInfo> datanodes = datanodesMap.values(); | ||
| return toArray(datanodes, DatanodeInfo.class); | ||
| }); | ||
| return asyncReturn(DatanodeInfo[].class); | ||
| } | ||
|
|
||
| @Override // ClientProtocol | ||
| public DatanodeStorageReport[] getDatanodeStorageReport( | ||
| DatanodeReportType type) throws IOException { | ||
|
|
@@ -1236,6 +1438,42 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap( | |
| return ret; | ||
| } | ||
|
|
||
| /** | ||
| * Get the list of datanodes per subcluster. | ||
| * Asynchronous version of getDatanodeStorageReportMap method. | ||
| * @param type Type of the datanodes to get. | ||
| * @param requireResponse If true an exception will be thrown if all calls do | ||
| * not complete. If false exceptions are ignored and all data results | ||
| * successfully received are returned. | ||
| * @param timeOutMs Time out for the reply in milliseconds. | ||
| * @return nsId to datanode list. | ||
| * @throws IOException If the method cannot be invoked remotely. | ||
| */ | ||
| public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMapAsync( | ||
| DatanodeReportType type, boolean requireResponse, long timeOutMs) | ||
| throws IOException { | ||
|
|
||
| Map<String, DatanodeStorageReport[]> ret = new LinkedHashMap<>(); | ||
| RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", | ||
| new Class<?>[] {DatanodeReportType.class}, type); | ||
| Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); | ||
| rpcClient.invokeConcurrent( | ||
| nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class); | ||
|
|
||
| asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeStorageReport[]>, | ||
| Map<String, DatanodeStorageReport[]>>) results -> { | ||
| for (Entry<FederationNamespaceInfo, DatanodeStorageReport[]> entry : | ||
| results.entrySet()) { | ||
| FederationNamespaceInfo ns = entry.getKey(); | ||
| String nsId = ns.getNameserviceId(); | ||
| DatanodeStorageReport[] result = entry.getValue(); | ||
| ret.put(nsId, result); | ||
| } | ||
| return ret; | ||
| }); | ||
| return asyncReturn(ret.getClass()); | ||
| } | ||
|
|
||
| @Override // ClientProtocol | ||
| public boolean setSafeMode(SafeModeAction action, boolean isChecked) | ||
| throws IOException { | ||
|
|
@@ -2051,6 +2289,37 @@ public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOu | |
| return toArray(datanodes, DatanodeInfo.class); | ||
| } | ||
|
|
||
| /** | ||
| * Get the slow running datanodes report with a timeout. | ||
| * Asynchronous version of the getSlowDatanodeReport method. | ||
| * | ||
| * @param requireResponse If we require all the namespaces to report. | ||
| * @param timeOutMs Time out for the reply in milliseconds. | ||
| * @return List of datanodes. | ||
| * @throws IOException If it cannot get the report. | ||
| */ | ||
| public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long timeOutMs) | ||
| throws IOException { | ||
| checkOperation(OperationCategory.UNCHECKED); | ||
|
|
||
| Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>(); | ||
| RemoteMethod method = new RemoteMethod("getSlowDatanodeReport"); | ||
|
|
||
| Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); | ||
| rpcClient.invokeConcurrent(nss, method, requireResponse, false, | ||
| timeOutMs, DatanodeInfo[].class); | ||
|
|
||
| asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>, | ||
| DatanodeInfo[]>) results -> { | ||
| updateDnMap(results, datanodesMap); | ||
| // Map -> Array | ||
| Collection<DatanodeInfo> datanodes = datanodesMap.values(); | ||
| return toArray(datanodes, DatanodeInfo.class); | ||
| }); | ||
|
|
||
| return asyncReturn(DatanodeInfo[].class); | ||
| } | ||
|
|
||
| private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results, | ||
| Map<String, DatanodeInfo> datanodesMap) { | ||
| for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry : | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @hfutatzhanghb should use invokeOnNsAsync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed.