Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make ExtensibleLoadManagerImpl's broker filter pure async #20666

Merged

Conversation

Demogorgon314
Copy link
Member

Motivation

Currently, the filter has some block methods, e.g.,getIsolationDataPolicies method. When calling the block method in the metadata-store thread, it might cause a deadlock. For more detail, see: #20130

"configuration-metadata-store-186-1@14606" prio=5 tid=0x175 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.pulsar.broker.resources.BaseResources.get(BaseResources.java:88)
	  at org.apache.pulsar.broker.resources.NamespaceResources$IsolationPolicyResources.getIsolationDataPolicies(NamespaceResources.java:177)
	  at org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies.getIsolationPolicies(SimpleResourceAllocationPolicies.java:49)
	  at org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies.areIsolationPoliciesPresent(SimpleResourceAllocationPolicies.java:59)
	  at org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.applyNamespacePolicies(LoadManagerShared.java:103)
	  at org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper.applyIsolationPolicies(IsolationPoliciesHelper.java:52)
	  at org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter.filter(BrokerIsolationPoliciesFilter.java:53)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$selectAsync$12(ExtensibleLoadManagerImpl.java:394)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl$$Lambda$1347/0x0000000801645428.apply(Unknown Source:-1)
	  at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
	  at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:385)
	  at java.lang.invoke.LambdaForm$DMH/0x0000000800e1d000.invokeVirtual(LambdaForm$DMH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800fac800.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800f46000.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800c14400.invokeExact_MT(LambdaForm$MH:-1)
	  at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor$Dispatcher$ByteBuddy$MBY4HIiM.invokeWithArguments(Unknown Source:-1)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor.invoke(InstrumentationMemberAccessor.java:239)
	  at org.mockito.internal.util.reflection.ModuleMemberAccessor.invoke(ModuleMemberAccessor.java:55)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.tryInvoke(MockMethodAdvice.java:333)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.access$500(MockMethodAdvice.java:60)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice$RealMethodCall.invoke(MockMethodAdvice.java:253)
	  at org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:142)
	  at org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:45)
	  at org.mockito.Answers.answer(Answers.java:99)
	  at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:110)
	  at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	  at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
	  at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:383)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$6(ExtensibleLoadManagerImpl.java:336)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl$$Lambda$1343/0x0000000801644b18.apply(Unknown Source:-1)
	  at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
	  at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$10(ExtensibleLoadManagerImpl.java:333)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl$$Lambda$1342/0x00000008016446c8.apply(Unknown Source:-1)
	  at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:409)
	  at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:243)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:327)
	  at java.lang.invoke.LambdaForm$DMH/0x00000008013ac800.invokeVirtual(LambdaForm$DMH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x00000008013ad400.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800f44800.invoke(LambdaForm$MH:-1)
	  at java.lang.invoke.LambdaForm$MH/0x0000000800c14400.invokeExact_MT(LambdaForm$MH:-1)
	  at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor$Dispatcher$ByteBuddy$MBY4HIiM.invokeWithArguments(Unknown Source:-1)
	  at org.mockito.internal.util.reflection.InstrumentationMemberAccessor.invoke(InstrumentationMemberAccessor.java:239)
	  at org.mockito.internal.util.reflection.ModuleMemberAccessor.invoke(ModuleMemberAccessor.java:55)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.tryInvoke(MockMethodAdvice.java:333)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.access$500(MockMethodAdvice.java:60)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice$RealMethodCall.invoke(MockMethodAdvice.java:253)
	  at org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:142)
	  at org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:45)
	  at org.mockito.Answers.answer(Answers.java:99)
	  at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:110)
	  at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	  at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
	  at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
	  at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:325)
	  at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper.findBrokerServiceUrl(ExtensibleLoadManagerWrapper.java:66)
	  at org.apache.pulsar.broker.namespace.NamespaceService.lambda$internalGetWebServiceUrl$9(NamespaceService.java:302)
	  at org.apache.pulsar.broker.namespace.NamespaceService$$Lambda$1337/0x000000080163bb68.apply(Unknown Source:-1)
	  at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
	  at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
	  at org.apache.pulsar.broker.namespace.NamespaceService.internalGetWebServiceUrl(NamespaceService.java:285)
	  at org.apache.pulsar.broker.namespace.NamespaceService.getWebServiceUrlAsync(NamespaceService.java:265)

Modifications

  • Make the broker filter pure async to avoid block operation.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@Demogorgon314 Demogorgon314 added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/broker labels Jun 28, 2023
@Demogorgon314 Demogorgon314 self-assigned this Jun 28, 2023
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 28, 2023
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@lifepuzzlefun lifepuzzlefun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, i left some comments here 👋

return CompletableFuture.completedFuture(Optional.empty());
CompletableFuture<Map<String, BrokerLookupData>> future =
filter.filter(availableBrokerCandidates, bundle, context);
futures.add(future);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need process order for the filter here. the k8s scheduler will apply filter orderly. I wonder if we need it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we do no need to guarantee the order.

CompletableFuture<Optional<String>> result = new CompletableFuture<>();
FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to filter out brokers when select bundle: {}", bundle, ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we ignore all the filter result exception here. does this will inference the filter logic ? or we can choose some soft limit filter exception can be ignored and some filter are not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik, I think this is the current behavior in the modular load manager. I think we can add a todo or a backlog issue to follow up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added todo comment here.

@@ -53,6 +54,20 @@ private Optional<NamespaceIsolationPolicies> getIsolationPolicies(String cluster
}
}

private CompletableFuture<Optional<NamespaceIsolationPolicies>> getIsolationPoliciesAsync(String clusterName) {
return this.pulsar.getPulsarResources().getNamespaceResources()
.getIsolationPolicies().getIsolationDataPoliciesAsync(clusterName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the callback of this CompletableFuture is still execute on the metadataStore thread. if yes, if switch callback execute thread to other thread is needed ? WDYT?

CompletableFuture<Optional<String>> result = new CompletableFuture<>();
FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to filter out brokers when select bundle: {}", bundle, ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik, I think this is the current behavior in the modular load manager. I think we can add a todo or a backlog issue to follow up.

@@ -477,7 +477,7 @@ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
Set<String> excludeBrokerSet) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
.thenComposeAsync(availableBrokers -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lifepuzzlefun I think we can switch a thread here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

@Demogorgon314 Demogorgon314 force-pushed the Demogorgon314/make-broker-filter-async branch from 173503a to 1447f41 Compare June 30, 2023 05:27
@Technoboy- Technoboy- added this to the 3.1.0 milestone Jun 30, 2023
@Demogorgon314 Demogorgon314 requested a review from Technoboy- June 30, 2023 09:14
@codecov-commenter
Copy link

Codecov Report

Merging #20666 (c91ca8d) into master (0e60340) will increase coverage by 0.46%.
The diff coverage is 77.93%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #20666      +/-   ##
============================================
+ Coverage     72.60%   73.06%   +0.46%     
- Complexity    32018    32110      +92     
============================================
  Files          1855     1871      +16     
  Lines        138569   139056     +487     
  Branches      15250    15305      +55     
============================================
+ Hits         100605   101605    +1000     
+ Misses        29945    29399     -546     
- Partials       8019     8052      +33     
Flag Coverage Δ
inttests 24.11% <25.50%> (+<0.01%) ⬆️
systests 24.96% <13.75%> (?)
unittests 72.35% <77.65%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ava/org/apache/pulsar/admin/cli/CmdNamespaces.java 76.17% <0.00%> (-0.08%) ⬇️
.../service/SystemTopicBasedTopicPoliciesService.java 76.48% <50.00%> (-1.19%) ⬇️
...sar/broker/loadbalance/impl/LoadManagerShared.java 76.57% <55.22%> (-5.53%) ⬇️
...xtensions/channel/ServiceUnitStateChannelImpl.java 84.65% <66.66%> (+0.34%) ⬆️
...ar/common/naming/TopicBundleAssignmentFactory.java 66.66% <66.66%> (ø)
...n/java/org/apache/pulsar/admin/cli/CliCommand.java 57.95% <71.42%> (-1.60%) ⬇️
.../metadata/bookkeeper/PulsarRegistrationClient.java 80.85% <81.11%> (-4.11%) ⬇️
.../pulsar/common/topics/TopicCompactionStrategy.java 90.90% <83.33%> (-9.10%) ⬇️
...balance/impl/SimpleResourceAllocationPolicies.java 59.52% <85.71%> (+5.23%) ⬆️
...he/pulsar/broker/admin/v2/NonPersistentTopics.java 62.55% <86.95%> (+0.63%) ⬆️
... and 20 more

... and 172 files with indirect coverage changes

Copy link
Contributor

@Technoboy- Technoboy- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Technoboy-
Copy link
Contributor

Do we need to cherry pick to 3.0.2?

@Demogorgon314
Copy link
Member Author

Do we need to cherry pick to 3.0.2?

@Technoboy- Yes

Comment on lines -45 to +47
Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
LoadManagerContext context)
throws BrokerFilterException;
CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
LoadManagerContext context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not cherry-pick this public API change to branch-3.0 @Demogorgon314

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has yet to be exposed to users. In this case, can we cherry-pick?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower I pushed a PR to add the broker filter sync method back: #20826. Is this way acceptable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's okay. But since a new method is added into the BrokerFilter, we should still avoid cherry-picking it to release branches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-3.0 doc-not-needed Your PR changes do not impact docs release/3.0.1 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants