Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions DEPRECATED.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ The following features have been DEPRECATED and will be removed in the specified
* DOWNSTREAM_ADDRESS log formatter is deprecated. Use DOWNSTREAM_REMOTE_ADDRESS_WITHOUT_PORT
instead.
* CLIENT_IP header formatter is deprecated. Use DOWNSTREAM_REMOTE_ADDRESS_WITHOUT_PORT instead.
* Rate limit service configuration via the `cluster_name` field is deprecated. Use `grpc_service`
instead.
* gRPC service configuration via the `cluster_names` field in `ApiConfigSource` is deprecated. Use
`grpc_services` instead.


## Version 1.5.0

Expand Down
1 change: 1 addition & 0 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def _envoy_api_deps():
"cds",
"discovery",
"eds",
"grpc_service",
"health_check",
"lds",
"metrics",
Expand Down
10 changes: 10 additions & 0 deletions include/envoy/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "async_client_manager_interface",
hdrs = ["async_client_manager.h"],
external_deps = ["envoy_grpc_service"],
deps = [
":async_client_interface",
"//include/envoy/stats:stats_interface",
],
)

envoy_cc_library(
name = "status",
hdrs = ["status.h"],
Expand Down
47 changes: 47 additions & 0 deletions include/envoy/grpc/async_client_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include "envoy/grpc/async_client.h"
#include "envoy/stats/stats.h"

#include "api/grpc_service.pb.h"

namespace Envoy {
namespace Grpc {

// Per-service factory for Grpc::AsyncClients. This factory is thread aware and will instantiate
// with thread local state. Clients will use ThreadLocal::Instance::dispatcher() for event handling.
class AsyncClientFactory {
public:
virtual ~AsyncClientFactory() {}

/**
* Create a gRPC::AsyncClient.
* @return AsyncClientPtr async client.
*/
virtual AsyncClientPtr create() PURE;
};

typedef std::unique_ptr<AsyncClientFactory> AsyncClientFactoryPtr;

// Singleton gRPC client manager. Grpc::AsyncClientManager can be used to create per-service
// Grpc::AsyncClientFactory instances.
class AsyncClientManager {
public:
virtual ~AsyncClientManager() {}

/**
* Create a Grpc::AsyncClients factory for a service. Validation of the service is performed and
* will raise an exception on failure.
* @param grpc_service envoy::api::v2::GrpcService configuration.
* @param scope stats scope.
* @return AsyncClientFactoryPtr factory for grpc_service.
* @throws EnvoyException when grpc_service validation fails.
*/
virtual AsyncClientFactoryPtr
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really suggesting we do this in this PR, but I think there is an opportunity here that would further massively cleanup the amount of code that is needed at many of the call sites. Basically, like HTTP async client, if a gRPC client was internally TLS aware, I think many callers would not need any TLS at all. They could just acquire a handle, and then create a TLS local request. I'm pretty sure that if it were implemented this way, TLS in rate limit, access log, and metrics (and soon tracing) would not be needed.

At the very least, I might add some TODOs in this area, or if there any interface changes that would make this easier later we might want to consider it. WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as documented, when you create() on the factory, the client you get back (like with CM HTTP async client) is the one for your thread. It may be a cheap operation, if all we are returning is a thin handle (not the current implementation, but maybe in the future).

For the Envoy gRPC client today, Grpc::AsyncClientImpl uses the HTTP async client from CM, so it already is somewhat TLS aware. In the Google gRPC world, we might have things like TLS completion queues that are managed by the the TLS for the Google gRPC client.

The question is then, at these various sites that use the gRPC async client, do they have any specific TLS state that is external to the client. Looking at access logs gRPC, there is the stream_map_; would this remain TLS or become some state that is shared across threads? There's a tradeoff here; introducing more complexity (or contention) on shared structures vs. more streams (one per silo) for the same resource.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is then, at these various sites that use the gRPC async client, do they have any specific TLS state that is external to the client. Looking at access logs gRPC, there is the stream_map_; would this remain TLS or become some state that is shared across threads? There's a tradeoff here; introducing more complexity (or contention) on shared structures vs. more streams (one per silo) for the same resource.

Yeah I'm not sure. The main pattern that I think would be possible if the client itself was TLS aware is you could grab a client and do error checking, and then safely use that client across all threads to start streams. In this case, the client itself is TLS aware, but it still effectively needs to be stored in a local TLS slot for use.

It's not a big deal, this is fine as is. We can look at further cleanups later.

factoryForGrpcService(const envoy::api::v2::GrpcService& grpc_service, Stats::Scope& scope) PURE;
};

typedef std::unique_ptr<AsyncClientManager> AsyncClientManagerPtr;

} // namespace Grpc
} // namespace Envoy
1 change: 1 addition & 0 deletions include/envoy/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ envoy_cc_library(
":upstream_interface",
"//include/envoy/access_log:access_log_interface",
"//include/envoy/config:grpc_mux_interface",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/http:async_client_interface",
"//include/envoy/http:conn_pool_interface",
"//include/envoy/local_info:local_info_interface",
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "envoy/access_log/access_log.h"
#include "envoy/config/grpc_mux.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/http/async_client.h"
#include "envoy/http/conn_pool.h"
#include "envoy/local_info/local_info.h"
Expand Down Expand Up @@ -128,6 +129,11 @@ class ClusterManager {
*/
virtual Config::GrpcMux& adsMux() PURE;

/**
* @return Grpc::AsyncClientManager& the cluster manager's gRPC client manager.
*/
virtual Grpc::AsyncClientManager& grpcAsyncClientManager() PURE;

/**
* Return the current version info string for dynamic clusters, if CDS is setup.
*
Expand Down
1 change: 1 addition & 0 deletions source/common/access_log/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ envoy_cc_library(
deps = [
"//include/envoy/access_log:access_log_interface",
"//include/envoy/grpc:async_client_interface",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/singleton:instance_interface",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
Expand Down
3 changes: 1 addition & 2 deletions source/common/access_log/grpc_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
namespace Envoy {
namespace AccessLog {

GrpcAccessLogStreamerImpl::GrpcAccessLogStreamerImpl(GrpcAccessLogClientFactoryPtr&& factory,
GrpcAccessLogStreamerImpl::GrpcAccessLogStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory,
ThreadLocal::SlotAllocator& tls,
const LocalInfo::LocalInfo& local_info)
: tls_slot_(tls.allocateSlot()) {

SharedStateSharedPtr shared_state = std::make_shared<SharedState>(std::move(factory), local_info);
tls_slot_->set([shared_state](Event::Dispatcher&) {
return ThreadLocal::ThreadLocalObjectSharedPtr{new ThreadLocalStreamer(shared_state)};
Expand Down
23 changes: 4 additions & 19 deletions source/common/access_log/grpc_access_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/access_log/access_log.h"
#include "envoy/grpc/async_client.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/local_info/local_info.h"
#include "envoy/singleton/instance.h"
#include "envoy/thread_local/thread_local.h"
Expand All @@ -15,21 +16,6 @@ namespace AccessLog {

// TODO(mattklein123): Stats

/**
* Factory for creating a gRPC access log streaming client.
*/
class GrpcAccessLogClientFactory {
public:
virtual ~GrpcAccessLogClientFactory() {}

/**
* @return GrpcAccessLogClientPtr a new client.
*/
virtual Grpc::AsyncClientPtr create() PURE;
};

typedef std::unique_ptr<GrpcAccessLogClientFactory> GrpcAccessLogClientFactoryPtr;

/**
* Interface for an access log streamer. The streamer deals with threading and sends access logs
* on the correct stream.
Expand All @@ -55,8 +41,7 @@ typedef std::shared_ptr<GrpcAccessLogStreamer> GrpcAccessLogStreamerSharedPtr;
*/
class GrpcAccessLogStreamerImpl : public Singleton::Instance, public GrpcAccessLogStreamer {
public:
GrpcAccessLogStreamerImpl(GrpcAccessLogClientFactoryPtr&& factory,
ThreadLocal::SlotAllocator& tls,
GrpcAccessLogStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory, ThreadLocal::SlotAllocator& tls,
const LocalInfo::LocalInfo& local_info);

// GrpcAccessLogStreamer
Expand All @@ -71,10 +56,10 @@ class GrpcAccessLogStreamerImpl : public Singleton::Instance, public GrpcAccessL
* slot to be destroyed while the streamers hold onto the shared state.
*/
struct SharedState {
SharedState(GrpcAccessLogClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info)
SharedState(Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info)
: factory_(std::move(factory)), local_info_(local_info) {}

GrpcAccessLogClientFactoryPtr factory_;
Grpc::AsyncClientFactoryPtr factory_;
const LocalInfo::LocalInfo& local_info_;
};

Expand Down
3 changes: 2 additions & 1 deletion source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ envoy_cc_library(
":utility_lib",
"//include/envoy/config:grpc_mux_interface",
"//include/envoy/config:subscription_interface",
"//include/envoy/grpc:async_client_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/common:logger_lib",
"//source/common/grpc:async_client_lib",
"//source/common/protobuf",
],
)
Expand Down Expand Up @@ -158,6 +158,7 @@ envoy_cc_library(
":grpc_mux_subscription_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/grpc:async_client_interface",
],
)

Expand Down
8 changes: 0 additions & 8 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ GrpcMuxImpl::GrpcMuxImpl(const envoy::api::v2::Node& node, Grpc::AsyncClientPtr
retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
}

GrpcMuxImpl::GrpcMuxImpl(const envoy::api::v2::Node& node,
Upstream::ClusterManager& cluster_manager,
const std::string& remote_cluster_name, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method)
: GrpcMuxImpl(node,
std::make_unique<Grpc::AsyncClientImpl>(cluster_manager, remote_cluster_name),
dispatcher, service_method) {}

GrpcMuxImpl::~GrpcMuxImpl() {
for (const auto& api_state : api_state_) {
for (auto watch : api_state.second.watches_) {
Expand Down
5 changes: 1 addition & 4 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#include "envoy/config/grpc_mux.h"
#include "envoy/config/subscription.h"
#include "envoy/event/dispatcher.h"
#include "envoy/grpc/async_client.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/common/logger.h"
#include "common/grpc/async_client_impl.h"

#include "api/discovery.pb.h"

Expand All @@ -22,9 +22,6 @@ class GrpcMuxImpl : public GrpcMux,
Grpc::TypedAsyncStreamCallbacks<envoy::api::v2::DiscoveryResponse>,
Logger::Loggable<Logger::Id::upstream> {
public:
GrpcMuxImpl(const envoy::api::v2::Node& node, Upstream::ClusterManager& cluster_manager,
const std::string& remote_cluster_name, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method);
GrpcMuxImpl(const envoy::api::v2::Node& node, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method);
~GrpcMuxImpl();
Expand Down
7 changes: 1 addition & 6 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "envoy/config/subscription.h"
#include "envoy/event/dispatcher.h"
#include "envoy/grpc/async_client.h"

#include "common/config/grpc_mux_impl.h"
#include "common/config/grpc_mux_subscription_impl.h"
Expand All @@ -14,12 +15,6 @@ namespace Config {
template <class ResourceType>
class GrpcSubscriptionImpl : public Config::Subscription<ResourceType> {
public:
GrpcSubscriptionImpl(const envoy::api::v2::Node& node, Upstream::ClusterManager& cm,
const std::string& remote_cluster_name, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats)
: GrpcSubscriptionImpl(node, std::make_unique<Grpc::AsyncClientImpl>(cm, remote_cluster_name),
dispatcher, service_method, stats) {}

GrpcSubscriptionImpl(const envoy::api::v2::Node& node, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats)
Expand Down
11 changes: 8 additions & 3 deletions source/common/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,16 @@ class SubscriptionFactory {
Utility::apiConfigSourceRefreshDelay(api_config_source),
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats));
break;
case envoy::api::v2::ApiConfigSource::GRPC:
case envoy::api::v2::ApiConfigSource::GRPC: {
result.reset(new GrpcSubscriptionImpl<ResourceType>(
node, cm, cluster_name, dispatcher,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), stats));
node,
Config::Utility::factoryForApiConfigSource(cm.grpcAsyncClientManager(),
config.api_config_source(), scope)
->create(),
dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method),
stats));
break;
}
default:
NOT_REACHED;
}
Expand Down
32 changes: 32 additions & 0 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,37 @@ void Utility::checkObjNameLength(const std::string& error_prefix, const std::str
}
}

Grpc::AsyncClientFactoryPtr
Utility::factoryForApiConfigSource(Grpc::AsyncClientManager& async_client_manager,
const envoy::api::v2::ApiConfigSource& api_config_source,
Stats::Scope& scope) {
ASSERT(api_config_source.api_type() == envoy::api::v2::ApiConfigSource::GRPC);
envoy::api::v2::GrpcService grpc_service;
if (api_config_source.cluster_names().empty()) {
if (api_config_source.grpc_services().empty()) {
throw EnvoyException(
fmt::format("Missing gRPC services in envoy::api::v2::ApiConfigSource: {}",
api_config_source.DebugString()));
}
// TODO(htuch): Implement multiple gRPC services.
if (api_config_source.grpc_services().size() != 1) {
throw EnvoyException(fmt::format(
"Only singleton gRPC service lists supported in envoy::api::v2::ApiConfigSource: {}",
api_config_source.DebugString()));
}
grpc_service.MergeFrom(api_config_source.grpc_services(0));
} else {
// TODO(htuch): cluster_names is deprecated, remove after 1.6.0.
if (api_config_source.cluster_names().size() != 1) {
throw EnvoyException(fmt::format(
"Only singleton cluster name lists supported in envoy::api::v2::ApiConfigSource: {}",
api_config_source.DebugString()));
}
grpc_service.mutable_envoy_grpc()->set_cluster_name(api_config_source.cluster_names(0));
}

return async_client_manager.factoryForGrpcService(grpc_service, scope);
}

} // namespace Config
} // namespace Envoy
11 changes: 11 additions & 0 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,17 @@ class Utility {
* @param name supplies the name to check for length limits.
*/
static void checkObjNameLength(const std::string& error_prefix, const std::string& name);

/**
* Obtain gRPC async client factory from a envoy::api::v2::ApiConfigSource.
* @param async_client_manager gRPC async client manager.
* @param api_config_source envoy::api::v2::ApiConfigSource. Must have config type GRPC.
* @return Grpc::AsyncClientFactoryPtr gRPC async client factory.
*/
static Grpc::AsyncClientFactoryPtr
factoryForApiConfigSource(Grpc::AsyncClientManager& async_client_manager,
const envoy::api::v2::ApiConfigSource& api_config_source,
Stats::Scope& scope);
};

} // namespace Config
Expand Down
13 changes: 13 additions & 0 deletions source/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "async_client_manager_lib",
srcs = ["async_client_manager_impl.cc"],
hdrs = ["async_client_manager_impl.h"],
deps = [
":async_client_lib",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/singleton:manager_interface",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
],
)

envoy_cc_library(
name = "codec_lib",
srcs = ["codec.cc"],
Expand Down
Loading