Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,16 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(
}

auto existing_host = all_hosts_.find(host->address()->asString());
const bool existing_host_found = existing_host != all_hosts_.end();

if (existing_host != all_hosts_.end()) {
// If we have found a new host matched, based on address, to an existing one: check if the
// health check address of that host is different. If it is, we need to rebuild. Note that this
// checking matters only if the cluster's active health checker is activated.
const bool health_check_changed =
health_checker_ != nullptr && existing_host_found &&
*existing_host->second->healthCheckAddress() != *host->healthCheckAddress();

if (existing_host_found && !health_check_changed) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can still be simplified a bit, since later on, we might add another rebuild condition. I'd rename health_check_changed to no_inplace_host_update. Also, maybe add a comment to the top of this if block explaining what it's doing in terms of hosts insertion/deletion behavior.

existing_hosts_for_current_priority.emplace(existing_host->first);
// If we find a host matched based on address, we keep it. However we do change weight inline
// so do that here.
Expand Down
208 changes: 208 additions & 0 deletions test/common/upstream/eds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,214 @@ TEST_F(EdsTest, PriorityAndLocalityWeighted) {
EXPECT_EQ(1UL, stats_.counter("cluster.name.update_no_rebuild").value());
}

TEST_F(EdsTest, EndpointUpdateHealthCheckConfig) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably, you could subclass the test fixture EdsTest and add in a bunch of helper methods to make these tests less verbose; there is a lot of boilerplate that could be refactored away to make the tests more readable that way.

resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
lb_policy: ROUND_ROBIN
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF");

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
auto* cluster_load_assignment = resources.Add();
cluster_load_assignment->set_cluster_name("fare");

auto add_endpoint = [cluster_load_assignment](int port) {
auto* endpoints = cluster_load_assignment->add_endpoints();

auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(port);
};

auto update_health_check_port = [cluster_load_assignment](const uint32_t index,
const uint32_t port) {
cluster_load_assignment->mutable_endpoints(index)
->mutable_lb_endpoints(0)
->mutable_endpoint()
->mutable_health_check_config()
->set_port_value(port);
};

add_endpoint(80);
add_endpoint(81);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
// Make sure the custer is rebuilt.
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// Mark the hosts as healthy
hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
}

const uint32_t new_health_check_port = 8000;
update_health_check_port(0, new_health_check_port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 3);
// Make sure the first endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
EXPECT_NE(new_health_check_port, hosts[2]->healthCheckAddress()->ip()->port());
EXPECT_EQ(81, hosts[1]->healthCheckAddress()->ip()->port());
EXPECT_EQ(80, hosts[2]->healthCheckAddress()->ip()->port());

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// The old hosts still active. The health checker continues to do health checking to these
// hosts, until they are removed.
EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
}

update_health_check_port(1, new_health_check_port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 4);
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

// Make sure the second endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());

EXPECT_EQ(81, hosts[2]->healthCheckAddress()->ip()->port());
EXPECT_EQ(80, hosts[3]->healthCheckAddress()->ip()->port());

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// The old hosts still active.
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_FALSE(hosts[3]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
}
}

TEST_F(EdsTest, EndpointUpdateHealthCheckConfigWithDrainConnectionsOnRemoval) {
resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
lb_policy: ROUND_ROBIN
drain_connections_on_host_removal: true
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF");

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
auto* cluster_load_assignment = resources.Add();
cluster_load_assignment->set_cluster_name("fare");

auto add_endpoint = [cluster_load_assignment](int port) {
auto* endpoints = cluster_load_assignment->add_endpoints();

auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(port);
};

auto update_health_check_port = [cluster_load_assignment](const uint32_t index,
const uint32_t port) {
cluster_load_assignment->mutable_endpoints(index)
->mutable_lb_endpoints(0)
->mutable_endpoint()
->mutable_health_check_config()
->set_port_value(port);
};

add_endpoint(80);
add_endpoint(81);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
// Make sure the custer is rebuilt.
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
// Mark the hosts as healthy
hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
}

const uint32_t new_health_check_port = 8000;
update_health_check_port(0, new_health_check_port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
// Since drain_connections_on_host_removal is set to true, the old hosts are removed
// immediately.
EXPECT_EQ(hosts.size(), 2);
// Make sure the first endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
}

update_health_check_port(1, new_health_check_port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

// Make sure the second endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
}
}

// Throw on adding a new resource with an invalid endpoint (since the given address is invalid).
TEST_F(EdsTest, MalformedIP) {
Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
Expand Down