Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.nio.file.AccessDeniedException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
Expand Down Expand Up @@ -533,6 +534,26 @@ private static Path checkDest(String srcName, FileSystem dstFS, Path dst,
return dst;
}

public static boolean isRegularFile(File file) {
return isRegularFile(file, true);
}

/**
* Check if the file is regular.
* @param file The file being checked.
* @param allowLinks Whether to allow matching links.
* @return Returns the result of checking whether the file is a regular file.
*/
public static boolean isRegularFile(File file, boolean allowLinks) {
if (file != null) {
if (allowLinks) {
return Files.isRegularFile(file.toPath());
}
return Files.isRegularFile(file.toPath(), LinkOption.NOFOLLOW_LINKS);
}
return true;
}

/**
* Convert a os-native filename to a path that works for the shell.
* @param filename The filename to convert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,23 @@ public void testReadSymlink() throws IOException {
Assert.assertEquals(file.getAbsolutePath(), result);
}

@Test
public void testRegularFile() throws IOException {
byte[] data = "testRegularData".getBytes();
File tmpFile = new File(del, "reg1");

// write some data to the file
FileOutputStream os = new FileOutputStream(tmpFile);
os.write(data);
os.close();
assertTrue(FileUtil.isRegularFile(tmpFile));

// create a symlink to file
File link = new File(del, "reg2");
FileUtil.symLink(tmpFile.toString(), link.toString());
assertFalse(FileUtil.isRegularFile(link, false));
}

/**
* This test validates the correctness of {@link FileUtil#readLink(File)} when
* it gets a file in input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
Expand Down Expand Up @@ -540,21 +541,30 @@ private void scan() {
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected

// Block and meta must be regular file
boolean isRegular = FileUtil.isRegularFile(info.getBlockFile(), false) &&
FileUtil.isRegularFile(info.getMetaFile(), false);
if (!isRegular) {
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (memBlock.compareWith(info) != 0) {
// volumeMap record and on-disk files do not match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
} else {
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (memBlock.compareWith(info) != 0) {
// volumeMap record and on-disk files do not match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
}
}
d++;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import javax.management.ObjectName;
import javax.management.StandardMBean;

import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.classification.VisibleForTesting;

Expand Down Expand Up @@ -2645,6 +2646,9 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
Block.getGenerationStamp(diskMetaFile.getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;

final boolean isRegular = FileUtil.isRegularFile(diskMetaFile, false) &&
FileUtil.isRegularFile(diskFile, false);

if (vol.getStorageType() == StorageType.PROVIDED) {
if (memBlockInfo == null) {
// replica exists on provided store but not in memory
Expand Down Expand Up @@ -2812,6 +2816,9 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
+ memBlockInfo.getNumBytes() + " to "
+ memBlockInfo.getBlockDataLength());
memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength());
} else if (!isRegular) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated: the checkAndUpdate() is way too long. We should refactor it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment and review, @jojochuang .
I'll add some unit tests later.

This is indeed a bit too long for the checkAndUpdate() method.
I'll be fine-tuning this later if I can, but will handle this as a new jira.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. Let's not worry about the refactor now. Thanks.

corruptBlock = new Block(memBlockInfo);
LOG.warn("Block:{} is not a regular file.", corruptBlock.getBlockId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should print the absolute path so that we can deal with these abnormal files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tomscut for the comment and review.
This happens occasionally, I've been monitoring it for a long time and still haven't found the root cause.
But I think this situation may be related to the Linux environment. When the normal data flow is working, no exception occurs. (I will continue to monitor this situation)
Here are some more canonical checks to prevent further worse cases on the cluster. This is a good thing for clusters.

When the file is actually cleaned up, the specific path will be printed. Here are some examples of online clusters:
2022-02-15 11:24:12,856 INFO org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService: Deleted BP-xxxx blk_xxxx file /mnt/dfs/11/data/current/BP-xxxx.xxxx.xxxx/current/finalized/subdir0/subdir0/blk_xxxx

Copy link
Contributor

@tomscut tomscut Feb 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for explaining this. It looks like the operation related to mount has been performed. Did HDFS successfully clean the abnormal file on your online cluster you mentioned after this change?

Copy link
Contributor Author

@jianghuazhu jianghuazhu Feb 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these exception files are cleaned up.
When NameNode obtains such abnormal files, it treats them as invalid Blocks.
When the DataNode sends a heartbeat to the NameNode, the NameNode notifies the DataNode to clean up.
The specific cleaning action is performed by FsDatasetAsyncDiskServic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good.

}
} finally {
if (dataNodeMetrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSClient;
Expand All @@ -58,6 +59,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
Expand All @@ -71,6 +73,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
Expand Down Expand Up @@ -507,6 +510,53 @@ public void testDeleteBlockOnTransientStorage() throws Exception {
}
}

@Test(timeout = 600000)
public void testRegularBlock() throws Exception {
Configuration conf = getConfiguration();
cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
// log trace
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.
captureLogs(NameNode.stateChangeLog);
// Add files with 5 blocks
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 5, false);

List<ReplicaInfo> infos = new ArrayList<>(FsDatasetTestUtil.getReplicas(fds, bpid));
ReplicaInfo lastReplica = infos.get(infos.size() - 1);
ReplicaInfo penultimateReplica = infos.get(infos.size() - 2);

String blockParent = new File(lastReplica.getBlockURI().getPath()).getParent();
File lastBlockFile = new File(blockParent, getBlockFile(lastReplica.getBlockId()));
File penultimateBlockFile = new File(blockParent,
getBlockFile(penultimateReplica.getBlockId()));
FileUtil.symLink(lastBlockFile.toString(), penultimateBlockFile.toString());
ExtendedBlock block = new ExtendedBlock(bpid, penultimateReplica.getBlockId());

scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scanner.reconcile();
DirectoryScanner.Stats stats = scanner.stats.get(bpid);
assertNotNull(stats);
assertEquals(1, stats.mismatchBlocks);

// check nn log
String msg = "*DIR* reportBadBlocks for block: " + bpid + ":" +
getBlockFile(block.getBlockId());
assertTrue(logCapturer.getOutput().contains(msg));
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
}
}

@Test(timeout = 600000)
public void testDirectoryScanner() throws Exception {
// Run the test with and without parallel scanning
Expand Down