Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions source/common/upstream/health_checker_base_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ HealthCheckerImplBase::HealthCheckerImplBase(const Cluster& cluster,
});
}

HealthCheckerImplBase::~HealthCheckerImplBase() {
// Make sure that any sessions that were deferred deleted are cleared before we destruct.
dispatcher_.clearDeferredDeleteList();
}

void HealthCheckerImplBase::decHealthy() {
ASSERT(local_process_healthy_ > 0);
local_process_healthy_--;
Expand Down Expand Up @@ -135,6 +140,9 @@ void HealthCheckerImplBase::onClusterMemberUpdate(const HostVector& hosts_added,
for (const HostSharedPtr& host : hosts_removed) {
auto session_iter = active_sessions_.find(host);
ASSERT(active_sessions_.end() != session_iter);
// This deletion can happen inline in response to a host failure, so we deferred delete.
session_iter->second->onDeferredDeleteBase();
dispatcher_.deferredDelete(std::move(session_iter->second));
active_sessions_.erase(session_iter);
}
}
Expand Down Expand Up @@ -220,6 +228,14 @@ HealthCheckerImplBase::ActiveHealthCheckSession::~ActiveHealthCheckSession() {
}
}

void HealthCheckerImplBase::ActiveHealthCheckSession::onDeferredDeleteBase() {
// The session is about to be deferred deleted. Make sure all timers are gone and any
// implementation specific state is destroyed.
interval_timer_.reset();
timeout_timer_.reset();
onDeferredDelete();
}

void HealthCheckerImplBase::ActiveHealthCheckSession::handleSuccess(bool degraded) {
// If we are healthy, reset the # of unhealthy to zero.
num_unhealthy_ = 0;
Expand Down Expand Up @@ -310,8 +326,14 @@ HealthTransition HealthCheckerImplBase::ActiveHealthCheckSession::setUnhealthy(
void HealthCheckerImplBase::ActiveHealthCheckSession::handleFailure(
envoy::data::core::v2alpha::HealthCheckFailureType type) {
HealthTransition changed_state = setUnhealthy(type);
timeout_timer_->disableTimer();
interval_timer_->enableTimer(parent_.interval(HealthState::Unhealthy, changed_state));
// It's possible that the previous call caused this session to be deferred deleted.
if (timeout_timer_ != nullptr) {
timeout_timer_->disableTimer();
}

if (interval_timer_ != nullptr) {
interval_timer_->enableTimer(parent_.interval(HealthState::Unhealthy, changed_state));
}
}

void HealthCheckerImplBase::ActiveHealthCheckSession::onIntervalBase() {
Expand Down
5 changes: 4 additions & 1 deletion source/common/upstream/health_checker_base_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ class HealthCheckerImplBase : public HealthChecker,
void start() override;

protected:
class ActiveHealthCheckSession {
class ActiveHealthCheckSession : public Event::DeferredDeletable {
public:
virtual ~ActiveHealthCheckSession();
HealthTransition setUnhealthy(envoy::data::core::v2alpha::HealthCheckFailureType type);
void start() { onIntervalBase(); }
void onDeferredDeleteBase();

protected:
ActiveHealthCheckSession(HealthCheckerImplBase& parent, HostSharedPtr host);
Expand All @@ -66,6 +67,7 @@ class HealthCheckerImplBase : public HealthChecker,
void onIntervalBase();
virtual void onTimeout() PURE;
void onTimeoutBase();
virtual void onDeferredDelete() PURE;

HealthCheckerImplBase& parent_;
Event::TimerPtr interval_timer_;
Expand All @@ -80,6 +82,7 @@ class HealthCheckerImplBase : public HealthChecker,
HealthCheckerImplBase(const Cluster& cluster, const envoy::api::v2::core::HealthCheck& config,
Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, HealthCheckEventLoggerPtr&& event_logger);
~HealthCheckerImplBase();

virtual ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) PURE;
virtual envoy::data::core::v2alpha::HealthCheckerType healthCheckerType() const PURE;
Expand Down
15 changes: 15 additions & 0 deletions source/common/upstream/health_checker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ HttpHealthCheckerImpl::HttpActiveHealthCheckSession::HttpActiveHealthCheckSessio
local_address_(std::make_shared<Network::Address::Ipv4Instance>("127.0.0.1")) {}

HttpHealthCheckerImpl::HttpActiveHealthCheckSession::~HttpActiveHealthCheckSession() {
onDeferredDelete();
ASSERT(client_ == nullptr);
}

void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onDeferredDelete() {
if (client_) {
// If there is an active request it will get reset, so make sure we ignore the reset.
expect_reset_ = true;
Expand Down Expand Up @@ -349,6 +354,11 @@ TcpHealthCheckerImpl::TcpHealthCheckerImpl(const Cluster& cluster,
receive_bytes_(TcpHealthCheckMatcher::loadProtoBytes(config.tcp_health_check().receive())) {}

TcpHealthCheckerImpl::TcpActiveHealthCheckSession::~TcpActiveHealthCheckSession() {
onDeferredDelete();
ASSERT(client_ == nullptr);
}

void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onDeferredDelete() {
if (client_) {
client_->close(Network::ConnectionCloseType::NoFlush);
}
Expand Down Expand Up @@ -448,6 +458,11 @@ GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::GrpcActiveHealthCheckSessio
: ActiveHealthCheckSession(parent, host), parent_(parent) {}

GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::~GrpcActiveHealthCheckSession() {
onDeferredDelete();
ASSERT(client_ == nullptr);
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onDeferredDelete() {
if (client_) {
// If there is an active request it will get reset, so make sure we ignore the reset.
expect_reset_ = true;
Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/health_checker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase {
// ActiveHealthCheckSession
void onInterval() override;
void onTimeout() override;
void onDeferredDelete() final;

// Http::StreamDecoder
void decode100ContinueHeaders(Http::HeaderMapPtr&&) override {}
Expand Down Expand Up @@ -246,6 +247,7 @@ class TcpHealthCheckerImpl : public HealthCheckerImplBase {
// ActiveHealthCheckSession
void onInterval() override;
void onTimeout() override;
void onDeferredDelete() final;

TcpHealthCheckerImpl& parent_;
Network::ClientConnectionPtr client_;
Expand Down Expand Up @@ -292,6 +294,7 @@ class GrpcHealthCheckerImpl : public HealthCheckerImplBase {
// ActiveHealthCheckSession
void onInterval() override;
void onTimeout() override;
void onDeferredDelete() final;

// Http::StreamDecoder
void decode100ContinueHeaders(Http::HeaderMapPtr&&) override {}
Expand Down
6 changes: 6 additions & 0 deletions source/extensions/health_checkers/redis/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ RedisHealthChecker::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession
: ActiveHealthCheckSession(parent, host), parent_(parent) {}

RedisHealthChecker::RedisActiveHealthCheckSession::~RedisActiveHealthCheckSession() {
onDeferredDelete();
ASSERT(current_request_ == nullptr);
ASSERT(client_ == nullptr);
}

void RedisHealthChecker::RedisActiveHealthCheckSession::onDeferredDelete() {
if (current_request_) {
current_request_->cancel();
current_request_ = nullptr;
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/health_checkers/redis/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase {
public Network::ConnectionCallbacks {
RedisActiveHealthCheckSession(RedisHealthChecker& parent, const Upstream::HostSharedPtr& host);
~RedisActiveHealthCheckSession();

// ActiveHealthCheckSession
void onInterval() override;
void onTimeout() override;
void onDeferredDelete() final;

// Extensions::NetworkFilters::Common::Redis::Client::Config
bool disableOutlierEvents() const override { return true; }
Expand Down
3 changes: 3 additions & 0 deletions test/common/upstream/hds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ TEST_F(HdsTest, TestProcessMessageHealthChecks) {
// Check Correctness
EXPECT_EQ(hds_delegate_->hdsClusters()[0]->healthCheckers().size(), 2);
EXPECT_EQ(hds_delegate_->hdsClusters()[1]->healthCheckers().size(), 3);
EXPECT_CALL(dispatcher_, clearDeferredDeleteList()).Times(5);
}

// Tests OnReceiveMessage given a minimal HealthCheckSpecifier message
Expand Down Expand Up @@ -324,6 +325,8 @@ TEST_F(HdsTest, TestSendResponseOneEndpointTimeout) {
.socket_address()
.port_value(),
1234);

EXPECT_CALL(dispatcher_, clearDeferredDeleteList());
}

} // namespace Upstream
Expand Down
25 changes: 24 additions & 1 deletion test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ TEST(HealthCheckerFactoryTest, GrpcHealthCheckHTTP2NotConfiguredException) {
"fake_cluster cluster must support HTTP/2 for gRPC healthchecking");
}

TEST(HealthCheckerFactoryTest, createGrpc) {
TEST(HealthCheckerFactoryTest, CreateGrpc) {

NiceMock<Upstream::MockClusterMockPrioritySet> cluster;
EXPECT_CALL(*cluster.info_, features())
Expand All @@ -81,6 +81,7 @@ TEST(HealthCheckerFactoryTest, createGrpc) {
Event::MockDispatcher dispatcher;
AccessLog::MockAccessLogManager log_manager;

EXPECT_CALL(dispatcher, clearDeferredDeleteList());
EXPECT_NE(nullptr, dynamic_cast<GrpcHealthCheckerImpl*>(
HealthCheckerFactory::create(createGrpcHealthCheckConfig(), cluster,
runtime, random, dispatcher, log_manager)
Expand Down Expand Up @@ -1197,6 +1198,28 @@ TEST_F(HttpHealthCheckerImplTest, SuccessStartFailedFailFirstLogError) {
expectSuccessStartFailedFailFirst();
}

// Verify that removal during a failure callback works.
TEST_F(HttpHealthCheckerImplTest, HttpFailRemoveHostInCallback) {
setupNoServiceValidationHC();
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {
makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")};
expectSessionCreate();
expectStreamCreate(0);
EXPECT_CALL(*test_sessions_[0]->timeout_timer_, enableTimer(_));
health_checker_->start();

EXPECT_CALL(*this, onHostStatus(_, HealthTransition::Changed))
.WillOnce(Invoke([&](HostSharedPtr host, HealthTransition) {
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {};
cluster_->prioritySet().runUpdateCallbacks(0, {}, {host});
}));
EXPECT_CALL(*event_logger_, logEjectUnhealthy(_, _, _));
EXPECT_CALL(*test_sessions_[0]->interval_timer_, enableTimer(_)).Times(0);
EXPECT_CALL(*test_sessions_[0]->timeout_timer_, disableTimer()).Times(0);
EXPECT_CALL(*event_logger_, logUnhealthy(_, _, _, true));
respond(0, "503", false);
}

TEST_F(HttpHealthCheckerImplTest, HttpFail) {
setupNoServiceValidationHC();
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {
Expand Down
10 changes: 6 additions & 4 deletions test/extensions/health_checkers/redis/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace {

typedef Extensions::HealthCheckers::RedisHealthChecker::RedisHealthChecker CustomRedisHealthChecker;

TEST(HealthCheckerFactoryTest, createRedis) {
TEST(HealthCheckerFactoryTest, CreateRedis) {
const std::string yaml = R"EOF(
timeout: 1s
interval: 1s
Expand All @@ -40,7 +40,7 @@ TEST(HealthCheckerFactoryTest, createRedis) {
.get()));
}

TEST(HealthCheckerFactoryTest, createRedisWithoutKey) {
TEST(HealthCheckerFactoryTest, CreateRedisWithoutKey) {
const std::string yaml = R"EOF(
timeout: 1s
interval: 1s
Expand All @@ -63,7 +63,7 @@ TEST(HealthCheckerFactoryTest, createRedisWithoutKey) {
.get()));
}

TEST(HealthCheckerFactoryTest, createRedisWithLogHCFailure) {
TEST(HealthCheckerFactoryTest, CreateRedisWithLogHCFailure) {
const std::string yaml = R"EOF(
timeout: 1s
interval: 1s
Expand All @@ -87,7 +87,7 @@ TEST(HealthCheckerFactoryTest, createRedisWithLogHCFailure) {
.get()));
}

TEST(HealthCheckerFactoryTest, createRedisViaUpstreamHealthCheckerFactory) {
TEST(HealthCheckerFactoryTest, CreateRedisViaUpstreamHealthCheckerFactory) {
const std::string yaml = R"EOF(
timeout: 1s
interval: 1s
Expand All @@ -106,6 +106,8 @@ TEST(HealthCheckerFactoryTest, createRedisViaUpstreamHealthCheckerFactory) {
Runtime::MockRandomGenerator random;
Event::MockDispatcher dispatcher;
AccessLog::MockAccessLogManager log_manager;

EXPECT_CALL(dispatcher, clearDeferredDeleteList());
EXPECT_NE(nullptr, dynamic_cast<CustomRedisHealthChecker*>(
Upstream::HealthCheckerFactory::create(
Upstream::parseHealthCheckFromV2Yaml(yaml), cluster, runtime, random,
Expand Down
Loading