Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ class DeltaSubscriptionImpl
}

// Enqueues and attempts to send a discovery request, (un)subscribing to resources missing from /
// added to the passed 'resources' argument, relative to resources_. Updates resources_ to
// 'resources'.
// added to the passed 'resources' argument, relative to resource_versions_.
void buildAndQueueDiscoveryRequest(const std::vector<std::string>& resources) {
ResourceNameDiff diff;
std::set_difference(resources.begin(), resources.end(), resource_names_.begin(),
Expand All @@ -63,12 +62,10 @@ class DeltaSubscriptionImpl
resources.end(), std::inserter(diff.removed_, diff.removed_.begin()));

for (const auto& added : diff.added_) {
resources_[added] = EmptyVersion;
resource_names_.insert(added);
insertOrUpdateResourceVersion(added, EmptyVersion);
}
for (const auto& removed : diff.removed_) {
resources_.erase(removed);
resource_names_.erase(removed);
eraseResourceVersion(removed);
}
queueDiscoveryRequest(diff);
}
Expand Down Expand Up @@ -118,13 +115,22 @@ class DeltaSubscriptionImpl
}
}

envoy::api::v2::DeltaDiscoveryRequest stateOfRequest() const { return request_; }
Comment thread
fredlas marked this conversation as resolved.
Outdated

// Config::SubscriptionCallbacks
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& version_info) {
callbacks_->onConfigUpdate(added_resources, removed_resources, version_info);
for (const auto& resource : added_resources) {
resources_[resource.name()] = resource.version();
insertOrUpdateResourceVersion(resource.name(), resource.version());
}
// If a resource is gone, there is no longer a meaningful version for it that makes sense to
// provide to the server upon stream reconnect: either it will continue to not exist, in which
// case saying nothing is fine, or the server will bring back something new, which we should
// receive regardless (which is the logic that not specifying a version will get you).
for (const auto& resource_name : removed_resources) {
eraseResourceVersion(resource_name);
}
stats_.update_success_.inc();
stats_.update_attempt_.inc();
Expand Down Expand Up @@ -161,7 +167,7 @@ class DeltaSubscriptionImpl
clearRequestQueue();

request_.Clear();
for (auto const& resource : resources_) {
for (auto const& resource : resource_versions_) {
(*request_.mutable_initial_resource_versions())[resource.first] = resource.second;
}
request_.set_type_url(type_url_);
Expand Down Expand Up @@ -210,10 +216,22 @@ class DeltaSubscriptionImpl
init_fetch_timeout_timer_.reset();
}
}

// Use these helpers to avoid forgetting to update both at once.
void insertOrUpdateResourceVersion(const std::string& key, const std::string& val) {
Comment thread
fredlas marked this conversation as resolved.
Outdated
resource_versions_[key] = val;
resource_names_.insert(key);
}

void eraseResourceVersion(const std::string& key) {
Comment thread
fredlas marked this conversation as resolved.
Outdated
resource_versions_.erase(key);
resource_names_.erase(key);
}

// A map from resource name to per-resource version.
std::unordered_map<std::string, std::string> resources_;
// The keys of resources_. Only tracked separately because std::map does not provide an iterator
// into just its keys, e.g. for use in std::set_difference.
std::unordered_map<std::string, std::string> resource_versions_;
// The keys of resource_versions_. Only tracked separately because std::map does not provide an
// iterator into just its keys, e.g. for use in std::set_difference.
std::unordered_set<std::string> resource_names_;
const std::string type_url_;
SubscriptionCallbacks<ResourceType>* callbacks_{};
Expand Down
16 changes: 16 additions & 0 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@ load(

envoy_package()

envoy_cc_test(
name = "delta_subscription_impl_test",
srcs = ["delta_subscription_impl_test.cc"],
deps = [
"//source/common/config:delta_subscription_lib",
"//source/common/stats:isolated_store_lib",
"//test/mocks:common_lib",
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/local_info:local_info_mocks",
"//test/mocks/runtime:runtime_mocks",
"//test/test_common:logging_lib",
],
)

envoy_cc_test(
name = "filesystem_subscription_impl_test",
srcs = ["filesystem_subscription_impl_test.cc"],
Expand Down
81 changes: 81 additions & 0 deletions test/common/config/delta_subscription_impl_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include <memory>

#include "common/config/delta_subscription_impl.h"
#include "common/config/utility.h"
#include "common/stats/isolated_store_impl.h"

#include "test/mocks/common.h"
#include "test/mocks/config/mocks.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/grpc/mocks.h"
#include "test/mocks/local_info/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/test_common/logging.h"
#include "test/test_common/test_time.h"
#include "test/test_common/utility.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::NiceMock;

namespace Envoy {
namespace Config {
namespace {

TEST(DeltaSubscriptionImplTest, ResourceGoneLeadsToBlankInitialVersion) {
Comment thread
fredlas marked this conversation as resolved.
Outdated
NiceMock<Event::MockDispatcher> dispatcher;
NiceMock<Runtime::MockRandomGenerator> random;
Envoy::Config::RateLimitSettings rate_limit_settings;
Stats::IsolatedStoreImpl stats_store;
SubscriptionStats stats = Utility::generateStats(stats_store);
NiceMock<Config::MockSubscriptionCallbacks<envoy::api::v2::ClusterLoadAssignment>> callbacks;
envoy::api::v2::core::Node node;
node.set_id("fo0");
NiceMock<LocalInfo::MockLocalInfo> local_info;
EXPECT_CALL(local_info, node()).WillRepeatedly(testing::ReturnRef(node));

DeltaSubscriptionImpl<envoy::api::v2::ClusterLoadAssignment> subscription(
local_info, std::make_unique<NiceMock<Grpc::MockAsyncClient>>(), dispatcher,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.api.v2.EndpointDiscoveryService.StreamEndpoints"),
random, stats_store, rate_limit_settings, stats, std::chrono::milliseconds(123));

subscription.start({"name1", "name2"}, callbacks);

Protobuf::RepeatedPtrField<envoy::api::v2::Resource> added;
auto* resource = added.Add();
resource->set_name("name1");
resource->set_version("version1A");
resource = added.Add();
resource->set_name("name2");
resource->set_version("version2A");
subscription.onConfigUpdate(added, {}, "debugversion1");
subscription.handleStreamEstablished();
envoy::api::v2::DeltaDiscoveryRequest cur_request = subscription.stateOfRequest();
EXPECT_EQ("version1A", cur_request.initial_resource_versions().at("name1"));
EXPECT_EQ("version2A", cur_request.initial_resource_versions().at("name2"));

Protobuf::RepeatedPtrField<std::string> removed1;
*removed1.Add() = "name1";
subscription.onConfigUpdate({}, removed1, "debugversion2");
subscription.handleStreamEstablished();
cur_request = subscription.stateOfRequest();
EXPECT_EQ(cur_request.initial_resource_versions().end(),
cur_request.initial_resource_versions().find("name1"));
EXPECT_EQ("version2A", cur_request.initial_resource_versions().at("name2"));

Protobuf::RepeatedPtrField<std::string> removed2;
*removed2.Add() = "name2";
subscription.onConfigUpdate({}, removed2, "debugversion3");
subscription.handleStreamEstablished();
cur_request = subscription.stateOfRequest();
EXPECT_EQ(cur_request.initial_resource_versions().end(),
cur_request.initial_resource_versions().find("name1"));
EXPECT_EQ(cur_request.initial_resource_versions().end(),
cur_request.initial_resource_versions().find("name2"));
}

} // namespace
} // namespace Config
} // namespace Envoy