diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 4446e6ea36c53..1f191c007f73b 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -905,9 +905,22 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( continue; } + // To match a new host with an existing host means comparing their addresses. auto existing_host = all_hosts_.find(host->address()->asString()); - - if (existing_host != all_hosts_.end()) { + const bool existing_host_found = existing_host != all_hosts_.end(); + + // Check if in-place host update should be skipped, i.e. when the following criteria are met + // (currently there is only one criterion, but we might add more in the future): + // - The cluster health checker is activated and a new host is matched with the existing one, + // but the health check address is different. + const bool skip_inplace_host_update = + health_checker_ != nullptr && existing_host_found && + *existing_host->second->healthCheckAddress() != *host->healthCheckAddress(); + + // When there is a match and we decided to do in-place update, we potentially update the host's + // health check flag and metadata. Afterwards, the host is pushed back into the final_hosts, + // i.e. hosts that should be preserved in the current priority. + if (existing_host_found && !skip_inplace_host_update) { existing_hosts_for_current_priority.emplace(existing_host->first); // If we find a host matched based on address, we keep it. However we do change weight inline // so do that here. diff --git a/test/common/upstream/eds_test.cc b/test/common/upstream/eds_test.cc index 7300a9eb82851..a9db413e6afa8 100644 --- a/test/common/upstream/eds_test.cc +++ b/test/common/upstream/eds_test.cc @@ -75,6 +75,89 @@ class EdsTest : public testing::Test { NiceMock local_info_; }; +class EdsWithHealthCheckUpdateTest : public EdsTest { +protected: + EdsWithHealthCheckUpdateTest() {} + + // Build the initial cluster with some endpoints. + void initializeCluster(const std::vector endpoint_ports, + const bool drain_connections_on_host_removal) { + resetCluster(drain_connections_on_host_removal); + + auto health_checker = std::make_shared(); + EXPECT_CALL(*health_checker, start()); + EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2); + cluster_->setHealthChecker(health_checker); + + cluster_load_assignment_ = resources_.Add(); + cluster_load_assignment_->set_cluster_name("fare"); + + for (const auto& port : endpoint_ports) { + addEndpoint(port); + } + + VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources_, "")); + + // Make sure the cluster is rebuilt. + EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value()); + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + + EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + + // Mark the hosts as healthy + hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC); + } + } + + void resetCluster(const bool drain_connections_on_host_removal) { + const std::string config = R"EOF( + name: name + connect_timeout: 0.25s + type: EDS + lb_policy: ROUND_ROBIN + drain_connections_on_host_removal: {} + eds_cluster_config: + service_name: fare + eds_config: + api_config_source: + cluster_names: + - eds + refresh_delay: 1s + )EOF"; + EdsTest::resetCluster(fmt::format(config, drain_connections_on_host_removal)); + } + + void addEndpoint(const uint32_t port) { + auto* endpoints = cluster_load_assignment_->add_endpoints(); + auto* socket_address = endpoints->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("1.2.3.4"); + socket_address->set_port_value(port); + } + + void updateEndpointHealthCheckPortAtIndex(const uint32_t index, const uint32_t port) { + cluster_load_assignment_->mutable_endpoints(index) + ->mutable_lb_endpoints(0) + ->mutable_endpoint() + ->mutable_health_check_config() + ->set_port_value(port); + + VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources_, "")); + + // Always rebuild if health check config is changed. + EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value()); + } + + Protobuf::RepeatedPtrField resources_; + envoy::api::v2::ClusterLoadAssignment* cluster_load_assignment_; +}; + // Negative test for protoc-gen-validate constraints. TEST_F(EdsTest, ValidateFail) { Protobuf::RepeatedPtrField resources; @@ -1152,6 +1235,84 @@ TEST_F(EdsTest, PriorityAndLocalityWeighted) { EXPECT_EQ(1UL, stats_.counter("cluster.name.update_no_rebuild").value()); } +TEST_F(EdsWithHealthCheckUpdateTest, EndpointUpdateHealthCheckConfig) { + const std::vector endpoint_ports = {80, 81}; + const uint32_t new_health_check_port = 8000; + + // Initialize the cluster with two endpoints without draining connections on host removal. + initializeCluster(endpoint_ports, false); + + updateEndpointHealthCheckPortAtIndex(0, new_health_check_port); + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 3); + // Make sure the first endpoint health check port is updated. + EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port()); + + EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port()); + EXPECT_NE(new_health_check_port, hosts[2]->healthCheckAddress()->ip()->port()); + EXPECT_EQ(endpoint_ports[1], hosts[1]->healthCheckAddress()->ip()->port()); + EXPECT_EQ(endpoint_ports[0], hosts[2]->healthCheckAddress()->ip()->port()); + + EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + + // The old hosts are still active. The health checker continues to do health checking to these + // hosts, until they are removed. + EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + } + + updateEndpointHealthCheckPortAtIndex(1, new_health_check_port); + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 4); + EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port()); + + // Make sure the second endpoint health check port is updated. + EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port()); + + EXPECT_EQ(endpoint_ports[1], hosts[2]->healthCheckAddress()->ip()->port()); + EXPECT_EQ(endpoint_ports[0], hosts[3]->healthCheckAddress()->ip()->port()); + + EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + + // The old hosts are still active. + EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(hosts[3]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + } +} + +TEST_F(EdsWithHealthCheckUpdateTest, EndpointUpdateHealthCheckConfigWithDrainConnectionsOnRemoval) { + const std::vector endpoint_ports = {80, 81}; + const uint32_t new_health_check_port = 8000; + + // Initialize the cluster with two endpoints with draining connections on host removal. + initializeCluster(endpoint_ports, true); + + updateEndpointHealthCheckPortAtIndex(0, new_health_check_port); + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + // Since drain_connections_on_host_removal is set to true, the old hosts are removed + // immediately. + EXPECT_EQ(hosts.size(), 2); + // Make sure the first endpoint health check port is updated. + EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port()); + + EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port()); + } + + updateEndpointHealthCheckPortAtIndex(1, new_health_check_port); + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port()); + + // Make sure the second endpoint health check port is updated. + EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port()); + } +} + // Throw on adding a new resource with an invalid endpoint (since the given address is invalid). TEST_F(EdsTest, MalformedIP) { Protobuf::RepeatedPtrField resources;