Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,33 +142,27 @@ protected void recover() throws IOException {
ReplicaState.RWR.getValue()) {
syncList.add(new BlockRecord(id, proxyDN, info));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Block recovery: Ignored replica with invalid " +
"original state: " + info + " from DataNode: " + id);
}
LOG.debug("Block recovery: Ignored replica with invalid " +
"original state: {} from DataNode: {}", info, id);
}
} else {
if (LOG.isDebugEnabled()) {
if (info == null) {
LOG.debug("Block recovery: DataNode: " + id + " does not have "
+ "replica for block: " + block);
} else {
LOG.debug("Block recovery: Ignored replica with invalid "
+ "generation stamp or length: " + info + " from " +
"DataNode: " + id);
}
if (info == null) {
LOG.debug("Block recovery: DataNode: {} does not have " +
"replica for block: {}", id, block);
} else {
LOG.debug("Block recovery: Ignored replica with invalid "
+ "generation stamp or length: {} from DataNode: {}", info, id);
}
}
} catch (RecoveryInProgressException ripE) {
InterDatanodeProtocol.LOG.warn(
"Recovery for replica " + block + " on data-node " + id
+ " is already in progress. Recovery id = "
+ rBlock.getNewGenerationStamp() + " is aborted.", ripE);
"Recovery for replica {} on data-node {} is already in progress. " +
"Recovery id = {} is aborted.", block, id, rBlock.getNewGenerationStamp(), ripE);
return;
} catch (IOException e) {
++errorCount;
InterDatanodeProtocol.LOG.warn("Failed to recover block (block="
+ block + ", datanode=" + id + ")", e);
InterDatanodeProtocol.LOG.warn("Failed to recover block (block={}, datanode={})",
block, id, e);
}
}

Expand Down Expand Up @@ -206,11 +200,9 @@ void syncBlock(List<BlockRecord> syncList) throws IOException {
// or their replicas have 0 length.
// The block can be deleted.
if (syncList.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("syncBlock for block " + block + ", all datanodes don't " +
"have the block or their replicas have 0 length. The block can " +
"be deleted.");
}
LOG.debug("syncBlock for block {}, all datanodes don't " +
"have the block or their replicas have 0 length. The block can " +
"be deleted.", block);
nn.commitBlockSynchronization(block, recoveryId, 0,
true, true, DatanodeID.EMPTY_ARRAY, null);
return;
Expand Down Expand Up @@ -249,12 +241,9 @@ void syncBlock(List<BlockRecord> syncList) throws IOException {
r.rInfo.getNumBytes() == finalizedLength) {
participatingList.add(r);
}
if (LOG.isDebugEnabled()) {
LOG.debug("syncBlock replicaInfo: block=" + block +
", from datanode " + r.id + ", receivedState=" + rState.name() +
", receivedLength=" + r.rInfo.getNumBytes() +
", bestState=FINALIZED, finalizedLength=" + finalizedLength);
}
LOG.debug("syncBlock replicaInfo: block={}, from datanode {}, receivedState={}, " +
"receivedLength={}, bestState=FINALIZED, finalizedLength={}",
block, r.id, rState.name(), r.rInfo.getNumBytes(), finalizedLength);
}
newBlock.setNumBytes(finalizedLength);
break;
Expand All @@ -267,12 +256,9 @@ void syncBlock(List<BlockRecord> syncList) throws IOException {
minLength = Math.min(minLength, r.rInfo.getNumBytes());
participatingList.add(r);
}
if (LOG.isDebugEnabled()) {
LOG.debug("syncBlock replicaInfo: block=" + block +
", from datanode " + r.id + ", receivedState=" + rState.name() +
", receivedLength=" + r.rInfo.getNumBytes() + ", bestState=" +
bestState.name());
}
LOG.debug("syncBlock replicaInfo: block={}, from datanode {}, receivedState={}, " +
"receivedLength={}, bestState={}", block, r.id, rState.name(),
r.rInfo.getNumBytes(), bestState.name());
}
// recover() guarantees syncList will have at least one replica with RWR
// or better state.
Expand Down Expand Up @@ -325,11 +311,8 @@ void syncBlock(List<BlockRecord> syncList) throws IOException {
storages[i] = r.storageID;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Datanode triggering commitBlockSynchronization, block=" +
block + ", newGs=" + newBlock.getGenerationStamp() +
", newLength=" + newBlock.getNumBytes());
}
LOG.debug("Datanode triggering commitBlockSynchronization, block={}, newGs={}, " +
"newLength={}", block, newBlock.getGenerationStamp(), newBlock.getNumBytes());

nn.commitBlockSynchronization(block,
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
Expand Down Expand Up @@ -406,14 +389,15 @@ protected void recover() throws IOException {
//check generation stamps
for (int i = 0; i < locs.length; i++) {
DatanodeID id = locs[i];
ExtendedBlock internalBlk = null;
try {
DatanodeID bpReg = getDatanodeID(bpid);
internalBlk = new ExtendedBlock(block);
final long blockId = block.getBlockId() + blockIndices[i];
internalBlk.setBlockId(blockId);
InterDatanodeProtocol proxyDN = bpReg.equals(id) ?
datanode : DataNode.createInterDataNodeProtocolProxy(id, conf,
dnConf.socketTimeout, dnConf.connectToDnViaHostname);
ExtendedBlock internalBlk = new ExtendedBlock(block);
final long blockId = block.getBlockId() + blockIndices[i];
internalBlk.setBlockId(blockId);
ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN,
new RecoveringBlock(internalBlk, null, recoveryId));

Expand All @@ -427,27 +411,36 @@ protected void recover() throws IOException {
// simply choose the one with larger length.
// TODO: better usage of redundant replicas
syncBlocks.put(blockId, new BlockRecord(id, proxyDN, info));
} else {
LOG.debug("Block recovery: Ignored replica with invalid " +
"original state: {} from DataNode: {} by block: {}", info, id, block);
}
} else {
if (info == null) {
LOG.debug("Block recovery: DataNode: {} does not have " +
"replica for block: (block={}, internalBlk={})", id, block, internalBlk);
} else {
LOG.debug("Block recovery: Ignored replica with invalid "
+ "generation stamp or length: {} from DataNode: {} by block: {}",
info, id, block);
}
}
} catch (RecoveryInProgressException ripE) {
InterDatanodeProtocol.LOG.warn(
"Recovery for replica " + block + " on data-node " + id
+ " is already in progress. Recovery id = "
+ rBlock.getNewGenerationStamp() + " is aborted.", ripE);
"Recovery for replica (block={}, internalBlk={}) on data-node {} is already " +
"in progress. Recovery id = {} is aborted.", block, internalBlk, id,
rBlock.getNewGenerationStamp(), ripE);
return;
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to recover block (block="
+ block + ", datanode=" + id + ")", e);
InterDatanodeProtocol.LOG.warn("Failed to recover block (block={}, internalBlk={}, " +
"datanode={})", block, internalBlk, id, e);
}
}
checkLocations(syncBlocks.size());

final long safeLength = getSafeLength(syncBlocks);
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering block " + block
+ ", length=" + block.getNumBytes() + ", safeLength=" + safeLength
+ ", syncList=" + syncBlocks);
}
LOG.debug("Recovering block {}, length={}, safeLength={}, syncList={}", block,
block.getNumBytes(), safeLength, syncBlocks);

// If some internal blocks reach the safe length, convert them to RUR
List<BlockRecord> rurList = new ArrayList<>(locs.length);
Expand Down Expand Up @@ -498,8 +491,8 @@ private void truncatePartialBlock(List<BlockRecord> rurList,
r.updateReplicaUnderRecovery(bpid, recoveryId, r.rInfo.getBlockId(),
newSize);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ ", datanode=" + r.id + ")", e);
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (block={}, internalBlk={}, " +
"datanode={})", block, r.rInfo, r.id, e);
failedList.add(r.id);
}
}
Expand Down Expand Up @@ -552,12 +545,9 @@ private static void logRecoverBlock(String who, RecoveringBlock rb) {
ExtendedBlock block = rb.getBlock();
DatanodeInfo[] targets = rb.getLocations();

LOG.info("BlockRecoveryWorker: " + who + " calls recoverBlock(" + block
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
+ ", newGenerationStamp=" + rb.getNewGenerationStamp()
+ ", newBlock=" + rb.getNewBlock()
+ ", isStriped=" + rb.isStriped()
+ ")");
LOG.info("BlockRecoveryWorker: {} calls recoverBlock({}, targets=[{}], newGenerationStamp={}"
+ ", newBlock={}, isStriped={})", who, block, Joiner.on(", ").join(targets),
rb.getNewGenerationStamp(), rb.getNewBlock(), rb.isStriped());
}

/**
Expand Down