Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions hadoop-ozone/dist/src/shell/hdds/hadoop-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2791,6 +2791,14 @@ function hadoop_assembly_classpath() {
fi
hadoop_add_classpath "${MAIN_ARTIFACT}"

# Add two Ozone FileSystem jars to CLASSPATH for trash cleanup thread
OZONE_FS_ARTIFACT_NAME="hadoop-ozone-filesystem"
FS_ARTIFACTS=$(find "$ARTIFACT_LIB_DIR" -name "${OZONE_FS_ARTIFACT_NAME}-*.jar")
while IFS= read -r FS_ARTIFACT; do
hadoop_debug "Adding $OZONE_FS_ARTIFACT_NAME jar to CLASSPATH: $FS_ARTIFACT"
hadoop_add_classpath "${FS_ARTIFACT}"
done <<< "$FS_ARTIFACTS"

#Add optional jars to the classpath
OPTIONAL_CLASSPATH_DIR="${HADOOP_HDFS_HOME}/share/ozone/lib/${ARTIFACT_NAME}"
if [[ -d "$OPTIONAL_CLASSPATH_DIR" ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
Expand Down Expand Up @@ -170,6 +172,9 @@
import com.google.protobuf.BlockingService;
import com.google.protobuf.ProtocolMessageEnum;
import org.apache.commons.lang3.StringUtils;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
Expand Down Expand Up @@ -250,6 +255,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final ProtocolMessageMetrics<ProtocolMessageEnum>
omClientProtocolMetrics;
private OzoneManagerHttpServer httpServer;
private Thread emptier;
private final OMStorage omStorage;
private final ScmBlockLocationProtocol scmBlockClient;
private final StorageContainerLocationProtocol scmContainerClient;
Expand Down Expand Up @@ -1123,6 +1129,11 @@ public void start() throws IOException {
// Allow OM to start as Http Server failure is not fatal.
LOG.error("OM HttpServer failed to start.", ex);
}
try {
startTrashEmptier(configuration);
} catch (Exception ex) {
LOG.error("Trash emptier failed to start.", ex);
}
registerMXBean();

startJVMPauseMonitor();
Expand Down Expand Up @@ -1169,14 +1180,19 @@ public void restart() throws IOException {
omRpcServer = getRpcServer(configuration);
omRpcServer.start();
isOmRpcServerRunning = true;

try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
} catch (Exception ex) {
// Allow OM to start as Http Server failure is not fatal.
LOG.error("OM HttpServer failed to start.", ex);
}
// TODO: Check is trash emptier thread will be stopped when OM is restarted
try {
startTrashEmptier(configuration);
} catch (Exception ex) {
LOG.error("Trash emptier failed to start.", ex);
}
registerMXBean();

startJVMPauseMonitor();
Expand Down Expand Up @@ -1265,6 +1281,7 @@ public void stop() {
if (httpServer != null) {
httpServer.stop();
}
stopTrashEmptier();
metadataManager.stop();
metrics.unRegister();
omClientProtocolMetrics.unregister();
Expand Down Expand Up @@ -3312,4 +3329,26 @@ private void startJVMPauseMonitor() {
jvmPauseMonitor.init(configuration);
jvmPauseMonitor.start();
}

private void startTrashEmptier(final OzoneConfiguration conf)
throws IOException {
long trashInterval =
conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
if (trashInterval <= 0) {
return;
}
// TODO: Also check if fs.defaultFS is set to ofs or o3fs?
FileSystem fs = SecurityUtil.doAsLoginUser(() -> FileSystem.get(conf));
this.emptier =
new Thread(new Trash(fs, conf).getEmptier(), "Trash Emptier");
this.emptier.setDaemon(true);
this.emptier.start();
}

private void stopTrashEmptier() {
if (this.emptier != null) {
emptier.interrupt();
emptier = null;
}
}
}