-
Notifications
You must be signed in to change notification settings - Fork 5.3k
cluster: unstuck cluster manager when update the initializing cluster #13875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c867cb1
9d3849a
77c85c1
4dbde08
c04f9da
8b1f033
533fb44
07a8ce0
07250bb
125ea72
7dd7bf5
395e525
fa02576
b2965c3
74e9a0d
7fb0b15
43bb75d
7f991cd
278b80a
9927f6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1233,6 +1233,105 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) { | |
| EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); | ||
| } | ||
|
|
||
| TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { | ||
| const std::string json = fmt::sprintf( | ||
| R"EOF( | ||
| { | ||
| "dynamic_resources": { | ||
| "cds_config": { | ||
| "api_config_source": { | ||
| "api_type": "0", | ||
| "refresh_delay": "30s", | ||
| "cluster_names": ["cds_cluster"] | ||
| } | ||
| } | ||
| }, | ||
| "static_resources": { | ||
| %s | ||
| } | ||
| } | ||
| )EOF", | ||
| clustersJson({ | ||
| defaultStaticClusterJson("cds_cluster"), | ||
| })); | ||
|
|
||
| MockCdsApi* cds = new MockCdsApi(); | ||
| std::shared_ptr<MockClusterMockPrioritySet> cds_cluster( | ||
| new NiceMock<MockClusterMockPrioritySet>()); | ||
| cds_cluster->info_->name_ = "cds_cluster"; | ||
|
|
||
| // This part tests static init. | ||
| InSequence s; | ||
| EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) | ||
| .WillOnce(Return(std::make_pair(cds_cluster, nullptr))); | ||
| ON_CALL(*cds_cluster, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); | ||
| EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds)); | ||
| EXPECT_CALL(*cds, setInitializedCb(_)); | ||
| EXPECT_CALL(*cds_cluster, initialize(_)); | ||
|
|
||
| create(parseBootstrapFromV3Json(json)); | ||
|
|
||
| ReadyWatcher cm_initialized; | ||
| cluster_manager_->setInitializedCb([&]() -> void { cm_initialized.ready(); }); | ||
|
|
||
| const std::string ready_cluster_yaml = R"EOF( | ||
| name: fake_cluster | ||
| connect_timeout: 0.250s | ||
| type: STATIC | ||
| lb_policy: ROUND_ROBIN | ||
| load_assignment: | ||
| cluster_name: fake_cluster | ||
| endpoints: | ||
| - lb_endpoints: | ||
| - endpoint: | ||
| address: | ||
| socket_address: | ||
| address: 127.0.0.1 | ||
| port_value: 11001 | ||
| )EOF"; | ||
|
|
||
| const std::string warming_cluster_yaml = R"EOF( | ||
| name: fake_cluster | ||
| connect_timeout: 0.250s | ||
| type: STRICT_DNS | ||
| lb_policy: ROUND_ROBIN | ||
| load_assignment: | ||
| cluster_name: fake_cluster | ||
| endpoints: | ||
| - lb_endpoints: | ||
| - endpoint: | ||
| address: | ||
| socket_address: | ||
| address: foo.com | ||
| port_value: 11001 | ||
| )EOF"; | ||
|
|
||
| { | ||
| SCOPED_TRACE("Add a primary cluster staying in warming."); | ||
| EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); | ||
| EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(warming_cluster_yaml), | ||
| "warming")); | ||
|
|
||
| // Mark all the rest of the clusters ready. Now the only warming cluster is the above one. | ||
| EXPECT_CALL(cm_initialized, ready()).Times(0); | ||
| cds_cluster->initialize_callback_(); | ||
| } | ||
|
|
||
| { | ||
| SCOPED_TRACE("Modify the only warming primary cluster to immediate ready."); | ||
| EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); | ||
| EXPECT_CALL(*cds, initialize()); | ||
| EXPECT_TRUE( | ||
| cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(ready_cluster_yaml), "ready")); | ||
| } | ||
| { | ||
| SCOPED_TRACE("All clusters are ready."); | ||
| EXPECT_CALL(cm_initialized, ready()); | ||
| cds->initialized_callback_(); | ||
| } | ||
| EXPECT_TRUE(Mock::VerifyAndClearExpectations(cds_cluster.get())); | ||
| } | ||
|
|
||
| TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) { | ||
| time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); | ||
| create(defaultConfig()); | ||
|
|
@@ -2984,6 +3083,33 @@ TEST_F(ClusterManagerInitHelperTest, StaticSdsInitialize) { | |
| cluster1.initialize_callback_(); | ||
| } | ||
|
|
||
| // Verify that primary cluster can be updated in warming state. | ||
| TEST_F(ClusterManagerInitHelperTest, TestUpdateWarming) { | ||
| InSequence s; | ||
|
|
||
| auto sds = std::make_unique<NiceMock<MockClusterMockPrioritySet>>(); | ||
| ON_CALL(*sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); | ||
| EXPECT_CALL(*sds, initialize(_)); | ||
| init_helper_.addCluster(*sds); | ||
| init_helper_.onStaticLoadComplete(); | ||
|
|
||
| NiceMock<MockClusterMockPrioritySet> updated_sds; | ||
| ON_CALL(updated_sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); | ||
| EXPECT_CALL(updated_sds, initialize(_)); | ||
| init_helper_.addCluster(updated_sds); | ||
|
|
||
| // The override cluster is added. Manually drop the previous cluster. In production flow this is | ||
| // achieved by ClusterManagerImpl. | ||
| sds.reset(); | ||
|
Comment on lines
+3101
to
+3103
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to add a cluster manager test of this case that uses "real" DNS clusters? I think the main sequence that can actually happen here is:
I think you could also pretty easily test this in your integration test below by doing:
?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for the DNS with health check! After a quick search I didn't find an existing example but this test case Writing a test
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A slight difference is that DNS cluster is always depending on resolver. I modified the cluster to STATIC type to make it immediate ready.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks this all looks good, but can you add a real integration test for this in ADS integration test? I think it's really easy based on what you already have. Just use a static cluster with health checking, it will be stuck initializing, then you can update it? /wait
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had the impression that static cluster with bad health check doesn't block initialization...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. Also confirmed that failed health check in static cluster also turn the warm cluster to ready. I end up not accept the tcp connection but not write back to hold the DNS request. |
||
|
|
||
| ReadyWatcher primary_initialized; | ||
| init_helper_.setPrimaryClustersInitializedCb([&]() -> void { primary_initialized.ready(); }); | ||
|
|
||
| EXPECT_CALL(*this, onClusterInit(Ref(updated_sds))); | ||
| EXPECT_CALL(primary_initialized, ready()); | ||
| updated_sds.initialize_callback_(); | ||
| } | ||
|
|
||
| TEST_F(ClusterManagerInitHelperTest, UpdateAlreadyInitialized) { | ||
| InSequence s; | ||
|
|
||
|
|
@@ -3087,6 +3213,7 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) { | |
| init_helper_.addCluster(cluster1); | ||
|
|
||
| NiceMock<MockClusterMockPrioritySet> cluster2; | ||
| cluster2.info_->name_ = "cluster2"; | ||
| ON_CALL(cluster2, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); | ||
| init_helper_.addCluster(cluster2); | ||
|
|
||
|
|
@@ -3099,6 +3226,8 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) { | |
| init_helper_.startInitializingSecondaryClusters(); | ||
|
|
||
| NiceMock<MockClusterMockPrioritySet> cluster3; | ||
| cluster3.info_->name_ = "cluster3"; | ||
|
|
||
| ON_CALL(cluster3, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); | ||
| EXPECT_CALL(cluster3, initialize(_)); | ||
| init_helper_.addCluster(cluster3); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.