Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions envoy/grpc/async_client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,40 @@ class AsyncClientFactory {

using AsyncClientFactoryPtr = std::unique_ptr<AsyncClientFactory>;

class GrpcServiceConfigWithHashKey {
public:
explicit GrpcServiceConfigWithHashKey(const envoy::config::core::v3::GrpcService& config)
: config_(config), pre_computed_hash_(Envoy::MessageUtil::hash(config)){};

template <typename H> friend H AbslHashValue(H h, const GrpcServiceConfigWithHashKey& wrapper) {
return H::combine(std::move(h), wrapper.pre_computed_hash_);
}

std::size_t getPreComputedHash() const { return pre_computed_hash_; }

friend bool operator==(const GrpcServiceConfigWithHashKey& lhs,
const GrpcServiceConfigWithHashKey& rhs) {
if (lhs.pre_computed_hash_ == rhs.pre_computed_hash_) {
return Protobuf::util::MessageDifferencer::Equivalent(lhs.config_, rhs.config_);
}
return false;
}

const envoy::config::core::v3::GrpcService& config() const { return config_; }

private:
const envoy::config::core::v3::GrpcService config_;
const std::size_t pre_computed_hash_;
};

// Singleton gRPC client manager. Grpc::AsyncClientManager can be used to create per-service
// Grpc::AsyncClientFactory instances. All manufactured Grpc::AsyncClients must
// be destroyed before the AsyncClientManager can be safely destructed.
class AsyncClientManager {
public:
virtual ~AsyncClientManager() = default;

// TODO(diazalan) deprecate old getOrCreateRawAsyncClient once all filters have been transitioned
/**
* Create a Grpc::RawAsyncClient. The async client is cached thread locally and shared across
* different filter instances.
Expand All @@ -54,6 +81,22 @@ class AsyncClientManager {
getOrCreateRawAsyncClient(const envoy::config::core::v3::GrpcService& grpc_service,
Stats::Scope& scope, bool skip_cluster_check) PURE;

/**
* Create a Grpc::RawAsyncClient. The async client is cached thread locally and shared across
* different filter instances.
* @param grpc_service Envoy::Grpc::GrpcServiceConfigWithHashKey which contains config and
* hashkey.
* @param scope stats scope.
* @param skip_cluster_check if set to true skips checks for cluster presence and being statically
* configured.
* @param cache_option always use cache or use cache when runtime is enabled.
* @return RawAsyncClientPtr a grpc async client.
* @throws EnvoyException when grpc_service validation fails.
*/
virtual RawAsyncClientSharedPtr
getOrCreateRawAsyncClientWithHashKey(const GrpcServiceConfigWithHashKey& grpc_service,
Stats::Scope& scope, bool skip_cluster_check) PURE;

/**
* Create a Grpc::AsyncClients factory for a service. Validation of the service is performed and
* will raise an exception on failure.
Expand Down
37 changes: 27 additions & 10 deletions source/common/grpc/async_client_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "source/common/common/base64.h"
#include "source/common/grpc/async_client_impl.h"
#include "source/common/protobuf/utility.h"

#include "absl/strings/match.h"

Expand Down Expand Up @@ -136,12 +137,27 @@ AsyncClientManagerImpl::factoryForGrpcService(const envoy::config::core::v3::Grp
RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClient(
const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope,
bool skip_cluster_check) {
RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config);
const GrpcServiceConfigWithHashKey config_with_hash_key = GrpcServiceConfigWithHashKey(config);
RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
if (client != nullptr) {
return client;
}
client = factoryForGrpcService(config, scope, skip_cluster_check)->createUncachedRawAsyncClient();
raw_async_client_cache_->setCache(config, client);
client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check)
->createUncachedRawAsyncClient();
raw_async_client_cache_->setCache(config_with_hash_key, client);
return client;
}

RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClientWithHashKey(
const GrpcServiceConfigWithHashKey& config_with_hash_key, Stats::Scope& scope,
bool skip_cluster_check) {
RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
if (client != nullptr) {
return client;
}
client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check)
->createUncachedRawAsyncClient();
raw_async_client_cache_->setCache(config_with_hash_key, client);
return client;
}

Expand All @@ -151,20 +167,21 @@ AsyncClientManagerImpl::RawAsyncClientCache::RawAsyncClientCache(Event::Dispatch
}

void AsyncClientManagerImpl::RawAsyncClientCache::setCache(
const envoy::config::core::v3::GrpcService& config, const RawAsyncClientSharedPtr& client) {
ASSERT(lru_map_.find(config) == lru_map_.end());
const GrpcServiceConfigWithHashKey& config_with_hash_key,
const RawAsyncClientSharedPtr& client) {
ASSERT(lru_map_.find(config_with_hash_key) == lru_map_.end());
// Create a new cache entry at the beginning of the list.
lru_list_.emplace_front(config, client, dispatcher_.timeSource().monotonicTime());
lru_map_[config] = lru_list_.begin();
lru_list_.emplace_front(config_with_hash_key, client, dispatcher_.timeSource().monotonicTime());
lru_map_[config_with_hash_key] = lru_list_.begin();
// If inserting to an empty cache, enable eviction timer.
if (lru_list_.size() == 1) {
evictEntriesAndResetEvictionTimer();
}
}

RawAsyncClientSharedPtr AsyncClientManagerImpl::RawAsyncClientCache::getCache(
const envoy::config::core::v3::GrpcService& config) {
auto it = lru_map_.find(config);
const GrpcServiceConfigWithHashKey& config_with_hash_key) {
auto it = lru_map_.find(config_with_hash_key);
if (it == lru_map_.end()) {
return nullptr;
}
Expand All @@ -189,7 +206,7 @@ void AsyncClientManagerImpl::RawAsyncClientCache::evictEntriesAndResetEvictionTi
MonotonicTime next_expire = lru_list_.back().accessed_time_ + EntryTimeoutInterval;
if (now >= next_expire) {
// Erase the expired entry.
lru_map_.erase(lru_list_.back().config_);
lru_map_.erase(lru_list_.back().config_with_hash_key_);
lru_list_.pop_back();
} else {
cache_eviction_timer_->enableTimer(
Expand Down
20 changes: 12 additions & 8 deletions source/common/grpc/async_client_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/upstream/cluster_manager.h"

#include "source/common/grpc/stat_names.h"
#include "source/common/protobuf/utility.h"

namespace Envoy {
namespace Grpc {
Expand Down Expand Up @@ -51,32 +52,35 @@ class AsyncClientManagerImpl : public AsyncClientManager {
getOrCreateRawAsyncClient(const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope,
bool skip_cluster_check) override;

RawAsyncClientSharedPtr
getOrCreateRawAsyncClientWithHashKey(const GrpcServiceConfigWithHashKey& config_with_hash_key,
Stats::Scope& scope, bool skip_cluster_check) override;

AsyncClientFactoryPtr factoryForGrpcService(const envoy::config::core::v3::GrpcService& config,
Stats::Scope& scope,
bool skip_cluster_check) override;
class RawAsyncClientCache : public ThreadLocal::ThreadLocalObject {
public:
explicit RawAsyncClientCache(Event::Dispatcher& dispatcher);
void setCache(const envoy::config::core::v3::GrpcService& config,
void setCache(const GrpcServiceConfigWithHashKey& config_with_hash_key,
const RawAsyncClientSharedPtr& client);

RawAsyncClientSharedPtr getCache(const envoy::config::core::v3::GrpcService& config);
RawAsyncClientSharedPtr getCache(const GrpcServiceConfigWithHashKey& config_with_hash_key);

private:
void evictEntriesAndResetEvictionTimer();
struct CacheEntry {
CacheEntry(const envoy::config::core::v3::GrpcService& config,
CacheEntry(const GrpcServiceConfigWithHashKey& config_with_hash_key,
RawAsyncClientSharedPtr const& client, MonotonicTime create_time)
: config_(config), client_(client), accessed_time_(create_time) {}
envoy::config::core::v3::GrpcService config_;
: config_with_hash_key_(config_with_hash_key), client_(client),
accessed_time_(create_time) {}
GrpcServiceConfigWithHashKey config_with_hash_key_;
RawAsyncClientSharedPtr client_;
MonotonicTime accessed_time_;
};
using LruList = std::list<CacheEntry>;
absl::flat_hash_map<envoy::config::core::v3::GrpcService, LruList::iterator, MessageUtil,
MessageUtil>
lru_map_;
LruList lru_list_;
absl::flat_hash_map<GrpcServiceConfigWithHashKey, LruList::iterator> lru_map_;
Event::Dispatcher& dispatcher_;
Envoy::Event::TimerPtr cache_eviction_timer_;
static constexpr std::chrono::seconds EntryTimeoutInterval{50};
Expand Down
27 changes: 8 additions & 19 deletions source/extensions/filters/http/ext_authz/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.h"
#include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.validate.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/registry/registry.h"

#include "source/common/config/utility.h"
Expand Down Expand Up @@ -42,34 +43,22 @@ Http::FilterFactoryCb ExtAuthzFilterConfig::createFilterFactoryFromProtoTyped(
context.clusterManager(), client_config);
callbacks.addStreamFilter(std::make_shared<Filter>(filter_config, std::move(client)));
};
} else if (proto_config.grpc_service().has_google_grpc()) {
// Google gRPC client.
} else {
// gRPC client.
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);

Config::Utility::checkTransportVersion(proto_config);
Envoy::Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Envoy::Grpc::GrpcServiceConfigWithHashKey(proto_config.grpc_service());
callback = [&context, filter_config, timeout_ms,
proto_config](Http::FilterChainFactoryCallbacks& callbacks) {
config_with_hash_key](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<Filters::Common::ExtAuthz::GrpcClientImpl>(
context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient(
proto_config.grpc_service(), context.scope(), true),
context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClientWithHashKey(
config_with_hash_key, context.scope(), true),
std::chrono::milliseconds(timeout_ms));
callbacks.addStreamFilter(std::make_shared<Filter>(filter_config, std::move(client)));
};
} else {
// Envoy gRPC client.
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
Config::Utility::checkTransportVersion(proto_config);
callback = [grpc_service = proto_config.grpc_service(), &context, filter_config,
timeout_ms](Http::FilterChainFactoryCallbacks& callbacks) {
Grpc::RawAsyncClientSharedPtr raw_client =
context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient(
grpc_service, context.scope(), true);
auto client = std::make_unique<Filters::Common::ExtAuthz::GrpcClientImpl>(
raw_client, std::chrono::milliseconds(timeout_ms));
callbacks.addStreamFilter(std::make_shared<Filter>(filter_config, std::move(client)));
};
}

return callback;
Expand Down
27 changes: 27 additions & 0 deletions test/common/grpc/BUILD
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_benchmark_test",
"envoy_cc_benchmark_binary",
"envoy_cc_fuzz_test",
"envoy_cc_test",
"envoy_cc_test_library",
Expand Down Expand Up @@ -213,3 +215,28 @@ envoy_cc_test(
"//test/test_common:utility_lib",
],
)

envoy_cc_benchmark_binary(
name = "async_client_manager_benchmark",
srcs = ["async_client_manager_benchmark.cc"],
external_deps = [
"benchmark",
],
deps = [
"//source/common/api:api_lib",
"//source/common/grpc:async_client_manager_lib",
"//test/mocks/stats:stats_mocks",
"//test/mocks/thread_local:thread_local_mocks",
"//test/mocks/upstream:cluster_manager_mocks",
"//test/mocks/upstream:cluster_priority_set_mocks",
"//test/test_common:test_runtime_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_benchmark_test(
name = "async_client_manager_benchmark_test",
timeout = "long",
benchmark_binary = "async_client_manager_benchmark",
)
79 changes: 79 additions & 0 deletions test/common/grpc/async_client_manager_benchmark.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#include <memory>

#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/grpc/async_client.h"

#include "source/common/api/api_impl.h"
#include "source/common/event/dispatcher_impl.h"
#include "source/common/grpc/async_client_manager_impl.h"

#include "test/benchmark/main.h"
#include "test/mocks/stats/mocks.h"
#include "test/mocks/thread_local/mocks.h"
#include "test/mocks/upstream/cluster_manager.h"
#include "test/mocks/upstream/cluster_priority_set.h"
#include "test/test_common/test_runtime.h"
#include "test/test_common/test_time.h"
#include "test/test_common/utility.h"

#include "benchmark/benchmark.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace Envoy {
namespace Grpc {
namespace {

class AsyncClientManagerImplTest {
public:
AsyncClientManagerImplTest()
: api_(Api::createApiForTest()), stat_names_(scope_.symbolTable()),
async_client_manager_(cm_, tls_, test_time_.timeSystem(), *api_, stat_names_) {}

Upstream::MockClusterManager cm_;
NiceMock<ThreadLocal::MockInstance> tls_;
Stats::MockStore store_;
Stats::MockScope& scope_{store_.mockScope()};
DangerousDeprecatedTestTime test_time_;
Api::ApiPtr api_;
StatNames stat_names_;
AsyncClientManagerImpl async_client_manager_;
};

void testGetOrCreateAsyncClientWithConfig(::benchmark::State& state) {
AsyncClientManagerImplTest async_client_man_test;

envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

for (auto _ : state) {
for (int i = 0; i < 1000; i++) {
RawAsyncClientSharedPtr foo_client0 =
async_client_man_test.async_client_manager_.getOrCreateRawAsyncClient(
grpc_service, async_client_man_test.scope_, true);
}
}
}

void testGetOrCreateAsyncClientWithHashConfig(::benchmark::State& state) {
AsyncClientManagerImplTest async_client_man_test;

envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");
GrpcServiceConfigWithHashKey config_with_hash_key_a = GrpcServiceConfigWithHashKey(grpc_service);

for (auto _ : state) {
for (int i = 0; i < 1000; i++) {
RawAsyncClientSharedPtr foo_client0 =
async_client_man_test.async_client_manager_.getOrCreateRawAsyncClientWithHashKey(
config_with_hash_key_a, async_client_man_test.scope_, true);
}
}
}

BENCHMARK(testGetOrCreateAsyncClientWithConfig)->Unit(::benchmark::kMicrosecond);
BENCHMARK(testGetOrCreateAsyncClientWithHashConfig)->Unit(::benchmark::kMicrosecond);

} // namespace
} // namespace Grpc
} // namespace Envoy
Loading