diff --git a/source/common/upstream/health_checker_impl.cc b/source/common/upstream/health_checker_impl.cc index 5afb9c14159ec..9efe775a754af 100644 --- a/source/common/upstream/health_checker_impl.cc +++ b/source/common/upstream/health_checker_impl.cc @@ -360,6 +360,7 @@ TcpHealthCheckerImpl::TcpActiveHealthCheckSession::~TcpActiveHealthCheckSession( void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onDeferredDelete() { if (client_) { + expect_close_ = true; client_->close(Network::ConnectionCloseType::NoFlush); } } @@ -371,6 +372,7 @@ void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onData(Buffer::Instance& data.drain(data.length()); handleSuccess(false); if (!parent_.reuse_connection_) { + expect_close_ = true; client_->close(Network::ConnectionCloseType::NoFlush); } } else { @@ -379,12 +381,11 @@ void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onData(Buffer::Instance& } void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) { - if (event == Network::ConnectionEvent::RemoteClose) { - handleFailure(envoy::data::core::v2alpha::HealthCheckFailureType::NETWORK); - } - if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { + if (!expect_close_) { + handleFailure(envoy::data::core::v2alpha::HealthCheckFailureType::NETWORK); + } parent_.dispatcher_.deferredDelete(std::move(client_)); } @@ -403,6 +404,7 @@ void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onEvent(Network::Connect // TODO(mattklein123): In the case that a user configured bytes to write, they will not be // be written, since we currently have no way to know if the bytes actually get written via // the connection interface. We might want to figure out how to handle this better later. + expect_close_ = true; client_->close(Network::ConnectionCloseType::NoFlush); handleSuccess(false); } @@ -416,6 +418,7 @@ void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onInterval() { client_->addConnectionCallbacks(*session_callbacks_); client_->addReadFilter(session_callbacks_); + expect_close_ = false; client_->connect(); client_->noDelay(true); } @@ -431,6 +434,7 @@ void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onInterval() { } void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onTimeout() { + expect_close_ = true; host_->setActiveHealthFailureType(Host::ActiveHealthFailureType::TIMEOUT); client_->close(Network::ConnectionCloseType::NoFlush); } diff --git a/source/common/upstream/health_checker_impl.h b/source/common/upstream/health_checker_impl.h index a1d7b6fcf4071..f05f44ea84715 100644 --- a/source/common/upstream/health_checker_impl.h +++ b/source/common/upstream/health_checker_impl.h @@ -252,6 +252,9 @@ class TcpHealthCheckerImpl : public HealthCheckerImplBase { TcpHealthCheckerImpl& parent_; Network::ClientConnectionPtr client_; std::shared_ptr session_callbacks_; + // If true, stream close was initiated by us, not e.g. remote close or TCP reset. + // In this case healthcheck status already reported, only state cleanup required. + bool expect_close_{}; }; typedef std::unique_ptr TcpActiveHealthCheckSessionPtr; diff --git a/test/common/upstream/health_checker_impl_test.cc b/test/common/upstream/health_checker_impl_test.cc index 9455591ce54a4..d00342bcfb636 100644 --- a/test/common/upstream/health_checker_impl_test.cc +++ b/test/common/upstream/health_checker_impl_test.cc @@ -190,6 +190,28 @@ class HttpHealthCheckerImplTest : public testing::Test { }); } + void setupNoServiceValidationHCOneUnhealthy() { + const std::string yaml = R"EOF( + timeout: 1s + interval: 1s + no_traffic_interval: 5s + interval_jitter: 1s + unhealthy_threshold: 1 + healthy_threshold: 2 + http_health_check: + service_name: locations + path: /healthcheck + )EOF"; + + health_checker_.reset(new TestHttpHealthCheckerImpl(*cluster_, parseHealthCheckFromV2Yaml(yaml), + dispatcher_, runtime_, random_, + HealthCheckEventLoggerPtr(event_logger_))); + health_checker_->addHostCheckCompleteCb( + [this](HostSharedPtr host, HealthTransition changed_state) -> void { + onHostStatus(host, changed_state); + }); + } + void setupNoServiceValidationHCAlwaysLogFailure() { const std::string yaml = R"EOF( timeout: 1s @@ -214,20 +236,18 @@ class HttpHealthCheckerImplTest : public testing::Test { } void setupNoServiceValidationNoReuseConnectionHC() { - std::string json = R"EOF( - { - "type": "http", - "timeout_ms": 1000, - "interval_ms": 1000, - "interval_jitter_ms": 1000, - "unhealthy_threshold": 2, - "healthy_threshold": 2, - "reuse_connection": false, - "path": "/healthcheck" - } + std::string yaml = R"EOF( + timeout: 1s + interval: 1s + interval_jitter: 1s + unhealthy_threshold: 2 + healthy_threshold: 2 + reuse_connection: false + http_health_check: + path: /healthcheck )EOF"; - health_checker_.reset(new TestHttpHealthCheckerImpl(*cluster_, parseHealthCheckFromV1Json(json), + health_checker_.reset(new TestHttpHealthCheckerImpl(*cluster_, parseHealthCheckFromV2Yaml(yaml), dispatcher_, runtime_, random_, HealthCheckEventLoggerPtr(event_logger_))); health_checker_->addHostCheckCompleteCb( @@ -262,20 +282,18 @@ class HttpHealthCheckerImplTest : public testing::Test { } void setupServiceValidationHC() { - std::string json = R"EOF( - { - "type": "http", - "timeout_ms": 1000, - "interval_ms": 1000, - "service_name": "locations", - "interval_jitter_ms": 1000, - "unhealthy_threshold": 2, - "healthy_threshold": 2, - "path": "/healthcheck" - } + std::string yaml = R"EOF( + timeout: 1s + interval: 1s + interval_jitter: 1s + unhealthy_threshold: 2 + healthy_threshold: 2 + http_health_check: + service_name: locations + path: /healthcheck )EOF"; - health_checker_.reset(new TestHttpHealthCheckerImpl(*cluster_, parseHealthCheckFromV1Json(json), + health_checker_.reset(new TestHttpHealthCheckerImpl(*cluster_, parseHealthCheckFromV2Yaml(yaml), dispatcher_, runtime_, random_, HealthCheckEventLoggerPtr(event_logger_))); health_checker_->addHostCheckCompleteCb( @@ -1366,6 +1384,29 @@ TEST_F(HttpHealthCheckerImplTest, Disconnect) { } TEST_F(HttpHealthCheckerImplTest, Timeout) { + setupNoServiceValidationHCOneUnhealthy(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + expectSessionCreate(); + expectStreamCreate(0); + EXPECT_CALL(*test_sessions_[0]->timeout_timer_, enableTimer(_)); + health_checker_->start(); + + EXPECT_CALL(*this, onHostStatus(_, HealthTransition::Changed)); + EXPECT_CALL(*test_sessions_[0]->client_connection_, close(_)); + EXPECT_CALL(*test_sessions_[0]->interval_timer_, enableTimer(_)); + EXPECT_CALL(*test_sessions_[0]->timeout_timer_, disableTimer()); + EXPECT_CALL(*event_logger_, logUnhealthy(_, _, _, true)); + EXPECT_CALL(*event_logger_, logEjectUnhealthy(_, _, _)); + test_sessions_[0]->timeout_timer_->callback_(); + EXPECT_EQ(Host::Health::Unhealthy, + cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->health()); + + EXPECT_EQ(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->getActiveHealthFailureType(), + Host::ActiveHealthFailureType::TIMEOUT); +} + +TEST_F(HttpHealthCheckerImplTest, TimeoutThenRemoteClose) { setupNoServiceValidationHC(); EXPECT_CALL(*event_logger_, logUnhealthy(_, _, _, true)); cluster_->prioritySet().getMockHostSet(0)->hosts_ = { @@ -2170,65 +2211,55 @@ class TcpHealthCheckerImplTest : public testing::Test { : cluster_(new NiceMock()), event_logger_(new MockHealthCheckEventLogger()) {} - void setupData() { - std::string json = R"EOF( - { - "type": "tcp", - "timeout_ms": 1000, - "interval_ms": 1000, - "unhealthy_threshold": 2, - "healthy_threshold": 2, - "send": [ - {"binary": "01"} - ], - "receive": [ - {"binary": "02"} - ] - } + void setupData(unsigned int unhealthy_threshold = 2) { + std::ostringstream yaml; + yaml << R"EOF( + timeout: 1s + interval: 1s + unhealthy_threshold: )EOF" + << unhealthy_threshold << R"EOF( + healthy_threshold: 2 + tcp_health_check: + send: + text: "01" + receive: + - text: "02" )EOF"; - health_checker_.reset(new TcpHealthCheckerImpl(*cluster_, parseHealthCheckFromV1Json(json), - dispatcher_, runtime_, random_, - HealthCheckEventLoggerPtr(event_logger_))); + health_checker_.reset( + new TcpHealthCheckerImpl(*cluster_, parseHealthCheckFromV2Yaml(yaml.str()), dispatcher_, + runtime_, random_, HealthCheckEventLoggerPtr(event_logger_))); } void setupNoData() { - std::string json = R"EOF( - { - "type": "tcp", - "timeout_ms": 1000, - "interval_ms": 1000, - "unhealthy_threshold": 2, - "healthy_threshold": 2, - "send": [], - "receive": [] - } + std::string yaml = R"EOF( + timeout: 1s + interval: 1s + unhealthy_threshold: 2 + healthy_threshold: 2 + tcp_health_check: {} )EOF"; - health_checker_.reset(new TcpHealthCheckerImpl(*cluster_, parseHealthCheckFromV1Json(json), + health_checker_.reset(new TcpHealthCheckerImpl(*cluster_, parseHealthCheckFromV2Yaml(yaml), dispatcher_, runtime_, random_, HealthCheckEventLoggerPtr(event_logger_))); } void setupDataDontReuseConnection() { - std::string json = R"EOF( - { - "type": "tcp", - "timeout_ms": 1000, - "interval_ms": 1000, - "unhealthy_threshold": 2, - "healthy_threshold": 2, - "reuse_connection": false, - "send": [ - {"binary": "01"} - ], - "receive": [ - {"binary": "02"} - ] - } - )EOF"; + std::string yaml = R"EOF( + timeout: 1s + interval: 1s + unhealthy_threshold: 2 + healthy_threshold: 2 + reuse_connection: false + tcp_health_check: + send: + text: "01" + receive: + - text: "02" + )EOF"; - health_checker_.reset(new TcpHealthCheckerImpl(*cluster_, parseHealthCheckFromV1Json(json), + health_checker_.reset(new TcpHealthCheckerImpl(*cluster_, parseHealthCheckFromV2Yaml(yaml), dispatcher_, runtime_, random_, HealthCheckEventLoggerPtr(event_logger_))); } @@ -2336,7 +2367,7 @@ TEST_F(TcpHealthCheckerImplTest, WrongData) { Host::ActiveHealthFailureType::UNHEALTHY); } -TEST_F(TcpHealthCheckerImplTest, Timeout) { +TEST_F(TcpHealthCheckerImplTest, TimeoutThenRemoteClose) { InSequence s; setupData(); @@ -2396,6 +2427,101 @@ TEST_F(TcpHealthCheckerImplTest, Timeout) { cluster_->prioritySet().getMockHostSet(0)->runCallbacks({}, removed); } +TEST_F(TcpHealthCheckerImplTest, Timeout) { + InSequence s; + + setupData(1); + health_checker_->start(); + + expectSessionCreate(); + expectClientCreate(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + EXPECT_CALL(*connection_, write(_, _)); + EXPECT_CALL(*timeout_timer_, enableTimer(_)); + + cluster_->prioritySet().getMockHostSet(0)->runCallbacks( + {cluster_->prioritySet().getMockHostSet(0)->hosts_.back()}, {}); + + connection_->raiseEvent(Network::ConnectionEvent::Connected); + + Buffer::OwnedImpl response; + add_uint8(response, 1); + read_filter_->onData(response, false); + + EXPECT_CALL(*connection_, close(_)); + EXPECT_CALL(*event_logger_, logEjectUnhealthy(_, _, _)); + EXPECT_CALL(*event_logger_, logUnhealthy(_, _, _, true)); + EXPECT_CALL(*timeout_timer_, disableTimer()); + EXPECT_CALL(*interval_timer_, enableTimer(_)); + timeout_timer_->callback_(); + EXPECT_EQ(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->getActiveHealthFailureType(), + Host::ActiveHealthFailureType::TIMEOUT); + EXPECT_EQ(Host::Health::Unhealthy, + cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->health()); +} + +TEST_F(TcpHealthCheckerImplTest, DoubleTimeout) { + InSequence s; + + setupData(); + health_checker_->start(); + + expectSessionCreate(); + expectClientCreate(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + EXPECT_CALL(*connection_, write(_, _)); + EXPECT_CALL(*timeout_timer_, enableTimer(_)); + + cluster_->prioritySet().getMockHostSet(0)->runCallbacks( + {cluster_->prioritySet().getMockHostSet(0)->hosts_.back()}, {}); + + connection_->raiseEvent(Network::ConnectionEvent::Connected); + + Buffer::OwnedImpl response; + add_uint8(response, 1); + read_filter_->onData(response, false); + + EXPECT_CALL(*connection_, close(_)); + EXPECT_CALL(*event_logger_, logUnhealthy(_, _, _, true)); + EXPECT_CALL(*timeout_timer_, disableTimer()); + EXPECT_CALL(*interval_timer_, enableTimer(_)); + timeout_timer_->callback_(); + EXPECT_EQ(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->getActiveHealthFailureType(), + Host::ActiveHealthFailureType::TIMEOUT); + EXPECT_EQ(Host::Health::Healthy, cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->health()); + + expectClientCreate(); + EXPECT_CALL(*connection_, write(_, _)); + EXPECT_CALL(*timeout_timer_, enableTimer(_)); + interval_timer_->callback_(); + + connection_->raiseEvent(Network::ConnectionEvent::Connected); + + EXPECT_CALL(*connection_, close(_)); + EXPECT_CALL(*event_logger_, logEjectUnhealthy(_, _, _)); + EXPECT_CALL(*timeout_timer_, disableTimer()); + EXPECT_CALL(*interval_timer_, enableTimer(_)); + timeout_timer_->callback_(); + EXPECT_EQ(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->getActiveHealthFailureType(), + Host::ActiveHealthFailureType::TIMEOUT); + EXPECT_EQ(Host::Health::Unhealthy, + cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->health()); + + expectClientCreate(); + EXPECT_CALL(*connection_, write(_, _)); + EXPECT_CALL(*timeout_timer_, enableTimer(_)); + interval_timer_->callback_(); + + connection_->raiseEvent(Network::ConnectionEvent::Connected); + + HostVector removed{cluster_->prioritySet().getMockHostSet(0)->hosts_.back()}; + cluster_->prioritySet().getMockHostSet(0)->hosts_.clear(); + EXPECT_CALL(*connection_, close(_)); + cluster_->prioritySet().getMockHostSet(0)->runCallbacks({}, removed); +} + // Tests that when reuse_connection is false timeouts execute normally. TEST_F(TcpHealthCheckerImplTest, TimeoutWithoutReusingConnection) { InSequence s; @@ -2585,6 +2711,33 @@ TEST_F(TcpHealthCheckerImplTest, PassiveFailureCrossThreadRemoveClusterRace) { EXPECT_EQ(0UL, cluster_->info_->stats_store_.counter("health_check.passive_failure").value()); } +TEST_F(TcpHealthCheckerImplTest, ConnectionLocalFailure) { + InSequence s; + + setupData(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + expectSessionCreate(); + expectClientCreate(); + EXPECT_CALL(*connection_, write(_, _)); + EXPECT_CALL(*timeout_timer_, enableTimer(_)); + health_checker_->start(); + + // Expect the LocalClose to be handled as a health check failure + EXPECT_CALL(*event_logger_, logUnhealthy(_, _, _, true)); + EXPECT_CALL(*timeout_timer_, disableTimer()); + EXPECT_CALL(*interval_timer_, enableTimer(_)); + + // Raise a LocalClose that is not triggered by the health monitor itself. + // e.g. a failure to setsockopt(). + connection_->raiseEvent(Network::ConnectionEvent::LocalClose); + + EXPECT_EQ(1UL, cluster_->info_->stats_store_.counter("health_check.attempt").value()); + EXPECT_EQ(0UL, cluster_->info_->stats_store_.counter("health_check.success").value()); + EXPECT_EQ(1UL, cluster_->info_->stats_store_.counter("health_check.failure").value()); + EXPECT_EQ(0UL, cluster_->info_->stats_store_.counter("health_check.passive_failure").value()); +} + class TestGrpcHealthCheckerImpl : public GrpcHealthCheckerImpl { public: using GrpcHealthCheckerImpl::GrpcHealthCheckerImpl; @@ -3210,8 +3363,26 @@ TEST_F(GrpcHealthCheckerImplTest, Disconnect) { expectHostHealthy(false); } -// Test timeouts produce network-type failures which does not lead to immediate unhealthy state. TEST_F(GrpcHealthCheckerImplTest, Timeout) { + setupHCWithUnhealthyThreshold(1); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + expectSessionCreate(); + + expectHealthcheckStart(0); + EXPECT_CALL(*event_logger_, logUnhealthy(_, _, _, true)); + health_checker_->start(); + + expectHealthcheckStop(0); + // Unhealthy threshold is 1 so first timeout causes unhealthy + EXPECT_CALL(*this, onHostStatus(_, HealthTransition::Changed)); + EXPECT_CALL(*event_logger_, logEjectUnhealthy(_, _, _)); + test_sessions_[0]->timeout_timer_->callback_(); + expectHostHealthy(false); +} + +// Test timeouts produce network-type failures which does not lead to immediate unhealthy state. +TEST_F(GrpcHealthCheckerImplTest, DoubleTimeout) { setupHC(); cluster_->prioritySet().getMockHostSet(0)->hosts_ = { makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; diff --git a/test/common/upstream/utility.h b/test/common/upstream/utility.h index b5ef983aab6dc..658e871326ddc 100644 --- a/test/common/upstream/utility.h +++ b/test/common/upstream/utility.h @@ -126,14 +126,6 @@ parseHealthCheckFromV2Yaml(const std::string& yaml_string) { return health_check; } -inline envoy::api::v2::core::HealthCheck -parseHealthCheckFromV1Json(const std::string& json_string) { - envoy::api::v2::core::HealthCheck health_check; - auto json_object_ptr = Json::Factory::loadFromString(json_string); - Config::CdsJson::translateHealthCheck(*json_object_ptr, health_check); - return health_check; -} - inline PrioritySet::UpdateHostsParams updateHostsParams(HostVectorConstSharedPtr hosts, HostsPerLocalityConstSharedPtr hosts_per_locality, HealthyHostVectorConstSharedPtr healthy_hosts,