Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ public synchronized void register(DatanodeDescriptor dn) {

private synchronized NodeData registerNode(DatanodeDescriptor dn) {
if (nodes.containsKey(dn.getDatanodeUuid())) {
LOG.info("Can't register DN {} because it is already registered.",
dn.getDatanodeUuid());
LOG.info("Can't register DN {} ({}) because it is already registered.",
dn.getDatanodeUuid(), dn.getXferAddr());
return null;
}
NodeData node = new NodeData(dn.getDatanodeUuid());
Expand All @@ -213,8 +213,8 @@ private synchronized void remove(NodeData node) {
public synchronized void unregister(DatanodeDescriptor dn) {
NodeData node = nodes.remove(dn.getDatanodeUuid());
if (node == null) {
LOG.info("Can't unregister DN {} because it is not currently " +
"registered.", dn.getDatanodeUuid());
LOG.info("Can't unregister DN {} ({}) because it is not currently " +
"registered.", dn.getDatanodeUuid(), dn.getXferAddr());
return;
}
remove(node);
Expand All @@ -224,17 +224,17 @@ public synchronized long requestLease(DatanodeDescriptor dn) {
NodeData node = nodes.get(dn.getDatanodeUuid());
if (node == null) {
LOG.warn("DN {} ({}) requested a lease even though it wasn't yet " +
"registered. Registering now.", dn.getDatanodeUuid(),
"registered. Registering now.", dn.getDatanodeUuid(),
dn.getXferAddr());
node = registerNode(dn);
}
if (node.leaseId != 0) {
// The DataNode wants a new lease, even though it already has one.
// This can happen if the DataNode is restarted in between requesting
// a lease and using it.
LOG.debug("Removing existing BR lease 0x{} for DN {} in order to " +
LOG.debug("Removing existing BR lease 0x{} for DN {} ({}) in order to " +
"issue a new one.", Long.toHexString(node.leaseId),
dn.getDatanodeUuid());
dn.getDatanodeUuid(), dn.getXferAddr());
}
remove(node);
long monotonicNowMs = Time.monotonicNow();
Expand All @@ -248,9 +248,9 @@ public synchronized long requestLease(DatanodeDescriptor dn) {
allLeases.append(prefix).append(cur.datanodeUuid);
prefix = ", ";
}
LOG.debug("Can't create a new BR lease for DN {}, because " +
"numPending equals maxPending at {}. Current leases: {}",
dn.getDatanodeUuid(), numPending, allLeases.toString());
LOG.debug("Can't create a new BR lease for DN {} ({}), because " +
"numPending equals maxPending at {}. Current leases: {}",
dn.getDatanodeUuid(), dn.getXferAddr(), numPending, allLeases);
}
return 0;
}
Expand All @@ -259,8 +259,8 @@ public synchronized long requestLease(DatanodeDescriptor dn) {
node.leaseTimeMs = monotonicNowMs;
pendingHead.addToEnd(node);
if (LOG.isDebugEnabled()) {
LOG.debug("Created a new BR lease 0x{} for DN {}. numPending = {}",
Long.toHexString(node.leaseId), dn.getDatanodeUuid(), numPending);
LOG.debug("Created a new BR lease 0x{} for DN {} ({}). numPending = {}",
Long.toHexString(node.leaseId), dn.getDatanodeUuid(), dn.getXferAddr(), numPending);
}
return node.leaseId;
}
Expand Down Expand Up @@ -293,57 +293,57 @@ private synchronized void pruneExpiredPending(long monotonicNowMs) {
public synchronized boolean checkLease(DatanodeDescriptor dn,
long monotonicNowMs, long id) {
if (id == 0) {
LOG.debug("Datanode {} is using BR lease id 0x0 to bypass " +
"rate-limiting.", dn.getDatanodeUuid());
LOG.debug("Datanode {} ({}) is using BR lease id 0x0 to bypass " +
"rate-limiting.", dn.getDatanodeUuid(), dn.getXferAddr());
return true;
}
NodeData node = nodes.get(dn.getDatanodeUuid());
if (node == null) {
LOG.info("BR lease 0x{} is not valid for unknown datanode {}",
Long.toHexString(id), dn.getDatanodeUuid());
LOG.info("BR lease 0x{} is not valid for unknown datanode {} ({})",
Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr());
return false;
}
if (node.leaseId == 0) {
LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " +
LOG.warn("BR lease 0x{} is not valid for DN {} ({}), because the DN " +
"is not in the pending set.",
Long.toHexString(id), dn.getDatanodeUuid());
Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr());
return false;
}
if (pruneIfExpired(monotonicNowMs, node)) {
LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +
"has expired.", Long.toHexString(id), dn.getDatanodeUuid());
LOG.warn("BR lease 0x{} is not valid for DN {} ({}), because the lease " +
"has expired.", Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr());
return false;
}
if (id != node.leaseId) {
LOG.warn("BR lease 0x{} is not valid for DN {}. Expected BR lease 0x{}.",
Long.toHexString(id), dn.getDatanodeUuid(),
LOG.warn("BR lease 0x{} is not valid for DN {} ({}). Expected BR lease 0x{}.",
Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr(),
Long.toHexString(node.leaseId));
return false;
}
if (LOG.isTraceEnabled()) {
LOG.trace("BR lease 0x{} is valid for DN {}.",
Long.toHexString(id), dn.getDatanodeUuid());
LOG.trace("BR lease 0x{} is valid for DN {} ({}).",
Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr());
}
return true;
}

public synchronized long removeLease(DatanodeDescriptor dn) {
NodeData node = nodes.get(dn.getDatanodeUuid());
if (node == null) {
LOG.info("Can't remove lease for unknown datanode {}",
dn.getDatanodeUuid());
LOG.info("Can't remove lease for unknown datanode {} ({})",
dn.getDatanodeUuid(), dn.getXferAddr());
return 0;
}
long id = node.leaseId;
if (id == 0) {
LOG.debug("DN {} has no lease to remove.", dn.getDatanodeUuid());
LOG.debug("DN {} ({}) has no lease to remove.", dn.getDatanodeUuid(), dn.getXferAddr());
return 0;
}
remove(node);
deferredHead.addToEnd(node);
if (LOG.isTraceEnabled()) {
LOG.trace("Removed BR lease 0x{} for DN {}. numPending = {}",
Long.toHexString(id), dn.getDatanodeUuid(), numPending);
LOG.trace("Removed BR lease 0x{} for DN {} ({}). numPending = {}",
Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr(), numPending);
}
return id;
}
Expand Down