Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class BackgroundService {
private ThreadGroup threadGroup;
private final String serviceName;
private long interval;
private final long serviceTimeoutInNanos;
private volatile long serviceTimeoutInNanos;
private TimeUnit unit;
private final int threadPoolSize;
private final String threadNamePrefix;
Expand Down Expand Up @@ -92,6 +92,11 @@ public void setPoolSize(int size) {
exec.setCorePoolSize(size);
}

public synchronized void setServiceTimeoutInNanos(long newTimeout) {
LOG.info("{} timeout is set to {} {}", serviceName, newTimeout, TimeUnit.NANOSECONDS.name().toLowerCase());
this.serviceTimeoutInNanos = newTimeout;
}

@VisibleForTesting
public int getThreadCount() {
return threadGroup.activeCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS;
import static org.apache.hadoop.ozone.common.Storage.StorageState.INITIALIZED;
import static org.apache.hadoop.ozone.conf.OzoneServiceConfig.DEFAULT_SHUTDOWN_HOOK_PRIORITY;
Expand Down Expand Up @@ -284,6 +286,10 @@ public String getNamespace() {
this::reconfigBlockDeleteThreadMax)
.register(OZONE_BLOCK_DELETING_SERVICE_WORKERS,
this::reconfigDeletingServiceWorkers)
.register(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
this::reconfigBlockDeletingServiceInterval)
.register(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
this::reconfigBlockDeletingServiceTimeout)
.register(REPLICATION_STREAMS_LIMIT_KEY,
this::reconfigReplicationStreamsLimit);

Expand Down Expand Up @@ -656,10 +662,9 @@ private String reconfigBlockDeleteThreadMax(String value) {
}

private String reconfigDeletingServiceWorkers(String value) {
Preconditions.checkArgument(Integer.parseInt(value) >= 0,
OZONE_BLOCK_DELETING_SERVICE_WORKERS + " cannot be negative.");
getConf().set(OZONE_BLOCK_DELETING_SERVICE_WORKERS, value);

getDatanodeStateMachine().getContainer().getBlockDeletingService()
.setPoolSize(Integer.parseInt(value));
return value;
}

Expand All @@ -671,6 +676,16 @@ private String reconfigReplicationStreamsLimit(String value) {
return value;
}

private String reconfigBlockDeletingServiceInterval(String value) {
getConf().set(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, value);
return value;
}

private String reconfigBlockDeletingServiceTimeout(String value) {
getConf().set(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, value);
return value;
}

/**
* Returns the initial version of the datanode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.hadoop.ozone.container.common.impl;

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS_DEFAULT;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.time.Duration;
Expand All @@ -26,6 +33,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
Expand Down Expand Up @@ -94,12 +102,41 @@ public BlockDeletingService(
dnConf = conf.getObject(DatanodeConfiguration.class);
if (reconfigurationHandler != null) {
reconfigurationHandler.register(dnConf);
registerReconfigCallbacks(reconfigurationHandler);
}
this.blockDeletingMaxLockHoldingTime =
dnConf.getBlockDeletingMaxLockHoldingTime();
metrics = BlockDeletingServiceMetrics.create();
}

public void registerReconfigCallbacks(ReconfigurationHandler handler) {
handler.registerCompleteCallback((changedKeys, newConf) -> {
if (changedKeys.containsKey(OZONE_BLOCK_DELETING_SERVICE_INTERVAL) ||
changedKeys.containsKey(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT) ||
changedKeys.containsKey(OZONE_BLOCK_DELETING_SERVICE_WORKERS)) {
updateAndRestart((OzoneConfiguration) newConf);
}
});
}

public synchronized void updateAndRestart(OzoneConfiguration ozoneConf) {
long newInterval = ozoneConf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS);
int newCorePoolSize = ozoneConf.getInt(OZONE_BLOCK_DELETING_SERVICE_WORKERS,
OZONE_BLOCK_DELETING_SERVICE_WORKERS_DEFAULT);
long newTimeout = ozoneConf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.NANOSECONDS);
LOG.info("Updating and restarting BlockDeletingService with interval {} {}" +
", core pool size {} and timeout {} {}",
newInterval, TimeUnit.SECONDS.name().toLowerCase(), newCorePoolSize, newTimeout,
TimeUnit.NANOSECONDS.name().toLowerCase());
shutdown();
setInterval(newInterval, TimeUnit.SECONDS);
setPoolSize(newCorePoolSize);
setServiceTimeoutInNanos(newTimeout);
start();
}

/**
* Pair of container data and the number of blocks to delete.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.hadoop.ozone.reconfig;

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX;
import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
Expand All @@ -49,6 +50,8 @@ void reconfigurableProperties() {
Set<String> expected = ImmutableSet.<String>builder()
.add(HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX)
.add(OZONE_BLOCK_DELETING_SERVICE_WORKERS)
.add(OZONE_BLOCK_DELETING_SERVICE_INTERVAL)
.add(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT)
.add(REPLICATION_STREAMS_LIMIT_KEY)
.addAll(new DatanodeConfiguration().reconfigurableProperties())
.build();
Expand Down Expand Up @@ -79,19 +82,6 @@ void blockDeleteThreadMax(int delta) throws ReconfigurationException {
assertEquals(newValue, executor.getCorePoolSize());
}

@ParameterizedTest
@ValueSource(ints = { -1, +1 })
void blockDeletingServiceWorkers(int delta) throws ReconfigurationException {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor)
getFirstDatanode().getDatanodeStateMachine().getContainer()
.getBlockDeletingService().getExecutorService();
int newValue = executor.getCorePoolSize() + delta;

getFirstDatanode().getReconfigurationHandler().reconfigurePropertyImpl(
OZONE_BLOCK_DELETING_SERVICE_WORKERS, String.valueOf(newValue));
assertEquals(newValue, executor.getCorePoolSize());
}

@ParameterizedTest
@ValueSource(ints = { -1, +1 })
void replicationStreamsLimit(int delta) throws ReconfigurationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -53,6 +54,7 @@ void reconfigurableProperties() {
.add(OZONE_OM_VOLUME_LISTALL_ALLOWED)
.add(OZONE_READONLY_ADMINISTRATORS)
.add(OZONE_DIR_DELETING_SERVICE_INTERVAL)
.add(OZONE_THREAD_NUMBER_DIR_DELETION)
.addAll(new OmConfig().reconfigurableProperties())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void testDirectoryDeletingServiceIntervalReconfiguration() throws Reconfiguratio
String.format("SUCCESS: Changed property %s", OZONE_DIR_DELETING_SERVICE_INTERVAL)
);
assertThat(logCapturer.getOutput()).contains(
String.format("Updating and restarting DirectoryDeletingService with interval: %d %s",
String.format("Updating and restarting DirectoryDeletingService with interval %d %s",
intervalFromXMLInSeconds, TimeUnit.SECONDS.name().toLowerCase())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
Expand Down Expand Up @@ -524,7 +525,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
.register(OZONE_OM_VOLUME_LISTALL_ALLOWED, this::reconfigureAllowListAllVolumes)
.register(OZONE_KEY_DELETING_LIMIT_PER_TASK,
this::reconfOzoneKeyDeletingLimitPerTask)
.register(OZONE_DIR_DELETING_SERVICE_INTERVAL, this::reconfOzoneDirDeletingServiceInterval);
.register(OZONE_DIR_DELETING_SERVICE_INTERVAL, this::reconfOzoneDirDeletingServiceInterval)
.register(OZONE_THREAD_NUMBER_DIR_DELETION, this::reconfOzoneThreadNumberDirDeletion);

reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());

Expand Down Expand Up @@ -5169,6 +5171,13 @@ private String reconfOzoneDirDeletingServiceInterval(String newVal) {
return newVal;
}

private String reconfOzoneThreadNumberDirDeletion(String newVal) {
Preconditions.checkArgument(Integer.parseInt(newVal) >= 0,
OZONE_THREAD_NUMBER_DIR_DELETION + " cannot be negative.");
getConfiguration().set(OZONE_THREAD_NUMBER_DIR_DELETION, newVal);
return newVal;
}

public void validateReplicationConfig(ReplicationConfig replicationConfig)
throws OMException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -171,29 +173,34 @@ public DirectoryDeletingService(long interval, TimeUnit unit,

// always go to 90% of max limit for request as other header will be added
this.ratisByteLimit = (int) (limit * 0.9);
registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(), configuration);
registerReconfigCallbacks(ozoneManager.getReconfigurationHandler());
this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager();
this.deepCleanSnapshots = deepCleanSnapshots;
this.deletedDirsCount = new AtomicLong(0);
this.movedDirsCount = new AtomicLong(0);
this.movedFilesCount = new AtomicLong(0);
}

public void registerReconfigCallbacks(ReconfigurationHandler handler, OzoneConfiguration conf) {
public void registerReconfigCallbacks(ReconfigurationHandler handler) {
handler.registerCompleteCallback((changedKeys, newConf) -> {
if (changedKeys.containsKey(OZONE_DIR_DELETING_SERVICE_INTERVAL)) {
updateAndRestart(conf);
if (changedKeys.containsKey(OZONE_DIR_DELETING_SERVICE_INTERVAL) ||
changedKeys.containsKey(OZONE_THREAD_NUMBER_DIR_DELETION)) {
updateAndRestart((OzoneConfiguration) newConf);
}
});
}

private synchronized void updateAndRestart(OzoneConfiguration conf) {
long newInterval = conf.getTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL,
OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS);
LOG.info("Updating and restarting DirectoryDeletingService with interval: {} {}",
newInterval, TimeUnit.SECONDS.name().toLowerCase());
int newCorePoolSize = conf.getInt(OZONE_THREAD_NUMBER_DIR_DELETION,
OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT);
LOG.info("Updating and restarting DirectoryDeletingService with interval {} {}" +
" and core pool size {}",
newInterval, TimeUnit.SECONDS.name().toLowerCase(), newCorePoolSize);
shutdown();
setInterval(newInterval, TimeUnit.SECONDS);
setPoolSize(newCorePoolSize);
start();
}

Expand Down
Loading