diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index 92c78e14b4fff..bc7ab05db3025 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -118,7 +118,7 @@ class HdsIntegrationTest : public HttpIntegrationTest, // one HTTP health_check envoy::service::discovery::v2::HealthCheckSpecifier makeHttpHealthCheckSpecifier() { envoy::service::discovery::v2::HealthCheckSpecifier server_health_check_specifier_; - server_health_check_specifier_.mutable_interval()->set_seconds(1); + server_health_check_specifier_.mutable_interval()->set_nanos(100000000); // 0.1 seconds auto* health_check = server_health_check_specifier_.add_cluster_health_checks(); @@ -126,12 +126,12 @@ class HdsIntegrationTest : public HttpIntegrationTest, Network::Utility::addressToProtobufAddress( *host_upstream_->localAddress(), *health_check->add_locality_endpoints()->add_endpoints()->mutable_address()); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_region("some_region"); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_zone("some_zone"); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_sub_zone("crete"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_region("middle_earth"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_zone("shire"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_sub_zone("hobbiton"); - health_check->add_health_checks()->mutable_timeout()->set_seconds(2); - health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(1); + health_check->add_health_checks()->mutable_timeout()->set_seconds(MaxTimeout); + health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(MaxTimeout); health_check->mutable_health_checks(0)->mutable_unhealthy_threshold()->set_value(2); health_check->mutable_health_checks(0)->mutable_healthy_threshold()->set_value(2); health_check->mutable_health_checks(0)->mutable_grpc_health_check(); @@ -145,7 +145,7 @@ class HdsIntegrationTest : public HttpIntegrationTest, // one TCP health_check envoy::service::discovery::v2::HealthCheckSpecifier makeTcpHealthCheckSpecifier() { envoy::service::discovery::v2::HealthCheckSpecifier server_health_check_specifier_; - server_health_check_specifier_.mutable_interval()->set_seconds(1); + server_health_check_specifier_.mutable_interval()->set_nanos(100000000); // 0.1 seconds auto* health_check = server_health_check_specifier_.add_cluster_health_checks(); @@ -153,12 +153,12 @@ class HdsIntegrationTest : public HttpIntegrationTest, Network::Utility::addressToProtobufAddress( *host_upstream_->localAddress(), *health_check->add_locality_endpoints()->add_endpoints()->mutable_address()); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_region("some_region"); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_zone("some_zone"); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_sub_zone("crete"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_region("middle_earth"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_zone("eriador"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_sub_zone("rivendell"); - health_check->add_health_checks()->mutable_timeout()->set_seconds(2); - health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(1); + health_check->add_health_checks()->mutable_timeout()->set_seconds(MaxTimeout); + health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(MaxTimeout); health_check->mutable_health_checks(0)->mutable_unhealthy_threshold()->set_value(2); health_check->mutable_health_checks(0)->mutable_healthy_threshold()->set_value(2); auto* tcp_hc = health_check->mutable_health_checks(0)->mutable_tcp_health_check(); @@ -169,24 +169,39 @@ class HdsIntegrationTest : public HttpIntegrationTest, } // Checks if Envoy reported the health status of an endpoint correctly - void checkEndpointHealthResponse(envoy::service::discovery::v2::EndpointHealth endpoint, + bool checkEndpointHealthResponse(envoy::service::discovery::v2::EndpointHealth endpoint, envoy::api::v2::core::HealthStatus healthy, Network::Address::InstanceConstSharedPtr address) { - EXPECT_EQ(healthy, endpoint.health_status()); - EXPECT_EQ(address->ip()->port(), endpoint.endpoint().address().socket_address().port_value()); - EXPECT_EQ(address->ip()->addressAsString(), - endpoint.endpoint().address().socket_address().address()); + if (healthy != endpoint.health_status()) { + return false; + } + if (address->ip()->port() != endpoint.endpoint().address().socket_address().port_value()) { + return false; + } + if (address->ip()->addressAsString() != + endpoint.endpoint().address().socket_address().address()) { + return false; + } + return true; } // Checks if the cluster counters are correct - void checkCounters(int requests, int response_s, int successes, int failures) { + void checkCounters(int requests, int responses, int successes, int failures) { EXPECT_EQ(requests, test_server_->counter("hds_delegate.requests")->value()); - EXPECT_EQ(response_s, test_server_->counter("hds_delegate.responses")->value()); + EXPECT_LE(responses, test_server_->counter("hds_delegate.responses")->value()); EXPECT_EQ(successes, test_server_->counter("cluster.anna.health_check.success")->value()); EXPECT_EQ(failures, test_server_->counter("cluster.anna.health_check.failure")->value()); } + void waitForEndpointHealthResponse(envoy::api::v2::core::HealthStatus healthy) { + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + while (!checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + healthy, host_upstream_->localAddress())) { + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + } + } + static constexpr uint32_t upstream_endpoints_ = 0; FakeHttpConnectionPtr hds_fake_connection_; @@ -201,6 +216,7 @@ class HdsIntegrationTest : public HttpIntegrationTest, FakeHttpConnectionPtr host2_fake_connection_; FakeRawConnectionPtr host_fake_raw_connection_; + static constexpr int MaxTimeout = 100; envoy::service::discovery::v2::HealthCheckRequest envoy_msg_; envoy::service::discovery::v2::HealthCheckRequestOrEndpointHealthResponse response_; envoy::service::discovery::v2::HealthCheckSpecifier server_health_check_specifier_; @@ -234,13 +250,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointHealthyHttp) { host_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); host_stream_->encodeData(1024, true); - // Envoy reports back to server - ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + // Receive updates until the one we expect arrives + waitForEndpointHealthResponse(envoy::api::v2::core::HealthStatus::HEALTHY); - // Check that the response is correct - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), - envoy::api::v2::core::HealthStatus::HEALTHY, - host_upstream_->localAddress()); checkCounters(1, 2, 1, 0); // Clean up connections @@ -253,6 +265,7 @@ TEST_P(HdsIntegrationTest, SingleEndpointHealthyHttp) { TEST_P(HdsIntegrationTest, SingleEndpointTimeoutHttp) { initialize(); server_health_check_specifier_ = makeHttpHealthCheckSpecifier(); + server_health_check_specifier_.mutable_cluster_health_checks(0) ->mutable_health_checks(0) ->mutable_timeout() @@ -260,7 +273,8 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutHttp) { server_health_check_specifier_.mutable_cluster_health_checks(0) ->mutable_health_checks(0) ->mutable_timeout() - ->set_nanos(800000000); + ->set_nanos(100000000); // 0.1 seconds + // Server <--> Envoy waitForHdsStream(); ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg_)); @@ -275,13 +289,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutHttp) { // Endpoint doesn't repond to the health check - // Envoy reports back to server - ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + // Receive updates until the one we expect arrives + waitForEndpointHealthResponse(envoy::api::v2::core::HealthStatus::TIMEOUT); - // Check that the response is correct - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), - envoy::api::v2::core::HealthStatus::TIMEOUT, - host_upstream_->localAddress()); checkCounters(1, 2, 0, 1); // Clean up connections @@ -311,13 +321,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyHttp) { host_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "404"}}, false); host_stream_->encodeData(1024, true); - // Envoy reports back to server - ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + // Receive updates until the one we expect arrives + waitForEndpointHealthResponse(envoy::api::v2::core::HealthStatus::UNHEALTHY); - // Check that the response is correct - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), - envoy::api::v2::core::HealthStatus::UNHEALTHY, - host_upstream_->localAddress()); checkCounters(1, 2, 0, 1); // Clean up connections @@ -345,7 +351,8 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutTcp) { server_health_check_specifier_.mutable_cluster_health_checks(0) ->mutable_health_checks(0) ->mutable_timeout() - ->set_nanos(800000000); + ->set_nanos(100000000); // 0.1 seconds + hds_stream_->startGrpcStream(); hds_stream_->sendGrpcMessage(server_health_check_specifier_); test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); @@ -354,18 +361,12 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutTcp) { ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); ASSERT_TRUE( host_fake_raw_connection_->waitForData(FakeRawConnection::waitForInexactMatch("Ping"))); - host_upstream_->set_allow_unexpected_disconnects(true); - // No response from the endpoint - // Envoy reports back to server - ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + // No response from the endpoint - // Check that the response is correct - auto endpoint = response_.endpoint_health_response().endpoints_health(0); - EXPECT_EQ(envoy::api::v2::core::HealthStatus::TIMEOUT, endpoint.health_status()); - EXPECT_EQ(host_upstream_->localAddress()->ip()->port(), - endpoint.endpoint().address().socket_address().port_value()); + // Receive updates until the one we expect arrives + waitForEndpointHealthResponse(envoy::api::v2::core::HealthStatus::TIMEOUT); // Clean up connections cleanupHostConnections(); @@ -395,14 +396,8 @@ TEST_P(HdsIntegrationTest, SingleEndpointHealthyTcp) { RELEASE_ASSERT(result, result.message()); host_upstream_->set_allow_unexpected_disconnects(true); - // Envoy reports back to server - ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); - - // Check that the response is correct - auto endpoint = response_.endpoint_health_response().endpoints_health(0); - EXPECT_EQ(envoy::api::v2::core::HealthStatus::HEALTHY, endpoint.health_status()); - EXPECT_EQ(host_upstream_->localAddress()->ip()->port(), - endpoint.endpoint().address().socket_address().port_value()); + // Receive updates until the one we expect arrives + waitForEndpointHealthResponse(envoy::api::v2::core::HealthStatus::HEALTHY); // Clean up connections cleanupHostConnections(); @@ -436,14 +431,8 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTcp) { RELEASE_ASSERT(result, result.message()); host_upstream_->set_allow_unexpected_disconnects(true); - // Envoy reports back to server - ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); - - // Check that the response is correct - auto endpoint = response_.endpoint_health_response().endpoints_health(0); - EXPECT_EQ(envoy::api::v2::core::HealthStatus::UNHEALTHY, endpoint.health_status()); - EXPECT_EQ(host_upstream_->localAddress()->ip()->port(), - endpoint.endpoint().address().socket_address().port_value()); + // Receive updates until the one we expect arrives + waitForEndpointHealthResponse(envoy::api::v2::core::HealthStatus::UNHEALTHY); // Clean up connections cleanupHostConnections(); @@ -479,16 +468,17 @@ TEST_P(HdsIntegrationTest, TwoEndpointsSameLocality) { host2_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); host2_stream_->encodeData(1024, true); - // Envoy reports back to server + // Receive updates until the one we expect arrives ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + while (!(checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::UNHEALTHY, + host_upstream_->localAddress()) && + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(1), + envoy::api::v2::core::HealthStatus::HEALTHY, + host2_upstream_->localAddress()))) { + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + } - // Check that the response is correct - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), - envoy::api::v2::core::HealthStatus::UNHEALTHY, - host_upstream_->localAddress()); - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(1), - envoy::api::v2::core::HealthStatus::HEALTHY, - host2_upstream_->localAddress()); checkCounters(1, 2, 1, 1); // Clean up connections @@ -508,8 +498,8 @@ TEST_P(HdsIntegrationTest, TwoEndpointsDifferentLocality) { Network::Utility::addressToProtobufAddress( *host2_upstream_->localAddress(), *health_check->add_locality_endpoints()->add_endpoints()->mutable_address()); - health_check->mutable_locality_endpoints(1)->mutable_locality()->set_region("different_region"); - health_check->mutable_locality_endpoints(1)->mutable_locality()->set_zone("different_zone"); + health_check->mutable_locality_endpoints(1)->mutable_locality()->set_region("plakias"); + health_check->mutable_locality_endpoints(1)->mutable_locality()->set_zone("fragokastelo"); health_check->mutable_locality_endpoints(1)->mutable_locality()->set_sub_zone("emplisi"); // Server <--> Envoy @@ -530,16 +520,17 @@ TEST_P(HdsIntegrationTest, TwoEndpointsDifferentLocality) { host2_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); host2_stream_->encodeData(1024, true); - // Envoy reports back to server + // Receive updates until the one we expect arrives ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + while (!(checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::UNHEALTHY, + host_upstream_->localAddress()) && + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(1), + envoy::api::v2::core::HealthStatus::HEALTHY, + host2_upstream_->localAddress()))) { + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + } - // Check that the response is correct - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), - envoy::api::v2::core::HealthStatus::UNHEALTHY, - host_upstream_->localAddress()); - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(1), - envoy::api::v2::core::HealthStatus::HEALTHY, - host2_upstream_->localAddress()); checkCounters(1, 2, 1, 1); // Clean up connections @@ -560,12 +551,12 @@ TEST_P(HdsIntegrationTest, TwoEndpointsDifferentClusters) { Network::Utility::addressToProtobufAddress( *host2_upstream_->localAddress(), *health_check->add_locality_endpoints()->add_endpoints()->mutable_address()); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_region("peculiar_region"); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_zone("peculiar_zone"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_region("kounopetra"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_zone("emplisi"); health_check->mutable_locality_endpoints(0)->mutable_locality()->set_sub_zone("paris"); - health_check->add_health_checks()->mutable_timeout()->set_seconds(2); - health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(1); + health_check->add_health_checks()->mutable_timeout()->set_seconds(MaxTimeout); + health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(MaxTimeout); health_check->mutable_health_checks(0)->mutable_unhealthy_threshold()->set_value(2); health_check->mutable_health_checks(0)->mutable_healthy_threshold()->set_value(2); health_check->mutable_health_checks(0)->mutable_grpc_health_check(); @@ -590,16 +581,17 @@ TEST_P(HdsIntegrationTest, TwoEndpointsDifferentClusters) { host2_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); host2_stream_->encodeData(1024, true); - // Envoy reports back to server + // Receive updates until the one we expect arrives ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + while (!(checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::UNHEALTHY, + host_upstream_->localAddress()) && + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(1), + envoy::api::v2::core::HealthStatus::HEALTHY, + host2_upstream_->localAddress()))) { + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + } - // Check that the response is correct - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), - envoy::api::v2::core::HealthStatus::UNHEALTHY, - host_upstream_->localAddress()); - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(1), - envoy::api::v2::core::HealthStatus::HEALTHY, - host2_upstream_->localAddress()); checkCounters(1, 2, 0, 1); EXPECT_EQ(1, test_server_->counter("cluster.cat.health_check.success")->value()); EXPECT_EQ(0, test_server_->counter("cluster.cat.health_check.failure")->value()); @@ -631,20 +623,16 @@ TEST_P(HdsIntegrationTest, TestUpdateMessage) { host_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); host_stream_->encodeData(1024, true); - // Envoy reports back to server - ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + // Receive updates until the one we expect arrives + waitForEndpointHealthResponse(envoy::api::v2::core::HealthStatus::HEALTHY); - // Check that the response is correct - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), - envoy::api::v2::core::HealthStatus::HEALTHY, - host_upstream_->localAddress()); checkCounters(1, 2, 1, 0); cleanupHostConnections(); // New HealthCheckSpecifier message envoy::service::discovery::v2::HealthCheckSpecifier new_message; - new_message.mutable_interval()->set_seconds(1); + new_message.mutable_interval()->set_nanos(100000000); // 0.1 seconds auto* health_check = new_message.add_cluster_health_checks(); @@ -653,12 +641,12 @@ TEST_P(HdsIntegrationTest, TestUpdateMessage) { *host2_upstream_->localAddress(), *health_check->add_locality_endpoints()->add_endpoints()->mutable_address()); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_region("peculiar_region"); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_zone("peculiar_zone"); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_sub_zone("paris"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_region("matala"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_zone("tilburg"); + health_check->mutable_locality_endpoints(0)->mutable_locality()->set_sub_zone("rivendell"); - health_check->add_health_checks()->mutable_timeout()->set_seconds(2); - health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(1); + health_check->add_health_checks()->mutable_timeout()->set_seconds(MaxTimeout); + health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(MaxTimeout); health_check->mutable_health_checks(0)->mutable_unhealthy_threshold()->set_value(2); health_check->mutable_health_checks(0)->mutable_healthy_threshold()->set_value(2); health_check->mutable_health_checks(0)->mutable_grpc_health_check(); @@ -674,17 +662,18 @@ TEST_P(HdsIntegrationTest, TestUpdateMessage) { ASSERT_TRUE(host2_fake_connection_->waitForNewStream(*dispatcher_, host2_stream_)); ASSERT_TRUE(host2_stream_->waitForEndStream(*dispatcher_)); host2_upstream_->set_allow_unexpected_disconnects(true); + // Endpoint responds to the health check host2_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "404"}}, false); host2_stream_->encodeData(1024, true); - // Envoy reports back to server + // Receive updates until the one we expect arrives ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); - - // Check that the response is correct - checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), - envoy::api::v2::core::HealthStatus::UNHEALTHY, - host2_upstream_->localAddress()); + while (!checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::UNHEALTHY, + host2_upstream_->localAddress())) { + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + } // Clean up connections cleanupHostConnections();