Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 75 additions & 6 deletions source/common/config/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
namespace Envoy {
namespace Config {

namespace {

constexpr auto CloseLogMessage = "{} gRPC config stream closed: {}, {}";
constexpr auto CloseLogMessageWithMs = "{} gRPC config stream closed {}ms ago: {}, {}";

// TODO(htuch): Make this configurable.
constexpr uint32_t RetryInitialDelayMs = 500;
constexpr uint32_t RetryMaxDelayMs = 30000; // Do not cross more than 30s

constexpr int64_t UnsetStatus = -9999;
Comment thread
htuch marked this conversation as resolved.
Outdated
} // namespace

template <class ResponseProto> using ResponseProtoPtr = std::unique_ptr<ResponseProto>;

// Oversees communication for gRPC xDS implementations (parent to both regular xDS and delta
Expand Down Expand Up @@ -45,9 +57,6 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
});
}

// TODO(htuch): Make this configurable.
static constexpr uint32_t RetryInitialDelayMs = 500;
static constexpr uint32_t RetryMaxDelayMs = 30000; // Do not cross more than 30s
backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
RetryInitialDelayMs, RetryMaxDelayMs, random_);
}
Expand All @@ -60,12 +69,13 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
}
stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
if (stream_ == nullptr) {
ENVOY_LOG(warn, "Unable to establish new stream");
ENVOY_LOG(debug, "Unable to establish new grpc config stream");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a warning? There's a real failure ocurring.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The real failure with error message is recorded in the call back. I don;t think there is a case where something is not recorded in the callback, but is recorded here. This is also the source of noise since it does not tell the reason.

callbacks_->onEstablishmentFailure();
setRetryTimer();
return;
}
control_plane_stats_.connected_state_.set(1);
unsetFailure();
callbacks_->onStreamEstablished();
}

Expand All @@ -85,6 +95,7 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
void onReceiveMessage(ResponseProtoPtr<ResponseProto>&& message) override {
// Reset here so that it starts with fresh backoff interval on next disconnect.
backoff_strategy_->reset();
unsetFailure();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Isn't it sufficient to unset on successful connect?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the comment on the next line that lead me to believe I need a reset here, to.
But I think it makes sense to remove it. ack.

// Sometimes during hot restarts this stat's value becomes inconsistent and will continue to
// have 0 until it is reconnected. Setting here ensures that it is consistent with the state of
// management server connection.
Expand All @@ -97,8 +108,7 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
}

void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status,
message);
maybeLogClose(status, message);
stream_ = nullptr;
control_plane_stats_.connected_state_.set(0);
callbacks_->onEstablishmentFailure();
Expand Down Expand Up @@ -136,6 +146,60 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
}

void maybeLogClose(Grpc::Status::GrpcStatus status, const std::string& message) {
if (Grpc::Status::WellKnownGrpcStatus::Ok == status) {
ENVOY_LOG(debug, CloseLogMessage, service_method_.name(), status, message);
return;
}

if (!onlyWarnOnRepeatedFailures(status)) {
ENVOY_LOG(warn, CloseLogMessage, service_method_.name(), status, message);
return;
}

ENVOY_LOG(debug, CloseLogMessage, service_method_.name(), status, message);

if (!isFailureSet()) {
// first failure: record occurrence, do not log.
setFailure(status, message);
return;
}

uint64_t ms_since_first_close = std::chrono::duration_cast<std::chrono::milliseconds>(
time_source_.monotonicTime() - close_time_)
.count();

// This is a different error. Log the old error and remember the new error.
if (status != close_status_) {
ENVOY_LOG(warn, CloseLogMessageWithMs, service_method_.name(), ms_since_first_close,
close_status_, close_message_);
setFailure(status, message);
return;
}

// Log event and reset if we are over the time limit.
if (ms_since_first_close > RetryMaxDelayMs) {
ENVOY_LOG(warn, CloseLogMessageWithMs, service_method_.name(), ms_since_first_close,
close_status_, close_message_);
unsetFailure();
}
}

bool onlyWarnOnRepeatedFailures(Grpc::Status::GrpcStatus status) {
return Grpc::Status::WellKnownGrpcStatus::Unavailable == status ||
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded == status ||
Grpc::Status::WellKnownGrpcStatus::Internal == status;
}

void unsetFailure() { close_status_ = UnsetStatus; }
bool isFailureSet() { return close_status_ != UnsetStatus; }

void setFailure(Grpc::Status::GrpcStatus status, const std::string& message) {
close_status_ = status;
close_time_ = time_source_.monotonicTime();
close_message_ = message;
}

GrpcStreamCallbacks<ResponseProto>* const callbacks_;

Grpc::AsyncClient<RequestProto, ResponseProto> async_client_;
Expand All @@ -153,6 +217,11 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
TokenBucketPtr limit_request_;
const bool rate_limiting_enabled_;
Event::TimerPtr drain_request_timer_;

// Record close status and message of the first failure.
Grpc::Status::GrpcStatus close_status_ = UnsetStatus;
std::string close_message_;
MonotonicTime close_time_;
};

} // namespace Config
Expand Down