Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -864,9 +864,12 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,

bool found = false;
for (auto i = current_hosts.begin(); i != current_hosts.end();) {
// If we find a host matched based on address, we keep it. However we do change weight inline
// so do that here.
if (*(*i)->address() == *host->address()) {
const bool address_matched = *(*i)->address() == *host->address();
const bool health_check_changed =
health_checker_ != nullptr && *(*i)->healthCheckAddress() != *host->healthCheckAddress();
// If we find a host matched based on address and the health check address is not changed, we
// keep it. However we do change weight inline so do that here.
if (address_matched && !health_check_changed) {

@htuch htuch Aug 8, 2018

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not your fault, but this entire method is completely unreadable to me; it's hard to keep track of what current_hosts represent, which hosts are to be removed, under which conditions we're going to do a rebuild, inplace modify or remove/add. Do you think there's a way to explain to the layman how this works? Each time I reread this method, by head hurts more and more (and I'm partly to blame for sure).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree. This has been an iterative process over time and I agree it's now not very readable and it hurts my head also. I think it would be worth it to have a lot more comments in here, possibly variable name changes, and maybe even functions broken out. @dio since you are in here now do you mind taking a look?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you are in here now do you mind taking a look?

Sure, love to have a look 🙂. Will spend some time dive into this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there is a quite intensive refactoring in here: #3959

if (host->weight() > max_host_weight) {
max_host_weight = host->weight();
}
Expand Down
208 changes: 208 additions & 0 deletions test/common/upstream/eds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,214 @@ TEST_F(EdsTest, PriorityAndLocalityWeighted) {
EXPECT_EQ(1UL, stats_.counter("cluster.name.update_no_rebuild").value());
}

TEST_F(EdsTest, EndpointUpdateHealthCheckConfig) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably, you could subclass the test fixture EdsTest and add in a bunch of helper methods to make these tests less verbose; there is a lot of boilerplate that could be refactored away to make the tests more readable that way.

resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
lb_policy: ROUND_ROBIN
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF");

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
auto* cluster_load_assignment = resources.Add();
cluster_load_assignment->set_cluster_name("fare");

auto add_endpoint = [cluster_load_assignment](int port) {
auto* endpoints = cluster_load_assignment->add_endpoints();

auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(port);
};

auto update_health_check_port = [cluster_load_assignment](const uint32_t index,
const uint32_t port) {
cluster_load_assignment->mutable_endpoints(index)
->mutable_lb_endpoints(0)
->mutable_endpoint()
->mutable_health_check_config()
->set_port_value(port);
};

add_endpoint(80);
add_endpoint(81);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
// Make sure the custer is rebuilt.
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// Mark the hosts as healthy
hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
}

const uint32_t new_health_check_port = 8000;
update_health_check_port(0, new_health_check_port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 3);
// Make sure the first endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
EXPECT_NE(new_health_check_port, hosts[2]->healthCheckAddress()->ip()->port());
EXPECT_EQ(81, hosts[1]->healthCheckAddress()->ip()->port());
EXPECT_EQ(80, hosts[2]->healthCheckAddress()->ip()->port());

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// The old hosts still active. The health checker continues to do health checking to these
// hosts, until they are removed.
EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
}

update_health_check_port(1, new_health_check_port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 4);
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

// Make sure the second endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());

EXPECT_EQ(81, hosts[2]->healthCheckAddress()->ip()->port());
EXPECT_EQ(80, hosts[3]->healthCheckAddress()->ip()->port());

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// The old hosts still active.
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_FALSE(hosts[3]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
}
}

TEST_F(EdsTest, EndpointUpdateHealthCheckConfigWithDrainConnectionsOnRemoval) {
resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
lb_policy: ROUND_ROBIN
drain_connections_on_host_removal: true
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF");

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
auto* cluster_load_assignment = resources.Add();
cluster_load_assignment->set_cluster_name("fare");

auto add_endpoint = [cluster_load_assignment](int port) {
auto* endpoints = cluster_load_assignment->add_endpoints();

auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(port);
};

auto update_health_check_port = [cluster_load_assignment](const uint32_t index,
const uint32_t port) {
cluster_load_assignment->mutable_endpoints(index)
->mutable_lb_endpoints(0)
->mutable_endpoint()
->mutable_health_check_config()
->set_port_value(port);
};

add_endpoint(80);
add_endpoint(81);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
// Make sure the custer is rebuilt.
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
// Mark the hosts as healthy
hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
}

const uint32_t new_health_check_port = 8000;
update_health_check_port(0, new_health_check_port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
// Since drain_connections_on_host_removal is set to true, the old hosts are removed
// immediately.
EXPECT_EQ(hosts.size(), 2);
// Make sure the first endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
}

update_health_check_port(1, new_health_check_port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources, ""));
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());

{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

// Make sure the second endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
}
}

// Throw on adding a new resource with an invalid endpoint (since the given address is invalid).
TEST_F(EdsTest, MalformedIP) {
Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
Expand Down