Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Auto update topic partitions #6732

Merged
merged 7 commits into from
May 10, 2020
Merged

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Apr 13, 2020

Motivation

We need to increase producers or consumers when partitions updated.

Java client has implemented this feature, see #3513. This PR trys to implement the same feature in C++ client.

Modifications

  • Add a boost::asio::deadline_timer to PartitionedConsumerImpl and PartitionedProducerImpl to register lookup task to detect partitions changes periodly;
  • Add an unsigned int configuration parameter to indicate the period seconds of detecting partitions change (default: 60 seconds);
  • Unlock the mutex_ in PartitionedConsumerImpl::receive after state_ were checked.

Explain: When new consumers are created, handleSinglePartitionConsumerCreated will be called finally, which tried to lock the mutex_. It may happen that receive acquire the lock again and again so that handleSinglePartitionConsumerCreated
are blocked in lock.lock() for a long time.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

Run PartitionsUpdateTest test suite after the cmake build.

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run cpp-tests

1 similar comment
@BewareMyPower
Copy link
Contributor Author

/pulsarbot run cpp-tests

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run unit-tests

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run cpp-tests

1 similar comment
@BewareMyPower
Copy link
Contributor Author

/pulsarbot run cpp-tests

assert(unsubscribedSoFar_ <= numPartitions_);
assert(consumerIndex <= numPartitions_);
Lock consumersLock(consumersMutex_);
const auto numPartitions = numPartitions_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this change be avoid? Seems brings in lot of un-needed changes, also a little confusing to made numPartitions_ and numPartitions mix-used together in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change soon, here I should've called method like getNumPartitionsWithLock() (like I did in PartitionedConsumerImpl.
Because producers_/consumers_ may be modified in timer's callback, the code that accessed producers_/consumers_ and some other members (see comments in headers) should be protected by lock()/unlock(), except in start().

@jiazhai
Copy link
Member

jiazhai commented Apr 16, 2020

/pulsarbot run-failure-checks

@jiazhai
Copy link
Member

jiazhai commented Apr 16, 2020

@BewareMyPower thanks for the great work. Seems it is not normal for the cpp tests failure.
Could the tests get success in your local env?

@BewareMyPower
Copy link
Contributor Author

@BewareMyPower thanks for the great work. Seems it is not normal for the cpp tests failure.
Could the tests get success in your local env?

Yeah. The test also passed in github action, you can see CI of commit 89ba471.

cpp-tests > run-tests (line 136)

[98/163] PartitionsUpdateTest.testPartitionsUpdate (13748 ms)

The strange thing is that the 163 tests cannot be completed, I've seen at most 141 tests completed, and the number may also be 121. The current test may be 98:

Thu, 16 Apr 2020 16:19:38 GMT
[96/163] BasicEndToEndTest.testPatternEmptyUnsubscribe (37 ms)
Thu, 16 Apr 2020 16:19:38 GMT
[97/163] BasicEndToEndTest.testpatternMultiTopicsHttpConsumerPubSub (2493 ms)
Thu, 16 Apr 2020 16:19:38 GMT
[98/163] PartitionsUpdateTest.testPartitionsUpdate (13718 ms)

I guessed after 90 minutes (2 hours reached), the completes tests number will still be 90.

@BewareMyPower
Copy link
Contributor Author

@BewareMyPower thanks for the great work. Seems it is not normal for the cpp tests failure.
Could the tests get success in your local env?

Now I've found where the problem is. The tests stuck at the loop in TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout):

    while (true) {
        // maximum wait time
        ASSERT_LE(timeWaited, unAckedMessagesTimeoutMs * 3);
        if (messagesReceived >= 10 * 2) {  // **Problem**: Never reached here
            break;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        timeWaited += 500;
    }

It seems that MessageListener doesn't work after my commits. I will try to solve it.

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

2 similar comments
@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run process

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run process

@BewareMyPower
Copy link
Contributor Author

Hi, @codelipenghui @merlimat @jiazhai @sijie , could you help me with the failure of unit tests?

I've only changed files under pulsar-client-cpp directory, but there're some unrelated tests that can't pass:

  • Integration - Process / process
  • Unit - Adaptors / unit-tests
  • Unit - Broker Auth SASL / unit-tests
  • Unit - Flaky / unit-test-flaky
  • Unit - Proxy / unit-tests
  • Unit / unit-tests

They all failed with No space left on device. It seems like Github Actions' problem.

Failure message of Integration - Process / process:

[ERROR] Error processing tar file(exit status 1): write /pulsar/connectors/pulsar-io-redis-2.6.0-SNAPSHOT.nar: no space left on device

[ERROR] Failed to execute goal com.spotify:dockerfile-maven-plugin:1.4.13:build (default) on project pulsar-all-docker-image: Could not build image: Error processing tar file(exit status 1): write /pulsar/connectors/pulsar-io-redis-2.6.0-SNAPSHOT.nar: no space left on device -> [Help 1]

The other tests:

[ERROR] Failed to execute goal on project bc_2_0_1: Could not resolve dependencies for project org.apache.pulsar.tests:bc_2_0_1:jar:2.6.0-SNAPSHOT: Could not transfer artifact org.apache.pulsar:pulsar-client:jar:2.0.1-incubating from/to central (https://repo1.maven.org/maven2): GET request of: org/apache/pulsar/pulsar-client/2.0.1-incubating/pulsar-client-2.0.1-incubating.jar from central failed: No space left on device -> [Help 1]

I tried to rollback to the older commit that has passed these tests before, or run failure checks again, but the error still happens. How should I deal with it? Should I close this PR and open a new PR?

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run unit-test-flaky

@sijie
Copy link
Member

sijie commented Apr 20, 2020

@BewareMyPower those are flaky tests due to the environmental issue. Typically it was because the disk space is not enough in Github, so it is not able to start the container-based integration. You can use /pulsarbot run-failure-checks to re-run those failures. But before your trigger that, it is good to check the logs of failures to see if it is related to your code change.

@BewareMyPower
Copy link
Contributor Author

@BewareMyPower those are flaky tests due to the environmental issue. Typically it was because the disk space is not enough in Github, so it is not able to start the container-based integration. You can use /pulsarbot run-failure-checks to re-run those failures. But before your trigger that, it is good to check the logs of failures to see if it is related to your code change.

Thanks for your reply. I've rerun those failures for many times but only seen the same No space left on device error. I also saw some other PRs have the same failure, like #6760 and #6766. The problem is I'm not sure if the failure is recoverable. Did the frequently rerun before generate many dirty files causing that the test environment has no enough space?

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@sijie
Copy link
Member

sijie commented Apr 20, 2020

@BewareMyPower this usually indicates a problem in the Github Action. We might need to see how to clean the disk space up. /cc @tuteng @codelipenghui

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

@merlimat @sijie Finally all checks have passed :) PTAL

@BewareMyPower BewareMyPower changed the title Auto update topic partitions for C++ client [C++] Auto update topic partitions May 4, 2020
@jiazhai jiazhai merged commit 30934e1 into apache:master May 10, 2020
@jiazhai
Copy link
Member

jiazhai commented May 12, 2020

conflict with #6938 will mark this as 2.5.2.

jiazhai pushed a commit that referenced this pull request May 12, 2020
### Motivation

We need to increase producers or consumers when partitions updated.

Java client has implemented this feature, see [#3513](#3513). This PR trys to implement the same feature in C++ client.

### Modifications

- Add a `boost::asio::deadline_timer` to `PartitionedConsumerImpl` and `PartitionedProducerImpl` to register lookup task to detect partitions changes periodly;
- Add an `unsigned int` configuration parameter to indicate the period seconds of detecting partitions change (default: 60 seconds);
- Unlock the `mutex_` in `PartitionedConsumerImpl::receive` after `state_` were checked.
> Explain: When new consumers are created, `handleSinglePartitionConsumerCreated` will be called finally, which tried to lock the `mutex_`. It may happen that `receive` acquire the lock again and again so that `handleSinglePartitionConsumerCreated`
are blocked in `lock.lock()` for a long time.

* auto update topic partitions extend for consumer and producer in c++ client

* add c++ unit test for partitions update

* format code with clang-format-5.0

* stop partitions update timer after producer/consumer called closeAsync()

* fix bugs when running gtest-parallel

* fix bug: Producer::flush() may cause deadlock

* use getters to read `numPartitions` with or without lock
(cherry picked from commit 30934e1)
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request May 27, 2020
### Motivation

We need to increase producers or consumers when partitions updated.

Java client has implemented this feature, see [apache#3513](apache#3513). This PR trys to implement the same feature in C++ client.

### Modifications

- Add a `boost::asio::deadline_timer` to `PartitionedConsumerImpl` and `PartitionedProducerImpl` to register lookup task to detect partitions changes periodly;
- Add an `unsigned int` configuration parameter to indicate the period seconds of detecting partitions change (default: 60 seconds);
- Unlock the `mutex_` in `PartitionedConsumerImpl::receive` after `state_` were checked. 
> Explain: When new consumers are created, `handleSinglePartitionConsumerCreated` will be called finally, which tried to lock the `mutex_`. It may happen that `receive` acquire the lock again and again so that `handleSinglePartitionConsumerCreated` 
are blocked in `lock.lock()` for a long time.



* auto update topic partitions extend for consumer and producer in c++ client

* add c++ unit test for partitions update

* format code with clang-format-5.0

* stop partitions update timer after producer/consumer called closeAsync()

* fix bugs when running gtest-parallel

* fix bug: Producer::flush() may cause deadlock

* use getters to read `numPartitions` with or without lock
addisonj pushed a commit to instructure/pulsar that referenced this pull request Jun 12, 2020
### Motivation

We need to increase producers or consumers when partitions updated.

Java client has implemented this feature, see [apache#3513](apache#3513). This PR trys to implement the same feature in C++ client.

### Modifications

- Add a `boost::asio::deadline_timer` to `PartitionedConsumerImpl` and `PartitionedProducerImpl` to register lookup task to detect partitions changes periodly;
- Add an `unsigned int` configuration parameter to indicate the period seconds of detecting partitions change (default: 60 seconds);
- Unlock the `mutex_` in `PartitionedConsumerImpl::receive` after `state_` were checked.
> Explain: When new consumers are created, `handleSinglePartitionConsumerCreated` will be called finally, which tried to lock the `mutex_`. It may happen that `receive` acquire the lock again and again so that `handleSinglePartitionConsumerCreated`
are blocked in `lock.lock()` for a long time.

* auto update topic partitions extend for consumer and producer in c++ client

* add c++ unit test for partitions update

* format code with clang-format-5.0

* stop partitions update timer after producer/consumer called closeAsync()

* fix bugs when running gtest-parallel

* fix bug: Producer::flush() may cause deadlock

* use getters to read `numPartitions` with or without lock
(cherry picked from commit 30934e1)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
### Motivation

We need to increase producers or consumers when partitions updated.

Java client has implemented this feature, see [apache#3513](apache#3513). This PR trys to implement the same feature in C++ client.

### Modifications

- Add a `boost::asio::deadline_timer` to `PartitionedConsumerImpl` and `PartitionedProducerImpl` to register lookup task to detect partitions changes periodly;
- Add an `unsigned int` configuration parameter to indicate the period seconds of detecting partitions change (default: 60 seconds);
- Unlock the `mutex_` in `PartitionedConsumerImpl::receive` after `state_` were checked. 
> Explain: When new consumers are created, `handleSinglePartitionConsumerCreated` will be called finally, which tried to lock the `mutex_`. It may happen that `receive` acquire the lock again and again so that `handleSinglePartitionConsumerCreated` 
are blocked in `lock.lock()` for a long time.



* auto update topic partitions extend for consumer and producer in c++ client

* add c++ unit test for partitions update

* format code with clang-format-5.0

* stop partitions update timer after producer/consumer called closeAsync()

* fix bugs when running gtest-parallel

* fix bug: Producer::flush() may cause deadlock

* use getters to read `numPartitions` with or without lock
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants