From ad0fb03e818bf05df299e3b6836531767c195da8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 8 May 2024 21:41:22 +0800 Subject: [PATCH] [fix] [broker] Fix nothing changed after removing dynamic configs (#22673) (cherry picked from commit ada31a96db9aabbb071f65229be746e61f954696) --- .../pulsar/broker/admin/impl/BrokersBase.java | 8 +- .../pulsar/broker/service/BrokerService.java | 140 +++++++++++++----- .../AdminApiDynamicConfigurationsTest.java | 18 +++ 3 files changed, 126 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 5f73bc949defc..57fdf6f284974 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -230,7 +230,7 @@ public Map getAllDynamicConfigurations() throws Exception { @ApiResponse(code = 403, message = "You don't have admin permission to get configuration")}) public List getDynamicConfigurationName() { validateSuperUserAccess(); - return BrokerService.getDynamicConfiguration(); + return pulsar().getBrokerService().getDynamicConfiguration(); } @GET @@ -253,11 +253,11 @@ public Map getRuntimeConfiguration() { */ private synchronized CompletableFuture persistDynamicConfigurationAsync( String configName, String configValue) { - if (!BrokerService.validateDynamicConfiguration(configName, configValue)) { + if (!pulsar().getBrokerService().validateDynamicConfiguration(configName, configValue)) { return FutureUtil .failedFuture(new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value")); } - if (BrokerService.isDynamicConfiguration(configName)) { + if (pulsar().getBrokerService().isDynamicConfiguration(configName)) { return dynamicConfigurationResources().setDynamicConfigurationWithCreateAsync(old -> { Map configurationMap = old.orElseGet(Maps::newHashMap); configurationMap.put(configName, configValue); @@ -451,7 +451,7 @@ private CompletableFuture healthCheckRecursiveReadNext(Reader read } private CompletableFuture internalDeleteDynamicConfigurationOnMetadataAsync(String configName) { - if (!BrokerService.isDynamicConfiguration(configName)) { + if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) { throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration"); } else { return dynamicConfigurationResources().setDynamicConfigurationAsync(old -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d21f397ec38dd..6612f26946604 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -107,6 +107,7 @@ import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.resources.DynamicConfigurationResources; import org.apache.pulsar.broker.resources.LocalPoliciesResources; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; @@ -222,8 +223,7 @@ public class BrokerService implements Closeable { private final OrderedExecutor topicOrderedExecutor; // offline topic backlog cache private final ConcurrentOpenHashMap offlineTopicStatCache; - private static final ConcurrentOpenHashMap dynamicConfigurationMap = - prepareDynamicConfigurationMap(); + private final ConcurrentOpenHashMap dynamicConfigurationMap; private final ConcurrentOpenHashMap> configRegisteredListeners; private final ConcurrentLinkedQueue pendingTopicLoadingQueue; @@ -293,6 +293,7 @@ public class BrokerService implements Closeable { public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; + this.dynamicConfigurationMap = prepareDynamicConfigurationMap(); this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable(); this.managedLedgerFactory = pulsar.getManagedLedgerFactory(); @@ -2244,38 +2245,85 @@ private void handlePoliciesUpdates(NamespaceName namespace) { } private void handleDynamicConfigurationUpdates() { - pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfigurationAsync() + DynamicConfigurationResources dynamicConfigResources = null; + try { + dynamicConfigResources = pulsar() + .getPulsarResources() + .getDynamicConfigResources(); + } catch (Exception e) { + log.warn("Failed to read dynamic broker configuration", e); + } + + if (dynamicConfigResources != null) { + dynamicConfigResources.getDynamicConfigurationAsync() .thenAccept(optMap -> { + // Case some dynamic configs have been removed. + dynamicConfigurationMap.forEach((configKey, fieldWrapper) -> { + boolean configRemoved = !optMap.isPresent() || !optMap.get().containsKey(configKey); + if (fieldWrapper.lastDynamicValue != null && configRemoved) { + configValueChanged(configKey, null); + } + }); + // Some configs have been changed. if (!optMap.isPresent()) { return; } Map data = optMap.get(); data.forEach((configKey, value) -> { - ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey); - if (configFieldWrapper == null) { - log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey); - return; - } - Field configField = configFieldWrapper.field; - Object newValue = FieldParser.value(data.get(configKey), configField); - if (configField != null) { - Consumer listener = configRegisteredListeners.get(configKey); - try { - Object existingValue = configField.get(pulsar.getConfiguration()); - configField.set(pulsar.getConfiguration(), newValue); - log.info("Successfully updated configuration {}/{}", configKey, - data.get(configKey)); - if (listener != null && !existingValue.equals(newValue)) { - listener.accept(newValue); - } - } catch (Exception e) { - log.error("Failed to update config {}/{}", configKey, newValue); - } - } else { - log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue); - } + configValueChanged(configKey, value); }); }); + } + } + + private void configValueChanged(String configKey, String newValueStr) { + ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey); + if (configFieldWrapper == null) { + log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey); + return; + } + Consumer listener = configRegisteredListeners.get(configKey); + try { + // Convert existingValue and newValue. + final Object existingValue; + final Object newValue; + if (configFieldWrapper.field != null) { + if (StringUtils.isBlank(newValueStr)) { + newValue = configFieldWrapper.defaultValue; + } else { + newValue = FieldParser.value(newValueStr, configFieldWrapper.field); + } + existingValue = configFieldWrapper.field.get(pulsar.getConfiguration()); + configFieldWrapper.field.set(pulsar.getConfiguration(), newValue); + } else { + // This case only occurs when it is a customized item. + // Since https://github.com/apache/pulsar/blob/master/pip/pip-300.md has not been cherry-picked, this + // case should never occur. + log.error("Skip update customized dynamic configuration {}/{} in memory, only trigger an event" + + " listeners. Since PIP-300 has net been cherry-picked, this case should never occur", + configKey, newValueStr); + existingValue = configFieldWrapper.lastDynamicValue; + newValue = newValueStr == null ? configFieldWrapper.defaultValue : newValueStr; + } + // Record the latest dynamic config. + configFieldWrapper.lastDynamicValue = newValueStr; + + if (newValueStr == null) { + log.info("Successfully remove the dynamic configuration {}, and revert to the default value", + configKey); + } else { + log.info("Successfully updated configuration {}/{}", configKey, newValueStr); + } + + if (listener != null && !Objects.equals(existingValue, newValue)) { + // So far, all config items that related to configuration listeners, their default value is not null. + // And the customized config can be null before. + // So call "listener.accept(null)" is okay. + listener.accept(newValue); + } + } catch (Exception e) { + log.error("Failed to update config {}", configKey, e); + } } /** @@ -2645,6 +2693,9 @@ private void updateManagedLedgerConfig() { * On notification, listener should first check if config value has been changed and after taking appropriate * action, listener should update config value with new value if it has been changed (so, next time listener can * compare values on configMap change). + * + * Note: The new value that the {@param listener} may accept could be a null value. + * * @param * * @param configKey @@ -2720,7 +2771,7 @@ public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() { return delayedDeliveryTrackerFactory; } - public static List getDynamicConfiguration() { + public List getDynamicConfiguration() { return dynamicConfigurationMap.keys(); } @@ -2733,27 +2784,34 @@ public Map getRuntimeConfiguration() { return configMap; } - public static boolean isDynamicConfiguration(String key) { + public boolean isDynamicConfiguration(String key) { return dynamicConfigurationMap.containsKey(key); } - public static boolean validateDynamicConfiguration(String key, String value) { + public boolean validateDynamicConfiguration(String key, String value) { if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) { return dynamicConfigurationMap.get(key).validator.test(value); } return true; } - private static ConcurrentOpenHashMap prepareDynamicConfigurationMap() { + private ConcurrentOpenHashMap prepareDynamicConfigurationMap() { ConcurrentOpenHashMap dynamicConfigurationMap = ConcurrentOpenHashMap.newBuilder().build(); - for (Field field : ServiceConfiguration.class.getDeclaredFields()) { - if (field != null && field.isAnnotationPresent(FieldContext.class)) { - field.setAccessible(true); - if (field.getAnnotation(FieldContext.class).dynamic()) { - dynamicConfigurationMap.put(field.getName(), new ConfigField(field)); + try { + for (Field field : ServiceConfiguration.class.getDeclaredFields()) { + if (field != null && field.isAnnotationPresent(FieldContext.class)) { + field.setAccessible(true); + if (field.getAnnotation(FieldContext.class).dynamic()) { + Object defaultValue = field.get(pulsar.getConfiguration()); + dynamicConfigurationMap.put(field.getName(), new ConfigField(field, defaultValue)); + } } } + } catch (IllegalArgumentException | IllegalAccessException ex) { + // This error never occurs. + log.error("Failed to initialize dynamic configuration map", ex); + throw new RuntimeException(ex); } return dynamicConfigurationMap; } @@ -3027,11 +3085,21 @@ public void unblockDispatchersOnUnAckMessages(List validator; - public ConfigField(Field field) { + public ConfigField(Field field, Object defaultValue) { super(); this.field = field; + this.defaultValue = defaultValue; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java index 3f913c7931f18..0cfa0721c2dae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -69,4 +70,21 @@ public void TestDeleteInvalidDynamicConfiguration() { } } } + + @Test + public void testDeleteIntDynamicConfig() throws PulsarAdminException { + // Record the default value; + int defaultValue = pulsar.getConfig().getMaxConcurrentTopicLoadRequest(); + // Set dynamic config. + int newValue = defaultValue + 1000; + admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", newValue + ""); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue); + }); + // Verify: it has been reverted to the default value. + admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest"); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), defaultValue); + }); + } }