Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode.sps;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -227,15 +228,18 @@ public synchronized void clearQueuesWithNotification() {
* ID's to process for satisfy the policy.
*/
private class SPSPathIdProcessor implements Runnable {
private static final int MAX_RETRY_COUNT = 3;

@Override
public void run() {
LOG.info("Starting SPSPathIdProcessor!.");
Long startINode = null;
int retryCount = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to implement the retry logic after catching the FileNotFoundException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tasanuma Thanks for your reviews.
I'm not sure if there are any other exceptions may cause this thread work incorrectly.
And the retry logic may help to avoid this thread falling into an infinite loop if the 'startINode' fail to be set to null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liubingxing Thanks for the explanation. I got it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tasanuma

while (ctxt.isRunning()) {
try {
if (!ctxt.isInSafeMode()) {
if (startINode == null) {
retryCount = 0;
startINode = ctxt.getNextSPSPath();
} // else same id will be retried
if (startINode == null) {
Expand All @@ -248,7 +252,12 @@ public void run() {
pendingWorkForDirectory.get(startINode);
if (dirPendingWorkInfo != null
&& dirPendingWorkInfo.isDirWorkDone()) {
ctxt.removeSPSHint(startINode);
try {
ctxt.removeSPSHint(startINode);
} catch (FileNotFoundException e) {
// ignore if the file doesn't already exist
startINode = null;
}
pendingWorkForDirectory.remove(startINode);
}
}
Expand All @@ -268,6 +277,11 @@ public void run() {
LOG.info("Interrupted while waiting in SPSPathIdProcessor", t);
break;
}
retryCount++;
if (retryCount >= MAX_RETRY_COUNT) {
LOG.warn("Skipping this inode {} due to too many retries.", startINode);
startINode = null;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ private void shutdownCluster() {
}
}

private void stopExternalSps() {
if (externalSps != null) {
externalSps.stopGracefully();
}
}

private void startExternalSps() {
externalSps = new StoragePolicySatisfier(getConf());
externalCtxt = new ExternalSPSContext(externalSps, nnc);

externalSps.init(externalCtxt);
externalSps.start(StoragePolicySatisfierMode.EXTERNAL);
}

private void createCluster() throws IOException {
getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
Expand Down Expand Up @@ -1370,6 +1384,45 @@ public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
}
}

/**
* Test SPS that satisfy the files and then delete the files before start SPS.
*/
@Test(timeout = 300000)
public void testSPSSatisfyAndThenDeleteFileBeforeStartSPS() throws Exception {
try {
createCluster();
HdfsAdmin hdfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(config), config);

StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}};
startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

stopExternalSps();

dfs.setStoragePolicy(new Path(FILE), COLD);
hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
dfs.delete(new Path(FILE), true);

startExternalSps();

String file1 = "/testMoveToSatisfyStoragePolicy_1";
writeContent(file1);
dfs.setStoragePolicy(new Path(file1), COLD);
hdfsAdmin.satisfyStoragePolicy(new Path(file1));

hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType(file1, StorageType.ARCHIVE, 3, 30000,
dfs);
} finally {
shutdownCluster();
}
}


/**
* Test SPS for directory which has multilevel directories.
*/
Expand Down