diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 959bee8d8c5f..a5df9a1776e7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -41,12 +41,14 @@ public abstract class BackgroundService { LoggerFactory.getLogger(BackgroundService.class); // Executor to launch child tasks - private final ScheduledThreadPoolExecutor exec; - private final ThreadGroup threadGroup; + private ScheduledThreadPoolExecutor exec; + private ThreadGroup threadGroup; private final String serviceName; - private final long interval; + private long interval; private final long serviceTimeoutInNanos; - private final TimeUnit unit; + private TimeUnit unit; + private final int threadPoolSize; + private final String threadNamePrefix; private final PeriodicalTask service; public BackgroundService(String serviceName, long interval, @@ -62,14 +64,9 @@ public BackgroundService(String serviceName, long interval, this.serviceName = serviceName; this.serviceTimeoutInNanos = TimeDuration.valueOf(serviceTimeout, unit) .toLong(TimeUnit.NANOSECONDS); - threadGroup = new ThreadGroup(serviceName); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setThreadFactory(r -> new Thread(threadGroup, r)) - .setDaemon(true) - .setNameFormat(threadNamePrefix + serviceName + "#%d") - .build(); - exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( - threadPoolSize, threadFactory); + this.threadPoolSize = threadPoolSize; + this.threadNamePrefix = threadNamePrefix; + initExecutorAndThreadGroup(); service = new PeriodicalTask(); } @@ -103,10 +100,20 @@ public void runPeriodicalTaskNow() throws Exception { } // start service - public void start() { + public synchronized void start() { + if (exec == null || exec.isShutdown() || exec.isTerminated()) { + initExecutorAndThreadGroup(); + } + LOG.info("Starting service {} with interval {} {}", serviceName, + interval, unit.name().toLowerCase()); exec.scheduleWithFixedDelay(service, 0, interval, unit); } + protected synchronized void setInterval(long newInterval, TimeUnit newUnit) { + this.interval = newInterval; + this.unit = newUnit; + } + public abstract BackgroundTaskQueue getTasks(); /** @@ -172,4 +179,14 @@ public void shutdown() { threadGroup.destroy(); } } + + private void initExecutorAndThreadGroup() { + threadGroup = new ThreadGroup(serviceName); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setThreadFactory(r -> new Thread(threadGroup, r)) + .setDaemon(true) + .setNameFormat(threadNamePrefix + serviceName + "#%d") + .build(); + exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadPoolSize, threadFactory); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java index 585cab9d38ab..31dab87935e6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java @@ -287,6 +287,8 @@ public String getNamespace() { .register(REPLICATION_STREAMS_LIMIT_KEY, this::reconfigReplicationStreamsLimit); + reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback()); + datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, conf, dnCertClient, secretKeyClient, this::terminateDatanode, reconfigurationHandler); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java new file mode 100644 index 000000000000..3d3f7e29662d --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.conf; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.ConfigRedactor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.conf.Reconfigurable; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.conf.ReconfigurationUtil; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class to support dynamic reconfiguration of configuration properties at runtime. + */ +public abstract class ReconfigurableBase extends Configured implements Reconfigurable { + private static final Logger LOG = LoggerFactory.getLogger(ReconfigurableBase.class); + private final ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil(); + private Thread reconfigThread = null; + private volatile boolean shouldRun = true; + private final Object reconfigLock = new Object(); + private long startTime = 0L; + private long endTime = 0L; + private Map> status = null; + private final Collection> reconfigurationCompleteCallbacks = new ArrayList<>(); + + public ReconfigurableBase(Configuration conf) { + super(conf == null ? new Configuration() : conf); + } + + protected abstract Configuration getNewConf(); + + @VisibleForTesting + public Collection getChangedProperties(Configuration newConf, + Configuration oldConf) { + return this.reconfigurationUtil.parseChangedProperties(newConf, oldConf); + } + + public void startReconfigurationTask() throws IOException { + synchronized (this.reconfigLock) { + String errorMessage; + if (!this.shouldRun) { + errorMessage = "The server is stopped."; + LOG.warn(errorMessage); + throw new IOException(errorMessage); + } else if (this.reconfigThread != null) { + errorMessage = "Another reconfiguration task is running."; + LOG.warn(errorMessage); + throw new IOException(errorMessage); + } else { + this.reconfigThread = new ReconfigurationThread(this); + this.reconfigThread.setDaemon(true); + this.reconfigThread.setName("Reconfiguration Task"); + this.reconfigThread.start(); + this.startTime = Time.now(); + } + } + } + + public ReconfigurationTaskStatus getReconfigurationTaskStatus() { + synchronized (this.reconfigLock) { + return this.reconfigThread != null ? new ReconfigurationTaskStatus(this.startTime, 0L, null) : + new ReconfigurationTaskStatus(this.startTime, this.endTime, this.status); + } + } + + public void shutdownReconfigurationTask() { + Thread tempThread; + synchronized (this.reconfigLock) { + this.shouldRun = false; + if (this.reconfigThread == null) { + return; + } + + tempThread = this.reconfigThread; + this.reconfigThread = null; + } + + try { + tempThread.join(); + } catch (InterruptedException ignored) { + } + + } + + @Override + public final void reconfigureProperty(String property, String newVal) throws ReconfigurationException { + if (this.isPropertyReconfigurable(property)) { + LOG.info("changing property " + property + " to " + newVal); + synchronized (this.getConf()) { + this.getConf().get(property); + String effectiveValue = this.reconfigurePropertyImpl(property, newVal); + if (newVal != null) { + this.getConf().set(property, effectiveValue); + } else { + this.getConf().unset(property); + } + } + } else { + throw new ReconfigurationException(property, newVal, this.getConf().get(property)); + } + } + + @Override + public abstract Collection getReconfigurableProperties(); + + @Override + public boolean isPropertyReconfigurable(String property) { + return this.getReconfigurableProperties().contains(property); + } + + protected abstract String reconfigurePropertyImpl(String var1, String var2) throws ReconfigurationException; + + private static class ReconfigurationThread extends Thread { + private final ReconfigurableBase parent; + + ReconfigurationThread(ReconfigurableBase base) { + this.parent = base; + } + + @Override + public void run() { + LOG.info("Starting reconfiguration task."); + Configuration oldConf = this.parent.getConf(); + Configuration newConf = this.parent.getNewConf(); + Collection changes = this.parent.getChangedProperties(newConf, oldConf); + Map> results = Maps.newHashMap(); + ConfigRedactor oldRedactor = new ConfigRedactor(oldConf); + ConfigRedactor newRedactor = new ConfigRedactor(newConf); + + for (ReconfigurationUtil.PropertyChange change : changes) { + String errorMessage = null; + String oldValRedacted = oldRedactor.redact(change.prop, change.oldVal); + String newValRedacted = newRedactor.redact(change.prop, change.newVal); + if (!this.parent.isPropertyReconfigurable(change.prop)) { + LOG.info(String.format("Property %s is not configurable: old value: %s, new value: %s", + change.prop, oldValRedacted, newValRedacted)); + } else { + LOG.info("Change property: " + change.prop + " from \"" + + (change.oldVal == null ? "" : oldValRedacted) + "\" to \"" + + (change.newVal == null ? "" : newValRedacted) + "\"."); + + try { + String effectiveValue = this.parent.reconfigurePropertyImpl(change.prop, change.newVal); + if (change.newVal != null) { + oldConf.set(change.prop, effectiveValue); + } else { + oldConf.unset(change.prop); + } + } catch (ReconfigurationException reconfException) { + Throwable cause = reconfException.getCause(); + errorMessage = cause == null ? reconfException.getMessage() : cause.getMessage(); + LOG.error("Failed to reconfigure property {}: {}", change.prop, errorMessage, reconfException); + } + + results.put(change, Optional.ofNullable(errorMessage)); + } + } + + synchronized (this.parent.reconfigLock) { + this.parent.endTime = Time.now(); + this.parent.status = Collections.unmodifiableMap(results); + this.parent.reconfigThread = null; + + for (Consumer callback : parent.reconfigurationCompleteCallbacks) { + try { + callback.accept(parent.getReconfigurationTaskStatus()); + } catch (Exception e) { + LOG.warn("Reconfiguration complete callback threw exception", e); + } + } + } + } + } + + public void addReconfigurationCompleteCallback(Consumer callback) { + synchronized (reconfigLock) { + this.reconfigurationCompleteCallbacks.add(callback); + } + } + +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java new file mode 100644 index 000000000000..810df7870d1d --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.conf; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; + +/** + * Callback interface to handle configuration changes after a reconfiguration task completes. + */ +@FunctionalInterface +public interface ReconfigurationChangeCallback { + void onPropertiesChanged(Map changedKeys, Configuration newConf); +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java index a594bfa27605..979525f7a1a7 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java @@ -22,18 +22,22 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; import java.util.function.UnaryOperator; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.ReconfigurableBase; import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.conf.ReconfigurationUtil; import org.apache.hadoop.hdds.protocol.ReconfigureProtocol; import org.apache.ratis.util.function.CheckedConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Keeps track of reconfigurable properties and the corresponding functions @@ -42,16 +46,64 @@ public class ReconfigurationHandler extends ReconfigurableBase implements ReconfigureProtocol { + private static final Logger LOG = LoggerFactory.getLogger(ReconfigurationHandler.class); private final String name; private final CheckedConsumer requireAdminPrivilege; private final Map> properties = new ConcurrentHashMap<>(); + private final List completeCallbacks = new ArrayList<>(); + private BiConsumer reconfigurationStatusListener; + + public void registerCompleteCallback(ReconfigurationChangeCallback callback) { + completeCallbacks.add(callback); + } + + public void setReconfigurationCompleteCallback(BiConsumer + statusListener) { + this.reconfigurationStatusListener = statusListener; + } + + public BiConsumer defaultLoggingCallback() { + return (status, conf) -> { + if (status.getStatus() != null && !status.getStatus().isEmpty()) { + LOG.info("Reconfiguration completed with {} updated properties.", + status.getStatus().size()); + } else { + LOG.info("Reconfiguration complete. No properties were changed."); + } + }; + } + + private void triggerCompleteCallbacks(ReconfigurationTaskStatus status, Configuration newConf) { + if (status.getStatus() != null && !status.getStatus().isEmpty()) { + Map changedKeys = new HashMap<>(); + for (ReconfigurationUtil.PropertyChange change : status.getStatus().keySet()) { + boolean deleted = change.newVal == null; + changedKeys.put(change.prop, !deleted); + } + for (ReconfigurationChangeCallback callback : completeCallbacks) { + callback.onPropertiesChanged(changedKeys, newConf); + } + } + + if (reconfigurationStatusListener != null) { + reconfigurationStatusListener.accept(status, newConf); + } + } + public ReconfigurationHandler(String name, OzoneConfiguration config, CheckedConsumer requireAdminPrivilege) { super(config); this.name = name; this.requireAdminPrivilege = requireAdminPrivilege; + + // Register callback on reconfiguration complete + addReconfigurationCompleteCallback(status -> { + Configuration newConf = getNewConf(); + triggerCompleteCallbacks(status, newConf); + }); + } public ReconfigurationHandler register( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 7773a91fec91..36b29c6a0792 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -403,6 +403,8 @@ private StorageContainerManager(OzoneConfiguration conf, .register(OZONE_READONLY_ADMINISTRATORS, this::reconfOzoneReadOnlyAdmins); + reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback()); + initializeSystemManagers(conf, configurator); if (isSecretKeyEnable(securityConfig)) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java index 05e7e2f0f3e3..55172f78f000 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT; @@ -51,6 +52,7 @@ void reconfigurableProperties() { .add(OZONE_KEY_DELETING_LIMIT_PER_TASK) .add(OZONE_OM_VOLUME_LISTALL_ALLOWED) .add(OZONE_READONLY_ADMINISTRATORS) + .add(OZONE_DIR_DELETING_SERVICE_INTERVAL) .addAll(new OmConfig().reconfigurableProperties()) .build(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java index b43536f5bd72..289044f89db0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java @@ -18,13 +18,18 @@ package org.apache.hadoop.ozone.shell; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; import java.util.List; -import org.apache.hadoop.conf.ReconfigurableBase; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.ReconfigurableBase; +import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -33,7 +38,9 @@ import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.admin.OzoneAdmin; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.GenericTestUtils.LogCapturer; import org.apache.ozone.test.NonHATests; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -47,6 +54,8 @@ public abstract class TestReconfigShell implements NonHATests.TestCase { private OzoneAdmin ozoneAdmin; + private OzoneConfiguration conf; + private ReconfigurationHandler reconfigurationHandler; private GenericTestUtils.PrintStreamCapturer out; private GenericTestUtils.PrintStreamCapturer err; @@ -55,6 +64,8 @@ void capture() { out = GenericTestUtils.captureOut(); err = GenericTestUtils.captureErr(); ozoneAdmin = new OzoneAdmin(); + conf = new OzoneConfiguration(); + reconfigurationHandler = cluster().getOzoneManager().getReconfigurationHandler(); } @AfterEach @@ -77,6 +88,35 @@ void testOzoneManagerGetReconfigurationProperties() { executeAndAssertProperties(om.getReconfigurationHandler(), "OM", socket); } + @Test + void testDirectoryDeletingServiceIntervalReconfiguration() throws ReconfigurationException { + OzoneManager om = cluster().getOzoneManager(); + InetSocketAddress socket = om.getOmRpcServerAddr(); + LogCapturer logCapturer = LogCapturer.captureLogs(DirectoryDeletingService.class); + + String initialInterval = "1m"; + String intervalFromXML = "2m"; //config value set in ozone-site.xml + long intervalFromXMLInSeconds = TimeUnit.MINUTES.toSeconds(2); //120 seconds + + reconfigurationHandler.reconfigurePropertyImpl(OZONE_DIR_DELETING_SERVICE_INTERVAL, initialInterval); + assertThat(reconfigurationHandler.getConf().get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(initialInterval); + + //Start the reconfiguration task + executeAndAssertStart("OM", socket); + //If config value is set in ozone-site.xml then it is picked up during reconfiguration + assertThat(conf.get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(intervalFromXML); + + executeAndAssertStatus("OM", socket); + assertThat(reconfigurationHandler.getConf().get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(intervalFromXML); + assertThat(out.get()).contains( + String.format("SUCCESS: Changed property %s", OZONE_DIR_DELETING_SERVICE_INTERVAL) + ); + assertThat(logCapturer.getOutput()).contains( + String.format("Updating and restarting DirectoryDeletingService with interval: %d %s", + intervalFromXMLInSeconds, TimeUnit.SECONDS.name().toLowerCase()) + ); + } + @Test void testStorageContainerManagerGetReconfigurationProperties() { StorageContainerManager scm = cluster().getStorageContainerManager(); @@ -130,4 +170,17 @@ private void executeForInServiceDatanodes(int expectedCount) { private String getAddress(InetSocketAddress socket) { return socket.getHostString() + ":" + socket.getPort(); } + + private void executeAndAssertStart(String service, InetSocketAddress socket) { + String address = socket.getHostString() + ":" + socket.getPort(); + ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address", address, "start"); + assertThat(out.get()).contains(service + ": Started reconfiguration task on node [" + address + "]"); + } + + private void executeAndAssertStatus(String service, InetSocketAddress socket) { + String address = socket.getHostString() + ":" + socket.getPort(); + ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address", address, "status"); + assertThat(out.get()).contains(service + ": Reconfiguring status for node [" + address + "]: started"); + } + } diff --git a/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml b/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml index 5ea2eb89dfa3..2b07b1d060dc 100644 --- a/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml +++ b/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml @@ -127,5 +127,17 @@ ozone.client.datastream.window.size 8MB + + ozone.readonly.administrators + admin + + + ozone.administrators + admin + + + ozone.directory.deleting.service.interval + 2m + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index ad1741df0a5b..ec2b9964f53d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -52,6 +52,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT; @@ -280,6 +281,7 @@ import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider; import org.apache.hadoop.ozone.om.s3.S3SecretStoreProvider; import org.apache.hadoop.ozone.om.service.CompactDBService; +import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService; import org.apache.hadoop.ozone.om.service.QuotaRepairTask; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; @@ -494,6 +496,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl // instance creation every single time. private UncheckedAutoCloseableSupplier rcOmMetadataReader; private OmSnapshotManager omSnapshotManager; + private volatile DirectoryDeletingService dirDeletingService; @SuppressWarnings("methodlength") private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) @@ -519,7 +522,10 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) this::reconfOzoneReadOnlyAdmins) .register(OZONE_OM_VOLUME_LISTALL_ALLOWED, this::reconfigureAllowListAllVolumes) .register(OZONE_KEY_DELETING_LIMIT_PER_TASK, - this::reconfOzoneKeyDeletingLimitPerTask); + this::reconfOzoneKeyDeletingLimitPerTask) + .register(OZONE_DIR_DELETING_SERVICE_INTERVAL, this::reconfOzoneDirDeletingServiceInterval); + + reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback()); versionManager = new OMLayoutVersionManager(omStorage.getLayoutVersion()); upgradeFinalizer = new OMUpgradeFinalizer(versionManager); @@ -5148,6 +5154,11 @@ private String reconfigureAllowListAllVolumes(String newVal) { return String.valueOf(allowListAllVolumes); } + private String reconfOzoneDirDeletingServiceInterval(String newVal) { + getConfiguration().set(OZONE_DIR_DELETING_SERVICE_INTERVAL, newVal); + return newVal; + } + public void validateReplicationConfig(ReplicationConfig replicationConfig) throws OMException { try { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index 7451032492ea..ad90490101c4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -17,6 +17,9 @@ package org.apache.hadoop.ozone.om.service; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT; + import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; @@ -29,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; @@ -100,9 +104,28 @@ public DirectoryDeletingService(long interval, TimeUnit unit, this.isRunningOnAOS = new AtomicBoolean(false); this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize; deletedDirSupplier = new DeletedDirSupplier(); + registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(), configuration); taskCount.set(0); } + public void registerReconfigCallbacks(ReconfigurationHandler handler, OzoneConfiguration conf) { + handler.registerCompleteCallback((changedKeys, newConf) -> { + if (changedKeys.containsKey(OZONE_DIR_DELETING_SERVICE_INTERVAL)) { + updateAndRestart(conf); + } + }); + } + + private synchronized void updateAndRestart(OzoneConfiguration conf) { + long newInterval = conf.getTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, + OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS); + LOG.info("Updating and restarting DirectoryDeletingService with interval: {} {}", + newInterval, TimeUnit.SECONDS.name().toLowerCase()); + shutdown(); + setInterval(newInterval, TimeUnit.SECONDS); + start(); + } + private boolean shouldRun() { if (getOzoneManager() == null) { // OzoneManager can be null for testing