Skip to content

Commit

Permalink
[fix] [broker] Fix nothing changed after removing dynamic configs (#2…
Browse files Browse the repository at this point in the history
…2673)

(cherry picked from commit ada31a9)
  • Loading branch information
poorbarcode committed May 8, 2024
1 parent 704630b commit 02a8f81
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public Map<String, String> getAllDynamicConfigurations() throws Exception {
@ApiResponse(code = 403, message = "You don't have admin permission to get configuration")})
public List<String> getDynamicConfigurationName() {
validateSuperUserAccess();
return BrokerService.getDynamicConfiguration();
return pulsar().getBrokerService().getDynamicConfiguration();
}

@GET
Expand All @@ -253,11 +253,11 @@ public Map<String, String> getRuntimeConfiguration() {
*/
private synchronized CompletableFuture<Void> persistDynamicConfigurationAsync(
String configName, String configValue) {
if (!BrokerService.validateDynamicConfiguration(configName, configValue)) {
if (!pulsar().getBrokerService().validateDynamicConfiguration(configName, configValue)) {
return FutureUtil
.failedFuture(new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value"));
}
if (BrokerService.isDynamicConfiguration(configName)) {
if (pulsar().getBrokerService().isDynamicConfiguration(configName)) {
return dynamicConfigurationResources().setDynamicConfigurationWithCreateAsync(old -> {
Map<String, String> configurationMap = old.orElseGet(Maps::newHashMap);
configurationMap.put(configName, configValue);
Expand Down Expand Up @@ -451,7 +451,7 @@ private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> read
}

private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
if (!BrokerService.isDynamicConfiguration(configName)) {
if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) {
throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
} else {
return dynamicConfigurationResources().setDynamicConfigurationAsync(old -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
Expand Down Expand Up @@ -220,8 +221,7 @@ public class BrokerService implements Closeable {
private final OrderedExecutor topicOrderedExecutor;
// offline topic backlog cache
private final ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> offlineTopicStatCache;
private static final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
prepareDynamicConfigurationMap();
private final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap;
private final ConcurrentOpenHashMap<String, Consumer<?>> configRegisteredListeners;

private final ConcurrentLinkedQueue<TopicLoadingContext> pendingTopicLoadingQueue;
Expand Down Expand Up @@ -289,6 +289,7 @@ public class BrokerService implements Closeable {

public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
this.pulsar = pulsar;
this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
Expand Down Expand Up @@ -2190,38 +2191,85 @@ private void handlePoliciesUpdates(NamespaceName namespace) {
}

private void handleDynamicConfigurationUpdates() {
pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfigurationAsync()
DynamicConfigurationResources dynamicConfigResources = null;
try {
dynamicConfigResources = pulsar()
.getPulsarResources()
.getDynamicConfigResources();
} catch (Exception e) {
log.warn("Failed to read dynamic broker configuration", e);
}

if (dynamicConfigResources != null) {
dynamicConfigResources.getDynamicConfigurationAsync()
.thenAccept(optMap -> {
// Case some dynamic configs have been removed.
dynamicConfigurationMap.forEach((configKey, fieldWrapper) -> {
boolean configRemoved = !optMap.isPresent() || !optMap.get().containsKey(configKey);
if (fieldWrapper.lastDynamicValue != null && configRemoved) {
configValueChanged(configKey, null);
}
});
// Some configs have been changed.
if (!optMap.isPresent()) {
return;
}
Map<String, String> data = optMap.get();
data.forEach((configKey, value) -> {
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
if (configFieldWrapper == null) {
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
return;
}
Field configField = configFieldWrapper.field;
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
}
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
}
configValueChanged(configKey, value);
});
});
}
}

private void configValueChanged(String configKey, String newValueStr) {
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
if (configFieldWrapper == null) {
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
return;
}
Consumer listener = configRegisteredListeners.get(configKey);
try {
// Convert existingValue and newValue.
final Object existingValue;
final Object newValue;
if (configFieldWrapper.field != null) {
if (StringUtils.isBlank(newValueStr)) {
newValue = configFieldWrapper.defaultValue;
} else {
newValue = FieldParser.value(newValueStr, configFieldWrapper.field);
}
existingValue = configFieldWrapper.field.get(pulsar.getConfiguration());
configFieldWrapper.field.set(pulsar.getConfiguration(), newValue);
} else {
// This case only occurs when it is a customized item.
// Since https://github.com/apache/pulsar/blob/master/pip/pip-300.md has not been cherry-picked, this
// case should never occur.
log.error("Skip update customized dynamic configuration {}/{} in memory, only trigger an event"
+ " listeners. Since PIP-300 has net been cherry-picked, this case should never occur",
configKey, newValueStr);
existingValue = configFieldWrapper.lastDynamicValue;
newValue = newValueStr == null ? configFieldWrapper.defaultValue : newValueStr;
}
// Record the latest dynamic config.
configFieldWrapper.lastDynamicValue = newValueStr;

if (newValueStr == null) {
log.info("Successfully remove the dynamic configuration {}, and revert to the default value",
configKey);
} else {
log.info("Successfully updated configuration {}/{}", configKey, newValueStr);
}

if (listener != null && !Objects.equals(existingValue, newValue)) {
// So far, all config items that related to configuration listeners, their default value is not null.
// And the customized config can be null before.
// So call "listener.accept(null)" is okay.
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}", configKey, e);
}
}

/**
Expand Down Expand Up @@ -2591,6 +2639,9 @@ private void updateManagedLedgerConfig() {
* On notification, listener should first check if config value has been changed and after taking appropriate
* action, listener should update config value with new value if it has been changed (so, next time listener can
* compare values on configMap change).
*
* Note: The new value that the {@param listener} may accept could be a null value.
*
* @param <T>
*
* @param configKey
Expand Down Expand Up @@ -2666,7 +2717,7 @@ public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() {
return delayedDeliveryTrackerFactory;
}

public static List<String> getDynamicConfiguration() {
public List<String> getDynamicConfiguration() {
return dynamicConfigurationMap.keys();
}

Expand All @@ -2679,27 +2730,34 @@ public Map<String, String> getRuntimeConfiguration() {
return configMap;
}

public static boolean isDynamicConfiguration(String key) {
public boolean isDynamicConfiguration(String key) {
return dynamicConfigurationMap.containsKey(key);
}

public static boolean validateDynamicConfiguration(String key, String value) {
public boolean validateDynamicConfiguration(String key, String value) {
if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) {
return dynamicConfigurationMap.get(key).validator.test(value);
}
return true;
}

private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
private ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
if (field.getAnnotation(FieldContext.class).dynamic()) {
dynamicConfigurationMap.put(field.getName(), new ConfigField(field));
try {
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
if (field.getAnnotation(FieldContext.class).dynamic()) {
Object defaultValue = field.get(pulsar.getConfiguration());
dynamicConfigurationMap.put(field.getName(), new ConfigField(field, defaultValue));
}
}
}
} catch (IllegalArgumentException | IllegalAccessException ex) {
// This error never occurs.
log.error("Failed to initialize dynamic configuration map", ex);
throw new RuntimeException(ex);
}
return dynamicConfigurationMap;
}
Expand Down Expand Up @@ -2973,11 +3031,21 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC

private static class ConfigField {
final Field field;

// It is the dynamic config value if set.
// It is null if has does not set a dynamic config, even if the value of "pulsar.config" is present.
volatile String lastDynamicValue;

// The default value of "pulsar.config", which is initialized when the broker is starting.
// After the dynamic config has been removed, revert the config to this default value.
final Object defaultValue;

Predicate<String> validator;

public ConfigField(Field field) {
public ConfigField(Field field, Object defaultValue) {
super();
this.field = field;
this.defaultValue = defaultValue;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -69,4 +70,21 @@ public void TestDeleteInvalidDynamicConfiguration() {
}
}
}

@Test
public void testDeleteIntDynamicConfig() throws PulsarAdminException {
// Record the default value;
int defaultValue = pulsar.getConfig().getMaxConcurrentTopicLoadRequest();
// Set dynamic config.
int newValue = defaultValue + 1000;
admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", newValue + "");
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue);
});
// Verify: it has been reverted to the default value.
admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest");
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), defaultValue);
});
}
}

0 comments on commit 02a8f81

Please sign in to comment.