Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -579,6 +580,27 @@ void updateLastLeaseRenewal() {
}
}

/**
* Get all namespaces of DFSOutputStreams.
*/
private List<String> getNamespaces() {
HashSet<String> namespaces = new HashSet<>();
synchronized (filesBeingWritten) {
for (DFSOutputStream outputStream : filesBeingWritten.values()) {
String namespace = outputStream.getNamespace();
if (namespace == null || namespace.isEmpty()) {
return null;
} else {
namespaces.add(namespace);
}
}
if (namespaces.isEmpty()) {
return null;
}
}
return new ArrayList<>(namespaces);
}

/**
* Renew leases.
* @return true if lease was renewed. May return false if this
Expand All @@ -587,7 +609,7 @@ void updateLastLeaseRenewal() {
public boolean renewLease() throws IOException {
if (clientRunning && !isFilesBeingWrittenEmpty()) {
try {
namenode.renewLease(clientName);
namenode.renewLease(clientName, getNamespaces());
updateLastLeaseRenewal();
return true;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class DFSOutputStream extends FSOutputSummer

protected final String src;
protected final long fileId;
private final String namespace;
protected final long blockSize;
protected final int bytesPerChecksum;

Expand Down Expand Up @@ -195,6 +196,7 @@ private DFSOutputStream(DFSClient dfsClient, String src,
this.dfsClient = dfsClient;
this.src = src;
this.fileId = stat.getFileId();
this.namespace = stat.getNamespace();
this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication();
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
Expand Down Expand Up @@ -1084,6 +1086,11 @@ public long getFileId() {
return fileId;
}

@VisibleForTesting
public String getNamespace() {
return namespace;
}

/**
* Return the source of stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,11 +759,19 @@ SnapshotStatus[] getSnapshotListing(String snapshotRoot)
* the last call to renewLease(), the NameNode assumes the
* client has died.
*
* @param namespaces The full Namespace list that the renewLease rpc
* should be forwarded by RBF.
* Tips: NN side, this value should be null.
* RBF side, if this value is null, this rpc will
* be forwarded to all available namespaces,
* else this rpc will be forwarded to
* the special namespaces.
*
* @throws org.apache.hadoop.security.AccessControlException permission denied
* @throws IOException If an I/O error occurred
*/
@Idempotent
void renewLease(String clientName) throws IOException;
void renewLease(String clientName, List<String> namespaces) throws IOException;

/**
* Start lease recovery.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ default FileStatus makeQualified(URI defaultUri, Path parent) {
*/
int compareTo(FileStatus stat);

void setNamespace(String namespace);

String getNamespace();

/**
* Set redundant flags for compatibility with existing applications.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class HdfsLocatedFileStatus
// BlockLocations[] is the user-facing type
private transient LocatedBlocks hdfsloc;

private String namespace = null;

/**
* Constructor.
* @param length the number of bytes the file has
Expand Down Expand Up @@ -217,4 +219,14 @@ public LocatedFileStatus makeQualifiedLocated(URI defaultUri, Path path) {
return this;
}

@Override
public String getNamespace() {
return namespace;
}

@Override
public void setNamespace(String namespace) {
this.namespace = namespace;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class HdfsNamedFileStatus extends FileStatus implements HdfsFileStatus {
private final int childrenNum;
private final byte storagePolicy;

private String namespace = null;

/**
* Constructor.
* @param length the number of bytes the file has
Expand Down Expand Up @@ -177,4 +179,13 @@ public int hashCode() {
return super.hashCode();
}

@Override
public String getNamespace() {
return namespace;
}

@Override
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -744,11 +744,15 @@ public BatchedDirectoryListing getBatchedListing(


@Override
public void renewLease(String clientName) throws IOException {
RenewLeaseRequestProto req = RenewLeaseRequestProto.newBuilder()
.setClientName(clientName).build();
public void renewLease(String clientName, List<String> namespaces)
throws IOException {
RenewLeaseRequestProto.Builder builder = RenewLeaseRequestProto
.newBuilder().setClientName(clientName);
if (namespaces != null && !namespaces.isEmpty()) {
builder.addAllNamespaces(namespaces);
}
try {
rpcProxy.renewLease(null, req);
rpcProxy.renewLease(null, builder.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1764,7 +1764,7 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
EnumSet<HdfsFileStatus.Flags> flags = fs.hasFlags()
? convertFlags(fs.getFlags())
: convertFlags(fs.getPermission());
return new HdfsFileStatus.Builder()
HdfsFileStatus hdfsFileStatus = new HdfsFileStatus.Builder()
.length(fs.getLength())
.isdir(fs.getFileType().equals(FileType.IS_DIR))
.replication(fs.getBlockReplication())
Expand Down Expand Up @@ -1794,6 +1794,10 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
? convertErasureCodingPolicy(fs.getEcPolicy())
: null)
.build();
if (fs.hasNamespace()) {
hdfsFileStatus.setNamespace(fs.getNamespace());
}
return hdfsFileStatus;
}

private static EnumSet<HdfsFileStatus.Flags> convertFlags(int flags) {
Expand Down Expand Up @@ -2399,6 +2403,9 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
flags |= fs.isSnapshotEnabled() ? HdfsFileStatusProto.Flags
.SNAPSHOT_ENABLED_VALUE : 0;
builder.setFlags(flags);
if (fs.getNamespace() != null && !fs.getNamespace().isEmpty()) {
builder.setNamespace(fs.getNamespace());
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ message GetSnapshotDiffReportListingResponseProto {
}
message RenewLeaseRequestProto {
required string clientName = 1;
repeated string namespaces = 2;
}

message RenewLeaseResponseProto { //void response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ message HdfsFileStatusProto {

// Set of flags
optional uint32 flags = 18 [default = 0];
optional string namespace = 19;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,10 @@ public HdfsFileStatus create(String src, FsPermission masked,
RemoteLocation createLocation = null;
try {
createLocation = rpcServer.getCreateLocation(src, locations);
return rpcClient.invokeSingle(createLocation, method,
HdfsFileStatus status = rpcClient.invokeSingle(createLocation, method,
HdfsFileStatus.class);
status.setNamespace(createLocation.getNameserviceId());
return status;
} catch (IOException ioe) {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, createLocation, locations);
Expand Down Expand Up @@ -377,8 +379,11 @@ public LastBlockWithStatus append(String src, final String clientName,
RemoteMethod method = new RemoteMethod("append",
new Class<?>[] {String.class, String.class, EnumSetWritable.class},
new RemoteParam(), clientName, flag);
return rpcClient.invokeSequential(
locations, method, LastBlockWithStatus.class, null);
RemoteResult result = rpcClient.invokeSequential(
method, locations, LastBlockWithStatus.class, null);
LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult();
lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId());
return lbws;
}

@Override
Expand Down Expand Up @@ -759,14 +764,49 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
}
}

private Map<String, FederationNamespaceInfo> getAvailableNamespaces()
throws IOException {
Map<String, FederationNamespaceInfo> allAvailableNamespaces =
new HashMap<>();
namenodeResolver.getNamespaces().forEach(
k -> allAvailableNamespaces.put(k.getNameserviceId(), k));
return allAvailableNamespaces;
}

/**
* Try to get a list of FederationNamespaceInfo for renewLease RPC.
*/
private List<FederationNamespaceInfo> getRenewLeaseNSs(List<String> namespaces)
throws IOException {
if (namespaces == null || namespaces.isEmpty()) {
return new ArrayList<>(namenodeResolver.getNamespaces());
}
List<FederationNamespaceInfo> result = new ArrayList<>();
Map<String, FederationNamespaceInfo> allAvailableNamespaces =
getAvailableNamespaces();
Comment on lines +785 to +786
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have some caching here:
Like:
Initially initialise availableNamespace and for every call check from this, if some entry isn't found in the stored/cached availableNamespace, In that case call getAvailableNamespaces() and update the value of availableNamespace,
if still we don't find the entry after then we can return all the namespace what we are doing now

for (String namespace : namespaces) {
if (!allAvailableNamespaces.containsKey(namespace)) {
return new ArrayList<>(namenodeResolver.getNamespaces());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use result directly rather than create another ArrayList again here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namenodeResolver.getNamespaces() is a hashSet, I want to a List so that we can use invokeSingle method to forward this rpc when there is only one namespace.

List<FederationNamespaceInfo> nss = getRenewLeaseNSs(namespaces);
    if (nss.size() == 1) {
      rpcClient.invokeSingle(nss.get(0).getNameserviceId(), method);
    } else {
      rpcClient.invokeConcurrent(nss, method, false, false);
 }

Of course, Set can also achieve this goal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it. make sense to me.

} else {
result.add(allAvailableNamespaces.get(namespace));
}
}
return result;
}

@Override
public void renewLease(String clientName) throws IOException {
public void renewLease(String clientName, List<String> namespaces)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

RemoteMethod method = new RemoteMethod("renewLease",
new Class<?>[] {String.class}, clientName);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(nss, method, false, false);
new Class<?>[] {String.class, List.class}, clientName, null);
List<FederationNamespaceInfo> nss = getRenewLeaseNSs(namespaces);
if (nss.size() == 1) {
rpcClient.invokeSingle(nss.get(0).getNameserviceId(), method);
Comment on lines +805 to +806
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nsId is getting passed from the client, if we get an array or so, you can figure out initially itself whether you have only one entry or not. so you can get rid of getRewLeaseNSs(nsIdentifies); completely in that case?

} else {
rpcClient.invokeConcurrent(nss, method, false, false);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,9 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
}

@Override // ClientProtocol
public void renewLease(String clientName) throws IOException {
clientProto.renewLease(clientName);
public void renewLease(String clientName, List<String> namespaces)
throws IOException {
clientProto.renewLease(clientName, namespaces);
}

@Override // ClientProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private void invokeSequential(ClientProtocol routerProto) throws IOException {

private void invokeConcurrent(ClientProtocol routerProto, String clientName)
throws IOException {
routerProto.renewLease(clientName);
routerProto.renewLease(clientName, null);
}

private int getTotalRejectedPermits(RouterContext routerContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testGetQuota() throws Exception {

@Test
public void testRenewLease() throws Exception {
router.getRpcServer().renewLease("test");
router.getRpcServer().renewLease("test", null);
assertCounter("RenewLeaseOps", 2L, getMetrics(ROUTER_METRICS));
assertCounter("ConcurrentRenewLeaseOps", 1L, getMetrics(ROUTER_METRICS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void cleanup() throws IOException {
public void testWithoutDisabling() throws IOException {
// ns0 is slow and renewLease should take a long time
long t0 = monotonicNow();
routerProtocol.renewLease("client0");
routerProtocol.renewLease("client0", null);
long t = monotonicNow() - t0;
assertTrue("It took too little: " + t + "ms",
t > TimeUnit.SECONDS.toMillis(1));
Expand All @@ -178,7 +178,7 @@ public void testDisabling() throws Exception {

// renewLease should be fast as we are skipping ns0
long t0 = monotonicNow();
routerProtocol.renewLease("client0");
routerProtocol.renewLease("client0", null);
long t = monotonicNow() - t0;
assertTrue("It took too long: " + t + "ms",
t < TimeUnit.SECONDS.toMillis(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void run() {
routerClient = new DFSClient(address, conf);
String clientName = routerClient.getClientName();
ClientProtocol routerProto = routerClient.getNamenode();
routerProto.renewLease(clientName);
routerProto.renewLease(clientName, null);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException();
assertTrue("Wrong exception: " + ioe,
Expand Down Expand Up @@ -390,7 +390,7 @@ public void testAsyncCallerPoolMetrics() throws Exception {
cluster.getRouterClientConf());
String clientName = routerClient.getClientName();
ClientProtocol routerProto = routerClient.getNamenode();
routerProto.renewLease(clientName);
routerProto.renewLease(clientName, null);
} catch (Exception e) {
fail("Client request failed: " + e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testRetryWhenOneNameServiceDown() throws Exception {

DFSClient client = nnContext1.getClient();
// Renew lease for the DFS client, it will succeed.
routerProtocol.renewLease(client.getClientName());
routerProtocol.renewLease(client.getClientName(), null);

// Verify the retry times, it will retry one time for ns0.
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
Expand Down
Loading