Skip to content
Merged
3 changes: 3 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Minor Behavior Changes
----------------------
*Changes that may cause incompatibilities for some users, but should not for most*

* grpc stream: reduced log level for "Unable to establish new stream" to debug. The log level for
Comment thread
tbarrella marked this conversation as resolved.
Outdated
"gRPC config stream closed" is now reduced to debug when the status is ``Ok`` or has been
retriable (``DeadlineExceeded`` or ``Unavailable``) for less than 30 seconds.
* http: set the default :ref:`lazy headermap threshold <arch_overview_http_header_map_settings>` to 3,
which defines the minimal number of headers in a request/response/trailers required for using a
dictionary in addition to the list. Setting the `envoy.http.headermap.lazy_map_min_size` runtime
Expand Down
90 changes: 84 additions & 6 deletions source/common/config/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
namespace Envoy {
namespace Config {

namespace {

// TODO(htuch): Make this configurable.
constexpr uint32_t RetryInitialDelayMs = 500;
constexpr uint32_t RetryMaxDelayMs = 30000; // Do not cross more than 30s

} // namespace

template <class ResponseProto> using ResponseProtoPtr = std::unique_ptr<ResponseProto>;

// Oversees communication for gRPC xDS implementations (parent to both regular xDS and delta
Expand Down Expand Up @@ -45,9 +53,6 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
});
}

// TODO(htuch): Make this configurable.
static constexpr uint32_t RetryInitialDelayMs = 500;
static constexpr uint32_t RetryMaxDelayMs = 30000; // Do not cross more than 30s
backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
RetryInitialDelayMs, RetryMaxDelayMs, random_);
}
Expand All @@ -60,12 +65,13 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
}
stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
if (stream_ == nullptr) {
ENVOY_LOG(warn, "Unable to establish new stream");
ENVOY_LOG(debug, "Unable to establish new stream");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this one pretty important though if the config server is completely busted? (No healthy hosts, etc.)

IMO we should make this error message easier to understand:

Suggested change
ENVOY_LOG(debug, "Unable to establish new stream");
ENVOY_LOG(debug, "Unable to establish new stream to configuration server");

But also potentially rate limit the output? WDYT?

@tbarrella tbarrella Jul 19, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#14616 (comment) aserted that this message is always accompanied by the remote close message. Tracing code (see e.g. code around here), this does seem to be the case at least for AyncStreamImpl. If you prefer to be conservative, I may just keep this as a warning for now rather than adding more logic for it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are sure it always prints together I think it's fine to downgrade, but I would confirm with manual testing. The issue here is I don't think every case that this would get printed in would result in a remote close, for example no healthy host.

callbacks_->onEstablishmentFailure();
setRetryTimer();
return;
}
control_plane_stats_.connected_state_.set(1);
clearCloseStatus();
callbacks_->onStreamEstablished();
}

Expand Down Expand Up @@ -97,8 +103,7 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
}

void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status,
message);
logClose(status, message);
stream_ = nullptr;
control_plane_stats_.connected_state_.set(0);
callbacks_->onEstablishmentFailure();
Expand Down Expand Up @@ -131,11 +136,78 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
return false;
}

absl::optional<Grpc::Status::GrpcStatus> getCloseStatus() { return close_status_; }

private:
void setRetryTimer() {
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
}

// https://github.com/envoyproxy/envoy/issues/14591
Comment thread
tbarrella marked this conversation as resolved.
Outdated
// Log level should be reduced when the remote close failure is `Ok` or is retriable and has only
// been occurring for a short amount of time.
void logClose(Grpc::Status::GrpcStatus status, const std::string& message) {
if (Grpc::Status::WellKnownGrpcStatus::Ok == status) {
ENVOY_LOG(debug, "{} gRPC config stream closed: {}, {}", service_method_.name(), status,
message);
return;
}

if (!onlyWarnOnRepeatedFailure(status)) {
// When the failure is considered non-retriable, warn.
ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status,
message);
return;
}

if (!isCloseStatusSet()) {
// For the first failure, record its occurrence and log at the debug level.
ENVOY_LOG(debug, "{} gRPC config stream closed: {}, {}", service_method_.name(), status,
message);
setCloseStatus(status, message);
return;
}

uint64_t ms_since_first_close = std::chrono::duration_cast<std::chrono::milliseconds>(
Comment thread
tbarrella marked this conversation as resolved.
Outdated
time_source_.monotonicTime() - close_time_)
.count();
Grpc::Status::GrpcStatus close_status = close_status_.value();

if (status != close_status) {
// This is a different failure. Warn on both statuses and remember the new one.
ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {} (previously {}, {} since {}ms ago)",
service_method_.name(), status, message, close_status, close_message_,
ms_since_first_close);
setCloseStatus(status, message);
return;
}

if (ms_since_first_close > RetryMaxDelayMs) {
// Warn if we are over the time limit.
ENVOY_LOG(warn, "{} gRPC config stream closed since {}ms ago: {}, {}", service_method_.name(),
ms_since_first_close, close_status, close_message_);
return;
}

// Failure is retriable and new enough to only log at the debug level.
ENVOY_LOG(debug, "{} gRPC config stream closed: {}, {}", service_method_.name(), status,
message);
}

bool onlyWarnOnRepeatedFailure(Grpc::Status::GrpcStatus status) {
Comment thread
tbarrella marked this conversation as resolved.
Outdated
return Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded == status ||
Grpc::Status::WellKnownGrpcStatus::Unavailable == status;
Comment on lines +198 to +208

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments on how you decided these?

Looking at https://grpc.github.io/grpc/core/md_doc_statuscodes.html I would naively assume RESOURCE_EXHAUSTED should also be included. Maybe others?

@tbarrella tbarrella Jul 19, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add a comment. I could see why RESOURCE_EXHAUSTED would be included, so I'll add that. At first I was thinking the resource is ambiguous and wasn't sure how likely it would be that retrying would help, but either way it seems suppressing the log for the first 30s makes sense. I have a hard time seeing how any of the others would be included though. The only other ones that seemed potentially worth retrying immediately to me were ABORTED and FAILED_PRECONDITION, for which it says

(a) Use UNAVAILABLE if the client can retry just the failing call. (b) Use ABORTED if the client should retry at a higher level (e.g., when a client-specified test-and-set fails, indicating the client should restart a read-modify-write sequence). (c) Use FAILED_PRECONDITION if the client should not retry until the system state has been explicitly fixed.

}

void clearCloseStatus() { close_status_ = absl::nullopt; }
bool isCloseStatusSet() { return close_status_.has_value(); }

void setCloseStatus(Grpc::Status::GrpcStatus status, const std::string& message) {
close_status_ = status;
close_time_ = time_source_.monotonicTime();
close_message_ = message;
}

GrpcStreamCallbacks<ResponseProto>* const callbacks_;

Grpc::AsyncClient<RequestProto, ResponseProto> async_client_;
Expand All @@ -153,6 +225,12 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
TokenBucketPtr limit_request_;
const bool rate_limiting_enabled_;
Event::TimerPtr drain_request_timer_;

// Records the initial message and timestamp of the most recent remote closes with the same
Comment thread
tbarrella marked this conversation as resolved.
// status.
absl::optional<Grpc::Status::GrpcStatus> close_status_ = absl::nullopt;
Comment thread
tbarrella marked this conversation as resolved.
Outdated
std::string close_message_;
Comment thread
tbarrella marked this conversation as resolved.
Outdated
MonotonicTime close_time_;
Comment thread
tbarrella marked this conversation as resolved.
Outdated
};

} // namespace Config
Expand Down
2 changes: 2 additions & 0 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ envoy_cc_test(
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/test_common:logging_lib",
"//test/test_common:simulated_time_system_lib",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
Expand Down
86 changes: 86 additions & 0 deletions test/common/config/grpc_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "test/mocks/config/mocks.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/grpc/mocks.h"
#include "test/test_common/logging.h"
#include "test/test_common/simulated_time_system.h"
#include "test/test_common/utility.h"

#include "gmock/gmock.h"
Expand Down Expand Up @@ -38,6 +40,7 @@ class GrpcStreamTest : public testing::Test {
NiceMock<MockGrpcStreamCallbacks> callbacks_;
std::unique_ptr<Grpc::MockAsyncClient> async_client_owner_;
Grpc::MockAsyncClient* async_client_;
Event::SimulatedTimeSystem time_system_;

GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>
Expand Down Expand Up @@ -73,6 +76,89 @@ TEST_F(GrpcStreamTest, EstablishStream) {
}
}

// Tests reducing log level depending on remote close status.
TEST_F(GrpcStreamTest, LogClose) {
// Failures with statuses that do not need special handling. They are always logged in the same
// way and so never saved.
{
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());

// Benign status: debug.
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("debug", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Ok, "Ok");
});
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());

// Non-retriable failure: warn.
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("warn", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::NotFound, "Not Found");
});
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());
}
// Repeated failures that warn after enough time.
{
// Retriable failure: debug.
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("debug", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Unavailable, "Unavailable");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::Unavailable);

// Different retriable failure: warn.
time_system_.advanceTimeWait(std::chrono::milliseconds(1000));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS(
"warn", "stream closed: 4, Deadline Exceeded (previously 14, Unavailable since 1000ms ago)",
{
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);

// Same retriable failure after a short amount of time: debug.
time_system_.advanceTimeWait(std::chrono::milliseconds(1000));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("debug", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);

// Same retriable failure after a long time: warn.
time_system_.advanceTimeWait(std::chrono::milliseconds(100000));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("warn", "gRPC config stream closed since 101000ms ago", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);

// Warn again.
time_system_.advanceTimeWait(std::chrono::milliseconds(1000));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("warn", "gRPC config stream closed since 102000ms ago", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);
}

// Successful establishment clears close status.
{
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
EXPECT_CALL(callbacks_, onStreamEstablished());
grpc_stream_.establishNewStream();
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());
}
}

// A failure in the underlying gRPC machinery should result in grpcStreamAvailable() false. Calling
// sendMessage would segfault.
TEST_F(GrpcStreamTest, FailToEstablishNewStream) {
Expand Down