diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6e23deaa6fa50..c1b2b9e1da974 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -229,8 +229,7 @@ public class BrokerService implements Closeable { private final OrderedExecutor topicOrderedExecutor; // offline topic backlog cache private final ConcurrentOpenHashMap offlineTopicStatCache; - private final ConcurrentOpenHashMap dynamicConfigurationMap = - prepareDynamicConfigurationMap(); + private final ConcurrentOpenHashMap dynamicConfigurationMap; private final ConcurrentOpenHashMap> configRegisteredListeners; private final ConcurrentLinkedQueue pendingTopicLoadingQueue; @@ -313,6 +312,7 @@ public class BrokerService implements Closeable { public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; + this.dynamicConfigurationMap = prepareDynamicConfigurationMap(); this.brokerPublishRateLimiter = new PublishRateLimiterImpl(pulsar.getMonotonicSnapshotClock()); this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable(); @@ -2496,40 +2496,71 @@ private void handleDynamicConfigurationUpdates() { if (dynamicConfigResources != null) { dynamicConfigResources.getDynamicConfigurationAsync() - .thenAccept(optMap -> { - if (!optMap.isPresent()) { - return; + .thenAccept(optMap -> { + // Case some dynamic configs have been removed. + dynamicConfigurationMap.forEach((configKey, fieldWrapper) -> { + boolean configRemoved = optMap.isEmpty() || !optMap.get().containsKey(configKey); + if (fieldWrapper.lastDynamicValue != null && configRemoved) { + configValueChanged(configKey, null); } - Map data = optMap.get(); - data.forEach((configKey, value) -> { - ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey); - if (configFieldWrapper == null) { - log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey); - return; - } - Field configField = configFieldWrapper.field; - Consumer listener = configRegisteredListeners.get(configKey); - try { - final Object existingValue; - final Object newValue; - if (configField != null) { - newValue = FieldParser.value(data.get(configKey), configField); - existingValue = configField.get(pulsar.getConfiguration()); - configField.set(pulsar.getConfiguration(), newValue); - } else { - newValue = value; - existingValue = configFieldWrapper.customValue; - configFieldWrapper.customValue = newValue == null ? null : String.valueOf(newValue); - } - log.info("Successfully updated configuration {}/{}", configKey, data.get(configKey)); - if (listener != null && !Objects.equals(existingValue, newValue)) { - listener.accept(newValue); - } - } catch (Exception e) { - log.error("Failed to update config {}", configKey, e); - } - }); }); + // Some configs have been changed. + if (!optMap.isPresent()) { + return; + } + Map data = optMap.get(); + data.forEach((configKey, value) -> { + configValueChanged(configKey, value); + }); + }); + } + } + + private void configValueChanged(String configKey, String newValueStr) { + ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey); + if (configFieldWrapper == null) { + log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey); + return; + } + Consumer listener = configRegisteredListeners.get(configKey); + try { + // Convert existingValue and newValue. + final Object existingValue; + final Object newValue; + if (configFieldWrapper.field != null) { + if (StringUtils.isBlank(newValueStr)) { + newValue = configFieldWrapper.defaultValue; + } else { + newValue = FieldParser.value(newValueStr, configFieldWrapper.field); + } + existingValue = configFieldWrapper.field.get(pulsar.getConfiguration()); + configFieldWrapper.field.set(pulsar.getConfiguration(), newValue); + } else { + // This case only occurs when it is a customized item. + // See: https://github.com/apache/pulsar/blob/master/pip/pip-300.md. + log.info("Skip update customized dynamic configuration {}/{} in memory, only trigger an event" + + " listeners.", configKey, newValueStr); + existingValue = configFieldWrapper.lastDynamicValue; + newValue = newValueStr == null ? configFieldWrapper.defaultValue : newValueStr; + } + // Record the latest dynamic config. + configFieldWrapper.lastDynamicValue = newValueStr; + + if (newValueStr == null) { + log.info("Successfully remove the dynamic configuration {}, and revert to the default value", + configKey); + } else { + log.info("Successfully updated configuration {}/{}", configKey, newValueStr); + } + + if (listener != null && !Objects.equals(existingValue, newValue)) { + // So far, all config items that related to configuration listeners, their default value is not null. + // And the customized config can be null before. + // So call "listener.accept(null)" is okay. + listener.accept(newValue); + } + } catch (Exception e) { + log.error("Failed to update config {}", configKey, e); } } @@ -2936,6 +2967,9 @@ private void updateManagedLedgerConfig() { * On notification, listener should first check if config value has been changed and after taking appropriate * action, listener should update config value with new value if it has been changed (so, next time listener can * compare values on configMap change). + * + * Note: The new value that the {@param listener} may accept could be a null value. + * * @param * * @param configKey @@ -3057,16 +3091,23 @@ public boolean validateDynamicConfiguration(String key, String value) { return true; } - private static ConcurrentOpenHashMap prepareDynamicConfigurationMap() { + private ConcurrentOpenHashMap prepareDynamicConfigurationMap() { ConcurrentOpenHashMap dynamicConfigurationMap = ConcurrentOpenHashMap.newBuilder().build(); - for (Field field : ServiceConfiguration.class.getDeclaredFields()) { - if (field != null && field.isAnnotationPresent(FieldContext.class)) { - field.setAccessible(true); - if (field.getAnnotation(FieldContext.class).dynamic()) { - dynamicConfigurationMap.put(field.getName(), new ConfigField(field)); + try { + for (Field field : ServiceConfiguration.class.getDeclaredFields()) { + if (field != null && field.isAnnotationPresent(FieldContext.class)) { + field.setAccessible(true); + if (field.getAnnotation(FieldContext.class).dynamic()) { + Object defaultValue = field.get(pulsar.getConfiguration()); + dynamicConfigurationMap.put(field.getName(), new ConfigField(field, defaultValue)); + } } } + } catch (IllegalArgumentException | IllegalAccessException ex) { + // This error never occurs. + log.error("Failed to initialize dynamic configuration map", ex); + throw new RuntimeException(ex); } return dynamicConfigurationMap; } @@ -3348,19 +3389,25 @@ private static class ConfigField { // field holds the pulsar dynamic configuration. final Field field; - // customValue holds the external dynamic configuration. - volatile String customValue; + // It is the dynamic config value if set. + // It is null if has does not set a dynamic config, even if the value of "pulsar.config" is present. + volatile String lastDynamicValue; + + // The default value of "pulsar.config", which is initialized when the broker is starting. + // After the dynamic config has been removed, revert the config to this default value. + final Object defaultValue; Predicate validator; - public ConfigField(Field field) { + public ConfigField(Field field, Object defaultValue) { super(); this.field = field; + this.defaultValue = defaultValue; } public static ConfigField newCustomConfigField(String customValue) { - ConfigField configField = new ConfigField(null); - configField.customValue = customValue; + ConfigField configField = new ConfigField(null, null); + configField.lastDynamicValue = customValue; return configField; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java index 12f231a4d2ce3..aa7c2d720e353 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java @@ -18,15 +18,18 @@ */ package org.apache.pulsar.broker.admin; +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.fail; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; import org.awaitility.Awaitility; @@ -107,4 +110,69 @@ public void testRegisterCustomDynamicConfiguration() throws PulsarAdminException allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations(); assertThat(allDynamicConfigurations).doesNotContainKey(key); } + + @Test + public void testDeleteStringDynamicConfig() throws PulsarAdminException { + String syncEventTopic = BrokerTestUtil.newUniqueName(SYSTEM_NAMESPACE + "/tp"); + // The default value is null; + Awaitility.await().untilAsserted(() -> { + assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic()); + }); + // Set dynamic config. + admin.brokers().updateDynamicConfiguration("configurationMetadataSyncEventTopic", syncEventTopic); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar.getConfig().getConfigurationMetadataSyncEventTopic(), syncEventTopic); + }); + // Remove dynamic config. + admin.brokers().deleteDynamicConfiguration("configurationMetadataSyncEventTopic"); + Awaitility.await().untilAsserted(() -> { + assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic()); + }); + } + + @Test + public void testDeleteIntDynamicConfig() throws PulsarAdminException { + // Record the default value; + int defaultValue = pulsar.getConfig().getMaxConcurrentTopicLoadRequest(); + // Set dynamic config. + int newValue = defaultValue + 1000; + admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", newValue + ""); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue); + }); + // Verify: it has been reverted to the default value. + admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest"); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), defaultValue); + }); + } + + @Test + public void testDeleteCustomizedDynamicConfig() throws PulsarAdminException { + // Record the default value; + String customizedConfigName = "a123"; + pulsar.getBrokerService().registerCustomDynamicConfiguration(customizedConfigName, v -> true); + + AtomicReference currentValue = new AtomicReference<>(); + pulsar.getBrokerService().registerConfigurationListener(customizedConfigName, v -> { + currentValue.set(v); + }); + + // The default value is null; + Awaitility.await().untilAsserted(() -> { + assertNull(currentValue.get()); + }); + + // Set dynamic config. + admin.brokers().updateDynamicConfiguration(customizedConfigName, "xxx"); + Awaitility.await().untilAsserted(() -> { + assertEquals(currentValue.get(), "xxx"); + }); + + // Remove dynamic config. + admin.brokers().deleteDynamicConfiguration(customizedConfigName); + Awaitility.await().untilAsserted(() -> { + assertNull(currentValue.get()); + }); + } }