diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/conf/OMClientConfig.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/conf/OMClientConfig.java index 37cd67e0af58..cbae089bd6df 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/conf/OMClientConfig.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/conf/OMClientConfig.java @@ -36,6 +36,8 @@ public class OMClientConfig { public static final String OM_CLIENT_RPC_TIME_OUT = "rpc.timeout"; + public static final String OM_TRASH_EMPTIER_CORE_POOL_SIZE + = "trash.core.pool.size"; @Config(key = OM_CLIENT_RPC_TIME_OUT, defaultValue = "15m", @@ -51,6 +53,21 @@ public class OMClientConfig { ) private long rpcTimeOut = 15 * 60 * 1000; + @Config(key = OM_TRASH_EMPTIER_CORE_POOL_SIZE, + defaultValue = "5", + type = ConfigType.INT, + tags = {OZONE, OM, CLIENT}, + description = "Total number of threads in pool for the Trash Emptier") + private int trashEmptierPoolSize = 5; + + + public int getTrashEmptierPoolSize() { + return trashEmptierPoolSize; + } + + public void setTrashEmptierPoolSize(int trashEmptierPoolSize) { + this.trashEmptierPoolSize = trashEmptierPoolSize; + } public long getRpcTimeOut() { return rpcTimeOut; @@ -64,4 +81,4 @@ public void setRpcTimeOut(long timeOut) { } this.rpcTimeOut = timeOut; } -} +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java index bd8d45cd4fd8..3b93592d8c40 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.fs.TrashPolicy; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -45,6 +46,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.TrashPolicyOzone; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.security.UserGroupInformation; @@ -239,6 +241,7 @@ public void testFileSystem() throws Exception { testDeleteRoot(); testRecursiveDelete(); + testTrash(); } @After @@ -252,9 +255,9 @@ public void tearDown() { private void setupOzoneFileSystem() throws IOException, TimeoutException, InterruptedException { OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(FS_TRASH_INTERVAL_KEY, 1); conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS, enabledFileSystemPaths); + conf.setInt(FS_TRASH_INTERVAL_KEY, 1); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) .build(); @@ -776,4 +779,58 @@ public void testRenameToTrashEnabled() throws Exception { // Cleanup o3fs.delete(trashRoot, true); } + + /** + * 1.Move a Key to Trash + * 2.Verify that the key gets deleted by the trash emptier. + * @throws Exception + */ + + public void testTrash() throws Exception { + String testKeyName = "testKey2"; + Path path = new Path(OZONE_URI_DELIMITER, testKeyName); + ContractTestUtils.touch(fs, path); + Assert.assertTrue(trash.getConf().getClass( + "fs.trash.classname", TrashPolicy.class). + isAssignableFrom(TrashPolicyOzone.class)); + Assert.assertEquals(trash.getConf().getInt(FS_TRASH_INTERVAL_KEY, 0), 1); + // Call moveToTrash. We can't call protected fs.rename() directly + trash.moveToTrash(path); + + // Construct paths + String username = UserGroupInformation.getCurrentUser().getShortUserName(); + Path trashRoot = new Path(OZONE_URI_DELIMITER, TRASH_PREFIX); + Path userTrash = new Path(trashRoot, username); + Path userTrashCurrent = new Path(userTrash, "Current"); + Path trashPath = new Path(userTrashCurrent, testKeyName); + + // Wait until the TrashEmptier purges the key + GenericTestUtils.waitFor(()-> { + try { + return !o3fs.exists(trashPath); + } catch (IOException e) { + LOG.error("Delete from Trash Failed"); + Assert.fail("Delete from Trash Failed"); + return false; + } + }, 1000, 120000); + + // userTrash path will contain the checkpoint folder + Assert.assertEquals(1, fs.listStatus(userTrash).length); + + // wait for deletion of checkpoint dir + GenericTestUtils.waitFor(()-> { + try { + return o3fs.listStatus(userTrash).length==0; + } catch (IOException e) { + LOG.error("Delete from Trash Failed"); + Assert.fail("Delete from Trash Failed"); + return false; + } + }, 1000, 120000); + + // Cleanup + fs.delete(trashRoot, true); + + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java index 91ad4afc0a71..7006ea742ae4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java @@ -24,15 +24,20 @@ import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.TrashPolicyDefault; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.conf.OMClientConfig; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,16 +103,21 @@ public void initialize(Configuration conf, FileSystem fs) { @Override public Runnable getEmptier() throws IOException { - return new TrashPolicyOzone.Emptier(configuration, emptierInterval); + return new TrashPolicyOzone.Emptier((OzoneConfiguration)configuration, + emptierInterval); } + protected class Emptier implements Runnable { private Configuration conf; // same as checkpoint interval private long emptierInterval; - Emptier(Configuration conf, long emptierInterval) throws IOException { + + private ThreadPoolExecutor executor; + + Emptier(OzoneConfiguration conf, long emptierInterval) throws IOException { this.conf = conf; this.emptierInterval = emptierInterval; if (emptierInterval > deletionInterval || emptierInterval <= 0) { @@ -118,24 +128,31 @@ protected class Emptier implements Runnable { " minutes that is used for deletion instead"); this.emptierInterval = deletionInterval; } + int trashEmptierCorePoolSize = conf.getObject(OMClientConfig.class) + .getTrashEmptierPoolSize(); LOG.info("Ozone Manager trash configuration: Deletion interval = " + (deletionInterval / MSECS_PER_MINUTE) + " minutes, Emptier interval = " + (this.emptierInterval / MSECS_PER_MINUTE) + " minutes."); + executor = new ThreadPoolExecutor(trashEmptierCorePoolSize, + trashEmptierCorePoolSize, 1, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); } @Override public void run() { - // if this is not the leader node,don't run the emptier - if (emptierInterval == 0 || !om.isLeaderReady()) { + if (emptierInterval == 0) { return; // trash disabled } long now, end; while (true) { now = Time.now(); end = ceiling(now, emptierInterval); - try { // sleep for interval + try { + // sleep for interval Thread.sleep(end - now); + // if not leader, thread will always be sleeping if (!om.isLeaderReady()){ continue; } @@ -147,20 +164,24 @@ public void run() { now = Time.now(); if (now >= end) { Collection trashRoots; - trashRoots = fs.getTrashRoots(true); // list all trash dirs - - for (FileStatus trashRoot : trashRoots) { // dump each trash + trashRoots = fs.getTrashRoots(true); // list all trash dirs + LOG.debug("Trash root Size: " + trashRoots.size()); + for (FileStatus trashRoot : trashRoots) { // dump each trash + LOG.info("Trashroot:" + trashRoot.getPath().toString()); if (!trashRoot.isDirectory()) { continue; } - try { - TrashPolicyOzone trash = new TrashPolicyOzone(fs, conf, om); - trash.deleteCheckpoint(trashRoot.getPath(), false); - trash.createCheckpoint(trashRoot.getPath(), new Date(now)); - } catch (IOException e) { - LOG.warn("Trash caught: "+e+". Skipping " + - trashRoot.getPath() + "."); - } + TrashPolicyOzone trash = new TrashPolicyOzone(fs, conf, om); + Runnable task = ()->{ + try { + trash.deleteCheckpoint(trashRoot.getPath(), false); + trash.createCheckpoint(trashRoot.getPath(), + new Date(Time.now())); + } catch (Exception e) { + LOG.info("Unable to checkpoint"); + } + }; + executor.submit(task); } } } catch (Exception e) { @@ -171,6 +192,13 @@ public void run() { fs.close(); } catch(IOException e) { LOG.warn("Trash cannot close FileSystem: ", e); + } finally { + executor.shutdown(); + try { + executor.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Error attempting to shutdown"); + } } }