From 802a4057e5d1a7333646f2c59a2ef29eca3ca057 Mon Sep 17 00:00:00 2001 From: "zengqiang.xu" Date: Fri, 1 Jul 2022 16:27:52 +0800 Subject: [PATCH 1/6] HDFS-16283. RBF: reducing the load of renewLease() rpc --- .../org/apache/hadoop/hdfs/DFSClient.java | 26 +++++++++- .../apache/hadoop/hdfs/DFSOutputStream.java | 7 +++ .../hadoop/hdfs/protocol/ClientProtocol.java | 2 +- .../hadoop/hdfs/protocol/HdfsFileStatus.java | 4 ++ .../hdfs/protocol/HdfsLocatedFileStatus.java | 12 +++++ .../hdfs/protocol/HdfsNamedFileStatus.java | 11 +++++ .../ClientNamenodeProtocolTranslatorPB.java | 12 +++-- .../hdfs/protocolPB/PBHelperClient.java | 9 +++- .../main/proto/ClientNamenodeProtocol.proto | 1 + .../src/main/proto/hdfs.proto | 1 + .../router/RouterClientProtocol.java | 48 +++++++++++++++++-- .../federation/router/RouterRpcClient.java | 2 +- .../federation/router/RouterRpcServer.java | 5 +- .../fairness/TestRouterHandlersFairness.java | 2 +- .../metrics/TestRouterClientMetrics.java | 2 +- .../router/TestDisableNameservices.java | 4 +- .../TestRouterClientRejectOverload.java | 4 +- .../router/TestRouterRPCClientRetries.java | 2 +- .../federation/router/TestRouterRpc.java | 39 +++++++++++++++ ...amenodeProtocolServerSideTranslatorPB.java | 2 +- .../server/namenode/NameNodeRpcServer.java | 4 +- .../hadoop/hdfs/TestDFSClientRetries.java | 10 ++-- .../org/apache/hadoop/hdfs/TestLease.java | 5 +- 23 files changed, 183 insertions(+), 31 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 2682549cba126..6f7855545facd 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -41,6 +41,7 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -188,6 +189,7 @@ import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.tracing.TraceScope; import org.apache.hadoop.tracing.Tracer; @@ -579,6 +581,28 @@ void updateLastLeaseRenewal() { } } + /** + * Get all nsIdentifies of DFSOutputStreams. + */ + private String getRenewLeaseNSIdentifies() { + HashSet allNSIdentifies = new HashSet<>(); + synchronized (filesBeingWritten) { + if (filesBeingWritten.isEmpty()) { + return null; + } + for (DFSOutputStream outputStream : filesBeingWritten.values()) { + String nsIdentify = outputStream.getNsIdentify(); + if (nsIdentify != null && !nsIdentify.isEmpty()) { + allNSIdentifies.add(nsIdentify); + } + } + if (allNSIdentifies.isEmpty()) { + return null; + } + } + return StringUtils.join(",", allNSIdentifies); + } + /** * Renew leases. * @return true if lease was renewed. May return false if this @@ -587,7 +611,7 @@ void updateLastLeaseRenewal() { public boolean renewLease() throws IOException { if (clientRunning && !isFilesBeingWrittenEmpty()) { try { - namenode.renewLease(clientName); + namenode.renewLease(clientName, getRenewLeaseNSIdentifies()); updateLastLeaseRenewal(); return true; } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 0deaa41cf4175..15f027bce9bfa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -113,6 +113,7 @@ public class DFSOutputStream extends FSOutputSummer protected final String src; protected final long fileId; + private final String nsIdentify; protected final long blockSize; protected final int bytesPerChecksum; @@ -195,6 +196,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, this.dfsClient = dfsClient; this.src = src; this.fileId = stat.getFileId(); + this.nsIdentify = stat.getNsIdentify(); this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.fileEncryptionInfo = stat.getFileEncryptionInfo(); @@ -1084,6 +1086,11 @@ public long getFileId() { return fileId; } + @VisibleForTesting + public String getNsIdentify() { + return nsIdentify; + } + /** * Return the source of stream. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index e9ae803a541b1..cd01eb471b807 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -763,7 +763,7 @@ SnapshotStatus[] getSnapshotListing(String snapshotRoot) * @throws IOException If an I/O error occurred */ @Idempotent - void renewLease(String clientName) throws IOException; + void renewLease(String clientName, String allNSIdentifies) throws IOException; /** * Start lease recovery. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java index 264e3f4050fd7..0dc26e8916412 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java @@ -490,6 +490,10 @@ default FileStatus makeQualified(URI defaultUri, Path parent) { */ int compareTo(FileStatus stat); + void setNsIdentify(String nsIdentify); + + String getNsIdentify(); + /** * Set redundant flags for compatibility with existing applications. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java index bf4e0d2f9f16e..fe7fe2c0c387d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java @@ -54,6 +54,8 @@ public class HdfsLocatedFileStatus // BlockLocations[] is the user-facing type private transient LocatedBlocks hdfsloc; + private String nsIdentify = null; + /** * Constructor. * @param length the number of bytes the file has @@ -217,4 +219,14 @@ public LocatedFileStatus makeQualifiedLocated(URI defaultUri, Path path) { return this; } + @Override + public String getNsIdentify() { + return nsIdentify; + } + + @Override + public void setNsIdentify(String nsIdentify) { + this.nsIdentify = nsIdentify; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java index 9434423d721b9..6d863d73ea6dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java @@ -44,6 +44,8 @@ public class HdfsNamedFileStatus extends FileStatus implements HdfsFileStatus { private final int childrenNum; private final byte storagePolicy; + private String nsIdentify = null; + /** * Constructor. * @param length the number of bytes the file has @@ -177,4 +179,13 @@ public int hashCode() { return super.hashCode(); } + @Override + public String getNsIdentify() { + return nsIdentify; + } + + @Override + public void setNsIdentify(String nsIdentify) { + this.nsIdentify = nsIdentify; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 7b8ca42c50f7b..fee98dd42556e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -744,11 +744,15 @@ public BatchedDirectoryListing getBatchedListing( @Override - public void renewLease(String clientName) throws IOException { - RenewLeaseRequestProto req = RenewLeaseRequestProto.newBuilder() - .setClientName(clientName).build(); + public void renewLease(String clientName, String nsIdentifies) + throws IOException { + RenewLeaseRequestProto.Builder builder = RenewLeaseRequestProto + .newBuilder().setClientName(clientName); + if (nsIdentifies != null && !nsIdentifies.isEmpty()) { + builder.setNsIdentifies(nsIdentifies); + } try { - rpcProxy.renewLease(null, req); + rpcProxy.renewLease(null, builder.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 6097d2f495dac..10ba208547218 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -1764,7 +1764,7 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) { EnumSet flags = fs.hasFlags() ? convertFlags(fs.getFlags()) : convertFlags(fs.getPermission()); - return new HdfsFileStatus.Builder() + HdfsFileStatus hdfsFileStatus = new HdfsFileStatus.Builder() .length(fs.getLength()) .isdir(fs.getFileType().equals(FileType.IS_DIR)) .replication(fs.getBlockReplication()) @@ -1794,6 +1794,10 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) { ? convertErasureCodingPolicy(fs.getEcPolicy()) : null) .build(); + if (fs.hasNsIdentify()) { + hdfsFileStatus.setNsIdentify(fs.getNsIdentify()); + } + return hdfsFileStatus; } private static EnumSet convertFlags(int flags) { @@ -2399,6 +2403,9 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) { flags |= fs.isSnapshotEnabled() ? HdfsFileStatusProto.Flags .SNAPSHOT_ENABLED_VALUE : 0; builder.setFlags(flags); + if (fs.getNsIdentify() != null && !fs.getNsIdentify().isEmpty()) { + builder.setNsIdentify(fs.getNsIdentify()); + } return builder.build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 1e8d0b0a2667d..c5ad046e31f9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -332,6 +332,7 @@ message GetSnapshotDiffReportListingResponseProto { } message RenewLeaseRequestProto { required string clientName = 1; + optional string nsIdentifies = 2; } message RenewLeaseResponseProto { //void response diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 163d3a49d3014..3fc896b03bc30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -481,6 +481,7 @@ message HdfsFileStatusProto { // Set of flags optional uint32 flags = 18 [default = 0]; + optional string nsIdentify = 19; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index c1dafec92203b..8c151dc64f912 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -114,6 +114,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** @@ -129,6 +130,8 @@ public class RouterClientProtocol implements ClientProtocol { private final RouterFederationRename rbfRename; private final FileSubclusterResolver subclusterResolver; private final ActiveNamenodeResolver namenodeResolver; + private final Map nsNameSpaceInfoCache + = new ConcurrentHashMap<>(); /** * Caching server defaults so as to prevent redundant calls to namenode, @@ -291,8 +294,10 @@ public HdfsFileStatus create(String src, FsPermission masked, RemoteLocation createLocation = null; try { createLocation = rpcServer.getCreateLocation(src, locations); - return rpcClient.invokeSingle(createLocation, method, + HdfsFileStatus status = rpcClient.invokeSingle(createLocation, method, HdfsFileStatus.class); + status.setNsIdentify(createLocation.getNameserviceId()); + return status; } catch (IOException ioe) { final List newLocations = checkFaultTolerantRetry( method, src, ioe, createLocation, locations); @@ -759,14 +764,47 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) } } + /** + * Try to get a list of FederationNamespaceInfo for renewLease RPC. + */ + private List getRewLeaseNSs(String nsIdentifies) + throws IOException { + // return all namespaces + if (nsIdentifies == null || nsIdentifies.isEmpty()) { + return new ArrayList<>(namenodeResolver.getNamespaces()); + } + String[] nsIdList = nsIdentifies.split(","); + List result = new ArrayList<>(); + for (String nsId : nsIdList) { + FederationNamespaceInfo namespaceInfo = nsNameSpaceInfoCache.get(nsId); + if (namespaceInfo == null) { + try { + rpcClient.getNamenodesForNameservice(nsId); + } catch (IOException ioe) { + // return all namespaces when parsing nsId failed. + return new ArrayList<>(namenodeResolver.getNamespaces()); + } + namespaceInfo = new FederationNamespaceInfo("", "", nsId); + nsNameSpaceInfoCache.put(nsId, namespaceInfo); + } + result.add(namespaceInfo); + } + return result; + } + @Override - public void renewLease(String clientName) throws IOException { + public void renewLease(String clientName, String nsIdentifies) + throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); RemoteMethod method = new RemoteMethod("renewLease", - new Class[] {String.class}, clientName); - Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, false, false); + new Class[] {String.class, String.class}, clientName, null); + List nss = getRewLeaseNSs(nsIdentifies); + if (nss.size() == 1) { + rpcClient.invokeSingle(nss.get(0).getNameserviceId(), method); + } else { + rpcClient.invokeConcurrent(nss, method, false, false); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index ff90854ebb7ec..836be3cee6d65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1508,7 +1508,7 @@ private void transferThreadLocalContext( * @return A prioritized list of NNs to use for communication. * @throws IOException If a NN cannot be located for the nameservice ID. */ - private List getNamenodesForNameservice( + public List getNamenodesForNameservice( final String nsId) throws IOException { final List namenodes = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 58181dcc346cf..b8e1b34bbb037 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -980,8 +980,9 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) } @Override // ClientProtocol - public void renewLease(String clientName) throws IOException { - clientProto.renewLease(clientName); + public void renewLease(String clientName, String nsIdentifies) + throws IOException { + clientProto.renewLease(clientName, nsIdentifies); } @Override // ClientProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java index 2d27c66e37ee0..8fc9de0cb261f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java @@ -194,7 +194,7 @@ private void invokeSequential(ClientProtocol routerProto) throws IOException { private void invokeConcurrent(ClientProtocol routerProto, String clientName) throws IOException { - routerProto.renewLease(clientName); + routerProto.renewLease(clientName, null); } private int getTotalRejectedPermits(RouterContext routerContext) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestRouterClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestRouterClientMetrics.java index da16c05910785..3397718745ffb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestRouterClientMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestRouterClientMetrics.java @@ -156,7 +156,7 @@ public void testGetQuota() throws Exception { @Test public void testRenewLease() throws Exception { - router.getRpcServer().renewLease("test"); + router.getRpcServer().renewLease("test", null); assertCounter("RenewLeaseOps", 2L, getMetrics(ROUTER_METRICS)); assertCounter("ConcurrentRenewLeaseOps", 1L, getMetrics(ROUTER_METRICS)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableNameservices.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableNameservices.java index ae04150d70fa9..78f41c5d92a2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableNameservices.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableNameservices.java @@ -159,7 +159,7 @@ public void cleanup() throws IOException { public void testWithoutDisabling() throws IOException { // ns0 is slow and renewLease should take a long time long t0 = monotonicNow(); - routerProtocol.renewLease("client0"); + routerProtocol.renewLease("client0", null); long t = monotonicNow() - t0; assertTrue("It took too little: " + t + "ms", t > TimeUnit.SECONDS.toMillis(1)); @@ -178,7 +178,7 @@ public void testDisabling() throws Exception { // renewLease should be fast as we are skipping ns0 long t0 = monotonicNow(); - routerProtocol.renewLease("client0"); + routerProtocol.renewLease("client0", null); long t = monotonicNow() - t0; assertTrue("It took too long: " + t + "ms", t < TimeUnit.SECONDS.toMillis(1)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java index 71ec747af4c30..04cfb5c9d903a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -215,7 +215,7 @@ public void run() { routerClient = new DFSClient(address, conf); String clientName = routerClient.getClientName(); ClientProtocol routerProto = routerClient.getNamenode(); - routerProto.renewLease(clientName); + routerProto.renewLease(clientName, null); } catch (RemoteException re) { IOException ioe = re.unwrapRemoteException(); assertTrue("Wrong exception: " + ioe, @@ -390,7 +390,7 @@ public void testAsyncCallerPoolMetrics() throws Exception { cluster.getRouterClientConf()); String clientName = routerClient.getClientName(); ClientProtocol routerProto = routerClient.getNamenode(); - routerProto.renewLease(clientName); + routerProto.renewLease(clientName, null); } catch (Exception e) { fail("Client request failed: " + e); } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java index 73803d9805203..039acbb5988df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java @@ -153,7 +153,7 @@ public void testRetryWhenOneNameServiceDown() throws Exception { DFSClient client = nnContext1.getClient(); // Renew lease for the DFS client, it will succeed. - routerProtocol.renewLease(client.getClientName()); + routerProtocol.renewLease(client.getClientName(), null); // Verify the retry times, it will retry one time for ns0. FederationRPCMetrics rpcMetrics = routerContext.getRouter() diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 4aeb2ec9b8f30..6a99d04a585ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -58,6 +58,7 @@ import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -107,6 +108,7 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.MockResolver; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; @@ -1450,6 +1452,43 @@ public void testProxyRestoreFailedStorage() throws Exception { assertEquals(nnSuccess, routerSuccess); } + @Test + public void testRewnewLease() throws Exception { + // Install a mount point to a different path to check + MockResolver resolver = + (MockResolver)router.getRouter().getSubclusterResolver(); + String ns0 = cluster.getNameservices().get(0); + String ns1 = cluster.getNameservices().get(1); + resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0"); + resolver.addLocation("/testRenewLease1", ns1, "/testRenewLease1"); + + // Stop LeaseRenewer + DistributedFileSystem dfsRouterFS = (DistributedFileSystem) routerFS; + dfsRouterFS.getClient().getLeaseRenewer().interruptAndJoin(); + + Path testPath = new Path("/testRenewLease0/test.txt"); + FSDataOutputStream fsDataOutputStream = routerFS.create(testPath); + + FederationRPCMetrics rpcMetrics = router.getRouterRpcServer().getRPCMetrics(); + long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps(); + assertTrue(dfsRouterFS.getClient().renewLease()); + long proxyOpAfterRenewLease = rpcMetrics.getProxyOps(); + assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease); + fsDataOutputStream.close(); + + Path newTestPath0 = new Path("/testRenewLease0/test1.txt"); + Path newTestPath1 = new Path("/testRenewLease1/test1.txt"); + FSDataOutputStream fsDataOutputStream0 = routerFS.create(newTestPath0); + FSDataOutputStream fsDataOutputStream1 = routerFS.create(newTestPath1); + + long proxyOpBeforeRenewLease2 = rpcMetrics.getProxyOps(); + assertTrue(dfsRouterFS.getClient().renewLease()); + long proxyOpAfterRenewLease2 = rpcMetrics.getProxyOps(); + assertEquals((proxyOpBeforeRenewLease2 + 2), proxyOpAfterRenewLease2); + fsDataOutputStream0.close(); + fsDataOutputStream1.close(); + } + @Test public void testProxyExceptionMessages() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 0164f25460dc6..0f6ca8a136d47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -818,7 +818,7 @@ public GetBatchedListingResponseProto getBatchedListing( public RenewLeaseResponseProto renewLease(RpcController controller, RenewLeaseRequestProto req) throws ServiceException { try { - server.renewLease(req.getClientName()); + server.renewLease(req.getClientName(), req.getNsIdentifies()); return VOID_RENEWLEASE_RESPONSE; } catch (IOException e) { throw new ServiceException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index c86211b058bdd..b51f10509c110 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1174,8 +1174,10 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) } @Override // ClientProtocol - public void renewLease(String clientName) throws IOException { + public void renewLease(String clientName, String nsIdentifies) + throws IOException { checkNNStartup(); + // just ignore nsIdentifies namesystem.renewLease(clientName); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 970003b0e58cc..f5bdb948cff8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -384,7 +384,7 @@ public void testLeaseRenewSocketTimeout() throws Exception cluster.waitActive(); NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc()); Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease( - Mockito.anyString()); + Mockito.anyString(), null); DFSClient client = new DFSClient(null, spyNN, conf, null); // Get hold of the lease renewer instance used by the client final LeaseRenewer leaseRenewer1 = client.getLeaseRenewer(); @@ -392,7 +392,7 @@ public void testLeaseRenewSocketTimeout() throws Exception OutputStream out1 = client.create(file1, false); Mockito.verify(spyNN, timeout(10000).times(1)).renewLease( - Mockito.anyString()); + Mockito.anyString(), null); verifyEmptyLease(leaseRenewer1); GenericTestUtils.waitFor(() -> !(leaseRenewer1.isRunning()), 100, 10000); try { @@ -406,12 +406,12 @@ public void testLeaseRenewSocketTimeout() throws Exception // Verify DFSClient can do write operation after renewLease no longer // throws SocketTimeoutException. Mockito.doNothing().when(spyNN).renewLease( - Mockito.anyString()); + Mockito.anyString(), null); final LeaseRenewer leaseRenewer2 = client.getLeaseRenewer(); leaseRenewer2.setRenewalTime(100); OutputStream out2 = client.create(file2, false); Mockito.verify(spyNN, timeout(10000).times(2)).renewLease( - Mockito.anyString()); + Mockito.anyString(), null); out2.write(new byte[256]); out2.close(); verifyEmptyLease(leaseRenewer2); @@ -1309,7 +1309,7 @@ public void delayWhenRenewLeaseTimeout() { try { //1. trigger get LeaseRenewer lock Mockito.doThrow(new SocketTimeoutException()).when(spyNN) - .renewLease(Mockito.anyString()); + .renewLease(Mockito.anyString(), null); } catch (IOException e) { e.printStackTrace(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java index 5d7b62a42846a..efd26f2b2845f 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java @@ -90,7 +90,8 @@ public void testLeaseAbort() throws Exception { // stub the renew method. doThrow(new RemoteException(InvalidToken.class.getName(), - "Your token is worthless")).when(spyNN).renewLease(anyString()); + "Your token is worthless")).when(spyNN).renewLease( + anyString(), null); // We don't need to wait the lease renewer thread to act. // call renewLease() manually. @@ -131,7 +132,7 @@ public void testLeaseAbort() throws Exception { Assert.assertTrue(originalRenewer.isEmpty()); // unstub - doNothing().when(spyNN).renewLease(anyString()); + doNothing().when(spyNN).renewLease(anyString(), null); // existing input streams should work try { From de5120f96c70102df6271d0e59a5669fc1ac59cf Mon Sep 17 00:00:00 2001 From: "zengqiang.xu" Date: Sat, 2 Jul 2022 18:01:57 +0800 Subject: [PATCH 2/6] HDFS-16283. make some changes based on the comments --- .../org/apache/hadoop/hdfs/DFSClient.java | 23 +++-- .../apache/hadoop/hdfs/DFSOutputStream.java | 8 +- .../hadoop/hdfs/protocol/ClientProtocol.java | 10 ++- .../hadoop/hdfs/protocol/HdfsFileStatus.java | 4 +- .../hdfs/protocol/HdfsLocatedFileStatus.java | 10 +-- .../hdfs/protocol/HdfsNamedFileStatus.java | 10 +-- .../ClientNamenodeProtocolTranslatorPB.java | 6 +- .../hdfs/protocolPB/PBHelperClient.java | 8 +- .../main/proto/ClientNamenodeProtocol.proto | 2 +- .../src/main/proto/hdfs.proto | 2 +- .../router/RouterClientProtocol.java | 44 +++++----- .../federation/router/RouterRpcClient.java | 4 + .../federation/router/RouterRpcServer.java | 4 +- .../federation/router/TestRouterRpc.java | 86 +++++++++++++++---- ...amenodeProtocolServerSideTranslatorPB.java | 2 +- .../server/namenode/NameNodeRpcServer.java | 6 +- 16 files changed, 148 insertions(+), 81 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 6f7855545facd..3bfa6298af81a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -582,25 +582,24 @@ void updateLastLeaseRenewal() { } /** - * Get all nsIdentifies of DFSOutputStreams. + * Get all namespaces of DFSOutputStreams. */ - private String getRenewLeaseNSIdentifies() { - HashSet allNSIdentifies = new HashSet<>(); + private List getNamespaces() { + HashSet namespaces = new HashSet<>(); synchronized (filesBeingWritten) { - if (filesBeingWritten.isEmpty()) { - return null; - } for (DFSOutputStream outputStream : filesBeingWritten.values()) { - String nsIdentify = outputStream.getNsIdentify(); - if (nsIdentify != null && !nsIdentify.isEmpty()) { - allNSIdentifies.add(nsIdentify); + String namespace = outputStream.getNamespace(); + if (namespace == null || namespace.isEmpty()) { + return null; + } else { + namespaces.add(namespace); } } - if (allNSIdentifies.isEmpty()) { + if (namespaces.isEmpty()) { return null; } } - return StringUtils.join(",", allNSIdentifies); + return new ArrayList<>(namespaces); } /** @@ -611,7 +610,7 @@ private String getRenewLeaseNSIdentifies() { public boolean renewLease() throws IOException { if (clientRunning && !isFilesBeingWrittenEmpty()) { try { - namenode.renewLease(clientName, getRenewLeaseNSIdentifies()); + namenode.renewLease(clientName, getNamespaces()); updateLastLeaseRenewal(); return true; } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 15f027bce9bfa..92df7c51b23e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -113,7 +113,7 @@ public class DFSOutputStream extends FSOutputSummer protected final String src; protected final long fileId; - private final String nsIdentify; + private final String namespace; protected final long blockSize; protected final int bytesPerChecksum; @@ -196,7 +196,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, this.dfsClient = dfsClient; this.src = src; this.fileId = stat.getFileId(); - this.nsIdentify = stat.getNsIdentify(); + this.namespace = stat.getNamespace(); this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.fileEncryptionInfo = stat.getFileEncryptionInfo(); @@ -1087,8 +1087,8 @@ public long getFileId() { } @VisibleForTesting - public String getNsIdentify() { - return nsIdentify; + public String getNamespace() { + return namespace; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index cd01eb471b807..9395d97710972 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -759,11 +759,19 @@ SnapshotStatus[] getSnapshotListing(String snapshotRoot) * the last call to renewLease(), the NameNode assumes the * client has died. * + * @param namespaces The full Namespace list that the release rpc + * should be forwarded by RBF. + * Tips: NN side, this value should be null. + * RBF side, if this value is null, this rpc will + * be forwarded to all available namespaces, + * else this rpc will be forwarded to + * the special namespaces. + * * @throws org.apache.hadoop.security.AccessControlException permission denied * @throws IOException If an I/O error occurred */ @Idempotent - void renewLease(String clientName, String allNSIdentifies) throws IOException; + void renewLease(String clientName, List namespaces) throws IOException; /** * Start lease recovery. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java index 0dc26e8916412..efc3b90b5a970 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java @@ -490,9 +490,9 @@ default FileStatus makeQualified(URI defaultUri, Path parent) { */ int compareTo(FileStatus stat); - void setNsIdentify(String nsIdentify); + void setNamespace(String namespace); - String getNsIdentify(); + String getNamespace(); /** * Set redundant flags for compatibility with existing applications. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java index fe7fe2c0c387d..a3d4867cff45d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java @@ -54,7 +54,7 @@ public class HdfsLocatedFileStatus // BlockLocations[] is the user-facing type private transient LocatedBlocks hdfsloc; - private String nsIdentify = null; + private String namespace = null; /** * Constructor. @@ -220,13 +220,13 @@ public LocatedFileStatus makeQualifiedLocated(URI defaultUri, Path path) { } @Override - public String getNsIdentify() { - return nsIdentify; + public String getNamespace() { + return namespace; } @Override - public void setNsIdentify(String nsIdentify) { - this.nsIdentify = nsIdentify; + public void setNamespace(String namespace) { + this.namespace = namespace; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java index 6d863d73ea6dd..4c90e17e4a5d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java @@ -44,7 +44,7 @@ public class HdfsNamedFileStatus extends FileStatus implements HdfsFileStatus { private final int childrenNum; private final byte storagePolicy; - private String nsIdentify = null; + private String namespace = null; /** * Constructor. @@ -180,12 +180,12 @@ public int hashCode() { } @Override - public String getNsIdentify() { - return nsIdentify; + public String getNamespace() { + return namespace; } @Override - public void setNsIdentify(String nsIdentify) { - this.nsIdentify = nsIdentify; + public void setNamespace(String namespace) { + this.namespace = namespace; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index fee98dd42556e..541a4361896dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -744,12 +744,12 @@ public BatchedDirectoryListing getBatchedListing( @Override - public void renewLease(String clientName, String nsIdentifies) + public void renewLease(String clientName, List namespaces) throws IOException { RenewLeaseRequestProto.Builder builder = RenewLeaseRequestProto .newBuilder().setClientName(clientName); - if (nsIdentifies != null && !nsIdentifies.isEmpty()) { - builder.setNsIdentifies(nsIdentifies); + if (namespaces != null && !namespaces.isEmpty()) { + builder.addAllNamespaces(namespaces); } try { rpcProxy.renewLease(null, builder.build()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 10ba208547218..496a5cf46146d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -1794,8 +1794,8 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) { ? convertErasureCodingPolicy(fs.getEcPolicy()) : null) .build(); - if (fs.hasNsIdentify()) { - hdfsFileStatus.setNsIdentify(fs.getNsIdentify()); + if (fs.hasNamespace()) { + hdfsFileStatus.setNamespace(fs.getNamespace()); } return hdfsFileStatus; } @@ -2403,8 +2403,8 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) { flags |= fs.isSnapshotEnabled() ? HdfsFileStatusProto.Flags .SNAPSHOT_ENABLED_VALUE : 0; builder.setFlags(flags); - if (fs.getNsIdentify() != null && !fs.getNsIdentify().isEmpty()) { - builder.setNsIdentify(fs.getNsIdentify()); + if (fs.getNamespace() != null && !fs.getNamespace().isEmpty()) { + builder.setNamespace(fs.getNamespace()); } return builder.build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index c5ad046e31f9f..60792b5b6c94c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -332,7 +332,7 @@ message GetSnapshotDiffReportListingResponseProto { } message RenewLeaseRequestProto { required string clientName = 1; - optional string nsIdentifies = 2; + repeated string namespaces = 2; } message RenewLeaseResponseProto { //void response diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 3fc896b03bc30..a4d36180c2c7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -481,7 +481,7 @@ message HdfsFileStatusProto { // Set of flags optional uint32 flags = 18 [default = 0]; - optional string nsIdentify = 19; + optional string namespace = 19; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 8c151dc64f912..cd6397794e1c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -130,8 +130,6 @@ public class RouterClientProtocol implements ClientProtocol { private final RouterFederationRename rbfRename; private final FileSubclusterResolver subclusterResolver; private final ActiveNamenodeResolver namenodeResolver; - private final Map nsNameSpaceInfoCache - = new ConcurrentHashMap<>(); /** * Caching server defaults so as to prevent redundant calls to namenode, @@ -296,7 +294,7 @@ public HdfsFileStatus create(String src, FsPermission masked, createLocation = rpcServer.getCreateLocation(src, locations); HdfsFileStatus status = rpcClient.invokeSingle(createLocation, method, HdfsFileStatus.class); - status.setNsIdentify(createLocation.getNameserviceId()); + status.setNamespace(createLocation.getNameserviceId()); return status; } catch (IOException ioe) { final List newLocations = checkFaultTolerantRetry( @@ -764,42 +762,44 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) } } + private Map getAvailableNamespaces() + throws IOException { + Map allAvailableNamespaces = + new HashMap<>(); + namenodeResolver.getNamespaces().forEach( + k -> allAvailableNamespaces.put(k.getNameserviceId(), k)); + return allAvailableNamespaces; + } + /** * Try to get a list of FederationNamespaceInfo for renewLease RPC. */ - private List getRewLeaseNSs(String nsIdentifies) + private List getRewLeaseNSs(List namespaces) throws IOException { - // return all namespaces - if (nsIdentifies == null || nsIdentifies.isEmpty()) { + if (namespaces == null || namespaces.isEmpty()) { return new ArrayList<>(namenodeResolver.getNamespaces()); } - String[] nsIdList = nsIdentifies.split(","); List result = new ArrayList<>(); - for (String nsId : nsIdList) { - FederationNamespaceInfo namespaceInfo = nsNameSpaceInfoCache.get(nsId); - if (namespaceInfo == null) { - try { - rpcClient.getNamenodesForNameservice(nsId); - } catch (IOException ioe) { - // return all namespaces when parsing nsId failed. - return new ArrayList<>(namenodeResolver.getNamespaces()); - } - namespaceInfo = new FederationNamespaceInfo("", "", nsId); - nsNameSpaceInfoCache.put(nsId, namespaceInfo); + Map allAvailableNamespaces = + getAvailableNamespaces(); + for (String namespace : namespaces) { + if (!allAvailableNamespaces.containsKey(namespace)) { + return new ArrayList<>(namenodeResolver.getNamespaces()); + } else { + result.add(allAvailableNamespaces.get(namespace)); } - result.add(namespaceInfo); } return result; } @Override - public void renewLease(String clientName, String nsIdentifies) + public void renewLease(String clientName, List namespaces) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); RemoteMethod method = new RemoteMethod("renewLease", - new Class[] {String.class, String.class}, clientName, null); - List nss = getRewLeaseNSs(nsIdentifies); + new Class[] {String.class, List.class}, clientName, null); + List nss = getRewLeaseNSs(namespaces); if (nss.size() == 1) { rpcClient.invokeSingle(nss.get(0).getNameserviceId(), method); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 836be3cee6d65..6770706499b37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; @@ -1012,6 +1013,9 @@ public RemoteResult invokeSequential( // Valid result, stop here @SuppressWarnings("unchecked") R location = (R) loc; @SuppressWarnings("unchecked") T ret = (T) result; + if (ret instanceof LastBlockWithStatus) { + ((LastBlockWithStatus) ret).getFileStatus().setNamespace(ns); + } return new RemoteResult<>(location, ret); } if (firstResult == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index b8e1b34bbb037..980d64a45d134 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -980,9 +980,9 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) } @Override // ClientProtocol - public void renewLease(String clientName, String nsIdentifies) + public void renewLease(String clientName, List namespaces) throws IOException { - clientProto.renewLease(clientName, nsIdentifies); + clientProto.renewLease(clientName, namespaces); } @Override // ClientProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 6a99d04a585ec..af5a5e919e5c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -1465,28 +1465,80 @@ public void testRewnewLease() throws Exception { // Stop LeaseRenewer DistributedFileSystem dfsRouterFS = (DistributedFileSystem) routerFS; dfsRouterFS.getClient().getLeaseRenewer().interruptAndJoin(); + FederationRPCMetrics rpcMetrics = router.getRouterRpcServer().getRPCMetrics(); - Path testPath = new Path("/testRenewLease0/test.txt"); - FSDataOutputStream fsDataOutputStream = routerFS.create(testPath); + // Test Replica File + Path testPath = new Path("/testRenewLease0/test_replica.txt"); + FSDataOutputStream replicaOutputStream = null; + try { + replicaOutputStream = routerFS.create(testPath); + replicaOutputStream.write("hello world create. \n".getBytes()); + long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps(); + assertTrue(dfsRouterFS.getClient().renewLease()); + long proxyOpAfterRenewLease = rpcMetrics.getProxyOps(); + assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease); + } finally { + if (replicaOutputStream != null) { + replicaOutputStream.close(); + } + } - FederationRPCMetrics rpcMetrics = router.getRouterRpcServer().getRPCMetrics(); - long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps(); - assertTrue(dfsRouterFS.getClient().renewLease()); - long proxyOpAfterRenewLease = rpcMetrics.getProxyOps(); - assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease); - fsDataOutputStream.close(); + try { + replicaOutputStream = routerFS.append(testPath); + replicaOutputStream.write("hello world append. \n".getBytes()); + long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps(); + assertTrue(dfsRouterFS.getClient().renewLease()); + long proxyOpAfterRenewLease = rpcMetrics.getProxyOps(); + assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease); + } finally { + if (replicaOutputStream != null) { + replicaOutputStream.close(); + } + } + // Test Replica File + Path testECPath = new Path("/testRenewLease0/ecDirectory/test_ec.txt"); + FSDataOutputStream ecOutputStream = null; + try { + routerFS.mkdirs(testECPath.getParent()); + ((DistributedFileSystem) routerFS).setErasureCodingPolicy( + testECPath.getParent(), "RS-6-3-1024k"); + ecOutputStream = routerFS.create(testECPath); + ecOutputStream.write("hello world ec file. \n".getBytes()); + ErasureCodingPolicy ecPolicy = ((DistributedFileSystem) routerFS) + .getErasureCodingPolicy(testECPath); + assertNotNull(ecPolicy); + assertEquals("RS-6-3-1024k", ecPolicy.getName()); + long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps(); + assertTrue(dfsRouterFS.getClient().renewLease()); + long proxyOpAfterRenewLease = rpcMetrics.getProxyOps(); + assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease); + } finally { + if (ecOutputStream != null) { + ecOutputStream.close(); + } + } + + FSDataOutputStream fsDataOutputStream0 = null; + FSDataOutputStream fsDataOutputStream1 = null; Path newTestPath0 = new Path("/testRenewLease0/test1.txt"); Path newTestPath1 = new Path("/testRenewLease1/test1.txt"); - FSDataOutputStream fsDataOutputStream0 = routerFS.create(newTestPath0); - FSDataOutputStream fsDataOutputStream1 = routerFS.create(newTestPath1); - - long proxyOpBeforeRenewLease2 = rpcMetrics.getProxyOps(); - assertTrue(dfsRouterFS.getClient().renewLease()); - long proxyOpAfterRenewLease2 = rpcMetrics.getProxyOps(); - assertEquals((proxyOpBeforeRenewLease2 + 2), proxyOpAfterRenewLease2); - fsDataOutputStream0.close(); - fsDataOutputStream1.close(); + try { + fsDataOutputStream0 = routerFS.create(newTestPath0); + fsDataOutputStream1 = routerFS.create(newTestPath1); + + long proxyOpBeforeRenewLease2 = rpcMetrics.getProxyOps(); + assertTrue(dfsRouterFS.getClient().renewLease()); + long proxyOpAfterRenewLease2 = rpcMetrics.getProxyOps(); + assertEquals((proxyOpBeforeRenewLease2 + 2), proxyOpAfterRenewLease2); + } finally { + if (fsDataOutputStream0 != null) { + fsDataOutputStream0.close(); + } + if (fsDataOutputStream1 != null) { + fsDataOutputStream1.close(); + } + } } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 0f6ca8a136d47..79c122cf5bae0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -818,7 +818,7 @@ public GetBatchedListingResponseProto getBatchedListing( public RenewLeaseResponseProto renewLease(RpcController controller, RenewLeaseRequestProto req) throws ServiceException { try { - server.renewLease(req.getClientName(), req.getNsIdentifies()); + server.renewLease(req.getClientName(), req.getNamespacesList()); return VOID_RENEWLEASE_RESPONSE; } catch (IOException e) { throw new ServiceException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index b51f10509c110..1e53fa7e2f99e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1174,8 +1174,12 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) } @Override // ClientProtocol - public void renewLease(String clientName, String nsIdentifies) + public void renewLease(String clientName, List namespaces) throws IOException { + if (namespaces != null && namespaces.size() > 0) { + LOG.warn("namespaces({}) should be null or empty " + + "on NameNode side, please check it.", namespaces); + } checkNNStartup(); // just ignore nsIdentifies namesystem.renewLease(clientName); From 2d6e01e1da15f207d1e323fdcd0db16ef3f7eadb Mon Sep 17 00:00:00 2001 From: "zengqiang.xu" Date: Sun, 3 Jul 2022 20:44:01 +0800 Subject: [PATCH 3/6] HDFS-16283. make some changes based on the comments --- .../org/apache/hadoop/hdfs/DFSClient.java | 1 - .../router/RouterClientProtocol.java | 8 +- .../federation/router/RouterRpcClient.java | 3 - .../federation/router/TestRouterRpc.java | 145 +++++++++--------- .../server/namenode/NameNodeRpcServer.java | 2 + .../hadoop/hdfs/TestDFSClientRetries.java | 10 +- 6 files changed, 88 insertions(+), 81 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 3bfa6298af81a..f314ac9c6e35d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -189,7 +189,6 @@ import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.tracing.TraceScope; import org.apache.hadoop.tracing.Tracer; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index cd6397794e1c4..51ea55c51bebc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -114,7 +114,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** @@ -380,8 +379,11 @@ public LastBlockWithStatus append(String src, final String clientName, RemoteMethod method = new RemoteMethod("append", new Class[] {String.class, String.class, EnumSetWritable.class}, new RemoteParam(), clientName, flag); - return rpcClient.invokeSequential( - locations, method, LastBlockWithStatus.class, null); + RemoteResult result = rpcClient.invokeSequential( + method, locations, LastBlockWithStatus.class, null); + LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult(); + lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId()); + return lbws; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 6770706499b37..7a250a53b58f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1013,9 +1013,6 @@ public RemoteResult invokeSequential( // Valid result, stop here @SuppressWarnings("unchecked") R location = (R) loc; @SuppressWarnings("unchecked") T ret = (T) result; - if (ret instanceof LastBlockWithStatus) { - ((LastBlockWithStatus) ret).getFileStatus().setNamespace(ns); - } return new RemoteResult<>(location, ret); } if (firstResult == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index af5a5e919e5c4..f0f4e0e571bed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -1452,92 +1452,99 @@ public void testProxyRestoreFailedStorage() throws Exception { assertEquals(nnSuccess, routerSuccess); } + private void testRenewLeaseInternal(DistributedFileSystem dfs, + FederationRPCMetrics rpcMetrics, Path testPath, boolean createFlag) + throws Exception { + FSDataOutputStream outputStream = null; + try { + if (createFlag) { + outputStream = dfs.create(testPath); + } else { + outputStream = dfs.append(testPath); + } + outputStream.write("hello world. \n".getBytes()); + long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps(); + assertTrue(dfs.getClient().renewLease()); + long proxyOpAfterRenewLease = rpcMetrics.getProxyOps(); + assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease); + } finally { + if (outputStream != null) { + outputStream.close(); + } + } + } + + @Test - public void testRewnewLease() throws Exception { + public void testRenewLeaseForECFile() throws Exception { + String ecName = "RS-6-3-1024k"; + FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics(); // Install a mount point to a different path to check MockResolver resolver = (MockResolver)router.getRouter().getSubclusterResolver(); String ns0 = cluster.getNameservices().get(0); - String ns1 = cluster.getNameservices().get(1); resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0"); - resolver.addLocation("/testRenewLease1", ns1, "/testRenewLease1"); // Stop LeaseRenewer - DistributedFileSystem dfsRouterFS = (DistributedFileSystem) routerFS; - dfsRouterFS.getClient().getLeaseRenewer().interruptAndJoin(); - FederationRPCMetrics rpcMetrics = router.getRouterRpcServer().getRPCMetrics(); + DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS; + routerDFS.getClient().getLeaseRenewer().interruptAndJoin(); + + Path testECPath = new Path("/testRenewLease0/ecDirectory/test_ec.txt"); + routerDFS.mkdirs(testECPath.getParent()); + routerDFS.setErasureCodingPolicy( + testECPath.getParent(), ecName); + testRenewLeaseInternal(routerDFS, metrics, testECPath, true); + + ErasureCodingPolicy ecPolicy = routerDFS.getErasureCodingPolicy(testECPath); + assertNotNull(ecPolicy); + assertEquals(ecName, ecPolicy.getName()); + } + + + @Test + public void testRenewLeaseForReplicaFile() throws Exception { + FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics(); + // Install a mount point to a different path to check + MockResolver resolver = + (MockResolver)router.getRouter().getSubclusterResolver(); + String ns0 = cluster.getNameservices().get(0); + resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0"); + + // Stop LeaseRenewer + DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS; + routerDFS.getClient().getLeaseRenewer().interruptAndJoin(); // Test Replica File Path testPath = new Path("/testRenewLease0/test_replica.txt"); - FSDataOutputStream replicaOutputStream = null; - try { - replicaOutputStream = routerFS.create(testPath); - replicaOutputStream.write("hello world create. \n".getBytes()); - long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps(); - assertTrue(dfsRouterFS.getClient().renewLease()); - long proxyOpAfterRenewLease = rpcMetrics.getProxyOps(); - assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease); - } finally { - if (replicaOutputStream != null) { - replicaOutputStream.close(); - } - } + testRenewLeaseInternal(routerDFS, metrics, testPath, true); + testRenewLeaseInternal(routerDFS, metrics, testPath, false); + } - try { - replicaOutputStream = routerFS.append(testPath); - replicaOutputStream.write("hello world append. \n".getBytes()); - long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps(); - assertTrue(dfsRouterFS.getClient().renewLease()); - long proxyOpAfterRenewLease = rpcMetrics.getProxyOps(); - assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease); - } finally { - if (replicaOutputStream != null) { - replicaOutputStream.close(); - } - } + @Test + public void testRenewLeaseWithMultiStream() throws Exception { + FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics(); + // Install a mount point to a different path to check + MockResolver resolver = + (MockResolver)router.getRouter().getSubclusterResolver(); + String ns0 = cluster.getNameservices().get(0); + String ns1 = cluster.getNameservices().get(1); + resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0"); + resolver.addLocation("/testRenewLease1", ns1, "/testRenewLease1"); - // Test Replica File - Path testECPath = new Path("/testRenewLease0/ecDirectory/test_ec.txt"); - FSDataOutputStream ecOutputStream = null; - try { - routerFS.mkdirs(testECPath.getParent()); - ((DistributedFileSystem) routerFS).setErasureCodingPolicy( - testECPath.getParent(), "RS-6-3-1024k"); - ecOutputStream = routerFS.create(testECPath); - ecOutputStream.write("hello world ec file. \n".getBytes()); - ErasureCodingPolicy ecPolicy = ((DistributedFileSystem) routerFS) - .getErasureCodingPolicy(testECPath); - assertNotNull(ecPolicy); - assertEquals("RS-6-3-1024k", ecPolicy.getName()); - long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps(); - assertTrue(dfsRouterFS.getClient().renewLease()); - long proxyOpAfterRenewLease = rpcMetrics.getProxyOps(); - assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease); - } finally { - if (ecOutputStream != null) { - ecOutputStream.close(); - } - } + // Stop LeaseRenewer + DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS; + routerDFS.getClient().getLeaseRenewer().interruptAndJoin(); - FSDataOutputStream fsDataOutputStream0 = null; - FSDataOutputStream fsDataOutputStream1 = null; Path newTestPath0 = new Path("/testRenewLease0/test1.txt"); Path newTestPath1 = new Path("/testRenewLease1/test1.txt"); - try { - fsDataOutputStream0 = routerFS.create(newTestPath0); - fsDataOutputStream1 = routerFS.create(newTestPath1); - - long proxyOpBeforeRenewLease2 = rpcMetrics.getProxyOps(); - assertTrue(dfsRouterFS.getClient().renewLease()); - long proxyOpAfterRenewLease2 = rpcMetrics.getProxyOps(); + try (FSDataOutputStream outStream1 = routerDFS.create(newTestPath0); + FSDataOutputStream outStream2 = routerDFS.create(newTestPath1)) { + outStream1.write("hello world \n".getBytes()); + outStream2.write("hello world \n".getBytes()); + long proxyOpBeforeRenewLease2 = metrics.getProxyOps(); + assertTrue(routerDFS.getClient().renewLease()); + long proxyOpAfterRenewLease2 = metrics.getProxyOps(); assertEquals((proxyOpBeforeRenewLease2 + 2), proxyOpAfterRenewLease2); - } finally { - if (fsDataOutputStream0 != null) { - fsDataOutputStream0.close(); - } - if (fsDataOutputStream1 != null) { - fsDataOutputStream1.close(); - } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 1e53fa7e2f99e..f6f40a38d6e8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1179,6 +1179,8 @@ public void renewLease(String clientName, List namespaces) if (namespaces != null && namespaces.size() > 0) { LOG.warn("namespaces({}) should be null or empty " + "on NameNode side, please check it.", namespaces); + throw new IOException("namespaces(" + namespaces + + ") should be null or empty"); } checkNNStartup(); // just ignore nsIdentifies diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index f5bdb948cff8d..c335d38f73311 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -384,7 +384,7 @@ public void testLeaseRenewSocketTimeout() throws Exception cluster.waitActive(); NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc()); Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease( - Mockito.anyString(), null); + Mockito.anyString(), any()); DFSClient client = new DFSClient(null, spyNN, conf, null); // Get hold of the lease renewer instance used by the client final LeaseRenewer leaseRenewer1 = client.getLeaseRenewer(); @@ -392,7 +392,7 @@ public void testLeaseRenewSocketTimeout() throws Exception OutputStream out1 = client.create(file1, false); Mockito.verify(spyNN, timeout(10000).times(1)).renewLease( - Mockito.anyString(), null); + Mockito.anyString(), any()); verifyEmptyLease(leaseRenewer1); GenericTestUtils.waitFor(() -> !(leaseRenewer1.isRunning()), 100, 10000); try { @@ -406,12 +406,12 @@ public void testLeaseRenewSocketTimeout() throws Exception // Verify DFSClient can do write operation after renewLease no longer // throws SocketTimeoutException. Mockito.doNothing().when(spyNN).renewLease( - Mockito.anyString(), null); + Mockito.anyString(), any()); final LeaseRenewer leaseRenewer2 = client.getLeaseRenewer(); leaseRenewer2.setRenewalTime(100); OutputStream out2 = client.create(file2, false); Mockito.verify(spyNN, timeout(10000).times(2)).renewLease( - Mockito.anyString(), null); + Mockito.anyString(), any()); out2.write(new byte[256]); out2.close(); verifyEmptyLease(leaseRenewer2); @@ -1309,7 +1309,7 @@ public void delayWhenRenewLeaseTimeout() { try { //1. trigger get LeaseRenewer lock Mockito.doThrow(new SocketTimeoutException()).when(spyNN) - .renewLease(Mockito.anyString(), null); + .renewLease(Mockito.anyString(), any()); } catch (IOException e) { e.printStackTrace(); } From 642f92050aa3877adef81dd8154cc55c8d5dff96 Mon Sep 17 00:00:00 2001 From: "zengqiang.xu" Date: Mon, 4 Jul 2022 08:43:47 +0800 Subject: [PATCH 4/6] HDFS-16283. Fix checkstyle and failed UTs --- .../hadoop/hdfs/server/federation/router/RouterRpcClient.java | 1 - .../src/test/java/org/apache/hadoop/hdfs/TestLease.java | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 7a250a53b58f9..836be3cee6d65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java index efd26f2b2845f..8b527d07a29b8 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java @@ -91,7 +91,7 @@ public void testLeaseAbort() throws Exception { // stub the renew method. doThrow(new RemoteException(InvalidToken.class.getName(), "Your token is worthless")).when(spyNN).renewLease( - anyString(), null); + anyString(), any()); // We don't need to wait the lease renewer thread to act. // call renewLease() manually. @@ -132,7 +132,7 @@ public void testLeaseAbort() throws Exception { Assert.assertTrue(originalRenewer.isEmpty()); // unstub - doNothing().when(spyNN).renewLease(anyString(), null); + doNothing().when(spyNN).renewLease(anyString(), any()); // existing input streams should work try { From 86ad8912390d21661196511dcca51a76847abf8e Mon Sep 17 00:00:00 2001 From: "zengqiang.xu" Date: Fri, 8 Jul 2022 18:03:07 +0800 Subject: [PATCH 5/6] HDFS-16283. Modify the patch based on comment --- .../hadoop/hdfs/server/federation/router/RouterRpcClient.java | 2 +- .../apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 836be3cee6d65..ff90854ebb7ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1508,7 +1508,7 @@ private void transferThreadLocalContext( * @return A prioritized list of NNs to use for communication. * @throws IOException If a NN cannot be located for the nameservice ID. */ - public List getNamenodesForNameservice( + private List getNamenodesForNameservice( final String nsId) throws IOException { final List namenodes = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index f6f40a38d6e8c..1d50bc5cb533d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1183,7 +1183,6 @@ public void renewLease(String clientName, List namespaces) + ") should be null or empty"); } checkNNStartup(); - // just ignore nsIdentifies namesystem.renewLease(clientName); } From 6739c2214b3a50de9200fcf6ee979a24777c376e Mon Sep 17 00:00:00 2001 From: "zengqiang.xu" Date: Fri, 8 Jul 2022 20:55:26 +0800 Subject: [PATCH 6/6] HDFS-16283. Modify this patch based on the comment --- .../java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java | 2 +- .../hdfs/server/federation/router/RouterClientProtocol.java | 4 ++-- .../hadoop/hdfs/server/federation/router/TestRouterRpc.java | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 9395d97710972..4f2da496a1a3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -759,7 +759,7 @@ SnapshotStatus[] getSnapshotListing(String snapshotRoot) * the last call to renewLease(), the NameNode assumes the * client has died. * - * @param namespaces The full Namespace list that the release rpc + * @param namespaces The full Namespace list that the renewLease rpc * should be forwarded by RBF. * Tips: NN side, this value should be null. * RBF side, if this value is null, this rpc will diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 51ea55c51bebc..73445595de7ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -776,7 +776,7 @@ private Map getAvailableNamespaces() /** * Try to get a list of FederationNamespaceInfo for renewLease RPC. */ - private List getRewLeaseNSs(List namespaces) + private List getRenewLeaseNSs(List namespaces) throws IOException { if (namespaces == null || namespaces.isEmpty()) { return new ArrayList<>(namenodeResolver.getNamespaces()); @@ -801,7 +801,7 @@ public void renewLease(String clientName, List namespaces) RemoteMethod method = new RemoteMethod("renewLease", new Class[] {String.class, List.class}, clientName, null); - List nss = getRewLeaseNSs(namespaces); + List nss = getRenewLeaseNSs(namespaces); if (nss.size() == 1) { rpcClient.invokeSingle(nss.get(0).getNameserviceId(), method); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index f0f4e0e571bed..f71145a452292 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -1474,7 +1474,6 @@ private void testRenewLeaseInternal(DistributedFileSystem dfs, } } - @Test public void testRenewLeaseForECFile() throws Exception { String ecName = "RS-6-3-1024k";