Skip to content

Commit

Permalink
[fix] [broker] Fix nothing changed after removing dynamic configs (#2…
Browse files Browse the repository at this point in the history
…2673)

(cherry picked from commit ada31a9)
  • Loading branch information
poorbarcode committed May 8, 2024
1 parent a8cd7be commit 28680d3
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse)
@ApiResponse(code = 403, message = "You don't have admin permission to get configuration")})
public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
.thenAccept(__ -> asyncResponse.resume(BrokerService.getDynamicConfiguration()))
.thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getDynamicConfiguration()))
.exceptionally(ex -> {
LOG.error("[{}] Failed to get all dynamic configuration names.", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -287,11 +287,11 @@ public void getRuntimeConfiguration(@Suspended AsyncResponse asyncResponse) {
*/
private synchronized CompletableFuture<Void> persistDynamicConfigurationAsync(
String configName, String configValue) {
if (!BrokerService.validateDynamicConfiguration(configName, configValue)) {
if (!pulsar().getBrokerService().validateDynamicConfiguration(configName, configValue)) {
return FutureUtil
.failedFuture(new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value"));
}
if (BrokerService.isDynamicConfiguration(configName)) {
if (pulsar().getBrokerService().isDynamicConfiguration(configName)) {
return dynamicConfigurationResources().setDynamicConfigurationWithCreateAsync(old -> {
Map<String, String> configurationMap = old.orElseGet(Maps::newHashMap);
configurationMap.put(configName, configValue);
Expand Down Expand Up @@ -512,7 +512,7 @@ private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> read
}

private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
if (!BrokerService.isDynamicConfiguration(configName)) {
if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) {
throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
} else {
return dynamicConfigurationResources().setDynamicConfigurationAsync(old -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ public class BrokerService implements Closeable {
private final OrderedExecutor topicOrderedExecutor;
// offline topic backlog cache
private final ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> offlineTopicStatCache;
private static final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
prepareDynamicConfigurationMap();
private final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap;
private final ConcurrentOpenHashMap<String, Consumer<?>> configRegisteredListeners;

private final ConcurrentLinkedQueue<TopicLoadingContext> pendingTopicLoadingQueue;
Expand Down Expand Up @@ -299,6 +298,7 @@ public class BrokerService implements Closeable {

public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
this.pulsar = pulsar;
this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
Expand Down Expand Up @@ -2310,39 +2310,75 @@ private void handleDynamicConfigurationUpdates() {
if (dynamicConfigResources != null) {
dynamicConfigResources.getDynamicConfigurationAsync()
.thenAccept(optMap -> {
// Case some dynamic configs have been removed.
dynamicConfigurationMap.forEach((configKey, fieldWrapper) -> {
boolean configRemoved = optMap.isEmpty() || !optMap.get().containsKey(configKey);
if (fieldWrapper.lastDynamicValue != null && configRemoved) {
configValueChanged(configKey, null);
}
});
// Some configs have been changed.
if (!optMap.isPresent()) {
return;
}
Map<String, String> data = optMap.get();
data.forEach((configKey, value) -> {
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
if (configFieldWrapper == null) {
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
return;
}
Field configField = configFieldWrapper.field;
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
}
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
}
configValueChanged(configKey, value);
});
});
}
}

private void configValueChanged(String configKey, String newValueStr) {
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
if (configFieldWrapper == null) {
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
return;
}
Consumer listener = configRegisteredListeners.get(configKey);
try {
// Convert existingValue and newValue.
final Object existingValue;
final Object newValue;
if (configFieldWrapper.field != null) {
if (StringUtils.isBlank(newValueStr)) {
newValue = configFieldWrapper.defaultValue;
} else {
newValue = FieldParser.value(newValueStr, configFieldWrapper.field);
}
existingValue = configFieldWrapper.field.get(pulsar.getConfiguration());
configFieldWrapper.field.set(pulsar.getConfiguration(), newValue);
} else {
// This case only occurs when it is a customized item.
// Since https://github.com/apache/pulsar/blob/master/pip/pip-300.md has not been cherry-picked, this
// case should never occur.
log.error("Skip update customized dynamic configuration {}/{} in memory, only trigger an event"
+ " listeners. Since PIP-300 has net been cherry-picked, this case should never occur",
configKey, newValueStr);
existingValue = configFieldWrapper.lastDynamicValue;
newValue = newValueStr == null ? configFieldWrapper.defaultValue : newValueStr;
}
// Record the latest dynamic config.
configFieldWrapper.lastDynamicValue = newValueStr;

if (newValueStr == null) {
log.info("Successfully remove the dynamic configuration {}, and revert to the default value",
configKey);
} else {
log.info("Successfully updated configuration {}/{}", configKey, newValueStr);
}

if (listener != null && !Objects.equals(existingValue, newValue)) {
// So far, all config items that related to configuration listeners, their default value is not null.
// And the customized config can be null before.
// So call "listener.accept(null)" is okay.
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}", configKey, e);
}
}

/**
* Unloads the namespace bundles if local cluster is not part of replication-cluster list into the namespace.
* So, broker that owns the bundle and doesn't receive the zk-watch will unload the namespace.
Expand Down Expand Up @@ -2748,6 +2784,9 @@ private void updateManagedLedgerConfig() {
* On notification, listener should first check if config value has been changed and after taking appropriate
* action, listener should update config value with new value if it has been changed (so, next time listener can
* compare values on configMap change).
*
* Note: The new value that the {@param listener} may accept could be a null value.
*
* @param <T>
*
* @param configKey
Expand Down Expand Up @@ -2838,7 +2877,7 @@ public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() {
return delayedDeliveryTrackerFactory;
}

public static List<String> getDynamicConfiguration() {
public List<String> getDynamicConfiguration() {
return dynamicConfigurationMap.keys();
}

Expand All @@ -2851,27 +2890,34 @@ public Map<String, String> getRuntimeConfiguration() {
return configMap;
}

public static boolean isDynamicConfiguration(String key) {
public boolean isDynamicConfiguration(String key) {
return dynamicConfigurationMap.containsKey(key);
}

public static boolean validateDynamicConfiguration(String key, String value) {
public boolean validateDynamicConfiguration(String key, String value) {
if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) {
return dynamicConfigurationMap.get(key).validator.test(value);
}
return true;
}

private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
private ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
if (field.getAnnotation(FieldContext.class).dynamic()) {
dynamicConfigurationMap.put(field.getName(), new ConfigField(field));
try {
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
if (field.getAnnotation(FieldContext.class).dynamic()) {
Object defaultValue = field.get(pulsar.getConfiguration());
dynamicConfigurationMap.put(field.getName(), new ConfigField(field, defaultValue));
}
}
}
} catch (IllegalArgumentException | IllegalAccessException ex) {
// This error never occurs.
log.error("Failed to initialize dynamic configuration map", ex);
throw new RuntimeException(ex);
}
return dynamicConfigurationMap;
}
Expand Down Expand Up @@ -3145,11 +3191,21 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC

private static class ConfigField {
final Field field;

// It is the dynamic config value if set.
// It is null if has does not set a dynamic config, even if the value of "pulsar.config" is present.
volatile String lastDynamicValue;

// The default value of "pulsar.config", which is initialized when the broker is starting.
// After the dynamic config has been removed, revert the config to this default value.
final Object defaultValue;

Predicate<String> validator;

public ConfigField(Field field) {
public ConfigField(Field field, Object defaultValue) {
super();
this.field = field;
this.defaultValue = defaultValue;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
import java.util.Map;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -69,4 +73,40 @@ public void TestDeleteInvalidDynamicConfiguration() {
}
}
}

@Test
public void testDeleteStringDynamicConfig() throws PulsarAdminException {
String syncEventTopic = BrokerTestUtil.newUniqueName(SYSTEM_NAMESPACE + "/tp");
// The default value is null;
Awaitility.await().untilAsserted(() -> {
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
});
// Set dynamic config.
admin.brokers().updateDynamicConfiguration("configurationMetadataSyncEventTopic", syncEventTopic);
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getConfigurationMetadataSyncEventTopic(), syncEventTopic);
});
// Remove dynamic config.
admin.brokers().deleteDynamicConfiguration("configurationMetadataSyncEventTopic");
Awaitility.await().untilAsserted(() -> {
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
});
}

@Test
public void testDeleteIntDynamicConfig() throws PulsarAdminException {
// Record the default value;
int defaultValue = pulsar.getConfig().getMaxConcurrentTopicLoadRequest();
// Set dynamic config.
int newValue = defaultValue + 1000;
admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", newValue + "");
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue);
});
// Verify: it has been reverted to the default value.
admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest");
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), defaultValue);
});
}
}

0 comments on commit 28680d3

Please sign in to comment.