diff --git a/RAW_RELEASE_NOTES.md b/RAW_RELEASE_NOTES.md index f0c675e1349d2..0764b5f7b5d1c 100644 --- a/RAW_RELEASE_NOTES.md +++ b/RAW_RELEASE_NOTES.md @@ -8,6 +8,7 @@ will make it substantially easier for the releaser to "linkify" all of the relea final version. ## 1.6.0 +* Added gRPC healthcheck based on [grpc.health.v1.Health](https://github.com/grpc/grpc/blob/master/src/proto/grpc/health/v1/health.proto) service. * Added Metrics Service implementation. * Added gRPC access logging. * Added DOWNSTREAM_REMOTE_ADDRESS, DOWNSTREAM_REMOTE_ADDRESS_WITHOUT_PORT, and diff --git a/bazel/BUILD b/bazel/BUILD index 9d27d1f2d0ac1..aff1a8ab8a503 100644 --- a/bazel/BUILD +++ b/bazel/BUILD @@ -41,3 +41,8 @@ config_setting( name = "disable_google_grpc", values = {"define": "google_grpc=disabled"}, ) + +cc_proto_library( + name = "grpc_health_proto", + deps = ["@com_github_grpc_grpc//src/proto/grpc/health/v1:_health_proto_only"], +) diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index f74e147cd33e1..88520c78ff6de 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -411,3 +411,8 @@ def _com_github_grpc_grpc(): name = "grpc", actual = "@com_github_grpc_grpc//:grpc++" ) + + native.bind( + name = "grpc_health_proto", + actual = "//bazel:grpc_health_proto", + ) diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index ff86018dd041e..ed3adf030e53d 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -4,6 +4,7 @@ load( "//bazel:envoy_build_system.bzl", "envoy_cc_library", "envoy_package", + "envoy_proto_library", ) envoy_package() @@ -88,11 +89,15 @@ envoy_cc_library( name = "health_checker_lib", srcs = ["health_checker_impl.cc"], hdrs = ["health_checker_impl.h"], - external_deps = ["envoy_health_check"], + external_deps = [ + "envoy_health_check", + "grpc_health_proto", + ], deps = [ ":host_utility_lib", "//include/envoy/event:dispatcher_interface", "//include/envoy/event:timer_interface", + "//include/envoy/grpc:status", "//include/envoy/http:codec_interface", "//include/envoy/http:codes_interface", "//include/envoy/network:connection_interface", @@ -101,11 +106,14 @@ envoy_cc_library( "//include/envoy/stats:stats_interface", "//include/envoy/upstream:health_checker_interface", "//source/common/buffer:buffer_lib", + "//source/common/buffer:zero_copy_input_stream_lib", "//source/common/common:empty_string", "//source/common/common:enum_to_int", "//source/common/common:hex_lib", "//source/common/common:logger_lib", "//source/common/common:utility_lib", + "//source/common/grpc:codec_lib", + "//source/common/grpc:common_lib", "//source/common/http:codec_client_lib", "//source/common/http:header_map_lib", "//source/common/http:headers_lib", diff --git a/source/common/upstream/health_checker_impl.cc b/source/common/upstream/health_checker_impl.cc index 688b3cf578fe6..15e547d25d13f 100644 --- a/source/common/upstream/health_checker_impl.cc +++ b/source/common/upstream/health_checker_impl.cc @@ -12,10 +12,12 @@ #include "envoy/stats/stats.h" #include "common/buffer/buffer_impl.h" +#include "common/buffer/zero_copy_input_stream_impl.h" #include "common/common/empty_string.h" #include "common/common/enum_to_int.h" #include "common/common/hex.h" #include "common/common/utility.h" +#include "common/grpc/common.h" #include "common/http/codec_client.h" #include "common/http/header_map_impl.h" #include "common/http/headers.h" @@ -41,6 +43,13 @@ HealthCheckerSharedPtr HealthCheckerFactory::create(const envoy::api::v2::Health case envoy::api::v2::HealthCheck::HealthCheckerCase::kRedisHealthCheck: return std::make_shared(cluster, hc_config, dispatcher, runtime, random, Redis::ConnPool::ClientFactoryImpl::instance_); + case envoy::api::v2::HealthCheck::HealthCheckerCase::kGrpcHealthCheck: + if (!(cluster.info()->features() & Upstream::ClusterInfo::Features::HTTP2)) { + throw EnvoyException(fmt::format("{} cluster must support HTTP/2 for gRPC healthchecking", + cluster.info()->name())); + } + return std::make_shared(cluster, hc_config, dispatcher, runtime, + random); default: // TODO(htuch): This should be subsumed eventually by the constraint checking in #1308. throw EnvoyException("Health checker type not set"); @@ -281,7 +290,7 @@ HttpHealthCheckerImpl::HttpHealthCheckerImpl(const Cluster& cluster, } HttpHealthCheckerImpl::HttpActiveHealthCheckSession::HttpActiveHealthCheckSession( - HttpHealthCheckerImpl& parent, HostSharedPtr host) + HttpHealthCheckerImpl& parent, const HostSharedPtr& host) : ActiveHealthCheckSession(parent, host), parent_(parent) {} HttpHealthCheckerImpl::HttpActiveHealthCheckSession::~HttpActiveHealthCheckSession() { @@ -520,7 +529,7 @@ RedisHealthCheckerImpl::RedisHealthCheckerImpl(const Cluster& cluster, client_factory_(client_factory) {} RedisHealthCheckerImpl::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession( - RedisHealthCheckerImpl& parent, HostSharedPtr host) + RedisHealthCheckerImpl& parent, const HostSharedPtr& host) : ActiveHealthCheckSession(parent, host), parent_(parent) {} RedisHealthCheckerImpl::RedisActiveHealthCheckSession::~RedisActiveHealthCheckSession() { @@ -586,5 +595,267 @@ RedisHealthCheckerImpl::HealthCheckRequest::HealthCheckRequest() { request_.asArray().swap(values); } +GrpcHealthCheckerImpl::GrpcHealthCheckerImpl(const Cluster& cluster, + const envoy::api::v2::HealthCheck& config, + Event::Dispatcher& dispatcher, + Runtime::Loader& runtime, + Runtime::RandomGenerator& random) + : HealthCheckerImplBase(cluster, config, dispatcher, runtime, random), + service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "grpc.health.v1.Health.Check")) { + if (!config.grpc_health_check().service_name().empty()) { + service_name_.value(config.grpc_health_check().service_name()); + } +} + +GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::GrpcActiveHealthCheckSession( + GrpcHealthCheckerImpl& parent, const HostSharedPtr& host) + : ActiveHealthCheckSession(parent, host), parent_(parent) {} + +GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::~GrpcActiveHealthCheckSession() { + if (client_) { + // If there is an active request it will get reset, so make sure we ignore the reset. + expect_reset_ = true; + client_->close(); + } +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeHeaders( + Http::HeaderMapPtr&& headers, bool end_stream) { + const auto http_response_status = Http::Utility::getResponseStatus(*headers); + if (http_response_status != enumToInt(Http::Code::OK)) { + // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that + // grpc-status be used if available. + if (end_stream) { + const auto grpc_status = Grpc::Common::getGrpcStatus(*headers); + if (grpc_status.valid()) { + onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true); + return; + } + } + onRpcComplete(Grpc::Common::httpToGrpcStatus(http_response_status), "non-200 HTTP response", + end_stream); + return; + } + if (!Grpc::Common::hasGrpcContentType(*headers)) { + onRpcComplete(Grpc::Status::GrpcStatus::Internal, "invalid gRPC content-type", false); + return; + } + if (end_stream) { + // This is how, for instance, grpc-go signals about missing service - HTTP/2 200 OK with + // 'unimplemented' gRPC status. + const auto grpc_status = Grpc::Common::getGrpcStatus(*headers); + if (grpc_status.valid()) { + onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true); + return; + } + onRpcComplete(Grpc::Status::GrpcStatus::Internal, + "gRPC protocol violation: unexpected stream end", true); + } +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeData(Buffer::Instance& data, + bool end_stream) { + if (end_stream) { + onRpcComplete(Grpc::Status::GrpcStatus::Internal, + "gRPC protocol violation: unexpected stream end", true); + return; + } + + // We should end up with only one frame here. + std::vector decoded_frames; + if (!decoder_.decode(data, decoded_frames)) { + onRpcComplete(Grpc::Status::GrpcStatus::Internal, "gRPC wire protocol decode error", false); + } + for (auto& frame : decoded_frames) { + if (frame.length_ > 0) { + if (health_check_response_) { + // grpc.health.v1.Health.Check is unary RPC, so only one message is allowed. + onRpcComplete(Grpc::Status::GrpcStatus::Internal, "unexpected streaming", false); + return; + } + health_check_response_ = std::make_unique(); + Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_)); + + if (frame.flags_ != Grpc::GRPC_FH_DEFAULT || + !health_check_response_->ParseFromZeroCopyStream(&stream)) { + onRpcComplete(Grpc::Status::GrpcStatus::Internal, "invalid grpc.health.v1 RPC payload", + false); + return; + } + } + } +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeTrailers( + Http::HeaderMapPtr&& trailers) { + auto maybe_grpc_status = Grpc::Common::getGrpcStatus(*trailers); + auto grpc_status = + maybe_grpc_status.valid() ? maybe_grpc_status.value() : Grpc::Status::GrpcStatus::Internal; + const std::string grpc_message = + maybe_grpc_status.valid() ? Grpc::Common::getGrpcMessage(*trailers) : "invalid gRPC status"; + onRpcComplete(grpc_status, grpc_message, true); +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) { + if (event == Network::ConnectionEvent::RemoteClose || + event == Network::ConnectionEvent::LocalClose) { + // For the raw disconnect event, we are either between intervals in which case we already have + // a timer setup, or we did the close or got a reset, in which case we already setup a new + // timer. There is nothing to do here other than blow away the client. + parent_.dispatcher_.deferredDelete(std::move(client_)); + } +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onInterval() { + if (!client_) { + Upstream::Host::CreateConnectionData conn = host_->createConnection(parent_.dispatcher_); + client_ = parent_.createCodecClient(conn); + client_->addConnectionCallbacks(connection_callback_impl_); + client_->setCodecConnectionCallbacks(http_connection_callback_impl_); + } + + request_encoder_ = &client_->newStream(*this); + request_encoder_->getStream().addCallbacks(*this); + + auto headers_message = Grpc::Common::prepareHeaders( + parent_.cluster_.info()->name(), parent_.service_method_.service()->full_name(), + parent_.service_method_.name()); + headers_message->headers().insertUserAgent().value().setReference( + Http::Headers::get().UserAgentValues.EnvoyHealthChecker); + + request_encoder_->encodeHeaders(headers_message->headers(), false); + + grpc::health::v1::HealthCheckRequest request; + if (parent_.service_name_.valid()) { + request.set_service(parent_.service_name_.value()); + } + + request_encoder_->encodeData(*Grpc::Common::serializeBody(request), true); +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onResetStream(Http::StreamResetReason) { + const bool expected_reset = expect_reset_; + resetState(); + + if (expected_reset) { + // Stream reset was initiated by us (bogus gRPC response, timeout or cluster host is going + // away). In these cases health check failure has already been reported, so just return. + return; + } + + ENVOY_CONN_LOG(debug, "connection/stream error health_flags={}", *client_, + HostUtility::healthFlagsToString(*host_)); + + // TODO(baranov1ch): according to all HTTP standards, we should check if reason is one of + // Http::StreamResetReason::RemoteRefusedStreamReset (which may mean GOAWAY), + // Http::StreamResetReason::RemoteReset or Http::StreamResetReason::ConnectionTermination (both + // mean connection close), check if connection is not fresh (was used for at least 1 request) + // and silently retry request on the fresh connection. This is also true for HTTP/1.1 healthcheck. + handleFailure(FailureType::Network); +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onGoAway() { + ENVOY_CONN_LOG(debug, "connection going away health_flags={}", *client_, + HostUtility::healthFlagsToString(*host_)); + // Even if we have active health check probe, fail it on GOAWAY and schedule new one. + if (request_encoder_) { + handleFailure(FailureType::Network); + expect_reset_ = true; + request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset); + } + client_->close(); +} + +bool GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::isHealthCheckSucceeded( + Grpc::Status::GrpcStatus grpc_status) const { + if (grpc_status != Grpc::Status::GrpcStatus::Ok) { + return false; + } + + if (!health_check_response_ || + health_check_response_->status() != grpc::health::v1::HealthCheckResponse::SERVING) { + return false; + } + + return true; +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onRpcComplete( + Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message, bool end_stream) { + logHealthCheckStatus(grpc_status, grpc_message); + if (isHealthCheckSucceeded(grpc_status)) { + handleSuccess(); + } else { + handleFailure(FailureType::Active); + } + + // |end_stream| will be false if we decided to stop healthcheck before HTTP stream has ended - + // invalid gRPC payload, unexpected message stream or wrong content-type. + if (end_stream) { + resetState(); + } else { + // resetState() will be called by onResetStream(). + expect_reset_ = true; + request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset); + } + + if (!parent_.reuse_connection_) { + client_->close(); + } +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::resetState() { + expect_reset_ = false; + request_encoder_ = nullptr; + decoder_ = Grpc::Decoder(); + health_check_response_.reset(); +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onTimeout() { + ENVOY_CONN_LOG(debug, "connection/stream timeout health_flags={}", *client_, + HostUtility::healthFlagsToString(*host_)); + expect_reset_ = true; + request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset); +} + +void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::logHealthCheckStatus( + Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message) { + const char* service_status; + if (!health_check_response_) { + service_status = "rpc_error"; + } else { + switch (health_check_response_->status()) { + case grpc::health::v1::HealthCheckResponse::SERVING: + service_status = "serving"; + break; + case grpc::health::v1::HealthCheckResponse::NOT_SERVING: + service_status = "not_serving"; + break; + case grpc::health::v1::HealthCheckResponse::UNKNOWN: + service_status = "unknown"; + break; + default: + // Should not happen really, Protobuf should not parse undefined enums values. + NOT_REACHED; + break; + } + } + std::string grpc_status_message; + if (grpc_status != Grpc::Status::GrpcStatus::Ok && !grpc_message.empty()) { + grpc_status_message = fmt::format("{} ({})", grpc_status, grpc_message); + } else { + grpc_status_message = fmt::format("{}", grpc_status); + } + ENVOY_CONN_LOG(debug, "hc grpc_status={} service_status={} health_flags={}", *client_, + grpc_status_message, service_status, HostUtility::healthFlagsToString(*host_)); +} + +Http::CodecClientPtr +ProdGrpcHealthCheckerImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) { + return std::make_unique( + Http::CodecClient::Type::HTTP2, std::move(data.connection_), data.host_description_); +} + } // namespace Upstream } // namespace Envoy diff --git a/source/common/upstream/health_checker_impl.h b/source/common/upstream/health_checker_impl.h index fa58ba6033da2..1f7d5edc5d21c 100644 --- a/source/common/upstream/health_checker_impl.h +++ b/source/common/upstream/health_checker_impl.h @@ -9,6 +9,7 @@ #include #include "envoy/event/timer.h" +#include "envoy/grpc/status.h" #include "envoy/http/codec.h" #include "envoy/network/connection.h" #include "envoy/network/filter.h" @@ -17,11 +18,13 @@ #include "envoy/upstream/health_checker.h" #include "common/common/logger.h" +#include "common/grpc/codec.h" #include "common/http/codec_client.h" #include "common/network/filter_impl.h" #include "common/protobuf/protobuf.h" #include "api/health_check.pb.h" +#include "src/proto/grpc/health/v1/health.pb.h" namespace Envoy { namespace Upstream { @@ -68,7 +71,7 @@ struct HealthCheckerStats { }; /** - * Base implementation for both the HTTP and TCP health checker. + * Base implementation for all health checkers. */ class HealthCheckerImplBase : public HealthChecker, protected Logger::Loggable, @@ -173,7 +176,7 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase { struct HttpActiveHealthCheckSession : public ActiveHealthCheckSession, public Http::StreamDecoder, public Http::StreamCallbacks { - HttpActiveHealthCheckSession(HttpHealthCheckerImpl& parent, HostSharedPtr host); + HttpActiveHealthCheckSession(HttpHealthCheckerImpl& parent, const HostSharedPtr& host); ~HttpActiveHealthCheckSession(); void onResponseComplete(); @@ -225,7 +228,7 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase { // HealthCheckerImplBase ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) override { - return ActiveHealthCheckSessionPtr{new HttpActiveHealthCheckSession(*this, host)}; + return std::make_unique(*this, host); } const std::string path_; @@ -326,7 +329,7 @@ class TcpHealthCheckerImpl : public HealthCheckerImplBase { }; struct TcpActiveHealthCheckSession : public ActiveHealthCheckSession { - TcpActiveHealthCheckSession(TcpHealthCheckerImpl& parent, HostSharedPtr host) + TcpActiveHealthCheckSession(TcpHealthCheckerImpl& parent, const HostSharedPtr& host) : ActiveHealthCheckSession(parent, host), parent_(parent) {} ~TcpActiveHealthCheckSession(); @@ -346,7 +349,7 @@ class TcpHealthCheckerImpl : public HealthCheckerImplBase { // HealthCheckerImplBase ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) override { - return ActiveHealthCheckSessionPtr{new TcpActiveHealthCheckSession(*this, host)}; + return std::make_unique(*this, host); } const TcpHealthCheckMatcher::MatchSegments send_bytes_; @@ -373,7 +376,7 @@ class RedisHealthCheckerImpl : public HealthCheckerImplBase { public Redis::ConnPool::Config, public Redis::ConnPool::PoolCallbacks, public Network::ConnectionCallbacks { - RedisActiveHealthCheckSession(RedisHealthCheckerImpl& parent, HostSharedPtr host); + RedisActiveHealthCheckSession(RedisHealthCheckerImpl& parent, const HostSharedPtr& host); ~RedisActiveHealthCheckSession(); // ActiveHealthCheckSession @@ -411,11 +414,108 @@ class RedisHealthCheckerImpl : public HealthCheckerImplBase { // HealthCheckerImplBase ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) override { - return ActiveHealthCheckSessionPtr{new RedisActiveHealthCheckSession(*this, host)}; + return std::make_unique(*this, host); } Redis::ConnPool::ClientFactory& client_factory_; }; +/** + * gRPC health checker implementation. + */ +class GrpcHealthCheckerImpl : public HealthCheckerImplBase { +public: + GrpcHealthCheckerImpl(const Cluster& cluster, const envoy::api::v2::HealthCheck& config, + Event::Dispatcher& dispatcher, Runtime::Loader& runtime, + Runtime::RandomGenerator& random); + +private: + struct GrpcActiveHealthCheckSession : public ActiveHealthCheckSession, + public Http::StreamDecoder, + public Http::StreamCallbacks { + GrpcActiveHealthCheckSession(GrpcHealthCheckerImpl& parent, const HostSharedPtr& host); + ~GrpcActiveHealthCheckSession(); + + void onRpcComplete(Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message, + bool end_stream); + bool isHealthCheckSucceeded(Grpc::Status::GrpcStatus grpc_status) const; + void resetState(); + void logHealthCheckStatus(Grpc::Status::GrpcStatus grpc_status, + const std::string& grpc_message); + + // ActiveHealthCheckSession + void onInterval() override; + void onTimeout() override; + + // Http::StreamDecoder + void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; + void decodeData(Buffer::Instance&, bool end_stream) override; + void decodeTrailers(Http::HeaderMapPtr&&) override; + + // Http::StreamCallbacks + void onResetStream(Http::StreamResetReason reason) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + void onEvent(Network::ConnectionEvent event); + void onGoAway(); + + class ConnectionCallbackImpl : public Network::ConnectionCallbacks { + public: + ConnectionCallbackImpl(GrpcActiveHealthCheckSession& parent) : parent_(parent) {} + // Network::ConnectionCallbacks + void onEvent(Network::ConnectionEvent event) override { parent_.onEvent(event); } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + private: + GrpcActiveHealthCheckSession& parent_; + }; + + class HttpConnectionCallbackImpl : public Http::ConnectionCallbacks { + public: + HttpConnectionCallbackImpl(GrpcActiveHealthCheckSession& parent) : parent_(parent) {} + // Http::ConnectionCallbacks + void onGoAway() override { parent_.onGoAway(); } + + private: + GrpcActiveHealthCheckSession& parent_; + }; + + ConnectionCallbackImpl connection_callback_impl_{*this}; + HttpConnectionCallbackImpl http_connection_callback_impl_{*this}; + GrpcHealthCheckerImpl& parent_; + Http::CodecClientPtr client_; + Http::StreamEncoder* request_encoder_; + Grpc::Decoder decoder_; + std::unique_ptr health_check_response_; + // If true, stream reset was initiated by us (GrpcActiveHealthCheckSession), not by HTTP stack, + // e.g. remote reset. In this case healthcheck status has already been reported, only state + // cleanup is required. + bool expect_reset_ = false; + }; + + virtual Http::CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE; + + // HealthCheckerImplBase + ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) override { + return std::make_unique(*this, host); + } + + const Protobuf::MethodDescriptor& service_method_; + Optional service_name_; +}; + +/** + * Production implementation of the gRPC health checker that allocates a real codec client. + */ +class ProdGrpcHealthCheckerImpl : public GrpcHealthCheckerImpl { +public: + using GrpcHealthCheckerImpl::GrpcHealthCheckerImpl; + + // GrpcHealthCheckerImpl + Http::CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) override; +}; + } // namespace Upstream } // namespace Envoy diff --git a/test/common/upstream/health_checker_impl_test.cc b/test/common/upstream/health_checker_impl_test.cc index 840f6bcc84b4b..255511804ea49 100644 --- a/test/common/upstream/health_checker_impl_test.cc +++ b/test/common/upstream/health_checker_impl_test.cc @@ -4,7 +4,9 @@ #include #include "common/buffer/buffer_impl.h" +#include "common/buffer/zero_copy_input_stream_impl.h" #include "common/config/cds_json.h" +#include "common/grpc/common.h" #include "common/http/headers.h" #include "common/json/json_loader.h" #include "common/network/utility.h" @@ -47,6 +49,16 @@ envoy::api::v2::HealthCheck parseHealthCheckFromJson(const std::string& json_str return health_check; } +envoy::api::v2::HealthCheck createGrpcHealthCheckConfig() { + envoy::api::v2::HealthCheck health_check; + health_check.mutable_timeout()->set_seconds(1); + health_check.mutable_interval()->set_seconds(1); + health_check.mutable_unhealthy_threshold()->set_value(2); + health_check.mutable_healthy_threshold()->set_value(2); + health_check.mutable_grpc_health_check(); + return health_check; +} + TEST(HealthCheckerFactoryTest, createRedis) { std::string json = R"EOF( { @@ -86,6 +98,36 @@ TEST(HealthCheckerFactoryTest, MissingFieldiException) { MissingFieldException); } +TEST(HealthCheckerFactoryTest, GrpcHealthCheckHTTP2NotConfiguredException) { + NiceMock cluster; + EXPECT_CALL(*cluster.info_, features()).WillRepeatedly(Return(0)); + + Runtime::MockLoader runtime; + Runtime::MockRandomGenerator random; + Event::MockDispatcher dispatcher; + + EXPECT_THROW_WITH_MESSAGE(HealthCheckerFactory::create(createGrpcHealthCheckConfig(), cluster, + runtime, random, dispatcher), + EnvoyException, + "fake_cluster cluster must support HTTP/2 for gRPC healthchecking"); +} + +TEST(HealthCheckerFactoryTest, createGrpc) { + + NiceMock cluster; + EXPECT_CALL(*cluster.info_, features()) + .WillRepeatedly(Return(Upstream::ClusterInfo::Features::HTTP2)); + + Runtime::MockLoader runtime; + Runtime::MockRandomGenerator random; + Event::MockDispatcher dispatcher; + + EXPECT_NE(nullptr, dynamic_cast( + HealthCheckerFactory::create(createGrpcHealthCheckConfig(), cluster, + runtime, random, dispatcher) + .get())); +} + class TestHttpHealthCheckerImpl : public HttpHealthCheckerImpl { public: using HttpHealthCheckerImpl::HttpHealthCheckerImpl; @@ -1452,6 +1494,818 @@ TEST_F(RedisHealthCheckerImplTest, AllDontReuseConnection) { EXPECT_EQ(2UL, cluster_->info_->stats_store_.counter("health_check.network_failure").value()); } +class TestGrpcHealthCheckerImpl : public GrpcHealthCheckerImpl { +public: + using GrpcHealthCheckerImpl::GrpcHealthCheckerImpl; + + Http::CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& conn_data) override { + auto codec_client = createCodecClient_(conn_data); + return Http::CodecClientPtr(codec_client); + }; + + // GrpcHealthCheckerImpl + MOCK_METHOD1(createCodecClient_, Http::CodecClient*(Upstream::Host::CreateConnectionData&)); +}; + +class GrpcHealthCheckerImplTestBase { +public: + struct TestSession { + TestSession() {} + + Event::MockTimer* interval_timer_{}; + Event::MockTimer* timeout_timer_{}; + Http::MockClientConnection* codec_{}; + Stats::IsolatedStoreImpl stats_store_; + Network::MockClientConnection* client_connection_{}; + NiceMock request_encoder_; + Http::StreamDecoder* stream_response_callbacks_{}; + CodecClientForTest* codec_client_{}; + }; + + typedef std::unique_ptr TestSessionPtr; + + struct ResponseSpec { + struct ChunkSpec { + bool valid; + std::vector data; + }; + static ChunkSpec invalidChunk() { + ChunkSpec spec; + spec.valid = false; + return spec; + } + static ChunkSpec invalidPayload(uint8_t flags, bool valid_message) { + ChunkSpec spec; + spec.valid = true; + spec.data = serializeResponse(grpc::health::v1::HealthCheckResponse::SERVING); + spec.data[0] = flags; + if (!valid_message) { + const size_t kGrpcHeaderSize = 5; + for (size_t i = kGrpcHeaderSize; i < spec.data.size(); i++) { + // Fill payload with some random data. + spec.data[i] = i % 256; + } + } + return spec; + } + static ChunkSpec validChunk(grpc::health::v1::HealthCheckResponse::ServingStatus status) { + ChunkSpec spec; + spec.valid = true; + spec.data = serializeResponse(status); + return spec; + } + + static ChunkSpec servingResponse() { + return validChunk(grpc::health::v1::HealthCheckResponse::SERVING); + } + + static ChunkSpec notServingResponse() { + return validChunk(grpc::health::v1::HealthCheckResponse::NOT_SERVING); + } + + static std::vector + serializeResponse(grpc::health::v1::HealthCheckResponse::ServingStatus status) { + grpc::health::v1::HealthCheckResponse response; + response.set_status(status); + const auto data = Grpc::Common::serializeBody(response); + auto ret = std::vector(data->length(), 0); + data->copyOut(0, data->length(), &ret[0]); + return ret; + } + + std::vector> response_headers; + std::vector body_chunks; + std::vector> trailers; + }; + + GrpcHealthCheckerImplTestBase() : cluster_(new NiceMock()) { + EXPECT_CALL(*cluster_->info_, features()) + .WillRepeatedly(Return(Upstream::ClusterInfo::Features::HTTP2)); + } + + void setupHC() { + const auto config = createGrpcHealthCheckConfig(); + health_checker_.reset( + new TestGrpcHealthCheckerImpl(*cluster_, config, dispatcher_, runtime_, random_)); + health_checker_->addHostCheckCompleteCb([this](HostSharedPtr host, bool changed_state) -> void { + onHostStatus(host, changed_state); + }); + } + + void setupHCWithUnhealthyThreshold(int value) { + auto config = createGrpcHealthCheckConfig(); + config.mutable_unhealthy_threshold()->set_value(value); + health_checker_.reset( + new TestGrpcHealthCheckerImpl(*cluster_, config, dispatcher_, runtime_, random_)); + health_checker_->addHostCheckCompleteCb([this](HostSharedPtr host, bool changed_state) -> void { + onHostStatus(host, changed_state); + }); + } + + void setupServiceNameHC() { + auto config = createGrpcHealthCheckConfig(); + config.mutable_grpc_health_check()->set_service_name("service"); + health_checker_.reset( + new TestGrpcHealthCheckerImpl(*cluster_, config, dispatcher_, runtime_, random_)); + health_checker_->addHostCheckCompleteCb([this](HostSharedPtr host, bool changed_state) -> void { + onHostStatus(host, changed_state); + }); + } + + void setupNoReuseConnectionHC() { + auto config = createGrpcHealthCheckConfig(); + config.mutable_reuse_connection()->set_value(false); + health_checker_.reset( + new TestGrpcHealthCheckerImpl(*cluster_, config, dispatcher_, runtime_, random_)); + health_checker_->addHostCheckCompleteCb([this](HostSharedPtr host, bool changed_state) -> void { + onHostStatus(host, changed_state); + }); + } + + void expectSessionCreate() { + // Expectations are in LIFO order. + TestSessionPtr new_test_session(new TestSession()); + test_sessions_.emplace_back(std::move(new_test_session)); + TestSession& test_session = *test_sessions_.back(); + test_session.timeout_timer_ = new Event::MockTimer(&dispatcher_); + test_session.interval_timer_ = new Event::MockTimer(&dispatcher_); + expectClientCreate(test_sessions_.size() - 1); + } + + void expectClientCreate(size_t index) { + TestSession& test_session = *test_sessions_[index]; + test_session.codec_ = new NiceMock(); + test_session.client_connection_ = new NiceMock(); + connection_index_.push_back(index); + codec_index_.push_back(index); + + EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _)) + .Times(testing::AnyNumber()) + .WillRepeatedly(InvokeWithoutArgs([&]() -> Network::ClientConnection* { + uint32_t index = connection_index_.front(); + connection_index_.pop_front(); + return test_sessions_[index]->client_connection_; + })); + EXPECT_CALL(*health_checker_, createCodecClient_(_)) + .WillRepeatedly( + Invoke([&](Upstream::Host::CreateConnectionData& conn_data) -> Http::CodecClient* { + uint32_t index = codec_index_.front(); + codec_index_.pop_front(); + TestSession& test_session = *test_sessions_[index]; + test_session.codec_client_ = new CodecClientForTest( + std::move(conn_data.connection_), test_session.codec_, nullptr, nullptr); + return test_session.codec_client_; + })); + } + + void expectStreamCreate(size_t index) { + test_sessions_[index]->request_encoder_.stream_.callbacks_.clear(); + EXPECT_CALL(*test_sessions_[index]->codec_, newStream(_)) + .WillOnce(DoAll(SaveArgAddress(&test_sessions_[index]->stream_response_callbacks_), + ReturnRef(test_sessions_[index]->request_encoder_))); + } + + // Starts healthchecker and sets up timer expectations, leaving up future specification of + // healthcheck response for the caller. Useful when there is only one healthcheck attempt + // performed during test case (but possibly on many hosts). + void expectHealthchecks(bool host_state_changed, size_t num_healthchecks) { + for (size_t i = 0; i < num_healthchecks; i++) { + cluster_->info_->stats().upstream_cx_total_.inc(); + expectSessionCreate(); + expectHealthcheckStart(i); + } + health_checker_->start(); + + EXPECT_CALL(runtime_.snapshot_, getInteger("health_check.max_interval", _)) + .Times(num_healthchecks); + EXPECT_CALL(runtime_.snapshot_, getInteger("health_check.min_interval", _)) + .Times(num_healthchecks) + .WillRepeatedly(Return(45000)); + for (size_t i = 0; i < num_healthchecks; i++) { + expectHealthcheckStop(i, 45000); + } + EXPECT_CALL(*this, onHostStatus(_, host_state_changed)).Times(num_healthchecks); + } + + void expectSingleHealthcheck(bool host_state_changed) { + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + expectHealthchecks(host_state_changed, 1); + } + + // Hides timer/stream-related boilerplate of healthcheck start. + void expectHealthcheckStart(size_t index) { + expectStreamCreate(index); + EXPECT_CALL(*test_sessions_[index]->timeout_timer_, enableTimer(_)); + } + + // Hides timer-related boilerplate of healthcheck stop. + void expectHealthcheckStop(size_t index, int interval_ms = 0) { + if (interval_ms > 0) { + EXPECT_CALL(*test_sessions_[index]->interval_timer_, + enableTimer(std::chrono::milliseconds(interval_ms))); + } else { + EXPECT_CALL(*test_sessions_[index]->interval_timer_, enableTimer(_)); + } + EXPECT_CALL(*test_sessions_[index]->timeout_timer_, disableTimer()); + } + + // Hides host status checking boilerplate when only single host is used in test. + void expectHostHealthy(bool healthy) { + const auto host = cluster_->prioritySet().getMockHostSet(0)->hosts_[0]; + if (!healthy) { + EXPECT_TRUE(host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(host->healthy()); + } else { + EXPECT_TRUE(host->healthy()); + } + } + + void respondServiceStatus(size_t index, + grpc::health::v1::HealthCheckResponse::ServingStatus status) { + respondResponseSpec(index, + ResponseSpec{{{":status", "200"}, {"content-type", "application/grpc"}}, + {ResponseSpec::validChunk(status)}, + {{"grpc-status", "0"}}}); + } + + void respondResponseSpec(size_t index, ResponseSpec&& spec) { + const bool trailers_empty = spec.trailers.size() == 0U; + const bool end_stream_on_headers = spec.body_chunks.empty() && trailers_empty; + auto response_headers = std::make_unique(); + for (const auto& header : spec.response_headers) { + response_headers->addCopy(header.first, header.second); + } + test_sessions_[index]->stream_response_callbacks_->decodeHeaders(std::move(response_headers), + end_stream_on_headers); + for (size_t i = 0; i < spec.body_chunks.size(); i++) { + const bool end_stream = i == spec.body_chunks.size() - 1 && trailers_empty; + const auto& chunk = spec.body_chunks[i]; + if (chunk.valid) { + const auto data = std::make_unique(chunk.data.data(), chunk.data.size()); + test_sessions_[index]->stream_response_callbacks_->decodeData(*data, end_stream); + } else { + Buffer::OwnedImpl incorrect_data("incorrect"); + test_sessions_[index]->stream_response_callbacks_->decodeData(incorrect_data, end_stream); + } + } + if (!trailers_empty) { + auto trailers = std::make_unique(); + for (const auto& header : spec.trailers) { + trailers->addCopy(header.first, header.second); + } + test_sessions_[index]->stream_response_callbacks_->decodeTrailers(std::move(trailers)); + } + } + + MOCK_METHOD2(onHostStatus, void(HostSharedPtr host, bool changed_state)); + + std::shared_ptr cluster_; + NiceMock dispatcher_; + std::vector test_sessions_; + std::shared_ptr health_checker_; + NiceMock runtime_; + NiceMock random_; + std::list connection_index_{}; + std::list codec_index_{}; +}; + +class GrpcHealthCheckerImplTest : public GrpcHealthCheckerImplTestBase, public testing::Test {}; + +// Test single host check success. +TEST_F(GrpcHealthCheckerImplTest, Success) { + setupServiceNameHC(); + + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + cluster_->info_->stats().upstream_cx_total_.inc(); + + expectSessionCreate(); + expectHealthcheckStart(0); + + EXPECT_CALL(test_sessions_[0]->request_encoder_, encodeHeaders(_, false)) + .WillOnce(Invoke([&](const Http::HeaderMap& headers, bool) { + EXPECT_EQ(Http::Headers::get().ContentTypeValues.Grpc, + headers.ContentType()->value().c_str()); + EXPECT_EQ(std::string("/grpc.health.v1.Health/Check"), headers.Path()->value().c_str()); + })); + EXPECT_CALL(test_sessions_[0]->request_encoder_, encodeData(_, true)) + .WillOnce(Invoke([&](Buffer::Instance& data, bool) { + std::vector decoded_frames; + Grpc::Decoder decoder; + ASSERT_TRUE(decoder.decode(data, decoded_frames)); + ASSERT_EQ(1U, decoded_frames.size()); + auto& frame = decoded_frames[0]; + Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_)); + grpc::health::v1::HealthCheckRequest request; + ASSERT_TRUE(request.ParseFromZeroCopyStream(&stream)); + EXPECT_EQ("service", request.service()); + })); + health_checker_->start(); + + EXPECT_CALL(runtime_.snapshot_, getInteger("health_check.max_interval", _)); + EXPECT_CALL(runtime_.snapshot_, getInteger("health_check.min_interval", _)) + .WillOnce(Return(45000)); + expectHealthcheckStop(0, 45000); + + // Host state should not be changed (remains healty). + EXPECT_CALL(*this, onHostStatus(cluster_->prioritySet().getMockHostSet(0)->hosts_[0], false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +// Test host check success when gRPC response payload is split between several incoming data chunks. +TEST_F(GrpcHealthCheckerImplTest, SuccessResponseSplitBetweenChunks) { + setupServiceNameHC(); + expectSingleHealthcheck(false); + + auto response_headers = std::make_unique( + std::initializer_list>{ + {":status", "200"}, + {"content-type", "application/grpc"}, + }); + test_sessions_[0]->stream_response_callbacks_->decodeHeaders(std::move(response_headers), false); + + grpc::health::v1::HealthCheckResponse response; + response.set_status(grpc::health::v1::HealthCheckResponse::SERVING); + auto data = Grpc::Common::serializeBody(response); + + const char* raw_data = static_cast(data->linearize(data->length())); + const uint64_t chunk_size = data->length() / 5; + for (uint64_t offset = 0; offset < data->length(); offset += chunk_size) { + const uint64_t effective_size = std::min(chunk_size, data->length() - offset); + const auto chunk = std::make_unique(raw_data + offset, effective_size); + test_sessions_[0]->stream_response_callbacks_->decodeData(*chunk, false); + } + + auto trailers = std::make_unique( + std::initializer_list>{{"grpc-status", "0"}}); + test_sessions_[0]->stream_response_callbacks_->decodeTrailers(std::move(trailers)); + + expectHostHealthy(true); +} + +// Test host check success with multiple hosts. +TEST_F(GrpcHealthCheckerImplTest, SuccessWithMultipleHosts) { + setupHC(); + + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80"), + makeTestHost(cluster_->info_, "tcp://127.0.0.1:81")}; + + expectHealthchecks(false, 2); + + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + respondServiceStatus(1, grpc::health::v1::HealthCheckResponse::SERVING); + EXPECT_TRUE(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->healthy()); + EXPECT_TRUE(cluster_->prioritySet().getMockHostSet(0)->hosts_[1]->healthy()); +} + +// Test host check success with multiple hosts across multiple priorities. +TEST_F(GrpcHealthCheckerImplTest, SuccessWithMultipleHostSets) { + setupHC(); + + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + cluster_->prioritySet().getMockHostSet(1)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:81")}; + + expectHealthchecks(false, 2); + + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + respondServiceStatus(1, grpc::health::v1::HealthCheckResponse::SERVING); + EXPECT_TRUE(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->healthy()); + EXPECT_TRUE(cluster_->prioritySet().getMockHostSet(1)->hosts_[0]->healthy()); +} + +// Test stream-level watermarks does not interfere with health check. +TEST_F(GrpcHealthCheckerImplTest, StreamReachesWatermarkDuringCheck) { + setupHC(); + expectSingleHealthcheck(false); + + test_sessions_[0]->request_encoder_.stream_.runHighWatermarkCallbacks(); + test_sessions_[0]->request_encoder_.stream_.runLowWatermarkCallbacks(); + + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +// Test connection-level watermarks does not interfere with health check. +TEST_F(GrpcHealthCheckerImplTest, ConnectionReachesWatermarkDuringCheck) { + setupHC(); + expectSingleHealthcheck(false); + + test_sessions_[0]->client_connection_->runHighWatermarkCallbacks(); + test_sessions_[0]->client_connection_->runLowWatermarkCallbacks(); + + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +// Test health check on host without traffic sets larger unconfigurable interval for the next check. +TEST_F(GrpcHealthCheckerImplTest, SuccessNoTraffic) { + setupHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + + expectSessionCreate(); + expectHealthcheckStart(0); + health_checker_->start(); + + // Default healthcheck interval for hosts without traffic is 60 seconds. + expectHealthcheckStop(0, 60000); + // Host state should not be changed (remains healty). + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +// Test first successful check immediately makes failed host available (without 2nd probe). +TEST_F(GrpcHealthCheckerImplTest, SuccessStartFailedSuccessFirst) { + setupHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->healthFlagSet( + Host::HealthFlag::FAILED_ACTIVE_HC); + + expectSessionCreate(); + expectHealthcheckStart(0); + health_checker_->start(); + + EXPECT_CALL(runtime_.snapshot_, getInteger("health_check.max_interval", _)).WillOnce(Return(500)); + EXPECT_CALL(runtime_.snapshot_, getInteger("health_check.min_interval", _)); + expectHealthcheckStop(0, 500); + // Fast success immediately moves us to healthy. + EXPECT_CALL(*this, onHostStatus(_, true)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +// Test host recovery after first failed check requires several successul checks. +TEST_F(GrpcHealthCheckerImplTest, SuccessStartFailedFailFirst) { + setupHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->healthFlagSet( + Host::HealthFlag::FAILED_ACTIVE_HC); + + expectSessionCreate(); + expectHealthcheckStart(0); + health_checker_->start(); + + // Failing first disables fast success. + expectHealthcheckStop(0); + // Host was unhealthy from the start, no state change. + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::NOT_SERVING); + expectHostHealthy(false); + + // Next successful healthcheck does not move host int healthy state (because we configured + // healthchecker this way). + expectHealthcheckStart(0); + test_sessions_[0]->interval_timer_->callback_(); + + expectHealthcheckStop(0); + // Host still unhealthy, need yet another healthcheck. + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(false); + + // 2nd successful healthcheck renders host healthy. + expectHealthcheckStart(0); + test_sessions_[0]->interval_timer_->callback_(); + + expectHealthcheckStop(0); + EXPECT_CALL(*this, onHostStatus(_, true)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +// Test host recovery after explicit check failure requires several successul checks. +TEST_F(GrpcHealthCheckerImplTest, GrpcHealthFail) { + setupHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + + expectSessionCreate(); + expectHealthcheckStart(0); + health_checker_->start(); + + // Explicit healthcheck failure immediately renders host unhealthy. + expectHealthcheckStop(0); + EXPECT_CALL(*this, onHostStatus(_, true)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::NOT_SERVING); + expectHostHealthy(false); + + // Next, we need 2 successful checks for host to become available again. + expectHealthcheckStart(0); + test_sessions_[0]->interval_timer_->callback_(); + + expectHealthcheckStop(0); + // Host still considered unhealthy. + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(false); + + expectHealthcheckStart(0); + test_sessions_[0]->interval_timer_->callback_(); + + expectHealthcheckStop(0); + // Host should has become healthy. + EXPECT_CALL(*this, onHostStatus(_, true)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +// Test disconnects produce network-type failures which does not lead to immediate unhealthy state. +TEST_F(GrpcHealthCheckerImplTest, Disconnect) { + setupHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + + expectSessionCreate(); + expectHealthcheckStart(0); + health_checker_->start(); + + expectHealthcheckStop(0); + // Network-type healthcheck failure should make host unhealthy only after 2nd event in a row. + EXPECT_CALL(*this, onHostStatus(_, false)); + test_sessions_[0]->client_connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + expectHostHealthy(true); + + expectClientCreate(0); + expectHealthcheckStart(0); + test_sessions_[0]->interval_timer_->callback_(); + + expectHealthcheckStop(0); + // Now, host should be unhealthy. + EXPECT_CALL(*this, onHostStatus(_, true)); + test_sessions_[0]->client_connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + expectHostHealthy(false); +} + +// Test timeouts produce network-type failures which does not lead to immediate unhealthy state. +TEST_F(GrpcHealthCheckerImplTest, Timeout) { + setupHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + expectSessionCreate(); + + expectHealthcheckStart(0); + health_checker_->start(); + + expectHealthcheckStop(0); + // Timeouts are considered network failures and make host unhealthy also after 2nd event. + EXPECT_CALL(*this, onHostStatus(_, false)); + test_sessions_[0]->timeout_timer_->callback_(); + expectHostHealthy(true); + + expectHealthcheckStart(0); + test_sessions_[0]->interval_timer_->callback_(); + + expectHealthcheckStop(0); + EXPECT_CALL(*this, onHostStatus(_, true)); + // Close connection. Timeouts and connection closes counts together. + test_sessions_[0]->client_connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + expectHostHealthy(false); +} + +// Test adding and removal of hosts starts and closes healthcheck sessions. +TEST_F(GrpcHealthCheckerImplTest, DynamicAddAndRemove) { + setupHC(); + health_checker_->start(); + + expectSessionCreate(); + expectStreamCreate(0); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + EXPECT_CALL(*test_sessions_[0]->timeout_timer_, enableTimer(_)); + cluster_->prioritySet().getMockHostSet(0)->runCallbacks( + {cluster_->prioritySet().getMockHostSet(0)->hosts_.back()}, {}); + + std::vector removed{cluster_->prioritySet().getMockHostSet(0)->hosts_.back()}; + cluster_->prioritySet().getMockHostSet(0)->hosts_.clear(); + EXPECT_CALL(*test_sessions_[0]->client_connection_, close(_)); + cluster_->prioritySet().getMockHostSet(0)->runCallbacks({}, removed); +} + +// Test connection close between checks affects nothing. +TEST_F(GrpcHealthCheckerImplTest, RemoteCloseBetweenChecks) { + setupHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + + expectSessionCreate(); + expectHealthcheckStart(0); + health_checker_->start(); + + expectHealthcheckStop(0); + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); + + // Connection closed between checks - nothing happens, just re-create client. + test_sessions_[0]->client_connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + + expectClientCreate(0); + expectHealthcheckStart(0); + test_sessions_[0]->interval_timer_->callback_(); + + expectHealthcheckStop(0); + // Test host state haven't changed. + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +// Test that we close connections on a healthy check when reuse_connection is false. +TEST_F(GrpcHealthCheckerImplTest, DontReuseConnectionBetweenChecks) { + setupNoReuseConnectionHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + + expectSessionCreate(); + expectHealthcheckStart(0); + health_checker_->start(); + + expectHealthcheckStop(0); + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); + + // A new client is created because we close the connection ourselves. + // See GrpcHealthCheckerImplTest.RemoteCloseBetweenChecks for how this works when the remote end + // closes the connection. + expectClientCreate(0); + expectHealthcheckStart(0); + test_sessions_[0]->interval_timer_->callback_(); + + expectHealthcheckStop(0); + // Test host state haven't changed. + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +// Test UNKNOWN health status is considered unhealthy. +TEST_F(GrpcHealthCheckerImplTest, GrpcFailUnknown) { + setupHC(); + expectSingleHealthcheck(true); + + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::UNKNOWN); + EXPECT_TRUE(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->healthFlagGet( + Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->healthy()); +} + +// Test receiving GOAWAY is interpreted as connection close event. +TEST_F(GrpcHealthCheckerImplTest, GoAwayProbeInProgress) { + // FailureType::Network will be issued, it will render host unhealthy only if unhealthy_threshold + // is reached. + setupHCWithUnhealthyThreshold(1); + expectSingleHealthcheck(true); + + test_sessions_[0]->codec_client_->raiseGoAway(); + + EXPECT_TRUE(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->healthFlagGet( + Host::HealthFlag::FAILED_ACTIVE_HC)); + EXPECT_FALSE(cluster_->prioritySet().getMockHostSet(0)->hosts_[0]->healthy()); +} + +// Test receing GOAWAY between checks affects nothing. +TEST_F(GrpcHealthCheckerImplTest, GoAwayBetweenChecks) { + setupHC(); + cluster_->prioritySet().getMockHostSet(0)->hosts_ = { + makeTestHost(cluster_->info_, "tcp://127.0.0.1:80")}; + + expectSessionCreate(); + expectHealthcheckStart(0); + health_checker_->start(); + + expectHealthcheckStop(0); + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); + + // GOAWAY between checks should go unnoticed. + test_sessions_[0]->codec_client_->raiseGoAway(); + + expectClientCreate(0); + expectHealthcheckStart(0); + test_sessions_[0]->interval_timer_->callback_(); + + expectHealthcheckStop(0); + // Test host state haven't changed. + EXPECT_CALL(*this, onHostStatus(_, false)); + respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::SERVING); + expectHostHealthy(true); +} + +class BadResponseGrpcHealthCheckerImplTest + : public GrpcHealthCheckerImplTestBase, + public testing::TestWithParam {}; + +INSTANTIATE_TEST_CASE_P( + BadResponse, BadResponseGrpcHealthCheckerImplTest, + testing::ValuesIn(std::vector{ + // Non-200 response. + { + {{":status", "500"}}, + {}, + {}, + }, + // Non-200 response with gRPC status. + { + {{":status", "500"}, {"grpc-status", "2"}}, + {}, + {}, + }, + // Missing content-type. + { + {{":status", "200"}}, + {}, + {}, + }, + // End stream on response headers. + { + {{":status", "200"}, {"content-type", "application/grpc"}}, + {}, + {}, + }, + // Non-OK gRPC status in headers. + { + {{":status", "200"}, {"content-type", "application/grpc"}, {"grpc-status", "2"}}, + {}, + {}, + }, + // Non-OK gRPC status + { + {{":status", "200"}, {"content-type", "application/grpc"}}, + {GrpcHealthCheckerImplTest::ResponseSpec::servingResponse()}, + {{"grpc-status", "2"}}, + }, + // Missing body. + { + {{":status", "200"}, {"content-type", "application/grpc"}, {"grpc-status", "0"}}, + {}, + {}, + }, + // Compressed body. + { + {{":status", "200"}, {"content-type", "application/grpc"}}, + {GrpcHealthCheckerImplTest::ResponseSpec::invalidPayload(Grpc::GRPC_FH_COMPRESSED, + true)}, + {}, + }, + // Invalid proto message. + { + {{":status", "200"}, {"content-type", "application/grpc"}}, + {GrpcHealthCheckerImplTest::ResponseSpec::invalidPayload(Grpc::GRPC_FH_DEFAULT, false)}, + {}, + }, + // Duplicate response. + { + {{":status", "200"}, {"content-type", "application/grpc"}}, + {GrpcHealthCheckerImplTest::ResponseSpec::servingResponse(), + GrpcHealthCheckerImplTest::ResponseSpec::servingResponse()}, + {}, + }, + // Invalid response. + { + {{":status", "200"}, {"content-type", "application/grpc"}}, + {GrpcHealthCheckerImplTest::ResponseSpec::invalidChunk()}, + {}, + }, + // No trailers. + { + {{":status", "200"}, {"content-type", "application/grpc"}}, + {GrpcHealthCheckerImplTest::ResponseSpec::servingResponse()}, + {}, + }, + // No gRPC status in trailer. + { + {{":status", "200"}, {"content-type", "application/grpc"}}, + {GrpcHealthCheckerImplTest::ResponseSpec::servingResponse()}, + {{"some-header", "1"}}, + }, + // Invalid gRPC status. + { + {{":status", "200"}, {"content-type", "application/grpc"}}, + {GrpcHealthCheckerImplTest::ResponseSpec::servingResponse()}, + {{"grpc-status", "invalid"}}, + }, + })); + +// Test different cases of invalid gRPC response makes host unhealthy. +TEST_P(BadResponseGrpcHealthCheckerImplTest, GrpcBadResponse) { + setupHC(); + expectSingleHealthcheck(true); + + ResponseSpec spec = GetParam(); + respondResponseSpec(0, std::move(spec)); + expectHostHealthy(false); +} + } // namespace } // namespace Upstream } // namespace Envoy