Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class BackgroundService {
private ThreadGroup threadGroup;
private final String serviceName;
private long interval;
private final long serviceTimeoutInNanos;
private long serviceTimeoutInNanos;
private TimeUnit unit;
private final int threadPoolSize;
private final String threadNamePrefix;
Expand Down Expand Up @@ -86,6 +86,11 @@ public void setPoolSize(int size) {
exec.setCorePoolSize(size);
}

public synchronized void setServiceTimeoutInNanos(long newTimeout) {
LOG.info("{} timeout is {} {}", serviceName, newTimeout, TimeUnit.NANOSECONDS.name().toLowerCase());
this.serviceTimeoutInNanos = newTimeout;
}

@VisibleForTesting
public int getThreadCount() {
return threadGroup.activeCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS;
import static org.apache.hadoop.ozone.common.Storage.StorageState.INITIALIZED;
import static org.apache.hadoop.ozone.conf.OzoneServiceConfig.DEFAULT_SHUTDOWN_HOOK_PRIORITY;
Expand All @@ -40,6 +43,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configurable;
Expand Down Expand Up @@ -284,6 +288,10 @@ public String getNamespace() {
this::reconfigBlockDeleteThreadMax)
.register(OZONE_BLOCK_DELETING_SERVICE_WORKERS,
this::reconfigDeletingServiceWorkers)
.register(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
this::reconfigBlockDeletingServiceInterval)
.register(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
this::reconfigBlockDeletingServiceTimeout)
.register(REPLICATION_STREAMS_LIMIT_KEY,
this::reconfigReplicationStreamsLimit);

Expand Down Expand Up @@ -671,6 +679,21 @@ private String reconfigReplicationStreamsLimit(String value) {
return value;
}

private String reconfigBlockDeletingServiceInterval(String value) {
getConf().set(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, value);
return value;
}

private String reconfigBlockDeletingServiceTimeout(String value) {
getConf().set(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, value);

long timeout = conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.NANOSECONDS);
getDatanodeStateMachine().getContainer().getBlockDeletingService()
.setServiceTimeoutInNanos(timeout);
return value;
}

/**
* Returns the initial version of the datanode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.hadoop.ozone.container.common.impl;

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.time.Duration;
Expand All @@ -26,6 +29,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
Expand Down Expand Up @@ -94,12 +98,31 @@ public BlockDeletingService(
dnConf = conf.getObject(DatanodeConfiguration.class);
if (reconfigurationHandler != null) {
reconfigurationHandler.register(dnConf);
registerReconfigCallbacks(reconfigurationHandler, (OzoneConfiguration) conf);
}
this.blockDeletingMaxLockHoldingTime =
dnConf.getBlockDeletingMaxLockHoldingTime();
metrics = BlockDeletingServiceMetrics.create();
}

public void registerReconfigCallbacks(ReconfigurationHandler handler, OzoneConfiguration ozoneConf) {
handler.registerCompleteCallback((changedKeys, newConf) -> {
if (changedKeys.containsKey(OZONE_BLOCK_DELETING_SERVICE_INTERVAL)) {
updateAndRestart(ozoneConf);
}
});
}

public synchronized void updateAndRestart(OzoneConfiguration ozoneConf) {
long newInterval = ozoneConf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS);
LOG.info("Updating and restarting BlockDeletingService with interval: {} {}",
newInterval, TimeUnit.SECONDS.name().toLowerCase());
shutdown();
setInterval(newInterval, TimeUnit.SECONDS);
start();
}

/**
* Pair of container data and the number of blocks to delete.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hadoop.ozone.reconfig;

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX;
import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
Expand Down Expand Up @@ -49,6 +51,8 @@ void reconfigurableProperties() {
Set<String> expected = ImmutableSet.<String>builder()
.add(HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX)
.add(OZONE_BLOCK_DELETING_SERVICE_WORKERS)
.add(OZONE_BLOCK_DELETING_SERVICE_INTERVAL)
.add(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT)
.add(REPLICATION_STREAMS_LIMIT_KEY)
.addAll(new DatanodeConfiguration().reconfigurableProperties())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
Expand All @@ -53,6 +55,7 @@ void reconfigurableProperties() {
.add(OZONE_OM_VOLUME_LISTALL_ALLOWED)
.add(OZONE_READONLY_ADMINISTRATORS)
.add(OZONE_DIR_DELETING_SERVICE_INTERVAL)
.add(OZONE_THREAD_NUMBER_DIR_DELETION)
.addAll(new OmConfig().reconfigurableProperties())
.build();

Expand Down Expand Up @@ -123,4 +126,16 @@ void unsetAllowListAllVolumes(String newValue) throws ReconfigurationException {
assertEquals(OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT, cluster().getOzoneManager().getAllowListAllVolumes());
}

@ParameterizedTest
@ValueSource(ints = { -1, +1 })
void threadNumberDirDeletion(int delta) throws ReconfigurationException {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor)
cluster().getOzoneManager().getKeyManager().getDirDeletingService().getExecutorService();
int newValue = executor.getCorePoolSize() + delta;

cluster().getOzoneManager().getReconfigurationHandler().reconfigurePropertyImpl(
OZONE_THREAD_NUMBER_DIR_DELETION, String.valueOf(newValue));
assertEquals(newValue, executor.getCorePoolSize());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
Expand Down Expand Up @@ -523,7 +524,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
.register(OZONE_OM_VOLUME_LISTALL_ALLOWED, this::reconfigureAllowListAllVolumes)
.register(OZONE_KEY_DELETING_LIMIT_PER_TASK,
this::reconfOzoneKeyDeletingLimitPerTask)
.register(OZONE_DIR_DELETING_SERVICE_INTERVAL, this::reconfOzoneDirDeletingServiceInterval);
.register(OZONE_DIR_DELETING_SERVICE_INTERVAL, this::reconfOzoneDirDeletingServiceInterval)
.register(OZONE_THREAD_NUMBER_DIR_DELETION, this::reconfOzoneThreadNumberDirDeletion);

reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());

Expand Down Expand Up @@ -5159,6 +5161,12 @@ private String reconfOzoneDirDeletingServiceInterval(String newVal) {
return newVal;
}

private String reconfOzoneThreadNumberDirDeletion(String newVal) {
getConfiguration().set(OZONE_THREAD_NUMBER_DIR_DELETION, newVal);
getKeyManager().getDirDeletingService().setPoolSize(Integer.parseInt(newVal));
return newVal;
}

public void validateReplicationConfig(ReplicationConfig replicationConfig)
throws OMException {
try {
Expand Down
Loading