Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/envoy/api/v2/core/config_source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ message ApiConfigSource {
// For GRPC APIs, the rate limit settings. If present, discovery requests made by Envoy will be
// rate limited.
RateLimitSettings rate_limit_settings = 6;

// Skip the node identifier in subsequent discovery requests for streaming gRPC config types.
// This will be enabled by default in the future.
Comment thread
kyessenov marked this conversation as resolved.
Outdated
bool skip_subsequent_node = 7;
Comment thread
kyessenov marked this conversation as resolved.
Outdated
}

// Aggregated Discovery Service (ADS) options. This is currently empty, but when
Expand Down
5 changes: 5 additions & 0 deletions api/xds_protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ versioning across resource types. When ADS is not used, even each
resource of a given resource type may have a distinct version, since the
Envoy API allows distinct EDS/RDS resources to point at different :ref:`ConfigSources <envoy_api_msg_core.ConfigSource>`.

Only the first request on a stream is guaranteed to carry the node identifier.
The subsequent discovery requests on the same stream carry an empty node
Comment thread
kyessenov marked this conversation as resolved.
Outdated
identifier. This holds true regardless of the acceptance of the discovery
responses on the same stream.
Comment thread
kyessenov marked this conversation as resolved.
Outdated

.. _xds_protocol_resource_update:

Resource Update
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Version history
* tls: added verification of IP address SAN fields in certificates against configured SANs in the
certificate validation context.
* upstream: added network filter chains to upstream connections, see :ref:`filters<envoy_api_field_Cluster.filters>`.
* api: added ::ref:`skip_subsequent_node <envoy_api_field_core.ApiConfigSource.skip_subsequent_node>` option to omit the node identifier from the subsequent discovery requests on the same stream.

1.11.0 (July 11, 2019)
======================
Expand Down
1 change: 1 addition & 0 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace Config {
* Manages the logic of a (non-aggregated) delta xDS subscription.
* TODO(fredlas) add aggregation support. The plan is for that to happen in XdsGrpcContext,
* which this class will then "have a" rather than "be a".
* TODO(kyessenov) implement skip_subsequent_node for delta xDS subscription.
*/
class DeltaSubscriptionImpl : public Subscription,
public GrpcStreamCallbacks<envoy::api::v2::DeltaDiscoveryResponse>,
Expand Down
12 changes: 9 additions & 3 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info,
Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings)
const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node)
: grpc_stream_(this, std::move(async_client), service_method, random, dispatcher, scope,
rate_limit_settings),

local_info_(local_info) {
local_info_(local_info), skip_subsequent_node_(skip_subsequent_node),
first_stream_request_(true) {
Config::Utility::checkLocalInfo("ads", local_info);
}

Expand Down Expand Up @@ -57,8 +57,12 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
}
}

if (skip_subsequent_node_ && !first_stream_request_) {
request.clear_node();
}
ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url, request.DebugString());
grpc_stream_.sendMessage(request);
first_stream_request_ = false;

// clear error_detail after the request is sent if it exists.
if (api_state_[type_url].request_.has_error_detail()) {
Expand Down Expand Up @@ -206,12 +210,14 @@ void GrpcMuxImpl::onDiscoveryResponse(
void GrpcMuxImpl::onWriteable() { drainRequests(); }

void GrpcMuxImpl::onStreamEstablished() {
first_stream_request_ = true;
for (const auto& type_url : subscriptions_) {
queueDiscoveryRequest(type_url);
}
}

void GrpcMuxImpl::onEstablishmentFailure() {
first_stream_request_ = true;
Comment thread
kyessenov marked this conversation as resolved.
Outdated
for (const auto& api_state : api_state_) {
for (auto watch : api_state.second.watches_) {
watch->callbacks_.onConfigUpdateFailed(
Expand Down
4 changes: 3 additions & 1 deletion source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class GrpcMuxImpl : public GrpcMux,
GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings);
const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node);
~GrpcMuxImpl() override;

void start() override;
Expand Down Expand Up @@ -104,6 +104,8 @@ class GrpcMuxImpl : public GrpcMux,

GrpcStream<envoy::api::v2::DiscoveryRequest, envoy::api::v2::DiscoveryResponse> grpc_stream_;
const LocalInfo::LocalInfo& local_info_;
bool skip_subsequent_node_;
Comment thread
kyessenov marked this conversation as resolved.
Outdated
bool first_stream_request_;
std::unordered_map<std::string, ApiState> api_state_;
// Envoy's dependency ordering.
std::list<std::string> subscriptions_;
Expand Down
7 changes: 4 additions & 3 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ class GrpcSubscriptionImpl : public Config::Subscription {
const Protobuf::MethodDescriptor& service_method, absl::string_view type_url,
SubscriptionCallbacks& callbacks, SubscriptionStats stats,
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings,
std::chrono::milliseconds init_fetch_timeout)
: callbacks_(callbacks), grpc_mux_(local_info, std::move(async_client), dispatcher,
service_method, random, scope, rate_limit_settings),
std::chrono::milliseconds init_fetch_timeout, bool skip_subsequent_node)
: callbacks_(callbacks),
grpc_mux_(local_info, std::move(async_client), dispatcher, service_method, random, scope,
rate_limit_settings, skip_subsequent_node),
grpc_mux_subscription_(grpc_mux_, callbacks_, stats, type_url, dispatcher,
init_fetch_timeout) {}

Expand Down
3 changes: 2 additions & 1 deletion source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
->create(),
dispatcher_, random_, sotwGrpcMethod(type_url), type_url, callbacks, stats, scope,
Utility::parseRateLimitSettings(api_config_source),
Utility::configSourceInitialFetchTimeout(config));
Utility::configSourceInitialFetchTimeout(config),
api_config_source.skip_subsequent_node());
break;
case envoy::api::v2::core::ApiConfigSource::DELTA_GRPC: {
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.clusters(), api_config_source);
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ ClusterManagerImpl::ClusterManagerImpl(
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_,
Envoy::Config::Utility::parseRateLimitSettings(bootstrap.dynamic_resources().ads_config()));
Envoy::Config::Utility::parseRateLimitSettings(bootstrap.dynamic_resources().ads_config()),
bootstrap.dynamic_resources().ads_config().skip_subsequent_node());
} else {
ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>();
}
Expand Down
5 changes: 3 additions & 2 deletions test/common/config/delta_subscription_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness {
subscription_->start(cluster_names);
}

void expectSendMessage(const std::set<std::string>& cluster_names,
const std::string& version) override {
void expectSendMessage(const std::set<std::string>& cluster_names, const std::string& version,
bool first_on_stream = false) override {
UNREFERENCED_PARAMETER(version);
UNREFERENCED_PARAMETER(first_on_stream);
expectSendMessage(cluster_names, {}, Grpc::Status::GrpcStatus::Ok, "", {});
}

Expand Down
5 changes: 3 additions & 2 deletions test/common/config/filesystem_subscription_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ class FilesystemSubscriptionTestHarness : public SubscriptionTestHarness {
}
}

void expectSendMessage(const std::set<std::string>& cluster_names,
const std::string& version) override {
void expectSendMessage(const std::set<std::string>& cluster_names, const std::string& version,
bool first_on_stream) override {
UNREFERENCED_PARAMETER(cluster_names);
UNREFERENCED_PARAMETER(version);
UNREFERENCED_PARAMETER(first_on_stream);
}

void deliverConfigUpdate(const std::vector<std::string>& cluster_names,
Expand Down
51 changes: 26 additions & 25 deletions test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,26 @@ class GrpcMuxImplTestBase : public testing::Test {
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_, rate_limit_settings_);
random_, stats_, rate_limit_settings_, true);
}

void setup(const RateLimitSettings& custom_rate_limit_settings) {
grpc_mux_ = std::make_unique<GrpcMuxImpl>(
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_, custom_rate_limit_settings);
random_, stats_, custom_rate_limit_settings, true);
}

void expectSendMessage(const std::string& type_url,
const std::vector<std::string>& resource_names, const std::string& version,
const std::string& nonce = "",
bool first = false, const std::string& nonce = "",
const Protobuf::int32 error_code = Grpc::Status::GrpcStatus::Ok,
const std::string& error_message = "") {
envoy::api::v2::DiscoveryRequest expected_request;
expected_request.mutable_node()->CopyFrom(local_info_.node());
if (first) {
expected_request.mutable_node()->CopyFrom(local_info_.node());
}
for (const auto& resource : resource_names) {
expected_request.add_resource_names(resource);
}
Expand Down Expand Up @@ -111,7 +113,7 @@ TEST_F(GrpcMuxImplTest, MultipleTypeUrlStreams) {
auto foo_sub = grpc_mux_->subscribe("foo", {"x", "y"}, callbacks_);
auto bar_sub = grpc_mux_->subscribe("bar", {}, callbacks_);
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
expectSendMessage("bar", {}, "");
grpc_mux_->start();
EXPECT_EQ(1, control_plane_connected_state_.value());
Expand Down Expand Up @@ -142,7 +144,7 @@ TEST_F(GrpcMuxImplTest, ResetStream) {
auto bar_sub = grpc_mux_->subscribe("bar", {}, callbacks_);
auto baz_sub = grpc_mux_->subscribe("baz", {"z"}, callbacks_);
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
expectSendMessage("bar", {}, "");
expectSendMessage("baz", {"z"}, "");
grpc_mux_->start();
Expand All @@ -156,7 +158,7 @@ TEST_F(GrpcMuxImplTest, ResetStream) {
grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::GrpcStatus::Canceled, "");
EXPECT_EQ(0, control_plane_connected_state_.value());
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
expectSendMessage("bar", {}, "");
expectSendMessage("baz", {"z"}, "");
timer_cb();
Expand All @@ -173,7 +175,7 @@ TEST_F(GrpcMuxImplTest, PauseResume) {
grpc_mux_->pause("foo");
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
grpc_mux_->start();
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
grpc_mux_->resume("foo");
grpc_mux_->pause("bar");
expectSendMessage("foo", {"z", "x", "y"}, "");
Expand All @@ -196,7 +198,7 @@ TEST_F(GrpcMuxImplTest, TypeUrlMismatch) {
auto foo_sub = grpc_mux_->subscribe("foo", {"x", "y"}, callbacks_);

EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
grpc_mux_->start();

{
Expand All @@ -215,7 +217,7 @@ TEST_F(GrpcMuxImplTest, TypeUrlMismatch) {
e->what()));
}));

expectSendMessage("foo", {"x", "y"}, "", "", Grpc::Status::GrpcStatus::Internal,
expectSendMessage("foo", {"x", "y"}, "", false, "", Grpc::Status::GrpcStatus::Internal,
fmt::format("bar does not match foo type URL in DiscoveryResponse {}",
invalid_response->DebugString()));
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(invalid_response));
Expand All @@ -231,7 +233,7 @@ TEST_F(GrpcMuxImplTest, WildcardWatch) {
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
auto foo_sub = grpc_mux_->subscribe(type_url, {}, callbacks_);
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {}, "");
expectSendMessage(type_url, {}, "", true);
grpc_mux_->start();

{
Expand Down Expand Up @@ -267,7 +269,7 @@ TEST_F(GrpcMuxImplTest, WatchDemux) {
auto bar_sub = grpc_mux_->subscribe(type_url, {"y", "z"}, bar_callbacks);
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
// Should dedupe the "x" resource.
expectSendMessage(type_url, {"y", "z", "x"}, "");
expectSendMessage(type_url, {"y", "z", "x"}, "", true);
grpc_mux_->start();

{
Expand Down Expand Up @@ -345,7 +347,7 @@ TEST_F(GrpcMuxImplTest, MultipleWatcherWithEmptyUpdates) {
auto foo_sub = grpc_mux_->subscribe(type_url, {"x", "y"}, foo_callbacks);

EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {"x", "y"}, "");
expectSendMessage(type_url, {"x", "y"}, "", true);
grpc_mux_->start();

std::unique_ptr<envoy::api::v2::DiscoveryResponse> response(
Expand All @@ -368,7 +370,7 @@ TEST_F(GrpcMuxImplTest, SingleWatcherWithEmptyUpdates) {
auto foo_sub = grpc_mux_->subscribe(type_url, {}, foo_callbacks);

EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {}, "");
expectSendMessage(type_url, {}, "", true);
grpc_mux_->start();

std::unique_ptr<envoy::api::v2::DiscoveryResponse> response(
Expand Down Expand Up @@ -422,7 +424,7 @@ TEST_F(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithDefaultSettings) {
};

auto foo_sub = grpc_mux_->subscribe("foo", {"x"}, callbacks_);
expectSendMessage("foo", {"x"}, "");
expectSendMessage("foo", {"x"}, "", true);
grpc_mux_->start();

// Exhausts the limit.
Expand Down Expand Up @@ -475,7 +477,7 @@ TEST_F(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithEmptyRateLimitSetti
};

auto foo_sub = grpc_mux_->subscribe("foo", {"x"}, callbacks_);
expectSendMessage("foo", {"x"}, "");
expectSendMessage("foo", {"x"}, "", true);
grpc_mux_->start();

// Validate that drain_request_timer is enabled when there are no tokens.
Expand Down Expand Up @@ -531,7 +533,7 @@ TEST_F(GrpcMuxImplTest, TooManyRequestsWithCustomRateLimitSettings) {
};

auto foo_sub = grpc_mux_->subscribe("foo", {"x"}, callbacks_);
expectSendMessage("foo", {"x"}, "");
expectSendMessage("foo", {"x"}, "", true);
grpc_mux_->start();

// Validate that rate limit is not enforced for 100 requests.
Expand Down Expand Up @@ -565,7 +567,7 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsEmptyResources) {
grpc_mux_->start();
{
// subscribe and unsubscribe to simulate a cluster added and removed
expectSendMessage(type_url, {"y"}, "");
expectSendMessage(type_url, {"y"}, "", true);
auto temp_sub = grpc_mux_->subscribe(type_url, {"y"}, callbacks_);
expectSendMessage(type_url, {}, "");
}
Expand All @@ -581,11 +583,11 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsEmptyResources) {
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response));

// when we add the new subscription version should be 1 and nonce should be bar
expectSendMessage(type_url, {"x"}, "1", "bar");
expectSendMessage(type_url, {"x"}, "1", false, "bar");

// simulate a new cluster x is added. add CLA subscription for it.
auto sub = grpc_mux_->subscribe(type_url, {"x"}, callbacks_);
expectSendMessage(type_url, {}, "1", "bar");
expectSendMessage(type_url, {}, "1", false, "bar");
}

// Verifies that a messsage with some resources is rejected when there are no watches.
Expand All @@ -598,7 +600,7 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeRejectsResources) {

grpc_mux_->start();
// subscribe and unsubscribe (by not keeping the return watch) so that the type is known to envoy
expectSendMessage(type_url, {"y"}, "");
expectSendMessage(type_url, {"y"}, "", true);
expectSendMessage(type_url, {}, "");
grpc_mux_->subscribe(type_url, {"y"}, callbacks_);

Expand All @@ -615,7 +617,7 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeRejectsResources) {
response->add_resources()->PackFrom(load_assignment);

// The message should be rejected.
expectSendMessage(type_url, {}, "", "bar");
expectSendMessage(type_url, {}, "", false, "bar");
EXPECT_LOG_CONTAINS("warning", "Ignoring unwatched type URL " + type_url,
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)));
}
Expand All @@ -627,7 +629,7 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) {
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_, rate_limit_settings_),
random_, stats_, rate_limit_settings_, true),
EnvoyException,
"ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via "
"--service-node and --service-cluster options.");
Expand All @@ -640,12 +642,11 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) {
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_, rate_limit_settings_),
random_, stats_, rate_limit_settings_, true),
EnvoyException,
"ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via "
"--service-node and --service-cluster options.");
}

} // namespace
} // namespace Config
} // namespace Envoy
4 changes: 2 additions & 2 deletions test/common/config/grpc_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) {
// Retry and succeed.
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));

expectSendMessage({"cluster2"}, "");
expectSendMessage({"cluster2"}, "", true);
timer_cb_();
EXPECT_TRUE(statsAre(3, 0, 0, 1, 0));
verifyControlPlaneStats(1);
Expand All @@ -49,7 +49,7 @@ TEST_F(GrpcSubscriptionImplTest, RemoteStreamClose) {

// Retry and succeed.
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage({"cluster0", "cluster1"}, "");
expectSendMessage({"cluster0", "cluster1"}, "", true);
timer_cb_();
EXPECT_TRUE(statsAre(2, 0, 0, 1, 0));
}
Expand Down
Loading