Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
Expand Down Expand Up @@ -467,7 +466,6 @@ private OzoneConfiguration getClientConfForOFS(
}

@Test
@Ignore("HDDS-3982. Disable moveToTrash in o3fs and ofs temporarily")
public void testDeleteToTrashOrSkipTrash() throws Exception {
final String hostPrefix = OZONE_OFS_URI_SCHEME + "://" + omServiceId;
OzoneConfiguration clientConf = getClientConfForOFS(hostPrefix, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.TrashPolicy;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
Expand Down Expand Up @@ -1187,8 +1185,6 @@ public void start() throws IOException {
}
omRpcServer.start();
isOmRpcServerRunning = true;
// TODO: Start this thread only on the leader node.
// Should be fixed after HDDS-4451.
startTrashEmptier(configuration);

registerMXBean();
Expand Down Expand Up @@ -1247,8 +1243,6 @@ public void restart() throws IOException {
omRpcServer.start();
isOmRpcServerRunning = true;

// TODO: Start this thread only on the leader node.
// Should be fixed after HDDS-4451.
startTrashEmptier(configuration);
registerMXBean();

Expand Down Expand Up @@ -1289,9 +1283,7 @@ public FileSystem run() throws IOException {
return FileSystem.get(fsconf);
}
});
conf.setClass("fs.trash.classname", TrashPolicyOzone.class,
TrashPolicy.class);
this.emptier = new Thread(new Trash(fs, conf).
this.emptier = new Thread(new OzoneTrash(fs, conf, this).
getEmptier(), "Trash Emptier");
this.emptier.setDaemon(true);
this.emptier.start();
Expand Down Expand Up @@ -1400,12 +1392,7 @@ public void stop() {
if (httpServer != null) {
httpServer.stop();
}
// TODO:Also stop this thread if an OM switches from leader to follower.
// Should be fixed after HDDS-4451.
if (this.emptier != null) {
emptier.interrupt();
emptier = null;
}
stopTrashEmptier();
metadataManager.stop();
metrics.unRegister();
omClientProtocolMetrics.unregister();
Expand Down Expand Up @@ -3285,6 +3272,7 @@ TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
// During stopServices, if KeyManager was stopped successfully and
// OMMetadataManager stop failed, we should restart the KeyManager.
keyManager.start(configuration);
startTrashEmptier(configuration);
throw e;
}

Expand Down Expand Up @@ -3375,6 +3363,14 @@ void stopServices() throws Exception {
keyManager.stop();
stopSecretManager();
metadataManager.stop();
stopTrashEmptier();
}

private void stopTrashEmptier() {
if (this.emptier != null) {
emptier.interrupt();
emptier = null;
}
}

/**
Expand Down Expand Up @@ -3443,6 +3439,7 @@ void reloadOMState(long newSnapshotIndex, long newSnapshotTermIndex)
// Restart required services
metadataManager.start(configuration);
keyManager.start(configuration);
startTrashEmptier(configuration);

// Set metrics and start metrics back ground thread
metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.TrashPolicy;

import java.io.IOException;

/**
* OzoneTrash which takes an OM as parameter .
*/
public class OzoneTrash extends Trash {

private TrashPolicy trashPolicy;
public OzoneTrash(FileSystem fs, Configuration conf, OzoneManager om)
throws IOException {
super(fs, conf);
this.trashPolicy = new TrashPolicyOzone(fs, conf, om);
}
@Override
public Runnable getEmptier() throws IOException {
return this.trashPolicy.getEmptier();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;

/**
* TrashPolicy for Ozone Specific Trash Operations.Through this implementation
* of TrashPolicy ozone-specific trash optimizations are/will be made such as
Expand All @@ -61,16 +66,39 @@ public class TrashPolicyOzone extends TrashPolicyDefault {

private long emptierInterval;

private Configuration configuration;

private OzoneManager om;

public TrashPolicyOzone(){
}

private TrashPolicyOzone(FileSystem fs, Configuration conf){
super.initialize(conf, fs);
@Override
public void initialize(Configuration conf, FileSystem fs) {
this.fs = fs;
this.configuration = conf;
this.deletionInterval = (long)(conf.getFloat(
FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
this.emptierInterval = (long)(conf.getFloat(
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
if (deletionInterval < 0) {
LOG.warn("Invalid value {} for deletion interval,"
+ " deletion interaval can not be negative."
+ "Changing to default value 0", deletionInterval);
this.deletionInterval = 0;
}
}

TrashPolicyOzone(FileSystem fs, Configuration conf, OzoneManager om){
initialize(conf, fs);
this.om = om;
}

@Override
public Runnable getEmptier() throws IOException {
return new TrashPolicyOzone.Emptier(getConf(), emptierInterval);
return new TrashPolicyOzone.Emptier(configuration, emptierInterval);
}

protected class Emptier implements Runnable {
Expand Down Expand Up @@ -98,7 +126,8 @@ protected class Emptier implements Runnable {

@Override
public void run() {
if (emptierInterval == 0) {
// if this is not the leader node,don't run the emptier
if (emptierInterval == 0 || !om.isLeaderReady()) {
return; // trash disabled
}
long now, end;
Expand All @@ -107,6 +136,9 @@ public void run() {
end = ceiling(now, emptierInterval);
try { // sleep for interval
Thread.sleep(end - now);
if (!om.isLeaderReady()){
continue;
}
} catch (InterruptedException e) {
break; // exit on interrupt
}
Expand All @@ -122,7 +154,7 @@ public void run() {
continue;
}
try {
TrashPolicyOzone trash = new TrashPolicyOzone(fs, conf);
TrashPolicyOzone trash = new TrashPolicyOzone(fs, conf, om);
trash.deleteCheckpoint(trashRoot.getPath(), false);
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
} catch (IOException e) {
Expand Down