Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker]Fix ConcurrentModificationException when ModularLoadManagerImpl start #16953

Conversation

AnonHxy
Copy link
Contributor

@AnonHxy AnonHxy commented Aug 5, 2022

Fixes #16952

Motivation

  • Fix ConcurrentModificationException when ModularLoadManagerImpl start. The root cause is that ModularLoadManagerImpl#knownBrokers can be modified in multi-thread through ModularLoadManagerImpl#updateAll
    • Thread-1 is line290:
      public void handleDataNotification(Notification t) {
      if (t.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
      brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT)
      .thenAccept(brokers -> {
      reapDeadBrokerPreallocations(brokers);
      });
      try {
      scheduler.execute(ModularLoadManagerImpl.this::updateAll);
      } catch (RejectedExecutionException e) {
      // Executor is shutting down
    • Thread-2 is line969
      public void start() throws PulsarServerException {
      try {
      // At this point, the ports will be updated with the real port number that the server was assigned
      Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();
      lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
      pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
      lastData.setProtocols(protocolData);
      // configure broker-topic mode
      lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
      lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
      localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
      pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
      localData.setProtocols(protocolData);
      localData.setBrokerVersionString(pulsar.getBrokerVersion());
      // configure broker-topic mode
      localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
      localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
      String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
      + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
      : conf.getWebServicePortTls().get());
      brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
      final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
      updateLocalBrokerData();
      brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();
      timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath,
      __ -> new TimeAverageBrokerData()).join();
      updateAll();
      } catch (Exception e) {

Modifications

  • Changing ModularLoadManagerImpl#knownBrokers to ConcurrentHashMap.newKeySet()

Verifying this change

  • Make sure that the change passes the CI checks.

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 5, 2022
@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 5, 2022

/pulsarbot run-failure-checks

@@ -197,7 +197,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
private long unloadBundleCount = 0;

private final Lock lock = new ReentrantLock();
private Set<String> knownBrokers = new HashSet<>();
private Set<String> knownBrokers = ConcurrentHashMap.newKeySet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move this line below to method start and add some code comments, this could solves the concurrency problem.

pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleDataNotification also has other actions, so we'd better keep it in init method I think @poorbarcode

@mattisonchao mattisonchao modified the milestones: 2.12.0, 2.11.0 Aug 7, 2022
@mattisonchao
Copy link
Member

@poorbarcode Could you do the final review?

@poorbarcode
Copy link
Contributor

@poorbarcode Could you do the final review?

OK. Approved

@Technoboy- Technoboy- modified the milestones: 2.12.0, 2.11.0 Aug 8, 2022
@Technoboy- Technoboy- merged commit b516ed9 into apache:master Aug 8, 2022
@Technoboy- Technoboy- added type/bug The PR fixed a bug or issue reported a bug area/broker and removed type/flaky-tests labels Aug 8, 2022
nodece pushed a commit to nodece/pulsar that referenced this pull request Jul 23, 2024
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky-test: org.apache.pulsar.broker.admin.PersistentTopicsTest.setup
6 participants