Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Fix nothing changed after removing dynamic configs #22673

Merged
merged 3 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ public class BrokerService implements Closeable {
private final OrderedExecutor topicOrderedExecutor;
// offline topic backlog cache
private final ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> offlineTopicStatCache;
private final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
prepareDynamicConfigurationMap();
private final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap;
private final ConcurrentOpenHashMap<String, Consumer<?>> configRegisteredListeners;

private final ConcurrentLinkedQueue<TopicLoadingContext> pendingTopicLoadingQueue;
Expand Down Expand Up @@ -313,6 +312,7 @@ public class BrokerService implements Closeable {

public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
this.pulsar = pulsar;
this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
this.brokerPublishRateLimiter = new PublishRateLimiterImpl(pulsar.getMonotonicSnapshotClock());
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
Expand Down Expand Up @@ -2488,40 +2488,71 @@ private void handleDynamicConfigurationUpdates() {

if (dynamicConfigResources != null) {
dynamicConfigResources.getDynamicConfigurationAsync()
.thenAccept(optMap -> {
if (!optMap.isPresent()) {
return;
.thenAccept(optMap -> {
// Case some dynamic configs have been removed.
dynamicConfigurationMap.forEach((configKey, fieldWrapper) -> {
boolean configRemoved = optMap.isEmpty() || !optMap.get().containsKey(configKey);
if (fieldWrapper.lastDynamicValue != null && configRemoved) {
configValueChanged(configKey, null);
}
Map<String, String> data = optMap.get();
data.forEach((configKey, value) -> {
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
if (configFieldWrapper == null) {
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
return;
}
Field configField = configFieldWrapper.field;
Consumer listener = configRegisteredListeners.get(configKey);
try {
final Object existingValue;
final Object newValue;
if (configField != null) {
newValue = FieldParser.value(data.get(configKey), configField);
existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
} else {
newValue = value;
existingValue = configFieldWrapper.customValue;
configFieldWrapper.customValue = newValue == null ? null : String.valueOf(newValue);
}
log.info("Successfully updated configuration {}/{}", configKey, data.get(configKey));
if (listener != null && !Objects.equals(existingValue, newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}", configKey, e);
}
});
});
// Some configs have been changed.
if (!optMap.isPresent()) {
return;
}
Map<String, String> data = optMap.get();
data.forEach((configKey, value) -> {
configValueChanged(configKey, value);
});
});
}
}

private void configValueChanged(String configKey, String newValueStr) {
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
if (configFieldWrapper == null) {
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
return;
}
Consumer listener = configRegisteredListeners.get(configKey);
try {
// Convert existingValue and newValue.
final Object existingValue;
final Object newValue;
if (configFieldWrapper.field != null) {
if (StringUtils.isBlank(newValueStr)) {
newValue = configFieldWrapper.defaultValue;
} else {
newValue = FieldParser.value(newValueStr, configFieldWrapper.field);
}
existingValue = configFieldWrapper.field.get(pulsar.getConfiguration());
configFieldWrapper.field.set(pulsar.getConfiguration(), newValue);
} else {
// This case only occurs when it is a customized item.
// See: https://github.com/apache/pulsar/blob/master/pip/pip-300.md.
log.info("Skip update customized dynamic configuration {}/{} in memory, only trigger an event"
+ " listeners.", configKey, newValueStr);
existingValue = configFieldWrapper.lastDynamicValue;
newValue = newValueStr == null ? configFieldWrapper.defaultValue : newValueStr;
}
// Record the latest dynamic config.
configFieldWrapper.lastDynamicValue = newValueStr;

if (newValueStr == null) {
log.info("Successfully remove the dynamic configuration {}, and revert to the default value",
configKey);
} else {
log.info("Successfully updated configuration {}/{}", configKey, newValueStr);
}

if (listener != null && !Objects.equals(existingValue, newValue)) {
// So far, all config items that related to configuration listeners, their default value is not null.
// And the customized config can be null before.
// So call "listener.accept(null)" is okay.
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}", configKey, e);
}
}

Expand Down Expand Up @@ -2928,6 +2959,9 @@ private void updateManagedLedgerConfig() {
* On notification, listener should first check if config value has been changed and after taking appropriate
* action, listener should update config value with new value if it has been changed (so, next time listener can
* compare values on configMap change).
*
* Note: The new value that the {@param listener} may accept could be a null value.
*
* @param <T>
*
* @param configKey
Expand Down Expand Up @@ -3049,16 +3083,23 @@ public boolean validateDynamicConfiguration(String key, String value) {
return true;
}

private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
private ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
if (field.getAnnotation(FieldContext.class).dynamic()) {
dynamicConfigurationMap.put(field.getName(), new ConfigField(field));
try {
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
if (field.getAnnotation(FieldContext.class).dynamic()) {
Object defaultValue = field.get(pulsar.getConfiguration());
dynamicConfigurationMap.put(field.getName(), new ConfigField(field, defaultValue));
}
}
}
} catch (IllegalArgumentException | IllegalAccessException ex) {
// This error never occurs.
log.error("Failed to initialize dynamic configuration map", ex);
throw new RuntimeException(ex);
}
return dynamicConfigurationMap;
}
Expand Down Expand Up @@ -3340,19 +3381,25 @@ private static class ConfigField {
// field holds the pulsar dynamic configuration.
final Field field;

// customValue holds the external dynamic configuration.
volatile String customValue;
// It is the dynamic config value if set.
// It is null if has does not set a dynamic config, even if the value of "pulsar.config" is present.
volatile String lastDynamicValue;

// The default value of "pulsar.config", which is initialized when the broker is starting.
// After the dynamic config has been removed, revert the config to this default value.
final Object defaultValue;

Predicate<String> validator;

public ConfigField(Field field) {
public ConfigField(Field field, Object defaultValue) {
super();
this.field = field;
this.defaultValue = defaultValue;
}

public static ConfigField newCustomConfigField(String customValue) {
ConfigField configField = new ConfigField(null);
configField.customValue = customValue;
ConfigField configField = new ConfigField(null, null);
configField.lastDynamicValue = customValue;
return configField;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
*/
package org.apache.pulsar.broker.admin;

import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -107,4 +110,69 @@ public void testRegisterCustomDynamicConfiguration() throws PulsarAdminException
allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations();
assertThat(allDynamicConfigurations).doesNotContainKey(key);
}

@Test
public void testDeleteStringDynamicConfig() throws PulsarAdminException {
String syncEventTopic = BrokerTestUtil.newUniqueName(SYSTEM_NAMESPACE + "/tp");
// The default value is null;
Awaitility.await().untilAsserted(() -> {
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
});
// Set dynamic config.
admin.brokers().updateDynamicConfiguration("configurationMetadataSyncEventTopic", syncEventTopic);
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getConfigurationMetadataSyncEventTopic(), syncEventTopic);
});
// Remove dynamic config.
admin.brokers().deleteDynamicConfiguration("configurationMetadataSyncEventTopic");
Awaitility.await().untilAsserted(() -> {
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
});
}

@Test
public void testDeleteIntDynamicConfig() throws PulsarAdminException {
// Record the default value;
int defaultValue = pulsar.getConfig().getMaxConcurrentTopicLoadRequest();
// Set dynamic config.
int newValue = defaultValue + 1000;
admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", newValue + "");
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue);
});
// Verify: it has been reverted to the default value.
admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest");
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), defaultValue);
});
}

@Test
public void testDeleteCustomizedDynamicConfig() throws PulsarAdminException {
// Record the default value;
String customizedConfigName = "a123";
pulsar.getBrokerService().registerCustomDynamicConfiguration(customizedConfigName, v -> true);

AtomicReference<Object> currentValue = new AtomicReference<>();
pulsar.getBrokerService().registerConfigurationListener(customizedConfigName, v -> {
currentValue.set(v);
});

// The default value is null;
Awaitility.await().untilAsserted(() -> {
assertNull(currentValue.get());
});

// Set dynamic config.
admin.brokers().updateDynamicConfiguration(customizedConfigName, "xxx");
Awaitility.await().untilAsserted(() -> {
assertEquals(currentValue.get(), "xxx");
});

// Remove dynamic config.
admin.brokers().deleteDynamicConfiguration(customizedConfigName);
Awaitility.await().untilAsserted(() -> {
assertNull(currentValue.get());
});
}
}
Loading