Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// All options and processing modes are implemented except for the following:
//
// * Request and response attributes are not sent and not processed.
// * Dynamic metadata in responses from the external processor is ignored.
// * "async mode" is not implemented.

// The filter communicates with an external gRPC service called an "external processor"
Expand Down Expand Up @@ -99,7 +98,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// <arch_overview_advanced_filter_state_sharing>` object in a namespace matching the filter
// name.
//
// [#next-free-field: 15]
// [#next-free-field: 17]
message ExternalProcessor {
// Configuration for the gRPC service that the filter will communicate with.
// The filter supports both the "Envoy" and "Google" gRPC clients.
Expand Down Expand Up @@ -198,6 +197,42 @@ message ExternalProcessor {
// :ref:`mode_override <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.mode_override>`.
// If not set, ``mode_override`` API in the response message will be ignored.
bool allow_mode_override = 14;

// If set to true, ignore the
// :ref:`immediate_response <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.immediate_response>`
// message in an external processor response. In such case, no local reply will be sent.
// Instead, the stream to the external processor will be closed. There will be no
// more external processing for this stream from now on.
bool disable_immediate_response = 15;

// Options related to the sending and receiving of dynamic metadata
MetadataOptions metadata_options = 16;
}

// The MetadataOptions structure defines options for the sending and receiving of
// dynamic metadata. Specifically, which namespaces to send to the server, whether
// metadata returned by the server may be written, and how that metadata may be written.
message MetadataOptions {
message MetadataNamespaces {
// Specifies a list of metadata namespaces whose values, if present,
// will be passed to the ext_proc service as an opaque *protobuf::Struct*.
repeated string untyped = 1;

// Specifies a list of metadata namespaces whose values, if present,
// will be passed to the ext_proc service as a *protobuf::Any*. This allows
// envoy and the external processing server to share the protobuf message
// definition for safe parsing.
repeated string typed = 2;
}

// Describes which typed or untyped dynamic metadata namespaces to forward to
// the external processing server.
MetadataNamespaces forwarding_namespaces = 1;

// Describes which typed or untyped dynamic metadata namespaces to accept from
// the external processing server. Set to empty or leave unset to disallow writing
// any received dynamic metadata. Receiving of typed metadata is not supported.
MetadataNamespaces receiving_namespaces = 2;
}

// The HeaderForwardingRules structure specifies what headers are
Expand Down Expand Up @@ -240,7 +275,7 @@ message ExtProcPerRoute {
}

// Overrides that may be set on a per-route basis
// [#next-free-field: 6]
// [#next-free-field: 7]
message ExtProcOverrides {
// Set a different processing mode for this route than the default.
ProcessingMode processing_mode = 1;
Expand All @@ -259,4 +294,7 @@ message ExtProcOverrides {

// Set a different gRPC service for this route than the default.
config.core.v3.GrpcService grpc_service = 5;

// Options related to the sending and receiving of dynamic metadata
MetadataOptions metadata_options = 6;
}
11 changes: 7 additions & 4 deletions api/envoy/service/ext_proc/v3/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ service ExternalProcessor {

// This represents the different types of messages that Envoy can send
// to an external processing server.
// [#next-free-field: 8]
// [#next-free-field: 9]
message ProcessingRequest {
// Specify whether the filter that sent this request is running in synchronous
// or asynchronous mode. The choice of synchronous or asynchronous mode
Expand Down Expand Up @@ -115,6 +115,9 @@ message ProcessingRequest {
// in the filter configuration.
HttpTrailers response_trailers = 7;
}

// Dynamic metadata associated with the request.
config.core.v3.Metadata metadata_context = 8;
}

// For every ProcessingRequest received by the server with the ``async_mode`` field
Expand Down Expand Up @@ -158,9 +161,9 @@ message ProcessingResponse {
ImmediateResponse immediate_response = 7;
}

// [#not-implemented-hide:]
// Optional metadata that will be emitted as dynamic metadata to be consumed by the next
// filter. This metadata will be placed in the namespace ``envoy.filters.http.ext_proc``.
// Optional metadata that will be emitted as dynamic metadata to be consumed by
// following filters. This metadata will be placed in the namespace(s) specified by the top-level
// field name(s) of the struct.
google.protobuf.Struct dynamic_metadata = 8;

// Override how parts of the HTTP request and response are processed
Expand Down
33 changes: 33 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,39 @@ new_features:
change: |
added :ref:`custom_sink <envoy_v3_api_field_config.tap.v3.OutputSink.custom_sink>` type to enable writing tap data
out to a custom sink extension.
- area: access_log
change: |
added %RESPONSE_FLAGS_LONG% substitution string, that will output a pascal case string representing the resonse flags.
The output response flags will correspond with %RESPONSE_FLAGS%, only with a long textual string representation.
- area: config
change: |
Added the capability to defer broadcasting of certain cluster (CDS, EDS) to
worker threads from the main thread. This optimization can save significant
amount of memory in cases where there are (1) a large number of workers and
(2) a large amount of config, most of which is unused. This capability is
guarded by :ref:`enable_deferred_cluster_creation
<envoy_v3_api_field_config.bootstrap.v3.ClusterManager.enable_deferred_cluster_creation>`.
- area: extension_discovery_service
change: |
added ECDS support for :ref:` downstream network filters<envoy_v3_api_field_config.listener.v3.Filter.config_discovery>`.
- area: ext_proc
change: |
added
:ref:`disable_immediate_response <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.disable_immediate_response>`
config API to ignore the
:ref:`immediate_response <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.immediate_response>`
message from the external processing server.
- area: http
change: |
added :ref:`Json-To-Metadata filter <envoy_v3_api_msg_extensions.filters.http.json_to_metadata.v3.JsonToMetadata>`.
- area: extension_discovery_service
change: |
added metric listener.listener_stat.network_extension_config_missing to track closed connections due to missing config.
- area: redis
change: |
added support for time command (returns a local response).
- area: redis
change: |
Provide initial span attributes to a sampler used in the OpenTelemetry tracer.
- area: ext_proc
change: |
Expand Down
50 changes: 50 additions & 0 deletions envoy/grpc/async_client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,47 @@ class AsyncClientFactory {

using AsyncClientFactoryPtr = std::unique_ptr<AsyncClientFactory>;

class GrpcServiceConfigWithHashKey {
public:
GrpcServiceConfigWithHashKey() = default;

explicit GrpcServiceConfigWithHashKey(const envoy::config::core::v3::GrpcService& config)
: config_(config), pre_computed_hash_(Envoy::MessageUtil::hash(config)){};

template <typename H> friend H AbslHashValue(H h, const GrpcServiceConfigWithHashKey& wrapper) {
return H::combine(std::move(h), wrapper.pre_computed_hash_);
}

std::size_t getPreComputedHash() const { return pre_computed_hash_; }

friend bool operator==(const GrpcServiceConfigWithHashKey& lhs,
const GrpcServiceConfigWithHashKey& rhs) {
if (lhs.pre_computed_hash_ == rhs.pre_computed_hash_) {
return Protobuf::util::MessageDifferencer::Equivalent(lhs.config_, rhs.config_);
}
return false;
}

const envoy::config::core::v3::GrpcService& config() const { return config_; }

void setConfig(const envoy::config::core::v3::GrpcService g) {
config_ = g;
pre_computed_hash_ = Envoy::MessageUtil::hash(g);
}

private:
envoy::config::core::v3::GrpcService config_;
std::size_t pre_computed_hash_;
};

// Singleton gRPC client manager. Grpc::AsyncClientManager can be used to create per-service
// Grpc::AsyncClientFactory instances. All manufactured Grpc::AsyncClients must
// be destroyed before the AsyncClientManager can be safely destructed.
class AsyncClientManager {
public:
virtual ~AsyncClientManager() = default;

// TODO(diazalan) deprecate old getOrCreateRawAsyncClient once all filters have been transitioned
/**
* Create a Grpc::RawAsyncClient. The async client is cached thread locally and shared across
* different filter instances.
Expand All @@ -54,6 +88,22 @@ class AsyncClientManager {
getOrCreateRawAsyncClient(const envoy::config::core::v3::GrpcService& grpc_service,
Stats::Scope& scope, bool skip_cluster_check) PURE;

/**
* Create a Grpc::RawAsyncClient. The async client is cached thread locally and shared across
* different filter instances.
* @param grpc_service Envoy::Grpc::GrpcServiceConfigWithHashKey which contains config and
* hashkey.
* @param scope stats scope.
* @param skip_cluster_check if set to true skips checks for cluster presence and being statically
* configured.
* @param cache_option always use cache or use cache when runtime is enabled.
* @return RawAsyncClientPtr a grpc async client.
* @throws EnvoyException when grpc_service validation fails.
*/
virtual RawAsyncClientSharedPtr
getOrCreateRawAsyncClientWithHashKey(const GrpcServiceConfigWithHashKey& grpc_service,
Stats::Scope& scope, bool skip_cluster_check) PURE;

/**
* Create a Grpc::AsyncClients factory for a service. Validation of the service is performed and
* will raise an exception on failure.
Expand Down
37 changes: 27 additions & 10 deletions source/common/grpc/async_client_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "source/common/common/base64.h"
#include "source/common/grpc/async_client_impl.h"
#include "source/common/protobuf/utility.h"

#include "absl/strings/match.h"

Expand Down Expand Up @@ -138,12 +139,27 @@ AsyncClientManagerImpl::factoryForGrpcService(const envoy::config::core::v3::Grp
RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClient(
const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope,
bool skip_cluster_check) {
RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config);
const GrpcServiceConfigWithHashKey config_with_hash_key = GrpcServiceConfigWithHashKey(config);
RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
if (client != nullptr) {
return client;
}
client = factoryForGrpcService(config, scope, skip_cluster_check)->createUncachedRawAsyncClient();
raw_async_client_cache_->setCache(config, client);
client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check)
->createUncachedRawAsyncClient();
raw_async_client_cache_->setCache(config_with_hash_key, client);
return client;
}

RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClientWithHashKey(
const GrpcServiceConfigWithHashKey& config_with_hash_key, Stats::Scope& scope,
bool skip_cluster_check) {
RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
if (client != nullptr) {
return client;
}
client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check)
->createUncachedRawAsyncClient();
raw_async_client_cache_->setCache(config_with_hash_key, client);
return client;
}

Expand All @@ -153,20 +169,21 @@ AsyncClientManagerImpl::RawAsyncClientCache::RawAsyncClientCache(Event::Dispatch
}

void AsyncClientManagerImpl::RawAsyncClientCache::setCache(
const envoy::config::core::v3::GrpcService& config, const RawAsyncClientSharedPtr& client) {
ASSERT(lru_map_.find(config) == lru_map_.end());
const GrpcServiceConfigWithHashKey& config_with_hash_key,
const RawAsyncClientSharedPtr& client) {
ASSERT(lru_map_.find(config_with_hash_key) == lru_map_.end());
// Create a new cache entry at the beginning of the list.
lru_list_.emplace_front(config, client, dispatcher_.timeSource().monotonicTime());
lru_map_[config] = lru_list_.begin();
lru_list_.emplace_front(config_with_hash_key, client, dispatcher_.timeSource().monotonicTime());
lru_map_[config_with_hash_key] = lru_list_.begin();
// If inserting to an empty cache, enable eviction timer.
if (lru_list_.size() == 1) {
evictEntriesAndResetEvictionTimer();
}
}

RawAsyncClientSharedPtr AsyncClientManagerImpl::RawAsyncClientCache::getCache(
const envoy::config::core::v3::GrpcService& config) {
auto it = lru_map_.find(config);
const GrpcServiceConfigWithHashKey& config_with_hash_key) {
auto it = lru_map_.find(config_with_hash_key);
if (it == lru_map_.end()) {
return nullptr;
}
Expand Down Expand Up @@ -197,7 +214,7 @@ void AsyncClientManagerImpl::RawAsyncClientCache::evictEntriesAndResetEvictionTi
// This will cause cpu spike.
if (time_to_next_expire_sec.count() <= 0) {
// Erase the expired entry.
lru_map_.erase(lru_list_.back().config_);
lru_map_.erase(lru_list_.back().config_with_hash_key_);
lru_list_.pop_back();
} else {
cache_eviction_timer_->enableTimer(time_to_next_expire_sec);
Expand Down
20 changes: 12 additions & 8 deletions source/common/grpc/async_client_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/upstream/cluster_manager.h"

#include "source/common/grpc/stat_names.h"
#include "source/common/protobuf/utility.h"

namespace Envoy {
namespace Grpc {
Expand Down Expand Up @@ -51,32 +52,35 @@ class AsyncClientManagerImpl : public AsyncClientManager {
getOrCreateRawAsyncClient(const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope,
bool skip_cluster_check) override;

RawAsyncClientSharedPtr
getOrCreateRawAsyncClientWithHashKey(const GrpcServiceConfigWithHashKey& config_with_hash_key,
Stats::Scope& scope, bool skip_cluster_check) override;

AsyncClientFactoryPtr factoryForGrpcService(const envoy::config::core::v3::GrpcService& config,
Stats::Scope& scope,
bool skip_cluster_check) override;
class RawAsyncClientCache : public ThreadLocal::ThreadLocalObject {
public:
explicit RawAsyncClientCache(Event::Dispatcher& dispatcher);
void setCache(const envoy::config::core::v3::GrpcService& config,
void setCache(const GrpcServiceConfigWithHashKey& config_with_hash_key,
const RawAsyncClientSharedPtr& client);

RawAsyncClientSharedPtr getCache(const envoy::config::core::v3::GrpcService& config);
RawAsyncClientSharedPtr getCache(const GrpcServiceConfigWithHashKey& config_with_hash_key);

private:
void evictEntriesAndResetEvictionTimer();
struct CacheEntry {
CacheEntry(const envoy::config::core::v3::GrpcService& config,
CacheEntry(const GrpcServiceConfigWithHashKey& config_with_hash_key,
RawAsyncClientSharedPtr const& client, MonotonicTime create_time)
: config_(config), client_(client), accessed_time_(create_time) {}
envoy::config::core::v3::GrpcService config_;
: config_with_hash_key_(config_with_hash_key), client_(client),
accessed_time_(create_time) {}
GrpcServiceConfigWithHashKey config_with_hash_key_;
RawAsyncClientSharedPtr client_;
MonotonicTime accessed_time_;
};
using LruList = std::list<CacheEntry>;
absl::flat_hash_map<envoy::config::core::v3::GrpcService, LruList::iterator, MessageUtil,
MessageUtil>
lru_map_;
LruList lru_list_;
absl::flat_hash_map<GrpcServiceConfigWithHashKey, LruList::iterator> lru_map_;
Event::Dispatcher& dispatcher_;
Envoy::Event::TimerPtr cache_eviction_timer_;
static constexpr std::chrono::seconds EntryTimeoutInterval{50};
Expand Down
27 changes: 8 additions & 19 deletions source/extensions/filters/http/ext_authz/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.h"
#include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.validate.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/registry/registry.h"

#include "source/common/config/utility.h"
Expand Down Expand Up @@ -42,34 +43,22 @@ Http::FilterFactoryCb ExtAuthzFilterConfig::createFilterFactoryFromProtoTyped(
context.clusterManager(), client_config);
callbacks.addStreamFilter(std::make_shared<Filter>(filter_config, std::move(client)));
};
} else if (proto_config.grpc_service().has_google_grpc()) {
// Google gRPC client.
} else {
// gRPC client.
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);

Config::Utility::checkTransportVersion(proto_config);
Envoy::Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Envoy::Grpc::GrpcServiceConfigWithHashKey(proto_config.grpc_service());
callback = [&context, filter_config, timeout_ms,
proto_config](Http::FilterChainFactoryCallbacks& callbacks) {
config_with_hash_key](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<Filters::Common::ExtAuthz::GrpcClientImpl>(
context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient(
proto_config.grpc_service(), context.scope(), true),
context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClientWithHashKey(
config_with_hash_key, context.scope(), true),
std::chrono::milliseconds(timeout_ms));
callbacks.addStreamFilter(std::make_shared<Filter>(filter_config, std::move(client)));
};
} else {
// Envoy gRPC client.
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
Config::Utility::checkTransportVersion(proto_config);
callback = [grpc_service = proto_config.grpc_service(), &context, filter_config,
timeout_ms](Http::FilterChainFactoryCallbacks& callbacks) {
Grpc::RawAsyncClientSharedPtr raw_client =
context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient(
grpc_service, context.scope(), true);
auto client = std::make_unique<Filters::Common::ExtAuthz::GrpcClientImpl>(
raw_client, std::chrono::milliseconds(timeout_ms));
callbacks.addStreamFilter(std::make_shared<Filter>(filter_config, std::move(client)));
};
}

return callback;
Expand Down
Loading