Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions source/common/config/xds_mux/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_library(
name = "delta_subscription_state_lib",
srcs = ["delta_subscription_state.cc"],
hdrs = ["delta_subscription_state.h"],
deps = [
":subscription_state_lib",
"//source/common/config:api_version_lib",
"//source/common/config:utility_lib",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "sotw_subscription_state_lib",
srcs = ["sotw_subscription_state.cc"],
hdrs = ["sotw_subscription_state.h"],
deps = [
":subscription_state_lib",
"//source/common/config:api_version_lib",
"//source/common/config:decoded_resource_lib",
"//source/common/config:utility_lib",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "subscription_state_lib",
hdrs = ["subscription_state.h"],
deps = [
"//envoy/config:subscription_interface",
"//envoy/event:dispatcher_interface",
"//envoy/local_info:local_info_interface",
"//source/common/common:minimal_logger_lib",
"//source/common/config:ttl_lib",
"//source/common/config:update_ack_lib",
"//source/common/config:utility_lib",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
196 changes: 196 additions & 0 deletions source/common/config/xds_mux/delta_subscription_state.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#include "source/common/config/xds_mux/delta_subscription_state.h"

#include "envoy/event/dispatcher.h"
#include "envoy/service/discovery/v3/discovery.pb.h"

#include "source/common/common/hash.h"
#include "source/common/config/utility.h"
#include "source/common/runtime/runtime_features.h"

namespace Envoy {
namespace Config {
namespace XdsMux {

DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
UntypedConfigUpdateCallbacks& watch_map,
Event::Dispatcher& dispatcher, const bool wildcard)
: BaseSubscriptionState(std::move(type_url), watch_map, dispatcher),
// TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
// empty resources as updates.
supports_heartbeats_(type_url_ != "envoy.config.route.v3.VirtualHost"), wildcard_(wildcard) {}

DeltaSubscriptionState::~DeltaSubscriptionState() = default;

void DeltaSubscriptionState::updateSubscriptionInterest(
const absl::flat_hash_set<std::string>& cur_added,
const absl::flat_hash_set<std::string>& cur_removed) {
for (const auto& a : cur_added) {
resource_state_[a] = ResourceState::waitingForServer();
// If interest in a resource is removed-then-added (all before a discovery request
// can be sent), we must treat it as a "new" addition: our user may have forgotten its
// copy of the resource after instructing us to remove it, and need to be reminded of it.
names_removed_.erase(a);
names_added_.insert(a);
}
for (const auto& r : cur_removed) {
resource_state_.erase(r);
// Ideally, when interest in a resource is added-then-removed in between requests,
// we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
// in the request. However, the removed-then-added case *does* need to go in the request,
// and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
// add-remove (because "remove-add" has to be treated as equivalent to just "add").
names_added_.erase(r);
names_removed_.insert(r);
}
}

// Not having sent any requests yet counts as an "update pending" since you're supposed to resend
// the entirety of your interest at the start of a stream, even if nothing has changed.
bool DeltaSubscriptionState::subscriptionUpdatePending() const {
return !names_added_.empty() || !names_removed_.empty() ||
!any_request_sent_yet_in_current_stream_ || dynamicContextChanged();
}

bool DeltaSubscriptionState::isHeartbeatResource(
const envoy::service::discovery::v3::Resource& resource) const {
if (!supports_heartbeats_ &&
!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.vhds_heartbeats")) {
return false;
}
const auto itr = resource_state_.find(resource.name());
if (itr == resource_state_.end()) {
return false;
}

return !resource.has_resource() && !itr->second.isWaitingForServer() &&
resource.version() == itr->second.version();
}

void DeltaSubscriptionState::handleGoodResponse(
const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
absl::flat_hash_set<std::string> names_added_removed;
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> non_heartbeat_resources;
for (const auto& resource : message.resources()) {
if (!names_added_removed.insert(resource.name()).second) {
throw EnvoyException(
fmt::format("duplicate name {} found among added/updated resources", resource.name()));
}
if (isHeartbeatResource(resource)) {
continue;
}
// TODO (dmitri-d) consider changing onConfigUpdate callback interface to avoid copying of
// resources
non_heartbeat_resources.Add()->CopyFrom(resource);
// DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
if (!resource.has_resource() && resource.aliases_size() > 0) {
continue;
}
if (message.type_url() != resource.resource().type_url()) {
throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match "
"the message-wide type URL {} in DeltaDiscoveryResponse {}",
resource.resource().type_url(), message.type_url(),
message.DebugString()));
}
}
for (const auto& name : message.removed_resources()) {
if (!names_added_removed.insert(name).second) {
throw EnvoyException(
fmt::format("duplicate name {} found in the union of added+removed resources", name));
}
}

{
const auto scoped_update = ttl_.scopedTtlUpdate();
for (const auto& resource : message.resources()) {
addResourceState(resource);
}
}

callbacks().onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
message.system_version_info());

// If a resource is gone, there is no longer a meaningful version for it that makes sense to
// provide to the server upon stream reconnect: either it will continue to not exist, in which
// case saying nothing is fine, or the server will bring back something new, which we should
// receive regardless (which is the logic that not specifying a version will get you).
//
// So, leave the version map entry present but blank. It will be left out of
// initial_resource_versions messages, but will remind us to explicitly tell the server "I'm
// cancelling my subscription" when we lose interest.
for (const auto& resource_name : message.removed_resources()) {
if (resource_state_.find(resource_name) != resource_state_.end()) {
resource_state_[resource_name] = ResourceState::waitingForServer();
}
}
ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", typeUrl(),
message.resources().size(), message.removed_resources().size());
}

std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryRequest>
DeltaSubscriptionState::getNextRequestInternal() {
auto request = std::make_unique<envoy::service::discovery::v3::DeltaDiscoveryRequest>();
request->set_type_url(typeUrl());
if (!any_request_sent_yet_in_current_stream_) {
any_request_sent_yet_in_current_stream_ = true;
// initial_resource_versions "must be populated for first request in a stream".
// Also, since this might be a new server, we must explicitly state *all* of our subscription
// interest.
for (auto const& [resource_name, resource_state] : resource_state_) {
// Populate initial_resource_versions with the resource versions we currently have.
// Resources we are interested in, but are still waiting to get any version of from the
// server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
if (!resource_state.isWaitingForServer()) {
(*request->mutable_initial_resource_versions())[resource_name] = resource_state.version();
}
// As mentioned above, fill resource_names_subscribe with everything, including names we
// have yet to receive any resource for unless this is a wildcard subscription, for which
// the first request on a stream must be without any resource names.
if (!wildcard_) {
names_added_.insert(resource_name);
}
}
// Wildcard subscription initial requests must have no resource_names_subscribe.
if (wildcard_) {
names_added_.clear();
}
names_removed_.clear();
}

std::copy(names_added_.begin(), names_added_.end(),
Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_subscribe()));
std::copy(names_removed_.begin(), names_removed_.end(),
Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_unsubscribe()));
names_added_.clear();
names_removed_.clear();

return request;
}

void DeltaSubscriptionState::addResourceState(
const envoy::service::discovery::v3::Resource& resource) {
setResourceTtl(resource);
resource_state_[resource.name()] = ResourceState(resource.version());
}

void DeltaSubscriptionState::setResourceTtl(
const envoy::service::discovery::v3::Resource& resource) {
if (resource.has_ttl()) {
ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
resource.name());
} else {
ttl_.clear(resource.name());
}
}

void DeltaSubscriptionState::ttlExpiryCallback(const std::vector<std::string>& expired) {
Protobuf::RepeatedPtrField<std::string> removed_resources;
for (const auto& resource : expired) {
resource_state_[resource] = ResourceState::waitingForServer();
removed_resources.Add(std::string(resource));
}
callbacks().onConfigUpdate({}, removed_resources, "");
}

} // namespace XdsMux
} // namespace Config
} // namespace Envoy
98 changes: 98 additions & 0 deletions source/common/config/xds_mux/delta_subscription_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#pragma once

#include "envoy/grpc/status.h"

#include "source/common/common/assert.h"
#include "source/common/common/logger.h"
#include "source/common/config/api_version.h"
#include "source/common/config/xds_mux/subscription_state.h"

#include "absl/container/node_hash_map.h"
#include "absl/types/optional.h"

namespace Envoy {
namespace Config {
namespace XdsMux {

// Tracks the state of a delta xDS-over-gRPC protocol session.
class DeltaSubscriptionState
: public BaseSubscriptionState<envoy::service::discovery::v3::DeltaDiscoveryResponse,
envoy::service::discovery::v3::DeltaDiscoveryRequest> {
public:
DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map,
Event::Dispatcher& dispatcher, const bool wildcard);

~DeltaSubscriptionState() override;

// Update which resources we're interested in subscribing to.
void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added,
const absl::flat_hash_set<std::string>& cur_removed) override;

// Whether there was a change in our subscription interest we have yet to inform the server of.
bool subscriptionUpdatePending() const override;

void markStreamFresh() override { any_request_sent_yet_in_current_stream_ = false; }

void ttlExpiryCallback(const std::vector<std::string>& expired) override;

DeltaSubscriptionState(const DeltaSubscriptionState&) = delete;
DeltaSubscriptionState& operator=(const DeltaSubscriptionState&) = delete;

private:
std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryRequest>
getNextRequestInternal() override;

void setResourceTtl(const envoy::service::discovery::v3::Resource& resource);
bool isHeartbeatResource(const envoy::service::discovery::v3::Resource& resource) const;
void
handleGoodResponse(const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) override;
void addResourceState(const envoy::service::discovery::v3::Resource& resource);

class ResourceState {
public:
explicit ResourceState(absl::string_view version) : version_(version) {}
// Builds a ResourceVersion in the waitingForServer state.
ResourceState() = default;
// Self-documenting alias of default constructor.
static ResourceState waitingForServer() { return ResourceState(); }

// If true, we currently have no version of this resource - we are waiting for the server to
// provide us with one.
bool isWaitingForServer() const { return version_ == absl::nullopt; }

// Must not be called if waitingForServer() == true.
std::string version() const {
ASSERT(version_.has_value());
return version_.value_or("");
}

private:
absl::optional<std::string> version_;
};

// Not all xDS resources support heartbeats due to there being specific information encoded in
// an empty response, which is indistinguishable from a heartbeat in some cases. For now we just
// disable heartbeats for these resources (currently only VHDS).
const bool supports_heartbeats_;

// Is the subscription is for a wildcard request.
const bool wildcard_;

// A map from resource name to per-resource version. The keys of this map are exactly the resource
// names we are currently interested in. Those in the waitingForServer state currently don't have
// any version for that resource: we need to inform the server if we lose interest in them, but we
// also need to *not* include them in the initial_resource_versions map upon a reconnect.
absl::node_hash_map<std::string, ResourceState> resource_state_;

bool any_request_sent_yet_in_current_stream_{};

// Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent.
// TODO: Can't use absl::flat_hash_set due to ordering issues in gTest expectation matching.
// Feel free to change to an unordered container once we figure out how to make it work.
std::set<std::string> names_added_;
std::set<std::string> names_removed_;
};

} // namespace XdsMux
} // namespace Config
} // namespace Envoy
Loading