Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
public class OMClientConfig {

public static final String OM_CLIENT_RPC_TIME_OUT = "rpc.timeout";
public static final String OM_TRASH_EMPTIER_CORE_POOL_SIZE
= "trash.core.pool.size";

@Config(key = OM_CLIENT_RPC_TIME_OUT,
defaultValue = "15m",
Expand All @@ -51,6 +53,21 @@ public class OMClientConfig {
)
private long rpcTimeOut = 15 * 60 * 1000;

@Config(key = OM_TRASH_EMPTIER_CORE_POOL_SIZE,
defaultValue = "5",
type = ConfigType.INT,
tags = {OZONE, OM, CLIENT},
description = "Total number of threads in pool for the Trash Emptier")
private int trashEmptierPoolSize = 5;


public int getTrashEmptierPoolSize() {
return trashEmptierPoolSize;
}

public void setTrashEmptierPoolSize(int trashEmptierPoolSize) {
this.trashEmptierPoolSize = trashEmptierPoolSize;
}

public long getRpcTimeOut() {
return rpcTimeOut;
Expand All @@ -64,4 +81,4 @@ public void setRpcTimeOut(long timeOut) {
}
this.rpcTimeOut = timeOut;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.TrashPolicy;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
Expand All @@ -45,6 +46,7 @@
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.TrashPolicyOzone;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -239,6 +241,7 @@ public void testFileSystem() throws Exception {
testDeleteRoot();

testRecursiveDelete();
testTrash();
}

@After
Expand All @@ -252,9 +255,9 @@ public void tearDown() {
private void setupOzoneFileSystem()
throws IOException, TimeoutException, InterruptedException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(FS_TRASH_INTERVAL_KEY, 1);
conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
enabledFileSystemPaths);
conf.setInt(FS_TRASH_INTERVAL_KEY, 1);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.build();
Expand Down Expand Up @@ -776,4 +779,58 @@ public void testRenameToTrashEnabled() throws Exception {
// Cleanup
o3fs.delete(trashRoot, true);
}

/**
* 1.Move a Key to Trash
* 2.Verify that the key gets deleted by the trash emptier.
* @throws Exception
*/

public void testTrash() throws Exception {
String testKeyName = "testKey2";
Path path = new Path(OZONE_URI_DELIMITER, testKeyName);
ContractTestUtils.touch(fs, path);
Assert.assertTrue(trash.getConf().getClass(
"fs.trash.classname", TrashPolicy.class).
isAssignableFrom(TrashPolicyOzone.class));
Assert.assertEquals(trash.getConf().getInt(FS_TRASH_INTERVAL_KEY, 0), 1);
// Call moveToTrash. We can't call protected fs.rename() directly
trash.moveToTrash(path);

// Construct paths
String username = UserGroupInformation.getCurrentUser().getShortUserName();
Path trashRoot = new Path(OZONE_URI_DELIMITER, TRASH_PREFIX);
Path userTrash = new Path(trashRoot, username);
Path userTrashCurrent = new Path(userTrash, "Current");
Path trashPath = new Path(userTrashCurrent, testKeyName);

// Wait until the TrashEmptier purges the key
GenericTestUtils.waitFor(()-> {
try {
return !o3fs.exists(trashPath);
} catch (IOException e) {
LOG.error("Delete from Trash Failed");
Assert.fail("Delete from Trash Failed");
return false;
}
}, 1000, 120000);

// userTrash path will contain the checkpoint folder
Assert.assertEquals(1, fs.listStatus(userTrash).length);

// wait for deletion of checkpoint dir
GenericTestUtils.waitFor(()-> {
try {
return o3fs.listStatus(userTrash).length==0;
} catch (IOException e) {
LOG.error("Delete from Trash Failed");
Assert.fail("Delete from Trash Failed");
return false;
}
}, 1000, 120000);

// Cleanup
fs.delete(trashRoot, true);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.TrashPolicyDefault;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.conf.OMClientConfig;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -98,16 +103,21 @@ public void initialize(Configuration conf, FileSystem fs) {

@Override
public Runnable getEmptier() throws IOException {
return new TrashPolicyOzone.Emptier(configuration, emptierInterval);
return new TrashPolicyOzone.Emptier((OzoneConfiguration)configuration,
emptierInterval);
}


protected class Emptier implements Runnable {

private Configuration conf;
// same as checkpoint interval
private long emptierInterval;

Emptier(Configuration conf, long emptierInterval) throws IOException {

private ThreadPoolExecutor executor;

Emptier(OzoneConfiguration conf, long emptierInterval) throws IOException {
this.conf = conf;
this.emptierInterval = emptierInterval;
if (emptierInterval > deletionInterval || emptierInterval <= 0) {
Expand All @@ -118,24 +128,31 @@ protected class Emptier implements Runnable {
" minutes that is used for deletion instead");
this.emptierInterval = deletionInterval;
}
int trashEmptierCorePoolSize = conf.getObject(OMClientConfig.class)
.getTrashEmptierPoolSize();
LOG.info("Ozone Manager trash configuration: Deletion interval = "
+ (deletionInterval / MSECS_PER_MINUTE)
+ " minutes, Emptier interval = "
+ (this.emptierInterval / MSECS_PER_MINUTE) + " minutes.");
executor = new ThreadPoolExecutor(trashEmptierCorePoolSize,
trashEmptierCorePoolSize, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
}

@Override
public void run() {
// if this is not the leader node,don't run the emptier
if (emptierInterval == 0 || !om.isLeaderReady()) {
if (emptierInterval == 0) {
return; // trash disabled
}
long now, end;
while (true) {
now = Time.now();
end = ceiling(now, emptierInterval);
try { // sleep for interval
try {
// sleep for interval
Thread.sleep(end - now);
// if not leader, thread will always be sleeping
if (!om.isLeaderReady()){
continue;
}
Expand All @@ -147,20 +164,24 @@ public void run() {
now = Time.now();
if (now >= end) {
Collection<FileStatus> trashRoots;
trashRoots = fs.getTrashRoots(true); // list all trash dirs

for (FileStatus trashRoot : trashRoots) { // dump each trash
trashRoots = fs.getTrashRoots(true); // list all trash dirs
LOG.debug("Trash root Size: " + trashRoots.size());
for (FileStatus trashRoot : trashRoots) { // dump each trash
LOG.info("Trashroot:" + trashRoot.getPath().toString());
if (!trashRoot.isDirectory()) {
continue;
}
try {
TrashPolicyOzone trash = new TrashPolicyOzone(fs, conf, om);
trash.deleteCheckpoint(trashRoot.getPath(), false);
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
} catch (IOException e) {
LOG.warn("Trash caught: "+e+". Skipping " +
trashRoot.getPath() + ".");
}
TrashPolicyOzone trash = new TrashPolicyOzone(fs, conf, om);
Runnable task = ()->{
try {
trash.deleteCheckpoint(trashRoot.getPath(), false);
trash.createCheckpoint(trashRoot.getPath(),
new Date(Time.now()));
} catch (Exception e) {
LOG.info("Unable to checkpoint");
}
};
executor.submit(task);
}
}
} catch (Exception e) {
Expand All @@ -171,6 +192,13 @@ public void run() {
fs.close();
} catch(IOException e) {
LOG.warn("Trash cannot close FileSystem: ", e);
} finally {
executor.shutdown();
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error("Error attempting to shutdown");
}
}
}

Expand Down