Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RAW_RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ will make it substantially easier for the releaser to "linkify" all of the relea
final version.

## 1.6.0
* Added gRPC healthcheck based on [grpc.health.v1.Health](https://github.com/grpc/grpc/blob/master/src/proto/grpc/health/v1/health.proto) service.
* Added Metrics Service implementation.
* Added gRPC access logging.
* Added DOWNSTREAM_REMOTE_ADDRESS, DOWNSTREAM_REMOTE_ADDRESS_WITHOUT_PORT, and
Expand Down
5 changes: 5 additions & 0 deletions bazel/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@ config_setting(
name = "disable_google_grpc",
values = {"define": "google_grpc=disabled"},
)

cc_proto_library(
name = "grpc_health_proto",
deps = ["@com_github_grpc_grpc//src/proto/grpc/health/v1:_health_proto_only"],
)
5 changes: 5 additions & 0 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,8 @@ def _com_github_grpc_grpc():
name = "grpc",
actual = "@com_github_grpc_grpc//:grpc++"
)

native.bind(
name = "grpc_health_proto",
actual = "//bazel:grpc_health_proto",
)
10 changes: 9 additions & 1 deletion source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
"envoy_proto_library",
)

envoy_package()
Expand Down Expand Up @@ -88,11 +89,15 @@ envoy_cc_library(
name = "health_checker_lib",
srcs = ["health_checker_impl.cc"],
hdrs = ["health_checker_impl.h"],
external_deps = ["envoy_health_check"],
external_deps = [
"envoy_health_check",
"grpc_health_proto",
],
deps = [
":host_utility_lib",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/event:timer_interface",
"//include/envoy/grpc:status",
"//include/envoy/http:codec_interface",
"//include/envoy/http:codes_interface",
"//include/envoy/network:connection_interface",
Expand All @@ -101,11 +106,14 @@ envoy_cc_library(
"//include/envoy/stats:stats_interface",
"//include/envoy/upstream:health_checker_interface",
"//source/common/buffer:buffer_lib",
"//source/common/buffer:zero_copy_input_stream_lib",
"//source/common/common:empty_string",
"//source/common/common:enum_to_int",
"//source/common/common:hex_lib",
"//source/common/common:logger_lib",
"//source/common/common:utility_lib",
"//source/common/grpc:codec_lib",
"//source/common/grpc:common_lib",
"//source/common/http:codec_client_lib",
"//source/common/http:header_map_lib",
"//source/common/http:headers_lib",
Expand Down
275 changes: 273 additions & 2 deletions source/common/upstream/health_checker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
#include "envoy/stats/stats.h"

#include "common/buffer/buffer_impl.h"
#include "common/buffer/zero_copy_input_stream_impl.h"
#include "common/common/empty_string.h"
#include "common/common/enum_to_int.h"
#include "common/common/hex.h"
#include "common/common/utility.h"
#include "common/grpc/common.h"
#include "common/http/codec_client.h"
#include "common/http/header_map_impl.h"
#include "common/http/headers.h"
Expand All @@ -41,6 +43,13 @@ HealthCheckerSharedPtr HealthCheckerFactory::create(const envoy::api::v2::Health
case envoy::api::v2::HealthCheck::HealthCheckerCase::kRedisHealthCheck:
return std::make_shared<RedisHealthCheckerImpl>(cluster, hc_config, dispatcher, runtime, random,
Redis::ConnPool::ClientFactoryImpl::instance_);
case envoy::api::v2::HealthCheck::HealthCheckerCase::kGrpcHealthCheck:
if (!(cluster.info()->features() & Upstream::ClusterInfo::Features::HTTP2)) {
throw EnvoyException(fmt::format("{} cluster must support HTTP/2 for gRPC healthchecking",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have a test that does EXPECT_THROW_WITH_MESSAGE to catch this case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

cluster.info()->name()));
}
return std::make_shared<ProdGrpcHealthCheckerImpl>(cluster, hc_config, dispatcher, runtime,
random);
default:
// TODO(htuch): This should be subsumed eventually by the constraint checking in #1308.
throw EnvoyException("Health checker type not set");
Expand Down Expand Up @@ -281,7 +290,7 @@ HttpHealthCheckerImpl::HttpHealthCheckerImpl(const Cluster& cluster,
}

HttpHealthCheckerImpl::HttpActiveHealthCheckSession::HttpActiveHealthCheckSession(
HttpHealthCheckerImpl& parent, HostSharedPtr host)
HttpHealthCheckerImpl& parent, const HostSharedPtr& host)
: ActiveHealthCheckSession(parent, host), parent_(parent) {}

HttpHealthCheckerImpl::HttpActiveHealthCheckSession::~HttpActiveHealthCheckSession() {
Expand Down Expand Up @@ -520,7 +529,7 @@ RedisHealthCheckerImpl::RedisHealthCheckerImpl(const Cluster& cluster,
client_factory_(client_factory) {}

RedisHealthCheckerImpl::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession(
RedisHealthCheckerImpl& parent, HostSharedPtr host)
RedisHealthCheckerImpl& parent, const HostSharedPtr& host)
: ActiveHealthCheckSession(parent, host), parent_(parent) {}

RedisHealthCheckerImpl::RedisActiveHealthCheckSession::~RedisActiveHealthCheckSession() {
Expand Down Expand Up @@ -586,5 +595,267 @@ RedisHealthCheckerImpl::HealthCheckRequest::HealthCheckRequest() {
request_.asArray().swap(values);
}

GrpcHealthCheckerImpl::GrpcHealthCheckerImpl(const Cluster& cluster,
const envoy::api::v2::HealthCheck& config,
Event::Dispatcher& dispatcher,
Runtime::Loader& runtime,
Runtime::RandomGenerator& random)
: HealthCheckerImplBase(cluster, config, dispatcher, runtime, random),
service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"grpc.health.v1.Health.Check")) {
if (!config.grpc_health_check().service_name().empty()) {
service_name_.value(config.grpc_health_check().service_name());
}
}

GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::GrpcActiveHealthCheckSession(
GrpcHealthCheckerImpl& parent, const HostSharedPtr& host)
: ActiveHealthCheckSession(parent, host), parent_(parent) {}

GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::~GrpcActiveHealthCheckSession() {
if (client_) {
// If there is an active request it will get reset, so make sure we ignore the reset.
expect_reset_ = true;
client_->close();
}
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeHeaders(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at the coverage results yet in CI, but please check and make sure you have coverage on all error branches in the code you added.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked, added more coverage

Http::HeaderMapPtr&& headers, bool end_stream) {
const auto http_response_status = Http::Utility::getResponseStatus(*headers);
if (http_response_status != enumToInt(Http::Code::OK)) {
// https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that
// grpc-status be used if available.
if (end_stream) {
const auto grpc_status = Grpc::Common::getGrpcStatus(*headers);
if (grpc_status.valid()) {
onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true);
return;
}
}
onRpcComplete(Grpc::Common::httpToGrpcStatus(http_response_status), "non-200 HTTP response",
end_stream);
return;
}
if (!Grpc::Common::hasGrpcContentType(*headers)) {
onRpcComplete(Grpc::Status::GrpcStatus::Internal, "invalid gRPC content-type", false);
return;
}
if (end_stream) {
// This is how, for instance, grpc-go signals about missing service - HTTP/2 200 OK with
// 'unimplemented' gRPC status.
const auto grpc_status = Grpc::Common::getGrpcStatus(*headers);
if (grpc_status.valid()) {
onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true);
return;
}
onRpcComplete(Grpc::Status::GrpcStatus::Internal,
"gRPC protocol violation: unexpected stream end", true);
}
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeData(Buffer::Instance& data,
bool end_stream) {
if (end_stream) {
onRpcComplete(Grpc::Status::GrpcStatus::Internal,
"gRPC protocol violation: unexpected stream end", true);
return;
}

// We should end up with only one frame here.
std::vector<Grpc::Frame> decoded_frames;
if (!decoder_.decode(data, decoded_frames)) {
onRpcComplete(Grpc::Status::GrpcStatus::Internal, "gRPC wire protocol decode error", false);
}
for (auto& frame : decoded_frames) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test with the response split across two decodeData calls?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the test

if (frame.length_ > 0) {
if (health_check_response_) {
// grpc.health.v1.Health.Check is unary RPC, so only one message is allowed.
onRpcComplete(Grpc::Status::GrpcStatus::Internal, "unexpected streaming", false);
return;
}
health_check_response_ = std::make_unique<grpc::health::v1::HealthCheckResponse>();
Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_));

if (frame.flags_ != Grpc::GRPC_FH_DEFAULT ||
!health_check_response_->ParseFromZeroCopyStream(&stream)) {
onRpcComplete(Grpc::Status::GrpcStatus::Internal, "invalid grpc.health.v1 RPC payload",
false);
return;
}
}
}
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeTrailers(
Http::HeaderMapPtr&& trailers) {
auto maybe_grpc_status = Grpc::Common::getGrpcStatus(*trailers);
auto grpc_status =
maybe_grpc_status.valid() ? maybe_grpc_status.value() : Grpc::Status::GrpcStatus::Internal;
const std::string grpc_message =
maybe_grpc_status.valid() ? Grpc::Common::getGrpcMessage(*trailers) : "invalid gRPC status";
onRpcComplete(grpc_status, grpc_message, true);
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
// For the raw disconnect event, we are either between intervals in which case we already have
// a timer setup, or we did the close or got a reset, in which case we already setup a new
// timer. There is nothing to do here other than blow away the client.
parent_.dispatcher_.deferredDelete(std::move(client_));
}
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onInterval() {
if (!client_) {
Upstream::Host::CreateConnectionData conn = host_->createConnection(parent_.dispatcher_);
client_ = parent_.createCodecClient(conn);
client_->addConnectionCallbacks(connection_callback_impl_);
client_->setCodecConnectionCallbacks(http_connection_callback_impl_);
}

request_encoder_ = &client_->newStream(*this);
request_encoder_->getStream().addCallbacks(*this);

auto headers_message = Grpc::Common::prepareHeaders(
parent_.cluster_.info()->name(), parent_.service_method_.service()->full_name(),
parent_.service_method_.name());
headers_message->headers().insertUserAgent().value().setReference(
Http::Headers::get().UserAgentValues.EnvoyHealthChecker);

request_encoder_->encodeHeaders(headers_message->headers(), false);

grpc::health::v1::HealthCheckRequest request;
if (parent_.service_name_.valid()) {
request.set_service(parent_.service_name_.value());
}

request_encoder_->encodeData(*Grpc::Common::serializeBody(request), true);
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onResetStream(Http::StreamResetReason) {
const bool expected_reset = expect_reset_;
resetState();

if (expected_reset) {
// Stream reset was initiated by us (bogus gRPC response, timeout or cluster host is going
// away). In these cases health check failure has already been reported, so just return.
return;
}

ENVOY_CONN_LOG(debug, "connection/stream error health_flags={}", *client_,
HostUtility::healthFlagsToString(*host_));

// TODO(baranov1ch): according to all HTTP standards, we should check if reason is one of
// Http::StreamResetReason::RemoteRefusedStreamReset (which may mean GOAWAY),
// Http::StreamResetReason::RemoteReset or Http::StreamResetReason::ConnectionTermination (both
// mean connection close), check if connection is not fresh (was used for at least 1 request)
// and silently retry request on the fresh connection. This is also true for HTTP/1.1 healthcheck.
handleFailure(FailureType::Network);
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onGoAway() {
ENVOY_CONN_LOG(debug, "connection going away health_flags={}", *client_,
HostUtility::healthFlagsToString(*host_));
// Even if we have active health check probe, fail it on GOAWAY and schedule new one.
if (request_encoder_) {
handleFailure(FailureType::Network);
expect_reset_ = true;
request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
}
client_->close();
}

bool GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::isHealthCheckSucceeded(
Grpc::Status::GrpcStatus grpc_status) const {
if (grpc_status != Grpc::Status::GrpcStatus::Ok) {
return false;
}

if (!health_check_response_ ||
health_check_response_->status() != grpc::health::v1::HealthCheckResponse::SERVING) {
return false;
}

return true;
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onRpcComplete(
Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message, bool end_stream) {
logHealthCheckStatus(grpc_status, grpc_message);
if (isHealthCheckSucceeded(grpc_status)) {
handleSuccess();
} else {
handleFailure(FailureType::Active);
}

// |end_stream| will be false if we decided to stop healthcheck before HTTP stream has ended -
// invalid gRPC payload, unexpected message stream or wrong content-type.
if (end_stream) {
resetState();
} else {
// resetState() will be called by onResetStream().
expect_reset_ = true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is a bit confusing to me when comparing with Grpc::AsyncClient. Can we use the same close local/remote as done there?

Copy link
Copy Markdown
Contributor Author

@baranov1ch baranov1ch Jan 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I decided not to use local/remote close, logic is simpler:

  • 'local' is always closed, RPC is unary, everything is sent by onInterval()
  • end_stream argument is passed to onRpcComplete instead of using a field indicating 'remote close'.
    Here local/remote close logic is not necessary, since there are no clients trying to send/receive streams of grpc messages.
  • besides local/remote close, there is http_reset_ in Grpc::AsyncClient which is used to determine if Http::AsyncClient::Stream::reset() should be called to recycle http stream, or stream was already reset by http stack. expect_reset_ has nearly the same semantics as http_reset_. But unlike Grpc::AsyncClient, which maps stream reset just to internal error, health checker distinguishes between active healthcheck errors (when service is in really bad mood) and network-ish healthcheck errors produced by stream reset.

I'm not sure if introducing all Grpc::AsyncClient state would make code more readable, but I will gladly add more comments about this flag:)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I think it would be helpful to comment here on why we might have seen an RPC completion, without hitting the end-of-stream, and why this might lead to a following onResetStream() callback. Thanks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments. PTAL if they are good enough

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, these are good, thanks. Maybe also explain how we ever get into the situation where we are in this branch, i.e. who called onRpcComplete with !end_stream as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
}

if (!parent_.reuse_connection_) {
client_->close();
}
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::resetState() {
expect_reset_ = false;
request_encoder_ = nullptr;
decoder_ = Grpc::Decoder();
health_check_response_.reset();
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onTimeout() {
ENVOY_CONN_LOG(debug, "connection/stream timeout health_flags={}", *client_,
HostUtility::healthFlagsToString(*host_));
expect_reset_ = true;
request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::logHealthCheckStatus(
Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message) {
const char* service_status;
if (!health_check_response_) {
service_status = "rpc_error";
} else {
switch (health_check_response_->status()) {
case grpc::health::v1::HealthCheckResponse::SERVING:
service_status = "serving";
break;
case grpc::health::v1::HealthCheckResponse::NOT_SERVING:
service_status = "not_serving";
break;
case grpc::health::v1::HealthCheckResponse::UNKNOWN:
service_status = "unknown";
break;
default:
// Should not happen really, Protobuf should not parse undefined enums values.
NOT_REACHED;
break;
}
}
std::string grpc_status_message;
if (grpc_status != Grpc::Status::GrpcStatus::Ok && !grpc_message.empty()) {
grpc_status_message = fmt::format("{} ({})", grpc_status, grpc_message);
} else {
grpc_status_message = fmt::format("{}", grpc_status);
}
ENVOY_CONN_LOG(debug, "hc grpc_status={} service_status={} health_flags={}", *client_,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're doing a bunch of work above that isn't strictly needed when log level is above debug. Is it possible to condition on log level?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found how to do that, but I'm quite new to the codebase. Would appreciate any help :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checked with @mrice32, this doesn't exist today unfortunately. I've opened #2330 to track.

grpc_status_message, service_status, HostUtility::healthFlagsToString(*host_));
}

Http::CodecClientPtr
ProdGrpcHealthCheckerImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) {
return std::make_unique<Http::CodecClientProd>(
Http::CodecClient::Type::HTTP2, std::move(data.connection_), data.host_description_);
}

} // namespace Upstream
} // namespace Envoy
Loading