diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index df24f9890db04..6d049f98bb674 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -906,6 +906,12 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid) return info; } + String getStorageUuidForLock(ExtendedBlock b) + throws ReplicaNotFoundException { + return getReplicaInfo(b.getBlockPoolId(), b.getBlockId()) + .getStorageUuid(); + } + /** * Returns handles to the block file and its metadata file */ @@ -913,7 +919,7 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid) public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME, - b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { + b.getBlockPoolId(), getStorageUuidForLock(b))) { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { @@ -1379,7 +1385,7 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta, public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { + b.getBlockPoolId(), getStorageUuidForLock(b))) { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client @@ -1562,7 +1568,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS, while (true) { try { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { + b.getBlockPoolId(), getStorageUuidForLock(b))) { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS @@ -1665,7 +1671,7 @@ public ReplicaHandler recoverRbw( while (true) { try { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { + b.getBlockPoolId(), getStorageUuidForLock(b))) { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state @@ -1697,7 +1703,7 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { + b.getBlockPoolId(), getStorageUuidForLock(b))) { // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || @@ -1759,7 +1765,7 @@ public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { + b.getBlockPoolId(), getStorageUuidForLock(b))) { final long blockId = b.getBlockId(); final long expectedGs = b.getGenerationStamp(); final long visible = b.getNumBytes(); @@ -1957,7 +1963,7 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) ReplicaInfo finalizedReplicaInfo = null; long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { + b.getBlockPoolId(), getStorageUuidForLock(b))) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block from Interrupted Thread"); @@ -2041,7 +2047,7 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) public void unfinalizeBlock(ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { + b.getBlockPoolId(), getStorageUuidForLock(b))) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null && @@ -2992,7 +2998,7 @@ public Replica updateReplicaUnderRecovery( final long newlength) throws IOException { long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - oldBlock.getBlockPoolId(), getReplicaInfo(oldBlock).getStorageUuid())) { + oldBlock.getBlockPoolId(), getStorageUuidForLock(oldBlock))) { //get replica final String bpid = oldBlock.getBlockPoolId(); final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index fe78a0f2a41a0..202fb190f3d64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -245,8 +245,10 @@ private void testAppend(String bpid, FsDatasetSpi dataSet, Assert.fail("Should not have appended to a non-existent replica " + blocks[NON_EXISTENT]); } catch (ReplicaNotFoundException e) { - Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA + - blocks[NON_EXISTENT], e.getMessage()); + String expectMessage = ReplicaNotFoundException.NON_EXISTENT_REPLICA + + blocks[NON_EXISTENT].getBlockPoolId() + ":" + + blocks[NON_EXISTENT].getBlockId(); + Assert.assertEquals(expectMessage, e.getMessage()); } newGS = blocks[FINALIZED].getGenerationStamp()+1;