Skip to content
Merged
3 changes: 3 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Minor Behavior Changes
``use_unsigned_payload`` filter option (default false).
* cluster: added default value of 5 seconds for :ref:`connect_timeout <envoy_v3_api_field_config.cluster.v3.Cluster.connect_timeout>`.
* dns cache: the new :ref:`dns_query_timeout <envoy_v3_api_field_extensions.common.dynamic_forward_proxy.v3.DnsCacheConfig.dns_query_timeout>` option has a default of 5s. See below for more information.
* grpc stream: reduced log level for "Unable to establish new stream" to debug.
Comment thread
tbarrella marked this conversation as resolved.
Outdated
The log level for "gRPC config stream closed" is now reduced to debug when the status is ``Ok``
or has been retriable (``DeadlineExceeded``, ``Internal``, or ``Unavailable``) for less than 30 seconds
* http: disable the integration between :ref:`ExtensionWithMatcher <envoy_v3_api_msg_extensions.common.matching.v3.ExtensionWithMatcher>`
and HTTP filters by default to reflects its experimental status. This feature can be enabled by seting
``envoy.reloadable_features.experimental_matching_api`` to true.
Expand Down
91 changes: 85 additions & 6 deletions source/common/config/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@
namespace Envoy {
namespace Config {

namespace {

constexpr auto CloseLogMessage = "{} gRPC config stream closed: {}, {}";
constexpr auto CloseLogMessageWithSince = "{} gRPC config stream closed since {}ms ago: {}, {}";
constexpr auto CloseLogMessageWithPrevious =
"{} gRPC config stream closed: {}, {} (previously {}, {} since {}ms ago)";

// TODO(htuch): Make this configurable.
constexpr uint32_t RetryInitialDelayMs = 500;
constexpr uint32_t RetryMaxDelayMs = 30000; // Do not cross more than 30s

} // namespace

template <class ResponseProto> using ResponseProtoPtr = std::unique_ptr<ResponseProto>;

// Oversees communication for gRPC xDS implementations (parent to both regular xDS and delta
Expand Down Expand Up @@ -45,9 +58,6 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
});
}

// TODO(htuch): Make this configurable.
static constexpr uint32_t RetryInitialDelayMs = 500;
static constexpr uint32_t RetryMaxDelayMs = 30000; // Do not cross more than 30s
backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
RetryInitialDelayMs, RetryMaxDelayMs, random_);
}
Expand All @@ -60,12 +70,13 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
}
stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
if (stream_ == nullptr) {
ENVOY_LOG(warn, "Unable to establish new stream");
ENVOY_LOG(debug, "Unable to establish new stream");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this one pretty important though if the config server is completely busted? (No healthy hosts, etc.)

IMO we should make this error message easier to understand:

Suggested change
ENVOY_LOG(debug, "Unable to establish new stream");
ENVOY_LOG(debug, "Unable to establish new stream to configuration server");

But also potentially rate limit the output? WDYT?

@tbarrella tbarrella Jul 19, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#14616 (comment) aserted that this message is always accompanied by the remote close message. Tracing code (see e.g. code around here), this does seem to be the case at least for AyncStreamImpl. If you prefer to be conservative, I may just keep this as a warning for now rather than adding more logic for it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are sure it always prints together I think it's fine to downgrade, but I would confirm with manual testing. The issue here is I don't think every case that this would get printed in would result in a remote close, for example no healthy host.

callbacks_->onEstablishmentFailure();
setRetryTimer();
return;
}
control_plane_stats_.connected_state_.set(1);
unsetCloseStatus();
callbacks_->onStreamEstablished();
}

Expand Down Expand Up @@ -97,8 +108,7 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
}

void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status,
message);
logClose(status, message);
stream_ = nullptr;
control_plane_stats_.connected_state_.set(0);
callbacks_->onEstablishmentFailure();
Expand Down Expand Up @@ -131,11 +141,74 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
return false;
}

absl::optional<Grpc::Status::GrpcStatus> getCloseStatus() { return close_status_; }

private:
void setRetryTimer() {
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
}

// https://github.com/envoyproxy/envoy/issues/14591
Comment thread
tbarrella marked this conversation as resolved.
Outdated
// Log level should be reduced when the remote close failure is `Ok` or is retriable and has only
// been occurring for a short amount of time.
void logClose(Grpc::Status::GrpcStatus status, const std::string& message) {
if (Grpc::Status::WellKnownGrpcStatus::Ok == status) {
ENVOY_LOG(debug, CloseLogMessage, service_method_.name(), status, message);
return;
}

if (!onlyWarnOnRepeatedFailure(status)) {
// Failure is considered non-retriable. Warn.
Comment thread
tbarrella marked this conversation as resolved.
Outdated
ENVOY_LOG(warn, CloseLogMessage, service_method_.name(), status, message);
return;
}

if (!isCloseStatusSet()) {
// First failure. Debug. Record occurrence.
Comment thread
tbarrella marked this conversation as resolved.
Outdated
ENVOY_LOG(debug, CloseLogMessage, service_method_.name(), status, message);
setCloseStatus(status, message);
return;
}

uint64_t ms_since_first_close = std::chrono::duration_cast<std::chrono::milliseconds>(
Comment thread
tbarrella marked this conversation as resolved.
Outdated
time_source_.monotonicTime() - close_time_)
.count();
Grpc::Status::GrpcStatus close_status = close_status_.value();

if (status != close_status) {
// This is a different failure. Warn on both statuses and remember the new one.
ENVOY_LOG(warn, CloseLogMessageWithPrevious, service_method_.name(), status, message,
Comment thread
tbarrella marked this conversation as resolved.
Outdated
close_status, close_message_, ms_since_first_close);
setCloseStatus(status, message);
return;
}

if (ms_since_first_close > RetryMaxDelayMs) {
// Warn if we are over the time limit.
ENVOY_LOG(warn, CloseLogMessageWithSince, service_method_.name(), ms_since_first_close,
Comment thread
tbarrella marked this conversation as resolved.
Outdated
close_status, close_message_);
return;
}

// Failure is retriable and new enough to only log at the debug level.
ENVOY_LOG(debug, CloseLogMessage, service_method_.name(), status, message);
Comment thread
tbarrella marked this conversation as resolved.
Outdated
}

bool onlyWarnOnRepeatedFailure(Grpc::Status::GrpcStatus status) {
Comment thread
tbarrella marked this conversation as resolved.
Outdated
return Grpc::Status::WellKnownGrpcStatus::Unavailable == status ||
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded == status ||
Grpc::Status::WellKnownGrpcStatus::Internal == status;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is included due to the example in #14591 (error 13), but gRPC docs classify this as a serious error:

Internal errors. This means that some invariants expected by the underlying system have been broken. This error code is reserved for serious errors.

I'm questioning whether the server should return this in the first place if it doesn't seem to be serious. @howardjohn @kyessenov @mandarjog thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure StreamAggregatedResources gRPC config stream closed: 13, comes from keepalive closing the connection in the bug. I am a bit surprised it shows up as 13, and not Unavailable. It may be worth exploring a bit more.

FWIW if you want to test you can set --keepaliveMaxServerConnectionAge=5s on Istiod to get an XDS server that closes connection this way. If you use out-of-the-box Istio we have an XDS proxy the translates the error to OK anyways though

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. That makes sense to me since it's happening every 30 minutes in that example. It looks like in recent Istio (1.10.2) the recurring error is now 14 (Unavailable)

2021-07-12T23:58:48.196332Z warning envoy config StreamAggregatedResources gRPC config stream closed: 14, transport is closing

After building an Envoy with this change and not including this line/treating Internal as retriable, I no longer got warnings for the above error. So I think we should not special case Internal for now and only DeadlineExceeded/Unavailable

}

void unsetCloseStatus() { close_status_ = absl::nullopt; }
Comment thread
tbarrella marked this conversation as resolved.
Outdated
bool isCloseStatusSet() { return close_status_.has_value(); }

void setCloseStatus(Grpc::Status::GrpcStatus status, const std::string& message) {
close_status_ = status;
close_time_ = time_source_.monotonicTime();
close_message_ = message;
}

GrpcStreamCallbacks<ResponseProto>* const callbacks_;

Grpc::AsyncClient<RequestProto, ResponseProto> async_client_;
Expand All @@ -153,6 +226,12 @@ class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
TokenBucketPtr limit_request_;
const bool rate_limiting_enabled_;
Event::TimerPtr drain_request_timer_;

// Records the initial message and timestamp of the most recent remote closes with the same
Comment thread
tbarrella marked this conversation as resolved.
// status.
absl::optional<Grpc::Status::GrpcStatus> close_status_ = absl::nullopt;
Comment thread
tbarrella marked this conversation as resolved.
Outdated
std::string close_message_;
Comment thread
tbarrella marked this conversation as resolved.
Outdated
MonotonicTime close_time_;
Comment thread
tbarrella marked this conversation as resolved.
Outdated
};

} // namespace Config
Expand Down
2 changes: 2 additions & 0 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ envoy_cc_test(
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/test_common:logging_lib",
"//test/test_common:simulated_time_system_lib",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
Expand Down
90 changes: 90 additions & 0 deletions test/common/config/grpc_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "test/mocks/config/mocks.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/grpc/mocks.h"
#include "test/test_common/logging.h"
#include "test/test_common/simulated_time_system.h"
#include "test/test_common/utility.h"

#include "gmock/gmock.h"
Expand All @@ -30,6 +32,10 @@ class GrpcStreamTest : public testing::Test {
"envoy.api.v2.EndpointDiscoveryService.StreamEndpoints"),
random_, dispatcher_, stats_, rate_limit_settings_) {}

Event::SimulatedTimeSystem& simulatedTimeSystem() {
return dynamic_cast<Event::SimulatedTimeSystem&>(dispatcher_.timeSource());
}

NiceMock<Event::MockDispatcher> dispatcher_;
Grpc::MockAsyncStream async_stream_;
Stats::TestUtil::TestStore stats_;
Expand Down Expand Up @@ -73,6 +79,90 @@ TEST_F(GrpcStreamTest, EstablishStream) {
}
}

// Tests reducing log level depending on remote close status.
TEST_F(GrpcStreamTest, LogClose) {
auto& time_source = simulatedTimeSystem();

// Failures whose statuses are handled simply and not saved.
Comment thread
tbarrella marked this conversation as resolved.
Outdated
{
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());

// Benign status: debug.
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("debug", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Ok, "Ok");
});
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());

// Non-retriable failure: warn.
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("warn", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::NotFound, "Not Found");
});
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());
}
// Repeated failures that warn after enough time.
{
// Retriable failure: debug.
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("debug", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Unavailable, "Unavailable");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::Unavailable);

// Different retriable failure: warn.
time_source.advanceTimeWait(std::chrono::milliseconds(1000));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS(
"warn", "stream closed: 4, Deadline Exceeded (previously 14, Unavailable since 1000ms ago)",
{
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);

// Same retriable failure after a short amount of time: debug.
time_source.advanceTimeWait(std::chrono::milliseconds(1000));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("debug", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);

// Same retriable failure after a long time: warn.
time_source.advanceTimeWait(std::chrono::milliseconds(100000));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("warn", "gRPC config stream closed since 101000ms ago", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);

// Warn again.
time_source.advanceTimeWait(std::chrono::milliseconds(1000));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("warn", "gRPC config stream closed since 102000ms ago", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);
}

// Successful establishment clears close status.
{
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
EXPECT_CALL(callbacks_, onStreamEstablished());
grpc_stream_.establishNewStream();
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());
}
}

// A failure in the underlying gRPC machinery should result in grpcStreamAvailable() false. Calling
// sendMessage would segfault.
TEST_F(GrpcStreamTest, FailToEstablishNewStream) {
Expand Down
1 change: 1 addition & 0 deletions test/mocks/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ envoy_cc_mock(
"//envoy/network:listener_interface",
"//envoy/ssl:context_interface",
"//test/mocks/buffer:buffer_mocks",
"//test/test_common:simulated_time_system_lib",
"//test/test_common:test_time_lib",
],
)
Expand Down
3 changes: 2 additions & 1 deletion test/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "source/common/common/scope_tracker.h"

#include "test/mocks/buffer/mocks.h"
#include "test/test_common/simulated_time_system.h"
#include "test/test_common/test_time.h"

#include "gmock/gmock.h"
Expand Down Expand Up @@ -162,7 +163,7 @@ class MockDispatcher : public Dispatcher {
MOCK_METHOD(void, updateApproximateMonotonicTime, ());
MOCK_METHOD(void, shutdown, ());

GlobalTimeSystem time_system_;
SimulatedTimeSystem time_system_;
std::list<DeferredDeletablePtr> to_delete_;
testing::NiceMock<MockBufferFactory> buffer_factory_;
bool allow_null_callback_{};
Expand Down