Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ Replica getReplica() {
return replicaInfo;
}

public void releaseAnyRemainingReservedSpace() {
if (replicaInfo != null) {
if (replicaInfo.getReplicaInfo().getBytesReserved() > 0) {
LOG.warn("Block {} has not released the reserved bytes. "
+ "Releasing {} bytes as part of close.", replicaInfo.getBlockId(),
replicaInfo.getReplicaInfo().getBytesReserved());
replicaInfo.releaseAllBytesReserved();
}
}
}

/**
* close files and release volume reference.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,9 @@ public void writeBlock(final ExtendedBlock block,
IOUtils.closeStream(mirrorIn);
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
if (blockReceiver != null) {
blockReceiver.releaseAnyRemainingReservedSpace();
}
IOUtils.closeStream(blockReceiver);
setCurrentBlockReceiver(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ public void releaseAllBytesReserved() {
getVolume().releaseLockedMemory(bytesReserved);
bytesReserved = 0;
}
@Override
public void releaseReplicaInfoBytesReserved() {
bytesReserved = 0;
}

@Override
public void setLastChecksumAndDataLen(long dataLength, byte[] checksum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public interface ReplicaInPipeline extends Replica {
*/
public void releaseAllBytesReserved();

/**
* Release the reserved space from the ReplicaInfo.
*/
void releaseReplicaInfoBytesReserved();

/**
* store the checksum for the last chunk along with the data length
* @param dataLength number of bytes on disk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2022,6 +2022,9 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)

newReplicaInfo = v.addFinalizedBlock(
bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved());
if (replicaInfo instanceof ReplicaInPipeline) {
((ReplicaInPipeline) replicaInfo).releaseReplicaInfoBytesReserved();
}
if (v.isTransientStorage()) {
releaseLockedMemory(
replicaInfo.getOriginalBytesReserved()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ synchronized public void setBytesAcked(long bytesAcked) {
public void releaseAllBytesReserved() {
}

@Override
public void releaseReplicaInfoBytesReserved() {
}

@Override
synchronized public long getBytesOnDisk() {
if (finalized) {
Expand Down Expand Up @@ -418,7 +422,6 @@ public void waitForMinLength(long minLength, long time, TimeUnit unit)
} while (deadLine > System.currentTimeMillis());
throw new IOException("Minimum length was not achieved within timeout");
}

@Override
public FsVolumeSpi getVolume() {
return getStorage(theBlock).getVolume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public long getBytesAcked() {
public void setBytesAcked(long bytesAcked) {
}

@Override
public void releaseReplicaInfoBytesReserved() {
}

@Override
public void releaseAllBytesReserved() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@

package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.util.Collection;
import java.util.EnumSet;
import java.util.function.Supplier;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -746,4 +752,48 @@ public Boolean get() {
}, 500, 30000);
checkReservedSpace(0);
}

/**
* Ensure that bytes reserved of ReplicaInfo gets cleared
* during finalize.
*
* @throws IOException
*/
@Test(timeout = 300000)
public void testReplicaInfoBytesReservedReleasedOnFinalize() throws IOException {
short replication = 3;
int bufferLength = 4096;
startCluster(BLOCK_SIZE, replication, -1);

String methodName = GenericTestUtils.getMethodName();
Path path = new Path("/" + methodName + ".01.dat");

FSDataOutputStream fos =
fs.create(path, FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), bufferLength,
replication, BLOCK_SIZE, null);
// Allocate a block.
fos.write(new byte[bufferLength]);
fos.hsync();

DataNode dataNode = cluster.getDataNodes().get(0);
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
long expectedReservedSpace = BLOCK_SIZE - bufferLength;

String bpid = cluster.getNamesystem().getBlockPoolId();
Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas(fsDataSetImpl, bpid);
ReplicaInfo r = replicas.iterator().next();

// Verify Initial Bytes Reserved for Replica and Volume are correct
assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(),
expectedReservedSpace);
assertEquals(r.getBytesReserved(), expectedReservedSpace);

// Verify Bytes Reserved for Replica and Volume are correct after finalize
fsDataSetImpl.finalizeNewReplica(r, new ExtendedBlock(bpid, r));

assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(), 0L);
assertEquals(r.getBytesReserved(), 0L);

fos.close();
}
}