Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ class BlockSender implements java.io.Closeable {
ris = new ReplicaInputStreams(
blockIn, checksumIn, volumeRef, fileIoProvider);
} catch (IOException ioe) {
IOUtils.cleanupWithLogger(null, volumeRef);
IOUtils.closeStream(this);
org.apache.commons.io.IOUtils.closeQuietly(blockIn);
org.apache.commons.io.IOUtils.closeQuietly(checksumIn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,26 @@ synchronized long countPendingDeletions() {
* Execute the task sometime in the future, using ThreadPools.
*/
synchronized void execute(FsVolumeImpl volume, Runnable task) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
if (volume == null) {
throw new RuntimeException("A null volume does not have a executor");
}
ThreadPoolExecutor executor = executors.get(volume.getStorageID());
if (executor == null) {
throw new RuntimeException("Cannot find volume " + volume
+ " for execution of task " + task);
} else {
executor.execute(task);
try {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
if (volume == null) {
throw new RuntimeException("A null volume does not have a executor");
}
ThreadPoolExecutor executor = executors.get(volume.getStorageID());
if (executor == null) {
throw new RuntimeException("Cannot find volume " + volume
+ " for execution of task " + task);
} else {
executor.execute(task);
}
} catch (RuntimeException re) {
if (task instanceof ReplicaFileDeleteTask) {
IOUtils.cleanupWithLogger(null,
((ReplicaFileDeleteTask) task).volumeRef);
}
throw re;
}
}

Expand Down Expand Up @@ -314,28 +322,31 @@ private boolean moveFiles() {

@Override
public void run() {
final long blockLength = replicaToDelete.getBlockDataLength();
final long metaLength = replicaToDelete.getMetadataLength();
boolean result;
try {
final long blockLength = replicaToDelete.getBlockDataLength();
final long metaLength = replicaToDelete.getMetadataLength();
boolean result;

result = (trashDirectory == null) ? deleteFiles() : moveFiles();
result = (trashDirectory == null) ? deleteFiles() : moveFiles();

if (!result) {
LOG.warn("Unexpected error trying to "
+ (trashDirectory == null ? "delete" : "move")
+ " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
+ " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
} else {
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
if (!result) {
LOG.warn("Unexpected error trying to "
+ (trashDirectory == null ? "delete" : "move")
+ " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
+ " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
} else {
if (block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK) {
datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
}
volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
LOG.info("Deleted " + block.getBlockPoolId() + " " +
block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
}
volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
LOG.info("Deleted " + block.getBlockPoolId() + " "
+ block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
updateDeletedBlockId(block);
} finally {
IOUtils.cleanupWithLogger(null, this.volumeRef);
}
updateDeletedBlockId(block);
IOUtils.cleanupWithLogger(null, volumeRef);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private void checkReference() {
}

@VisibleForTesting
int getReferenceCount() {
public int getReferenceCount() {
return this.reference.getReferenceCount();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -153,16 +154,24 @@ synchronized boolean queryVolume(FsVolumeImpl volume) {
* Execute the task sometime in the future, using ThreadPools.
*/
synchronized void execute(String storageId, Runnable task) {
if (executors == null) {
throw new RuntimeException(
"AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(storageId);
if (executor == null) {
throw new RuntimeException("Cannot find root storage volume with id " +
storageId + " for execution of task " + task);
} else {
executor.execute(task);
try {
if (executors == null) {
throw new RuntimeException(
"AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(storageId);
if (executor == null) {
throw new RuntimeException("Cannot find root storage volume with id " +
storageId + " for execution of task " + task);
} else {
executor.execute(task);
}
} catch (RuntimeException re) {
if (task instanceof ReplicaLazyPersistTask) {
IOUtils.cleanupWithLogger(null,
((ReplicaLazyPersistTask) task).targetVolume);
}
throw re;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import java.nio.ByteBuffer;
import java.util.Random;

import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -562,4 +565,56 @@ void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
checksum, CachingStrategy.newDefaultStrategy(), false, false,
null, null, new String[0]);
}

@Test(timeout = 30000)
public void testReleaseVolumeRefIfExceptionThrown()
throws IOException, InterruptedException {
Path file = new Path("dataprotocol.dat");
int numDataNodes = 1;

Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
numDataNodes).build();
try {
cluster.waitActive();
datanode = cluster.getFileSystem().getDataNodeStats(
DatanodeReportType.LIVE)[0];
dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
FileSystem fileSys = cluster.getFileSystem();

int fileLen = Math.min(
conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);

DFSTestUtil.createFile(fileSys, file, fileLen, fileLen,
fileSys.getDefaultBlockSize(file),
fileSys.getDefaultReplication(file), 0L);

// Get the first blockid for the file.
final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);

String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
sendBuf.reset();
recvBuf.reset();

// Delete the meta file to create a exception in BlockSender constructor.
DataNode dn = cluster.getDataNodes().get(0);
cluster.getMaterializedReplica(0, blk).deleteMeta();

FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset(
dn).getVolume(blk);
int beforeCnt = volume.getReferenceCount();

sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN);
sendRecvData("Copy a block.", false);
Thread.sleep(3000);

int afterCnt = volume.getReferenceCount();
assertEquals(beforeCnt, afterCnt);

} finally {
cluster.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;

import java.io.OutputStream;
Expand Down Expand Up @@ -1805,4 +1806,37 @@ public void testNotifyNamenodeMissingOrNewBlock() throws Exception {
cluster.shutdown();
}
}

@Test(timeout = 20000)
public void testReleaseVolumeRefIfExceptionThrown() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(
new HdfsConfiguration()).build();
cluster.waitActive();
FsVolumeImpl vol = (FsVolumeImpl) dataset.getFsVolumeReferences().get(0);
ExtendedBlock eb;
ReplicaInfo info;
int beforeCnt = 0;
try {
List<Block> blockList = new ArrayList<Block>();
eb = new ExtendedBlock(BLOCKPOOL, 1, 1, 1001);
info = new FinalizedReplica(
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
dataset.volumeMap.add(BLOCKPOOL, info);
((LocalReplica) info).getBlockFile().createNewFile();
((LocalReplica) info).getMetaFile().createNewFile();
blockList.add(info);

// Create a runtime exception.
dataset.asyncDiskService.shutdown();

beforeCnt = vol.getReferenceCount();
dataset.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));

} catch (RuntimeException re) {
int afterCnt = vol.getReferenceCount();
assertEquals(beforeCnt, afterCnt);
} finally {
cluster.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ThreadUtil;
import org.junit.Assert;
Expand Down Expand Up @@ -280,4 +283,38 @@ public void run() {
}
}
}

@Test(timeout = 20000)
public void testReleaseVolumeRefIfExceptionThrown()
throws IOException, InterruptedException {
getClusterBuilder().setRamDiskReplicaCapacity(2).build();
final String methodName = GenericTestUtils.getMethodName();
final int seed = 0xFADED;
Path path = new Path("/" + methodName + ".Writer.File.dat");

DataNode dn = cluster.getDataNodes().get(0);
FsDatasetSpi.FsVolumeReferences volumes =
DataNodeTestUtils.getFSDataset(dn).getFsVolumeReferences();
int[] beforeCnts = new int[volumes.size()];
FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);

// Create a runtime exception.
ds.asyncLazyPersistService.shutdown();
for (int i = 0; i < volumes.size(); ++i) {
beforeCnts[i] = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
}

makeRandomTestFile(path, BLOCK_SIZE, true, seed);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);

for (int i = 0; i < volumes.size(); ++i) {
int afterCnt = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
// LazyWriter keeps trying to save copies even if
// asyncLazyPersistService is already shutdown.
// If we do not release references, the number of
// references will increase infinitely.
Assert.assertTrue(
beforeCnts[i] == afterCnt || beforeCnts[i] == (afterCnt - 1));
}
}
}