Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/root/configuration/upstream/cluster_manager/cds.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,40 @@ Statistics
----------

CDS has a :ref:`statistics <subscription_statistics>` tree rooted at *cluster_manager.cds.*

On-demand CDS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markdroth FYI, in case you have some review bandwidth cycles to give this a pass similar to the VHDS one.

-------------

Similar to VHDS on-demand feature in terms of hosts, the on-demand CDS API is an additional API
that Envoy will call to dynamically fetch upstream clusters which Envoy interested in spontaneously.

By default in CDS, all cluster configurations are sent to every Envoy instance in the mesh. The
delta CDS provides the ability that the xDS management server can send incremental CDS to the Envoy
instance, but Envoy instance can not feedback upstream clusters it interested in spontaneously, in
other words, Envoy instance only can receive the CDS passively.

In order to fix this issue, on-demand CDS uses the delta xDS protocol to allow a cluster configuration
to be subscribed to and the necessary cluster configuration to be requested as needed. Instead
of sending all cluster configuration or cluster configuration the Envoy instance aren't interested
in, using on-demand CDS will allow an Envoy instance to subscribe and unsubscribe from a list of
cluster configurations stored internally in the xDS management server. The xDS management server
will monitor the list and use it to filter the configuration sent to an individual Envoy instance
to only contain the subscribed cluster configurations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? Let's say we have an Envoy and it has a CDS initially with {X, Y}. An extension notices that Z is missing and asks for Z. Is the management server now only returning {Z} or is it {X, Y, Z} (via delta protocol). I think what we're doing with on-demand is augmenting the base set of clusters, not going pure delta subscription.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the management server will only return {Z}, but actually it depends the implementation of management server, management server can also return {X, Y, Z}, but in the scene we expected before, the management server will only return {Z}.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to describe two different things here:

  1. What the management server thinks the Envoy is subscribed to. Presumably {X, Y, Z}.
  2. What the management server puts on the wire as part of the response delta update, this is just {Z}, but it could validly be any superset.


Subscribing to resources
^^^^^^^^^^^^^^^^^^^^^^^^

On-demand CDS allows resources to be :ref:`subscribed <xds_protocol_delta_subscribe>` to using
a :ref:`DeltaDiscoveryRequest <envoy_api_msg_DeltaDiscoveryRequest>`
with the :ref:`type_url <envoy_api_field_DeltaDiscoveryRequest.type_url>` set to
`type.googleapis.com/envoy.api.v2.Cluster` and
:ref:`resource_names_subscribe <envoy_api_field_DeltaDiscoveryRequest.resource_names_subscribe>`
set to a list of cluster resource names for which it would like configuration.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the cluster name have any special form? How does the extension (RocketMQ in your case) know what cluster to ask for? I.e. how is it translating something in the request to something it can ask for from the management server?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, no special form here. the resource_names_subscribe would notify management server cluster name the envoy instance need, the type_url would notify management server the type of resource the envoy instance need. The management server will translate the clusters into resources in DeltaDiscoveryResponse envoy can accept.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still trying to understand how we go from some RocketMQ concept to a cluster resource name. Can you elaborate on this?

Copy link
Member Author

@aaron-ai aaron-ai Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, there are a series of concepts in RocketMQ: topic, broker, and nameserver. nameserver is the server who provides service-discovery to find broker through topic. RocketMQ client will choose a topic to send/consume message. topic is just a name or an abstract concept, it is a set of brokers actually.

You can view more details in the official website's description about the architecture, which could be more precise.

In a standard procedure, RocketMQ client will fetch the broker's address through nameserver, the nameserver will send the route information to the client. Client would send/consume message to broker directly.

In service mesh, one topic's route information is abstracted into one cluster in xDS, the topic name is the cluster's name or the resource's name in delta xDS. Sometimes the envoy instance do not retain the cluster ( can not find the cluster in clusterManager ) but RocketMQ client needs it, the addToClusterInterest method will be trigger to fetch the route information from management server, and the ClusterUpdateCallbacks will notify the the events to be processed.

Copy link
Member

@htuch htuch Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think the interesting question here is how do we map topic to cluster name? Is this always going to be 1:1? Is the RocketMQ filter config going to allow for some kind of transformation or mapping to happen?

It seems unlikely that in all deployments we would want a 1:1 mapping from topic to cluster, this seems very restrictive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@htuch Yes, it's 1:1 now, I think 1:1 is the most suited way for our architecture.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering when other folks pick up RocketMQ whether they are going to feel the same way. Many Envoy operators have stylized ways of naming clusters that reflect things like their provenance, tenant, workload, version, etc. Would something like cluster-foo-v145-acme-tau-beta-lambda be a reasonable abstract topic to route to? Or are topics more like the-weather-today-in-france and you are expecting the clusters to match this logical naming?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@htuch As a developer of Apache RocketMQ, I would say 1:1 mapping RocketMQ topic to envoy cluster is the most natural way. One topic is normally served by multiple broker servers to maintain availability and each broker serves multiple topics. The topic name itself can be any arbitrary valid name. Prod deployment usually follows some naming schemes to implement multi-tenant etc. From the perspective of application developer and operator, it is OK to treat a topic as a cluster of virtual shared servers.

" Would something like cluster-foo-v145-acme-tau-beta-lambda be a reasonable abstract topic to route to? Or are topics more like the-weather-today-in-france and you are expecting the clusters to match this logical naming?"

The name of the topic is up to the application developer/operators. In most cases, topic name follows TenantId-productName-moduleX-featureY paradigm. Application developers may version their topics.


Typical use case
^^^^^^^^^^^^^^^^

Sometimes, there will be a large amount of broker instances provided for
`Apache RocketMQ <http://rocketmq.apache.org/>`_ to produce/consume messages. Perhaps the size of
SToW CDS configurations will be more than 1GB, so it is not practical to deliver the SToW CDS from the
management server every time, which will cause huge overhead. In this case, on-demand CDS is essential.
9 changes: 9 additions & 0 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ class GrpcMux {
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) PURE;

/**
* Adds additional resources to the watch. Unlike addOrUpdateWatch, it never removes resources.
*/
virtual Watch* addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) PURE;

virtual void removeWatch(const std::string& type_url, Watch* watch) PURE;

/**
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ class Subscription {
* be passed to std::set_difference, which must be given sorted collections.
*/
virtual void updateResourceInterest(const std::set<std::string>& update_to_these_names) PURE;

/**
* Add the resources to fetch.
* @param resources vector of resource names to fetch.
*/
virtual void addToResourceInterest(const std::set<std::string>&){};
};

using SubscriptionPtr = std::unique_ptr<Subscription>;
Expand Down
7 changes: 7 additions & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ class ClusterManager {
* @return Config::SubscriptionFactory& the subscription factory.
*/
virtual Config::SubscriptionFactory& subscriptionFactory() PURE;

virtual void addToClusterInterest(const std::set<std::string>& add_these_names) PURE;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used?

Copy link
Member Author

@aaron-ai aaron-ai Jan 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, updateClusterInterest here only provides the possibility to update the entire clusterInterest, maybe I should remove this method. As for addToClusterInterest, the code in rocketmq_proxy (#9503 ) call it in practice.

code in RouterImpl#sendRequestToUpstream

  if (!cluster) {
    active_message_->awaitCluster(true);
    active_message_->incPendingStats();
    if (active_message_->connectionManager().hasInFlightCdsRequest(cluster_name)) {
      ENVOY_LOG(trace, "Cluster {} is absent, but there has been an in-flight CDS request",
                cluster_name);
      return;
    }

    std::set<std::string> resources = {cluster_name};
    cluster_manager_.addToClusterInterest(resources);
    active_message_->connectionManager().markInFlightCdsRequest(cluster_name);
    ENVOY_LOG(info, "Cluster {} is not available, trigger to set up through delta CDS. Opaque: {}",
              cluster_name, opaque);
    return;
  }

code in ActiveMessage#onQueryTopicRoute

    await_cluster_ = true;
    incPendingStats();
    std::set<std::string> resources{cluster_name};
    ENVOY_LOG(trace, "Initiate on-demand cluster discovery service for {}", cluster_name);
    clusterManager.addToClusterInterest(resources);
    connection_manager_.markInFlightCdsRequest(cluster_name);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgallagher the on-demand part in #9503 has been removed temporarily, but you can still view the related part in this commit : aaron-ai@40ed836

};

using ClusterManagerPtr = std::unique_ptr<ClusterManager>;
Expand All @@ -253,6 +255,11 @@ class CdsApi {
* @return std::string last accepted version from fetch.
*/
virtual const std::string versionInfo() const PURE;

/**
* Add watch set of cluster resources interested.
*/
virtual void addToClusterInterest(const std::set<std::string>& add_these_names) PURE;
};

using CdsApiPtr = std::unique_ptr<CdsApi>;
Expand Down
5 changes: 5 additions & 0 deletions source/common/config/delta_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ void DeltaSubscriptionImpl::updateResourceInterest(
stats_.update_attempt_.inc();
}

void DeltaSubscriptionImpl::addToResourceInterest(const std::set<std::string>& add_these_names) {
watch_ = context_->addToWatch(type_url_, watch_, add_these_names, *this, init_fetch_timeout_);
stats_.update_attempt_.inc();
}

// Config::SubscriptionCallbacks
void DeltaSubscriptionImpl::onConfigUpdate(
const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Expand Down
1 change: 1 addition & 0 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DeltaSubscriptionImpl : public Subscription, public SubscriptionCallbacks
void start(const std::set<std::string>& resource_names) override;

void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void addToResourceInterest(const std::set<std::string>& add_these_names) override;

// Config::SubscriptionCallbacks (all pass through to callbacks_!)
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Expand Down
9 changes: 9 additions & 0 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class GrpcMuxImpl : public GrpcMux,
SubscriptionCallbacks&, std::chrono::milliseconds) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

Watch* addToWatch(const std::string&, Watch*, const std::set<std::string>&,
SubscriptionCallbacks&, std::chrono::milliseconds) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
void removeWatch(const std::string&, Watch*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

void handleDiscoveryResponse(
Expand Down Expand Up @@ -156,6 +161,10 @@ class NullGrpcMuxImpl : public GrpcMux,
SubscriptionCallbacks&, std::chrono::milliseconds) override {
throw EnvoyException("ADS must be configured to support an ADS config source");
}
Watch* addToWatch(const std::string&, Watch*, const std::set<std::string>&,
SubscriptionCallbacks&, std::chrono::milliseconds) override {
throw EnvoyException("ADS must be configured to support an ADS config source");
}
void removeWatch(const std::string&, Watch*) override {
throw EnvoyException("ADS must be configured to support an ADS config source");
}
Expand Down
43 changes: 33 additions & 10 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,24 @@ Watch* NewGrpcMuxImpl::addOrUpdateWatch(const std::string& type_url, Watch* watc
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
if (watch == nullptr) {
return addWatch(type_url, resources, callbacks, init_fetch_timeout);
} else {
updateWatch(type_url, watch, resources);
return watch;
watch = addWatch(type_url, callbacks, init_fetch_timeout);
}
// updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
updateWatch(type_url, watch, resources);
return watch;
}

Watch* NewGrpcMuxImpl::addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
if (watch == nullptr) {
watch = addWatch(type_url, callbacks, init_fetch_timeout);
}
// addToWatch() queues a discovery request for any of *extra* 'resources' we are not yet
// subscribed.
addToWatch(type_url, watch, resources);
return watch;
}

void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
Expand Down Expand Up @@ -124,22 +137,32 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::subscribe(const std::string&, const std::set<std

void NewGrpcMuxImpl::start() { grpc_stream_.establishNewStream(); }

Watch* NewGrpcMuxImpl::addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
Watch* NewGrpcMuxImpl::addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
auto entry = subscriptions_.find(type_url);
if (entry == subscriptions_.end()) {
// We don't yet have a subscription for type_url! Make one!
addSubscription(type_url, init_fetch_timeout);
return addWatch(type_url, resources, callbacks, init_fetch_timeout);
return addWatch(type_url, callbacks, init_fetch_timeout);
}

Watch* watch = entry->second->watch_map_.addWatch(callbacks);
// updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
updateWatch(type_url, watch, resources);
return watch;
}

void NewGrpcMuxImpl::addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources) {
ASSERT(watch != nullptr);
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
auto added_removed = sub->second->watch_map_.addToWatchInterest(watch, resources);
sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_, added_removed.removed_);
// Tell the server about our change in interest, if any.
if (sub->second->sub_state_.subscriptionUpdatePending()) {
trySendDiscoveryRequests();
}
}

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
Expand Down
12 changes: 10 additions & 2 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class NewGrpcMuxImpl
Watch* addOrUpdateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) override;

Watch* addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) override;

void removeWatch(const std::string& type_url, Watch* watch) override;

// TODO(fredlas) PR #8478 will remove this.
Expand Down Expand Up @@ -81,8 +86,11 @@ class NewGrpcMuxImpl
}

private:
Watch* addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout);
Watch* addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout);

void addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources);

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
Expand Down
9 changes: 9 additions & 0 deletions source/common/config/watch_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ AddedRemoved WatchMap::updateWatchInterest(Watch* watch,
findRemovals(newly_removed_from_watch, watch));
}

AddedRemoved WatchMap::addToWatchInterest(Watch* watch,
const std::set<std::string>& add_these_names) {
std::vector<std::string> newly_added_to_watch(add_these_names.begin(), add_these_names.end());
watch->resource_names_.insert(add_these_names.begin(), add_these_names.end());
std::set<std::string> removals;

return AddedRemoved(findAdditions(newly_added_to_watch, watch), std::move(removals));
}

absl::flat_hash_set<Watch*> WatchMap::watchesInterestedIn(const std::string& resource_name) {
absl::flat_hash_set<Watch*> ret = wildcard_watches_;
const auto watches_interested = watch_interest_.find(resource_name);
Expand Down
3 changes: 3 additions & 0 deletions source/common/config/watch_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class WatchMap : public SubscriptionCallbacks, public Logger::Loggable<Logger::I
AddedRemoved updateWatchInterest(Watch* watch,
const std::set<std::string>& update_to_these_names);

// Adds the extra set of resource names that the given watch should watch.
AddedRemoved addToWatchInterest(Watch* watch, const std::set<std::string>& add_these_names);

// Expects that the watch to be removed has already had all of its resource names removed via
// updateWatchInterest().
void removeWatch(Watch* watch);
Expand Down
4 changes: 4 additions & 0 deletions source/common/upstream/cds_api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class CdsApiImpl : public CdsApi,
}
const std::string versionInfo() const override { return system_version_info_; }

void addToClusterInterest(const std::set<std::string>& add_these_names) override {
subscription_->addToResourceInterest(add_these_names);
}

private:
// Config::SubscriptionCallbacks
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Expand Down
4 changes: 4 additions & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

Config::SubscriptionFactory& subscriptionFactory() override { return subscription_factory_; }

void addToClusterInterest(const std::set<std::string>& add_these_names) override {
cds_api_->addToClusterInterest(add_these_names);
}

protected:
virtual void postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed);
Expand Down
6 changes: 6 additions & 0 deletions test/common/config/delta_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ TEST_F(DeltaSubscriptionImplTest, PauseHoldsRequest) {
subscription_->resume();
}

TEST_F(DeltaSubscriptionImplTest, AddResourceCauseRequest) {
startSubscription({});
expectSendMessage({"name1", "name2"}, {}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {});
subscription_->addToResourceInterest({"name1", "name2"});
}

TEST_F(DeltaSubscriptionImplTest, ResponseCausesAck) {
startSubscription({"name1"});
deliverConfigUpdate({"name1"}, "someversion", true);
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/config/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class MockGrpcMux : public GrpcMux {
MOCK_METHOD(Watch*, addOrUpdateWatch,
(const std::string& type_url, Watch* watch, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout));
MOCK_METHOD(Watch*, addToWatch,
(const std::string& type_url, Watch* watch, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout));
MOCK_METHOD(void, removeWatch, (const std::string& type_url, Watch* watch));
};

Expand Down
2 changes: 2 additions & 0 deletions test/mocks/upstream/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ class MockClusterManager : public ClusterManager {
MOCK_METHOD(ClusterUpdateCallbacksHandle*, addThreadLocalClusterUpdateCallbacks_,
(ClusterUpdateCallbacks & callbacks));
MOCK_METHOD(Config::SubscriptionFactory&, subscriptionFactory, ());
MOCK_METHOD(void, addToClusterInterest, (const std::set<std::string>&));

NiceMock<Http::ConnectionPool::MockInstance> conn_pool_;
NiceMock<Http::MockAsyncClient> async_client_;
Expand Down Expand Up @@ -385,6 +386,7 @@ class MockCdsApi : public CdsApi {
MOCK_METHOD(void, initialize, ());
MOCK_METHOD(void, setInitializedCb, (std::function<void()> callback));
MOCK_METHOD(const std::string, versionInfo, (), (const));
MOCK_METHOD(void, addToClusterInterest, (const std::set<std::string>&));

std::function<void()> initialized_callback_;
};
Expand Down