Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,11 +43,12 @@ public abstract class BackgroundService {

// Executor to launch child tasks
private final ScheduledThreadPoolExecutor exec;
private volatile ScheduledFuture<?> scheduledHandle;
private final ThreadGroup threadGroup;
private final String serviceName;
private final long interval;
private long interval;
private final long serviceTimeoutInNanos;
private final TimeUnit unit;
private TimeUnit unit;
private final PeriodicalTask service;

public BackgroundService(String serviceName, long interval,
Expand Down Expand Up @@ -103,8 +105,25 @@ public void runPeriodicalTaskNow() throws Exception {
}

// start service
public void start() {
exec.scheduleWithFixedDelay(service, 0, interval, unit);
public synchronized void start() {
if (scheduledHandle != null && !scheduledHandle.isCancelled()) {
LOG.warn("Background service {} is already running", serviceName);
return;
}
scheduledHandle = exec.scheduleWithFixedDelay(service, 0, interval, unit);
}

protected synchronized void setInterval(long newInterval, TimeUnit newUnit) {
this.interval = newInterval;
this.unit = newUnit;
}

public synchronized void stop() {
LOG.info("Stopping {}", serviceName);
if (scheduledHandle != null) {
scheduledHandle.cancel(false); // don't interrupt running tasks
scheduledHandle = null;
}
}

public abstract BackgroundTaskQueue getTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ public String getNamespace() {
.register(REPLICATION_STREAMS_LIMIT_KEY,
this::reconfigReplicationStreamsLimit);

reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> {
if (status.getStatus() != null && !status.getStatus().isEmpty()) {
LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size());
} else {
LOG.info("Reconfiguration complete. No properties were changed.");
}
});

datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, conf,
dnCertClient, secretKeyClient, this::terminateDatanode,
reconfigurationHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.conf;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.ConfigRedactor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Reconfigurable;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class to support dynamic reconfiguration of configuration properties at runtime.
*/
public abstract class ReconfigurableBase extends Configured implements Reconfigurable {
private static final Logger LOG = LoggerFactory.getLogger(ReconfigurableBase.class);
private final ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil();
private Thread reconfigThread = null;
private volatile boolean shouldRun = true;
private final Object reconfigLock = new Object();
private long startTime = 0L;
private long endTime = 0L;
private Map<ReconfigurationUtil.PropertyChange, Optional<String>> status = null;
private final Collection<Consumer<ReconfigurationTaskStatus>> reconfigurationCompleteCallbacks = new ArrayList<>();

public ReconfigurableBase(Configuration conf) {
super(conf == null ? new Configuration() : conf);
}

protected abstract Configuration getNewConf();

@VisibleForTesting
public Collection<ReconfigurationUtil.PropertyChange> getChangedProperties(Configuration newConf,
Configuration oldConf) {
return this.reconfigurationUtil.parseChangedProperties(newConf, oldConf);
}

public void startReconfigurationTask() throws IOException {
synchronized (this.reconfigLock) {
String errorMessage;
if (!this.shouldRun) {
errorMessage = "The server is stopped.";
LOG.warn(errorMessage);
throw new IOException(errorMessage);
} else if (this.reconfigThread != null) {
errorMessage = "Another reconfiguration task is running.";
LOG.warn(errorMessage);
throw new IOException(errorMessage);
} else {
this.reconfigThread = new ReconfigurationThread(this);
this.reconfigThread.setDaemon(true);
this.reconfigThread.setName("Reconfiguration Task");
this.reconfigThread.start();
this.startTime = Time.now();
}
}
}

public ReconfigurationTaskStatus getReconfigurationTaskStatus() {
synchronized (this.reconfigLock) {
return this.reconfigThread != null ? new ReconfigurationTaskStatus(this.startTime, 0L, null) :
new ReconfigurationTaskStatus(this.startTime, this.endTime, this.status);
}
}

public void shutdownReconfigurationTask() {
Thread tempThread;
synchronized (this.reconfigLock) {
this.shouldRun = false;
if (this.reconfigThread == null) {
return;
}

tempThread = this.reconfigThread;
this.reconfigThread = null;
}

try {
tempThread.join();
} catch (InterruptedException ignored) {
}

}

@Override
public final void reconfigureProperty(String property, String newVal) throws ReconfigurationException {
if (this.isPropertyReconfigurable(property)) {
LOG.info("changing property " + property + " to " + newVal);
synchronized (this.getConf()) {
this.getConf().get(property);
String effectiveValue = this.reconfigurePropertyImpl(property, newVal);
if (newVal != null) {
this.getConf().set(property, effectiveValue);
} else {
this.getConf().unset(property);
}

}
} else {
throw new ReconfigurationException(property, newVal, this.getConf().get(property));
}
}

@Override
public abstract Collection<String> getReconfigurableProperties();

@Override
public boolean isPropertyReconfigurable(String property) {
return this.getReconfigurableProperties().contains(property);
}

protected abstract String reconfigurePropertyImpl(String var1, String var2) throws ReconfigurationException;

private static class ReconfigurationThread extends Thread {
private final ReconfigurableBase parent;

ReconfigurationThread(ReconfigurableBase base) {
this.parent = base;
}

@Override
public void run() {
LOG.info("Starting reconfiguration task.");
Configuration oldConf = this.parent.getConf();
Configuration newConf = this.parent.getNewConf();
Collection<ReconfigurationUtil.PropertyChange> changes = this.parent.getChangedProperties(newConf, oldConf);
Map<ReconfigurationUtil.PropertyChange, Optional<String>> results = Maps.newHashMap();
ConfigRedactor oldRedactor = new ConfigRedactor(oldConf);
ConfigRedactor newRedactor = new ConfigRedactor(newConf);

for (ReconfigurationUtil.PropertyChange change : changes) {
String errorMessage = null;
String oldValRedacted = oldRedactor.redact(change.prop, change.oldVal);
String newValRedacted = newRedactor.redact(change.prop, change.newVal);
if (!this.parent.isPropertyReconfigurable(change.prop)) {
LOG.info(String.format("Property %s is not configurable: old value: %s, new value: %s",
change.prop, oldValRedacted, newValRedacted));
} else {
LOG.info("Change property: " + change.prop + " from \"" +
(change.oldVal == null ? "<default>" : oldValRedacted) + "\" to \"" +
(change.newVal == null ? "<default>" : newValRedacted) + "\".");

try {
String effectiveValue = this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
if (change.newVal != null) {
oldConf.set(change.prop, effectiveValue);
} else {
oldConf.unset(change.prop);
}
} catch (ReconfigurationException reconfException) {
Throwable cause = reconfException.getCause();
errorMessage = cause == null ? reconfException.getMessage() : cause.getMessage();
LOG.error("Failed to reconfigure property {}: {}", change.prop, errorMessage, reconfException);
}

results.put(change, Optional.ofNullable(errorMessage));
}
}

synchronized (this.parent.reconfigLock) {
this.parent.endTime = Time.now();
this.parent.status = Collections.unmodifiableMap(results);
this.parent.reconfigThread = null;

LOG.info("Reconfiguration completed. {} properties were updated.", results.size());

for (Consumer<ReconfigurationTaskStatus> callback : parent.reconfigurationCompleteCallbacks) {
try {
callback.accept(parent.getReconfigurationTaskStatus());
} catch (Exception e) {
LOG.warn("Reconfiguration complete callback threw exception", e);
}
}
}
}
}

public void addReconfigurationCompleteCallback(Consumer<ReconfigurationTaskStatus> callback) {
synchronized (reconfigLock) {
this.reconfigurationCompleteCallbacks.add(callback);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.conf;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;

/**
* Callback interface to handle configuration changes after a reconfiguration task completes.
*/
@FunctionalInterface
public interface ReconfigurationChangeCallback {
void onPropertiesChanged(Map<String, Boolean> changedKeys, Configuration newConf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.UnaryOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurableBase;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil;
import org.apache.hadoop.hdds.protocol.ReconfigureProtocol;
import org.apache.ratis.util.function.CheckedConsumer;

Expand All @@ -47,11 +49,47 @@ public class ReconfigurationHandler extends ReconfigurableBase
private final Map<String, UnaryOperator<String>> properties =
new ConcurrentHashMap<>();

private final List<ReconfigurationChangeCallback> completeCallbacks = new ArrayList<>();
private BiConsumer<ReconfigurationTaskStatus, Configuration> reconfigurationStatusListener;

public void registerCompleteCallback(ReconfigurationChangeCallback callback) {
completeCallbacks.add(callback);
}

public void setReconfigurationCompleteCallback(BiConsumer<ReconfigurationTaskStatus, Configuration>
statusListener) {
this.reconfigurationStatusListener = statusListener;
}

private void triggerCompleteCallbacks(ReconfigurationTaskStatus status, Configuration newConf) {
if (status.getStatus() != null && !status.getStatus().isEmpty()) {
Map<String, Boolean> changedKeys = new HashMap<>();
for (ReconfigurationUtil.PropertyChange change : status.getStatus().keySet()) {
boolean deleted = change.newVal == null;
changedKeys.put(change.prop, !deleted);
}
for (ReconfigurationChangeCallback callback : completeCallbacks) {
callback.onPropertiesChanged(changedKeys, newConf);
}
}

if (reconfigurationStatusListener != null) {
reconfigurationStatusListener.accept(status, newConf);
}
}

public ReconfigurationHandler(String name, OzoneConfiguration config,
CheckedConsumer<String, IOException> requireAdminPrivilege) {
super(config);
this.name = name;
this.requireAdminPrivilege = requireAdminPrivilege;

// Register callback on reconfiguration complete
addReconfigurationCompleteCallback(status -> {
Configuration newConf = getNewConf();
triggerCompleteCallbacks(status, newConf);
});

}

public ReconfigurationHandler register(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,14 @@ private StorageContainerManager(OzoneConfiguration conf,
.register(OZONE_READONLY_ADMINISTRATORS,
this::reconfOzoneReadOnlyAdmins);

reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> {
if (status.getStatus() != null && !status.getStatus().isEmpty()) {
LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size());
} else {
LOG.info("Reconfiguration complete. No properties were changed.");
}
});

initializeSystemManagers(conf, configurator);

if (isSecretKeyEnable(securityConfig)) {
Expand Down
Loading