diff --git a/hadoop-ozone/dist/src/shell/hdds/hadoop-functions.sh b/hadoop-ozone/dist/src/shell/hdds/hadoop-functions.sh index b46045b2d8c0..d544bdbc074b 100755 --- a/hadoop-ozone/dist/src/shell/hdds/hadoop-functions.sh +++ b/hadoop-ozone/dist/src/shell/hdds/hadoop-functions.sh @@ -2791,6 +2791,14 @@ function hadoop_assembly_classpath() { fi hadoop_add_classpath "${MAIN_ARTIFACT}" + # Add two Ozone FileSystem jars to CLASSPATH for trash cleanup thread + OZONE_FS_ARTIFACT_NAME="hadoop-ozone-filesystem" + FS_ARTIFACTS=$(find "$ARTIFACT_LIB_DIR" -name "${OZONE_FS_ARTIFACT_NAME}-*.jar") + while IFS= read -r FS_ARTIFACT; do + hadoop_debug "Adding $OZONE_FS_ARTIFACT_NAME jar to CLASSPATH: $FS_ARTIFACT" + hadoop_add_classpath "${FS_ARTIFACT}" + done <<< "$FS_ARTIFACTS" + #Add optional jars to the classpath OPTIONAL_CLASSPATH_DIR="${HADOOP_HDFS_HOME}/share/ozone/lib/${ARTIFACT_NAME}" if [[ -d "$OPTIONAL_CLASSPATH_DIR" ]]; then diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index de0de0b542d7..c264866a74ff 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -50,6 +50,8 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.annotation.InterfaceAudience; @@ -170,6 +172,9 @@ import com.google.protobuf.BlockingService; import com.google.protobuf.ProtocolMessageEnum; import org.apache.commons.lang3.StringUtils; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT; import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients; @@ -250,6 +255,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private final ProtocolMessageMetrics omClientProtocolMetrics; private OzoneManagerHttpServer httpServer; + private Thread emptier; private final OMStorage omStorage; private final ScmBlockLocationProtocol scmBlockClient; private final StorageContainerLocationProtocol scmContainerClient; @@ -1123,6 +1129,11 @@ public void start() throws IOException { // Allow OM to start as Http Server failure is not fatal. LOG.error("OM HttpServer failed to start.", ex); } + try { + startTrashEmptier(configuration); + } catch (Exception ex) { + LOG.error("Trash emptier failed to start.", ex); + } registerMXBean(); startJVMPauseMonitor(); @@ -1169,7 +1180,6 @@ public void restart() throws IOException { omRpcServer = getRpcServer(configuration); omRpcServer.start(); isOmRpcServerRunning = true; - try { httpServer = new OzoneManagerHttpServer(configuration, this); httpServer.start(); @@ -1177,6 +1187,12 @@ public void restart() throws IOException { // Allow OM to start as Http Server failure is not fatal. LOG.error("OM HttpServer failed to start.", ex); } + // TODO: Check is trash emptier thread will be stopped when OM is restarted + try { + startTrashEmptier(configuration); + } catch (Exception ex) { + LOG.error("Trash emptier failed to start.", ex); + } registerMXBean(); startJVMPauseMonitor(); @@ -1265,6 +1281,7 @@ public void stop() { if (httpServer != null) { httpServer.stop(); } + stopTrashEmptier(); metadataManager.stop(); metrics.unRegister(); omClientProtocolMetrics.unregister(); @@ -3312,4 +3329,26 @@ private void startJVMPauseMonitor() { jvmPauseMonitor.init(configuration); jvmPauseMonitor.start(); } + + private void startTrashEmptier(final OzoneConfiguration conf) + throws IOException { + long trashInterval = + conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT); + if (trashInterval <= 0) { + return; + } + // TODO: Also check if fs.defaultFS is set to ofs or o3fs? + FileSystem fs = SecurityUtil.doAsLoginUser(() -> FileSystem.get(conf)); + this.emptier = + new Thread(new Trash(fs, conf).getEmptier(), "Trash Emptier"); + this.emptier.setDaemon(true); + this.emptier.start(); + } + + private void stopTrashEmptier() { + if (this.emptier != null) { + emptier.interrupt(); + emptier = null; + } + } }