Skip to content

Commit 3af4ed6

Browse files
mattisonchaoliangyuanpeng
authored andcommitted
[fix][broker] Fix inconsistent topic policy (apache#21231)
1 parent 6df6a99 commit 3af4ed6

30 files changed

+478
-197
lines changed

pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.pulsar.client.api.Schema;
5050
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
5151
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
52+
import org.apache.pulsar.common.util.ObjectMapperFactory;
5253
import org.apache.pulsar.proxy.server.ProxyConfiguration;
5354
import org.apache.pulsar.proxy.server.ProxyService;
5455
import org.slf4j.Logger;
@@ -193,15 +194,17 @@ protected void setup() throws Exception {
193194
conf.setAuthenticationProviders(providers);
194195
conf.setClusterName("test");
195196
conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" + kdc.getRealm()));
196-
197-
super.init();
198-
199-
lookupUrl = new URI(pulsar.getBrokerServiceUrl());
200-
201197
// set admin auth, to verify admin web resources
202198
Map<String, String> clientSaslConfig = new HashMap<>();
203199
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
204200
clientSaslConfig.put("serverType", "broker");
201+
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
202+
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
203+
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
204+
205+
super.init();
206+
207+
lookupUrl = new URI(pulsar.getBrokerServiceUrl());
205208
log.info("set client jaas section name: PulsarClient");
206209
admin = PulsarAdmin.builder()
207210
.serviceHttpUrl(brokerUrl.toString())

pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
5959
import org.apache.pulsar.common.api.AuthData;
6060
import org.apache.pulsar.common.sasl.SaslConstants;
61+
import org.apache.pulsar.common.util.ObjectMapperFactory;
6162
import org.testng.Assert;
6263
import org.testng.annotations.AfterClass;
6364
import org.testng.annotations.AfterMethod;
@@ -186,7 +187,12 @@ protected void setup() throws Exception {
186187
conf.setAuthenticationProviders(providers);
187188
conf.setClusterName("test");
188189
conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + kdc.getRealm()));
189-
190+
Map<String, String> clientSaslConfig = new HashMap<>();
191+
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
192+
clientSaslConfig.put("serverType", "broker");
193+
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
194+
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
195+
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
190196
super.init();
191197

192198
lookupUrl = new URI(pulsar.getWebServiceAddress());
@@ -197,9 +203,6 @@ protected void setup() throws Exception {
197203
.authentication(authSasl));
198204

199205
// set admin auth, to verify admin web resources
200-
Map<String, String> clientSaslConfig = new HashMap<>();
201-
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
202-
clientSaslConfig.put("serverType", "broker");
203206
log.info("set client jaas section name: PulsarClient");
204207
admin = PulsarAdmin.builder()
205208
.serviceHttpUrl(brokerUrl.toString())

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+151-144
Large diffs are not rendered by default.

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java

+73-27
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static java.util.Objects.requireNonNull;
2122
import com.google.common.annotations.VisibleForTesting;
2223
import com.google.common.collect.Sets;
2324
import java.util.HashSet;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.Objects;
28+
import java.util.Optional;
2729
import java.util.concurrent.CompletableFuture;
2830
import java.util.concurrent.ConcurrentHashMap;
2931
import java.util.concurrent.CopyOnWriteArrayList;
@@ -54,6 +56,7 @@
5456
import org.apache.pulsar.common.naming.TopicName;
5557
import org.apache.pulsar.common.policies.data.TopicPolicies;
5658
import org.apache.pulsar.common.util.FutureUtil;
59+
import org.jetbrains.annotations.NotNull;
5760
import org.slf4j.Logger;
5861
import org.slf4j.LoggerFactory;
5962

@@ -78,8 +81,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
7881

7982
private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
8083
readerCaches = new ConcurrentHashMap<>();
81-
@VisibleForTesting
82-
final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();
84+
85+
final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new ConcurrentHashMap<>();
8386

8487
@VisibleForTesting
8588
final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
@@ -219,12 +222,12 @@ public TopicPolicies getTopicPolicies(TopicName topicName,
219222
boolean isGlobal) throws TopicPoliciesCacheNotInitException {
220223
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
221224
NamespaceName namespace = topicName.getNamespaceObject();
222-
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
225+
prepareInitPoliciesCacheAsync(namespace);
223226
}
224227

225228
MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result = new MutablePair<>();
226229
policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> {
227-
if (initialized == null || !initialized) {
230+
if (initialized == null || !initialized.isDone()) {
228231
result.setLeft(new TopicPoliciesCacheNotInitException());
229232
} else {
230233
TopicPolicies topicPolicies =
@@ -242,6 +245,34 @@ public TopicPolicies getTopicPolicies(TopicName topicName,
242245
}
243246
}
244247

248+
@NotNull
249+
@Override
250+
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName,
251+
boolean isGlobal) {
252+
requireNonNull(topicName);
253+
final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
254+
return preparedFuture.thenApply(__ -> {
255+
final TopicPolicies candidatePolicies = isGlobal
256+
? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
257+
: policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
258+
return Optional.ofNullable(candidatePolicies);
259+
});
260+
}
261+
262+
@NotNull
263+
@Override
264+
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName) {
265+
requireNonNull(topicName);
266+
final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
267+
return preparedFuture.thenApply(__ -> {
268+
final TopicPolicies localPolicies = policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
269+
if (localPolicies != null) {
270+
return Optional.of(localPolicies);
271+
}
272+
return Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
273+
});
274+
}
275+
245276
@Override
246277
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
247278
return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
@@ -265,39 +296,48 @@ public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicNa
265296

266297
@Override
267298
public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
268-
CompletableFuture<Void> result = new CompletableFuture<>();
269299
NamespaceName namespace = namespaceBundle.getNamespaceObject();
270300
if (NamespaceService.isHeartbeatNamespace(namespace)) {
271-
result.complete(null);
272-
return result;
301+
return CompletableFuture.completedFuture(null);
273302
}
274303
synchronized (this) {
275304
if (readerCaches.get(namespace) != null) {
276305
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
277-
result.complete(null);
306+
return CompletableFuture.completedFuture(null);
278307
} else {
279-
prepareInitPoliciesCache(namespace, result);
308+
return prepareInitPoliciesCacheAsync(namespace);
280309
}
281310
}
282-
return result;
283311
}
284312

285-
private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
286-
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
287-
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
313+
private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
314+
requireNonNull(namespace);
315+
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
316+
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
288317
createSystemTopicClientWithRetry(namespace);
289318
readerCaches.put(namespace, readerCompletableFuture);
290319
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
291-
readerCompletableFuture.thenAccept(reader -> {
292-
initPolicesCache(reader, result);
293-
result.thenRun(() -> readMorePolicies(reader));
294-
}).exceptionally(ex -> {
295-
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
296-
cleanCacheAndCloseReader(namespace, false);
297-
result.completeExceptionally(ex);
320+
final CompletableFuture<Void> initFuture = readerCompletableFuture
321+
.thenCompose(reader -> {
322+
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
323+
initPolicesCache(reader, stageFuture);
324+
return stageFuture
325+
// Read policies in background
326+
.thenAccept(__ -> readMorePoliciesAsync(reader));
327+
});
328+
initFuture.exceptionally(ex -> {
329+
try {
330+
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
331+
cleanCacheAndCloseReader(namespace, false);
332+
} catch (Throwable cleanupEx) {
333+
// Adding this catch to avoid break callback chain
334+
log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx);
335+
}
298336
return null;
299337
});
300-
}
338+
// let caller know we've got an exception.
339+
return initFuture;
340+
});
301341
}
302342

303343
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
@@ -381,8 +421,7 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
381421
if (log.isDebugEnabled()) {
382422
log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
383423
}
384-
policyCacheInitMap.computeIfPresent(
385-
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
424+
386425
// replay policy message
387426
policiesCache.forEach(((topicName, topicPolicies) -> {
388427
if (listeners.get(topicName) != null) {
@@ -395,6 +434,7 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
395434
}
396435
}
397436
}));
437+
398438
future.complete(null);
399439
}
400440
});
@@ -420,15 +460,21 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean
420460
});
421461
}
422462

423-
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
463+
/**
464+
* This is an async method for the background reader to continue syncing new messages.
465+
*
466+
* Note: You should not do any blocking call here. because it will affect
467+
* #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic.
468+
*/
469+
private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) {
424470
reader.readNextAsync()
425471
.thenAccept(msg -> {
426472
refreshTopicPoliciesCache(msg);
427473
notifyListener(msg);
428474
})
429475
.whenComplete((__, ex) -> {
430476
if (ex == null) {
431-
readMorePolicies(reader);
477+
readMorePoliciesAsync(reader);
432478
} else {
433479
Throwable cause = FutureUtil.unwrapCompletionException(ex);
434480
if (cause instanceof PulsarClientException.AlreadyClosedException) {
@@ -437,7 +483,7 @@ private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
437483
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
438484
} else {
439485
log.warn("Read more topic polices exception, read again.", ex);
440-
readMorePolicies(reader);
486+
readMorePoliciesAsync(reader);
441487
}
442488
}
443489
});
@@ -605,7 +651,7 @@ boolean checkReaderIsCached(NamespaceName namespaceName) {
605651
}
606652

607653
@VisibleForTesting
608-
public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
654+
public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName namespaceName) {
609655
return policyCacheInitMap.get(namespaceName);
610656
}
611657

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java

+41
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.ScheduledExecutorService;
2424
import java.util.concurrent.TimeUnit;
25+
import javax.annotation.Nonnull;
2526
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
2627
import org.apache.pulsar.client.impl.Backoff;
2728
import org.apache.pulsar.client.impl.BackoffBuilder;
@@ -31,6 +32,7 @@
3132
import org.apache.pulsar.common.naming.TopicName;
3233
import org.apache.pulsar.common.policies.data.TopicPolicies;
3334
import org.apache.pulsar.common.util.FutureUtil;
35+
import org.jetbrains.annotations.NotNull;
3436

3537
/**
3638
* Topic policies service.
@@ -109,6 +111,32 @@ default CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetr
109111
return response;
110112
}
111113

114+
/**
115+
* Asynchronously retrieves topic policies.
116+
* This triggers the Pulsar broker's internal client to load policies from the
117+
* system topic `persistent://tenant/namespace/__change_event`.
118+
*
119+
* @param topicName The name of the topic.
120+
* @param isGlobal Indicates if the policies are global.
121+
* @return A CompletableFuture containing an Optional of TopicPolicies.
122+
* @throws NullPointerException If the topicName is null.
123+
*/
124+
@Nonnull
125+
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal);
126+
127+
/**
128+
* Asynchronously retrieves topic policies.
129+
* This triggers the Pulsar broker's internal client to load policies from the
130+
* system topic `persistent://tenant/namespace/__change_event`.
131+
*
132+
* NOTE: If local policies are not available, it will fallback to using topic global policies.
133+
* @param topicName The name of the topic.
134+
* @return A CompletableFuture containing an Optional of TopicPolicies.
135+
* @throws NullPointerException If the topicName is null.
136+
*/
137+
@Nonnull
138+
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName);
139+
112140
/**
113141
* Get policies for a topic without cache async.
114142
* @param topicName topic name
@@ -162,6 +190,19 @@ public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal)
162190
return null;
163191
}
164192

193+
@NotNull
194+
@Override
195+
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName,
196+
boolean isGlobal) {
197+
return CompletableFuture.completedFuture(Optional.empty());
198+
}
199+
200+
@NotNull
201+
@Override
202+
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName) {
203+
return CompletableFuture.completedFuture(Optional.empty());
204+
}
205+
165206
@Override
166207
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
167208
return null;

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public void initPersistentTopics() throws Exception {
127127
@Override
128128
@BeforeMethod
129129
protected void setup() throws Exception {
130+
conf.setTopicLevelPoliciesEnabled(false);
130131
super.internalSetup();
131132
persistentTopics = spy(PersistentTopics.class);
132133
persistentTopics.setServletContext(new MockServletContext());

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Excep
180180
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
181181
//make sure namespace policy reader is fully started.
182182
Awaitility.await().untilAsserted(()-> {
183-
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
183+
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
184184
});
185185

186186
//load the topic.

0 commit comments

Comments
 (0)