diff --git a/api/BUILD b/api/BUILD index 0f60195fb6dd7..31067886b55c8 100644 --- a/api/BUILD +++ b/api/BUILD @@ -59,6 +59,7 @@ proto_library( deps = [ "//contrib/envoy/extensions/filters/http/squash/v3:pkg", "//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg", + "//contrib/envoy/extensions/filters/network/generic_proxy/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/kafka_broker/v3:pkg", "//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/mysql_proxy/v3:pkg", diff --git a/api/contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/BUILD b/api/contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/BUILD new file mode 100644 index 0000000000000..2f90ace882d93 --- /dev/null +++ b/api/contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/BUILD @@ -0,0 +1,14 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/core/v3:pkg", + "//envoy/config/route/v3:pkg", + "//envoy/type/matcher/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/api/contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/generic_proxy.proto b/api/contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/generic_proxy.proto new file mode 100644 index 0000000000000..2b80edb54cf23 --- /dev/null +++ b/api/contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/generic_proxy.proto @@ -0,0 +1,172 @@ +syntax = "proto3"; + +package envoy.extensions.filters.network.generic_proxy.v3alpha; + +import "envoy/config/core/v3/base.proto"; +import "envoy/config/core/v3/config_source.proto"; +import "envoy/config/core/v3/extension.proto"; +import "envoy/config/route/v3/route_components.proto"; +import "envoy/type/matcher/v3/string.proto"; + +import "google/protobuf/any.proto"; +import "google/protobuf/duration.proto"; + +import "udpa/annotations/status.proto"; +import "udpa/annotations/versioning.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.network.generic_proxy.v3alpha"; +option java_outer_classname = "GenericProxyProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Generic Proxy] +// [#extension: proxy.filters.network.generic_proxy] +// [#next-free-field: 8] +message GenericProxyConfig { + // The human readable prefix to use when emitting statistics. + string stat_prefix = 1 [(validate.rules).string = {min_len: 1}]; + + // Specific codec implementation for generic proxy. The codec will be used to decode/encode + // downstream request and upstream response. + config.core.v3.TypedExtensionConfig codec_specifier = 2 + [(validate.rules).message = {required: true}]; + + oneof route_specifier { + // Static route table for generic proxy. + RouteConfiguration route_config = 6; + + // the generic proxy's route table will be dynamically loaded via the RDS API. + GRDS grds = 7; + } + + // A list of individual generic filters that make up the filter chain for requests made to the + // generic proxy. Order matters as the filters are processed sequentially. + repeated GenericFilter generic_filters = 5; +} + +message GRDS { + // Configuration source specifier for RDS. + config.core.v3.ConfigSource config_source = 1 [(validate.rules).message = {required: true}]; + + // The name of the route configuration. This name will be passed to the RDS API. This allows + // an Envoy configuration with multiple Generic listeners ( and associated Generic filters) + // to use different route configurations. + string route_config_name = 2 [(validate.rules).string = {min_len: 1}]; +} + +// GenericFilter configures a generic filter. +message GenericFilter { + // The name of the filter to instantiate. The name must match a supported + // filter. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // Filter specific configuration which depends on the filter being instantiated. See the + // supported filters for further documentation. + google.protobuf.Any config = 2; +} + +// [#next-free-field: 9] +message HeaderMatch { + // The name of header/metadata need to match. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // Specific match rule of header/metadata. + oneof match_specifier { + // If specified, header match will be performed based on the string match rule. + type.matcher.v3.StringMatcher string_match = 2; + + // If specified, header match will be performed based on whether the header is in the + // request. + bool present_match = 3; + } + + // If specified, the match result will be inverted before checking. Defaults to false. + // + // Examples: + // + // * The regex ``\d{3}`` does not match the value *1234*, so it will match when inverted. + // * The range [-10,0) will match the value -1, so it will not match when inverted. + bool invert_match = 8; +} + +message Authority { + // A list of authority value. Used to match request authority of downstream request. + // If request authority does not match this rule, the entire request does not match the + // current route set. + repeated string authorities = 1 [(validate.rules).repeated = {min_items: 1}]; + + repeated Route routes = 2; +} + +message RouteMatch { + // Used to match request path of downstream request. If request path does not match + // this rule, the entire request does not match the current route. + type.matcher.v3.StringMatcher path = 1; + + // Used to match request method of downstream request. If request method does not match + // this rule, the entire request does not match the current route. + type.matcher.v3.StringMatcher method = 2; + + // A list of header match rules that downstream request need to match. As long as one + // of the rules is not matched successfully, the entire request is not matched successfully. + repeated HeaderMatch headers = 3; +} + +message RetryPolicy { + // When certain conditions are met, the upstream request is retried. Supports retry when a + // network error occurs or when the response status is a specific status. + string retry_on = 1; + + // Maximum number of retries. For a downstream request, envoy may issue `1 + may_retry` + // upstream requests. + uint32 max_retry = 2; + + // If not specified, then Route.timeout will be used. + google.protobuf.Duration per_try_timeout = 3; +} + +message DirectReturn { +} + +// [#next-free-field: 9] +message Route { + // Match rules of current route. Used to determine whether a request should be forwarded + // using the current routing rules. + RouteMatch match = 1; + + oneof route_specifier { + option (validate.required) = true; + + // Indicates the upstream cluster to which the request should be routed. + string cluster = 2 [(validate.rules).string = {min_len: 1}]; + + // Multiple upstream clusters can be specified for a given route. The request is routed + // to one of the upstream clusters based on weights assigned to each cluster. Currently + // ClusterWeight only supports the name and weight fields. + config.route.v3.WeightedCluster weighted_clusters = 3; + + // Return a fixed response downstream. + DirectReturn direct = 4; + } + + // Downstream request timeout. + google.protobuf.Duration timeout = 5; + + // Retry policy used by current route. + RetryPolicy retry = 6; + + // Route level config for L7 generic filters. The key should always be the generic filter name. + map per_filter_config = 7; + + // Route metadata. + config.core.v3.Metadata metadata = 8; +} + +message RouteConfiguration { + // The name of route config. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // A list of route entry config. + repeated Authority config = 2; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index c0bbd3bc987f5..9ffe55884be7d 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -11,6 +11,7 @@ proto_library( deps = [ "//contrib/envoy/extensions/filters/http/squash/v3:pkg", "//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg", + "//contrib/envoy/extensions/filters/network/generic_proxy/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/kafka_broker/v3:pkg", "//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/mysql_proxy/v3:pkg", diff --git a/contrib/contrib_build_config.bzl b/contrib/contrib_build_config.bzl index 79708cc65db0e..b735ac66c9364 100644 --- a/contrib/contrib_build_config.bzl +++ b/contrib/contrib_build_config.bzl @@ -16,6 +16,7 @@ CONTRIB_EXTENSIONS = { "envoy.filters.network.mysql_proxy": "//contrib/mysql_proxy/filters/network/source:config", "envoy.filters.network.postgres_proxy": "//contrib/postgres_proxy/filters/network/source:config", "envoy.filters.network.rocketmq_proxy": "//contrib/rocketmq_proxy/filters/network/source:config", + "envoy.filters.network.generic_proxy": "//contrib/generic_proxy/filters/network/source:config", # # Sip proxy diff --git a/contrib/extensions_metadata.yaml b/contrib/extensions_metadata.yaml index 392df4fea682c..8f70c56b8f766 100644 --- a/contrib/extensions_metadata.yaml +++ b/contrib/extensions_metadata.yaml @@ -53,3 +53,8 @@ envoy.bootstrap.vcl: - envoy.bootstrap security_posture: requires_trusted_downstream_and_upstream status: alpha +envoy.filters.network.generic_proxy: + categories: + - envoy.filters.network + security_posture: requires_trusted_downstream_and_upstream + status: alpha diff --git a/contrib/generic_proxy/filters/network/source/BUILD b/contrib/generic_proxy/filters/network/source/BUILD new file mode 100644 index 0000000000000..d80bdec290dcf --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/BUILD @@ -0,0 +1,68 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_contrib_extension", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "filter_lib", + srcs = [ + "generic_proxy.cc", + ], + hdrs = [ + "generic_proxy.h", + ], + deps = [ + ":route_lib", + "//contrib/generic_proxy/filters/network/source/interface:generic_codec_interface", + "//envoy/network:filter_interface", + "//envoy/server:factory_context_interface", + "//source/common/common:linked_object", + "//source/common/common:minimal_logger_lib", + "//source/common/stream_info:stream_info_lib", + "//source/extensions/filters/network/common:factory_base_lib", + "@envoy_api//contrib/envoy/extensions/filters/network/generic_proxy/v3alpha:pkg_cc_proto", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) + +envoy_cc_contrib_extension( + name = "config", + srcs = [ + "config.cc", + ], + hdrs = [ + "config.h", + ], + deps = [ + ":filter_lib", + "//contrib/generic_proxy/filters/network/source/filters/router:config", + "//envoy/server:filter_config_interface", + "@envoy_api//contrib/envoy/extensions/filters/network/generic_proxy/v3alpha:pkg_cc_proto", + ], +) + +envoy_cc_library( + name = "route_lib", + srcs = [ + "route_impl.cc", + ], + hdrs = [ + "route_impl.h", + ], + deps = [ + "//contrib/generic_proxy/filters/network/source/interface:generic_config_interface", + "//contrib/generic_proxy/filters/network/source/interface:generic_route_interface", + "//envoy/server:factory_context_interface", + "//source/common/common:matchers_lib", + "//source/common/config:metadata_lib", + "//source/common/config:utility_lib", + "@envoy_api//contrib/envoy/extensions/filters/network/generic_proxy/v3alpha:pkg_cc_proto", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) diff --git a/contrib/generic_proxy/filters/network/source/config.cc b/contrib/generic_proxy/filters/network/source/config.cc new file mode 100644 index 0000000000000..629aa8ccc5490 --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/config.cc @@ -0,0 +1,22 @@ +#include "contrib/generic_proxy/filters/network/source/config.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +Envoy::Network::FilterFactoryCb +Factory::createFilterFactoryFromProtoTyped(const GenericProxyConfig& proto_config, + Envoy::Server::Configuration::FactoryContext& context) { + auto config = std::make_shared(proto_config, context); + return [config, &context](Envoy::Network::FilterManager& filter_manager) -> void { + filter_manager.addReadFilter(std::make_shared(config, context)); + }; +} + +REGISTER_FACTORY(Factory, Envoy::Server::Configuration::NamedNetworkFilterConfigFactory); + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/config.h b/contrib/generic_proxy/filters/network/source/config.h new file mode 100644 index 0000000000000..96233e4858ba2 --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/config.h @@ -0,0 +1,29 @@ + +#pragma once + +#include "envoy/registry/registry.h" + +#include "source/extensions/filters/network/common/factory_base.h" + +#include "contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/generic_proxy.pb.h" +#include "contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/generic_proxy.pb.validate.h" +#include "contrib/generic_proxy/filters/network/source/generic_proxy.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +class Factory : public Envoy::Extensions::NetworkFilters::Common::FactoryBase { +public: + Factory() : FactoryBase(Filter::name()) {} + + Envoy::Network::FilterFactoryCb + createFilterFactoryFromProtoTyped(const GenericProxyConfig& proto_config, + Envoy::Server::Configuration::FactoryContext& context) override; +}; + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/filters/BUILD b/contrib/generic_proxy/filters/network/source/filters/BUILD new file mode 100644 index 0000000000000..779d1695d3b7c --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/filters/BUILD @@ -0,0 +1 @@ +licenses(["notice"]) # Apache 2 diff --git a/contrib/generic_proxy/filters/network/source/filters/router/BUILD b/contrib/generic_proxy/filters/network/source/filters/router/BUILD new file mode 100644 index 0000000000000..64578393e507f --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/filters/router/BUILD @@ -0,0 +1,43 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "router_lib", + srcs = [ + "router.cc", + ], + hdrs = [ + "router.h", + ], + deps = [ + "//contrib/generic_proxy/filters/network/source/interface:generic_codec_interface", + "//contrib/generic_proxy/filters/network/source/interface:generic_config_interface", + "//contrib/generic_proxy/filters/network/source/interface:generic_filter_interface", + "//source/common/buffer:buffer_lib", + "//source/common/common:linked_object", + "//source/common/common:minimal_logger_lib", + "//source/common/upstream:load_balancer_lib", + ], +) + +envoy_cc_library( + name = "config", + srcs = [ + "config.cc", + ], + hdrs = [ + "config.h", + ], + deps = [ + ":router_lib", + "//contrib/generic_proxy/filters/network/source/interface:generic_config_interface", + "//envoy/registry", + ], +) diff --git a/contrib/generic_proxy/filters/network/source/filters/router/config.cc b/contrib/generic_proxy/filters/network/source/filters/router/config.cc new file mode 100644 index 0000000000000..d6df2ae6c03c7 --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/filters/router/config.cc @@ -0,0 +1,25 @@ +#include "contrib/generic_proxy/filters/network/source/filters/router/config.h" + +#include "envoy/registry/registry.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { +namespace Router { + +FilterFactoryCb +RouterFactory::createFilterFactoryFromProto(const Protobuf::Message&, const std::string&, + Server::Configuration::FactoryContext& context) { + return [&context](FilterChainFactoryCallbacks& callbacks) { + callbacks.addDecoderFilter(std::make_shared(context)); + }; +} + +REGISTER_FACTORY(RouterFactory, NamedGenericFilterConfigFactory); + +} // namespace Router +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/filters/router/config.h b/contrib/generic_proxy/filters/network/source/filters/router/config.h new file mode 100644 index 0000000000000..95d22d0f59295 --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/filters/router/config.h @@ -0,0 +1,29 @@ +#pragma once + +#include "contrib/generic_proxy/filters/network/source/filters/router/router.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_config.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { +namespace Router { + +class RouterFactory : public NamedGenericFilterConfigFactory { +public: + FilterFactoryCb + createFilterFactoryFromProto(const Protobuf::Message& config, const std::string& stat_prefix, + Server::Configuration::FactoryContext& context) override; + + ProtobufTypes::MessagePtr createEmptyConfigProto() override { return nullptr; } + + bool isTerminalFilter() override { return true; } + + std::string name() const override { return "envoy.filters.generic.router"; } +}; + +} // namespace Router +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/filters/router/router.cc b/contrib/generic_proxy/filters/network/source/filters/router/router.cc new file mode 100644 index 0000000000000..580b483166be0 --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/filters/router/router.cc @@ -0,0 +1,240 @@ +#include "contrib/generic_proxy/filters/network/source/filters/router/router.h" + +#include "envoy/common/conn_pool.h" +#include "envoy/network/connection.h" + +#include "source/common/common/assert.h" + +#include "contrib/generic_proxy/filters/network/source/interface/generic_filter.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { +namespace Router { + +namespace { +absl::string_view resetReasonToStringView(StreamResetReason reason) { + static std::string Reasons[] = {"local_reset", "connection_failure", "connection_termination", + "overflow", "ProtocolError"}; + return Reasons[static_cast(reason)]; +} +} // namespace + +UpstreamRequest::UpstreamRequest(RouterFilter& parent, Upstream::TcpPoolData tcp_data) + : parent_(parent), tcp_data_(std::move(tcp_data)) {} + +void UpstreamRequest::startStream() { + Tcp::ConnectionPool::Cancellable* handle = tcp_data_.newConnection(*this); + conn_pool_handle_ = handle; +} + +// TODO(wbpcode): To support stream reset reason. +void UpstreamRequest::resetStream(StreamResetReason reason) { + ENVOY_LOG(debug, "generic proxy upstream request: reset upstream request"); + stream_reset_ = true; + + if (conn_pool_handle_) { + ASSERT(!conn_data_); + ENVOY_LOG(debug, "generic proxy upstream request: cacel upstream request"); + conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); + conn_pool_handle_ = nullptr; + } + + if (conn_data_) { + ASSERT(!conn_pool_handle_); + ENVOY_LOG(debug, "generic proxy upstream request: close upstream connection"); + conn_data_->connection().close(Network::ConnectionCloseType::NoFlush); + conn_data_.reset(); + } + + parent_.onUpstreamRequestReset(*this, reason); +} + +void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view, + Upstream::HostDescriptionConstSharedPtr host) { + conn_pool_handle_ = nullptr; + + // Mimic an upstream reset. + onUpstreamHostSelected(host); + + switch (reason) { + case ConnectionPool::PoolFailureReason::Overflow: + resetStream(StreamResetReason::Overflow); + default: + // Treat pool timeout as connection failure. + resetStream(StreamResetReason::ConnectionFailure); + } + + resetStream(StreamResetReason::ConnectionFailure); +} + +void UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, + Upstream::HostDescriptionConstSharedPtr host) { + ENVOY_LOG(debug, "dubbo upstream request: tcp connection has ready"); + onUpstreamHostSelected(host); + + conn_data_ = std::move(conn); + conn_data_->addUpstreamCallbacks(*this); + conn_pool_handle_ = nullptr; + + encodeBufferToUpstream(parent_.upstream_request_buffer_); +} + +void UpstreamRequest::onUpstreamData(Buffer::Instance& data, bool end_stream) { + if (!response_started_) { + response_started_ = true; + response_decoder_ = parent_.callbacks_->downstreamCodec().responseDecoder(); + response_decoder_->setDecoderCallback(*this); + } + response_decoder_->decode(data); + + if (end_stream && !response_complete_) { + resetStream(StreamResetReason::ProtocolError); + } +} + +void UpstreamRequest::onGenericResponse(GenericResponsePtr response) { + response_complete_ = true; + ASSERT(conn_pool_handle_ == nullptr); + ASSERT(conn_data_ != nullptr); + conn_data_.reset(); + parent_.onUpstreamResponse(std::move(response)); +} + +void UpstreamRequest::onDecodingError() { + response_complete_ = true; + resetStream(StreamResetReason::ProtocolError); +} + +void UpstreamRequest::onEvent(Network::ConnectionEvent event) { + switch (event) { + case Network::ConnectionEvent::LocalClose: + if (!stream_reset_) { + resetStream(StreamResetReason::LocalReset); + } + break; + case Network::ConnectionEvent::RemoteClose: + if (!stream_reset_) { + resetStream(StreamResetReason::ConnectionTermination); + } + break; + default: + break; + } +} + +void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) { + ENVOY_LOG(debug, "dubbo upstream request: selected upstream {}", host->address()->asString()); + upstream_host_ = host; +} + +void UpstreamRequest::encodeBufferToUpstream(Buffer::Instance& buffer) { + ASSERT(conn_data_); + ASSERT(!conn_pool_handle_); + + ENVOY_LOG(trace, "proxying {} bytes", buffer.length()); + + conn_data_->connection().write(buffer, false); +} + +void RouterFilter::onUpstreamResponse(GenericResponsePtr response) { + // TODO(wbpcode): To support retry policy. + callbacks_->upstreamResponse(std::move(response)); + filter_complete_ = true; +} + +void RouterFilter::onUpstreamRequestReset(UpstreamRequest& upstream_request, + StreamResetReason reason) { + // Remove upstream request from router filter and move it to the defered-delete list. + callbacks_->dispatcher().deferredDelete(upstream_request.removeFromList(upstream_requests_)); + + if (filter_complete_) { + return; + } + + // TODO(wbpcode): To support retry policy. + resetStream(reason); +} + +void RouterFilter::cleanUpstreamRequests(bool filter_complete) { + // If filter_complete is true then the resetStream() of RouterFilter will not be called on the + // onUpstreamRequestReset() of RouterFilter. + filter_complete_ = filter_complete; + + while (!upstream_requests_.empty()) { + (*upstream_requests_.back()).resetStream(StreamResetReason::LocalReset); + } +} + +void RouterFilter::onDestroy() { + if (filter_complete_) { + return; + } + cleanUpstreamRequests(true); +} + +void RouterFilter::resetStream(StreamResetReason reason) { + ASSERT(upstream_requests_.empty()); + switch (reason) { + case StreamResetReason::LocalReset: + callbacks_->sendLocalReply(GenericState::LocalUnknowedError, resetReasonToStringView(reason)); + break; + case StreamResetReason::ProtocolError: + callbacks_->sendLocalReply(GenericState::LocalUnknowedError, resetReasonToStringView(reason)); + break; + case StreamResetReason::ConnectionFailure: + callbacks_->sendLocalReply(GenericState::LocalUnknowedError, resetReasonToStringView(reason)); + break; + case StreamResetReason::ConnectionTermination: + callbacks_->sendLocalReply(GenericState::LocalUnknowedError, resetReasonToStringView(reason)); + case StreamResetReason::Overflow: + callbacks_->sendLocalReply(GenericState::LocalUnknowedError, resetReasonToStringView(reason)); + } + + filter_complete_ = true; +} + +GenericFilterStatus RouterFilter::onStreamDecoded(GenericRequest& request) { + const auto route_entry = callbacks_->routeEntry(); + if (route_entry == nullptr) { + callbacks_->sendLocalReply(GenericState::LocalExpectedError, "route_not_found"); + return GenericFilterStatus::StopIteration; + } + + const auto& cluster_name = route_entry->clusterName(); + + auto thread_local_cluster = context_.clusterManager().getThreadLocalCluster(cluster_name); + if (thread_local_cluster == nullptr) { + callbacks_->sendLocalReply(GenericState::LocalUnknowedError, "cluster_not_found"); + return GenericFilterStatus::StopIteration; + } + + auto cluster_info = thread_local_cluster->info(); + if (cluster_info->maintenanceMode()) { + callbacks_->sendLocalReply(GenericState::LocalUnknowedError, "cluster_maintain_mode"); + return GenericFilterStatus::StopIteration; + } + + auto tcp_data = thread_local_cluster->tcpConnPool(Upstream::ResourcePriority::Default, this); + if (!tcp_data.has_value()) { + callbacks_->sendLocalReply(GenericState::LocalUnknowedError, "no_healthy_upstream"); + return GenericFilterStatus::StopIteration; + } + + request_encoder_ = callbacks_->downstreamCodec().requestEncoder(); + request_encoder_->encode(request, upstream_request_buffer_); + + auto upstream_request = std::make_unique(*this, std::move(tcp_data.value())); + auto raw_upstream_request = upstream_request.get(); + LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_); + + raw_upstream_request->startStream(); + return GenericFilterStatus::StopIteration; +} + +} // namespace Router +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/filters/router/router.h b/contrib/generic_proxy/filters/network/source/filters/router/router.h new file mode 100644 index 0000000000000..6f4e03fb41d8d --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/filters/router/router.h @@ -0,0 +1,123 @@ +#pragma once + +#include "envoy/network/connection.h" +#include "envoy/server/factory_context.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/linked_object.h" +#include "source/common/upstream/load_balancer_impl.h" + +#include "contrib/generic_proxy/filters/network/source/interface/generic_codec.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_filter.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_stream.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { +namespace Router { + +/** + * Stream reset reasons. + */ +enum class StreamResetReason : uint32_t { + LocalReset, + // If the stream was locally reset by a connection pool due to an initial connection failure. + ConnectionFailure, + // If the stream was locally reset due to connection termination. + ConnectionTermination, + // The stream was reset because of a resource overflow. + Overflow, + // Protocol error. + ProtocolError, +}; + +class RouterFilter; + +class UpstreamRequest : public Tcp::ConnectionPool::Callbacks, + public Tcp::ConnectionPool::UpstreamCallbacks, + public LinkedObject, + public Event::DeferredDeletable, + public ResponseDecoderCallback, + Logger::Loggable { +public: + UpstreamRequest(RouterFilter& parent, Upstream::TcpPoolData tcp_data); + + void startStream(); + void resetStream(StreamResetReason reason); + + // Tcp::ConnectionPool::Callbacks + void onPoolFailure(ConnectionPool::PoolFailureReason reason, + absl::string_view transport_failure_reason, + Upstream::HostDescriptionConstSharedPtr host) override; + void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, + Upstream::HostDescriptionConstSharedPtr host) override; + + // Tcp::ConnectionPool::UpstreamCallbacks + void onUpstreamData(Buffer::Instance& data, bool) override; + void onEvent(Network::ConnectionEvent event) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + // ResponseDecoderCallback + void onGenericResponse(GenericResponsePtr response) override; + void onDecodingError() override; + + void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host); + void encodeBufferToUpstream(Buffer::Instance& buffer); + + bool stream_reset_{}; + + RouterFilter& parent_; + Upstream::TcpPoolData tcp_data_; + + Tcp::ConnectionPool::Cancellable* conn_pool_handle_{}; + Tcp::ConnectionPool::ConnectionDataPtr conn_data_; + Upstream::HostDescriptionConstSharedPtr upstream_host_; + + bool response_started_{}; + bool response_complete_{}; + GenericResponseDecoderPtr response_decoder_; +}; +using UpstreamRequestPtr = std::unique_ptr; + +class RouterFilter : public DecoderFilter, + Logger::Loggable, + public Upstream::LoadBalancerContextBase { +public: + RouterFilter(Server::Configuration::FactoryContext& context) : context_(context) {} + + // DecoderFilter + void onDestroy() override; + + GenericFilterStatus onStreamDecoded(GenericRequest& request) override; + void setDecoderFilterCallbacks(DecoderFilterCallback& callbacks) override { + callbacks_ = &callbacks; + } + + void onUpstreamResponse(GenericResponsePtr response); + void resetStream(StreamResetReason reason); + + void onUpstreamRequestReset(UpstreamRequest& upstream_request, StreamResetReason reason); + void cleanUpstreamRequests(bool filter_complete); + +private: + friend class UpstreamRequest; + + bool filter_complete_{}; + + Buffer::OwnedImpl upstream_request_buffer_; + GenericRequestEncoderPtr request_encoder_; + + std::list upstream_requests_; + + DecoderFilterCallback* callbacks_{}; + + Server::Configuration::FactoryContext& context_; +}; + +} // namespace Router +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/generic_proxy.cc b/contrib/generic_proxy/filters/network/source/generic_proxy.cc new file mode 100644 index 0000000000000..5ac499590bd9a --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/generic_proxy.cc @@ -0,0 +1,205 @@ +#include "contrib/generic_proxy/filters/network/source/generic_proxy.h" + +#include "envoy/common/exception.h" +#include "envoy/network/connection.h" + +#include "source/common/config/utility.h" +#include "source/common/protobuf/protobuf.h" +#include "source/common/stream_info/stream_info_impl.h" + +#include "contrib/generic_proxy/filters/network/source/interface/generic_config.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_filter.h" +#include "contrib/generic_proxy/filters/network/source/route_impl.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +CodecFactoryPtr FilterConfig::codecFactoryFromProto( + const envoy::config::core::v3::TypedExtensionConfig& codec_config, + Envoy::Server::Configuration::FactoryContext& context) { + auto& factory = + Config::Utility::getAndCheckFactoryByName(codec_config.name()); + + ProtobufTypes::MessagePtr message = factory.createEmptyConfigProto(); + Envoy::Config::Utility::translateOpaqueConfig(codec_config.typed_config(), + context.messageValidationVisitor(), *message); + return factory.createFactory(*message, context); +} + +RouteMatcherPtr FilterConfig::routeMatcherFromProto( + const envoy::extensions::filters::network::generic_proxy::v3alpha::RouteConfiguration& + route_config, + Envoy::Server::Configuration::FactoryContext& context) { + return std::make_unique(route_config, context); +} + +std::vector FilterConfig::filtersFactoryFromProto( + const ProtobufWkt::RepeatedPtrField< + envoy::extensions::filters::network::generic_proxy::v3alpha::GenericFilter>& filters, + const std::string stats_prefix, Envoy::Server::Configuration::FactoryContext& context) { + + std::vector factories; + bool has_terminal_filter = false; + std::string terminal_filter_name; + for (const auto& filter : filters) { + if (has_terminal_filter) { + throw EnvoyException(fmt::format("Termial filter: {} must be the last generic L7 filter", + terminal_filter_name)); + } + + auto& factory = + Config::Utility::getAndCheckFactoryByName(filter.name()); + + ProtobufTypes::MessagePtr message = factory.createEmptyConfigProto(); + Envoy::Config::Utility::translateOpaqueConfig(filter.config(), + context.messageValidationVisitor(), *message); + + factories.push_back(factory.createFilterFactoryFromProto(*message, stats_prefix, context)); + + if (factory.isTerminalFilter()) { + terminal_filter_name = filter.name(); + has_terminal_filter = true; + } + } + + if (!has_terminal_filter) { + throw EnvoyException("A termial L7 filter is necessary for generic proxy"); + } + return factories; +} + +ActiveStream::ActiveStream(Filter& parent, GenericRequestPtr request) + : parent_(parent), request_(std::move(request)) {} + +ActiveStream::~ActiveStream() { + for (auto& filter : decoder_filters_) { + filter->onDestroy(); + } + for (auto& filter : encoder_filters_) { + if (filter->isDualFilter()) { + continue; + } + filter->onDestroy(); + } +} + +const Network::Connection* ActiveStream::connection() { return &parent_.connection(); } +Event::Dispatcher& ActiveStream::dispatcher() { return parent_.connection().dispatcher(); } +const CodecFactory& ActiveStream::downstreamCodec() { return *parent_.config_->codec_factory_; } +void ActiveStream::resetStream() { + if (active_stream_reset_) { + return; + } + active_stream_reset_ = true; + parent_.deferredStream(*this); +} + +void ActiveStream::sendLocalReply(GenericState status, absl::string_view status_detail, + MetadataUpdateFunction&& func) { + parent_.sendLocalReply(status, status_detail, std::move(func)); +} + +void ActiveStream::continueDecoding() { + if (active_stream_reset_ || request_ == nullptr) { + return; + } + + ASSERT(request_ != nullptr); + for (; next_decoder_filter_index_ < decoder_filters_.size();) { + auto status = decoder_filters_[next_decoder_filter_index_]->onStreamDecoded(*request_); + next_decoder_filter_index_++; + if (status == GenericFilterStatus::StopIteration) { + break; + } + } + if (next_decoder_filter_index_ == decoder_filters_.size()) { + ENVOY_LOG(debug, "Complete decoder filters"); + } +} + +void ActiveStream::upstreamResponse(GenericResponsePtr response) { + response_ = std::move(response); + continueEncoding(); +} + +void ActiveStream::continueEncoding() { + if (active_stream_reset_ || response_ == nullptr) { + return; + } + + ASSERT(response_ != nullptr); + for (; next_encoder_filter_index_ < encoder_filters_.size();) { + auto status = encoder_filters_[next_encoder_filter_index_]->onStreamEncoded(*response_); + next_encoder_filter_index_++; + if (status == GenericFilterStatus::StopIteration) { + break; + } + } + + if (next_encoder_filter_index_ == encoder_filters_.size()) { + ENVOY_LOG(debug, "Complete decoder filters"); + parent_.sendReplyDownstream(*response_); + } +} + +Envoy::Network::FilterStatus Filter::onData(Envoy::Buffer::Instance& data, bool) { + if (downstream_connection_closed_) { + return Envoy::Network::FilterStatus::StopIteration; + } + + decoder_->decode(data); + return Envoy::Network::FilterStatus::StopIteration; +} + +void Filter::onGenericRequest(GenericRequestPtr request) { + // New normal request and create active stream for this request. + newDownstreamRequest(std::move(request)); +} + +void Filter::onDirectResponse(GenericResponsePtr direct) { sendReplyDownstream(*direct); } + +void Filter::onDecodingError() { + resetStreamsForUnexpectedError(); + connection().close(Network::ConnectionCloseType::FlushWrite); +} + +void Filter::sendReplyDownstream(GenericResponse& response) { + ASSERT(callbacks_->connection().state() == Network::Connection::State::Open); + response_encoder_->encode(response, response_buffer_); + callbacks_->connection().write(response_buffer_, false); +} + +void Filter::sendLocalReply(GenericState status, absl::string_view status_detail, + MetadataUpdateFunction&& func) { + auto response = creator_->response(status, status_detail); + func(*response); + sendReplyDownstream(*response); +} + +void Filter::newDownstreamRequest(GenericRequestPtr request) { + auto stream = std::make_unique(*this, std::move(request)); + config_->createFilterChain(*stream); + LinkedList::moveIntoList(std::move(stream), active_streams_); + // Start request. + (*active_streams_.begin())->continueDecoding(); +} + +void Filter::deferredStream(ActiveStream& stream) { + if (!stream.inserted()) { + return; + } + callbacks_->connection().dispatcher().deferredDelete(stream.removeFromList(active_streams_)); +} + +void Filter::resetStreamsForUnexpectedError() { + while (!active_streams_.empty()) { + active_streams_.front()->resetStream(); + } +} + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/generic_proxy.h b/contrib/generic_proxy/filters/network/source/generic_proxy.h new file mode 100644 index 0000000000000..9ef4468fc9795 --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/generic_proxy.h @@ -0,0 +1,233 @@ +#pragma once + +#include + +#include +#include +#include + +#include "envoy/config/core/v3/extension.pb.h" +#include "envoy/network/connection.h" +#include "envoy/network/filter.h" +#include "envoy/server/factory_context.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/linked_object.h" +#include "source/common/common/logger.h" +#include "source/common/stream_info/stream_info_impl.h" + +#include "contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/generic_proxy.pb.h" +#include "contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/generic_proxy.pb.validate.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_codec.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_filter.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_route.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_stream.h" +#include "contrib/generic_proxy/filters/network/source/route_impl.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +using GenericProxyConfig = + envoy::extensions::filters::network::generic_proxy::v3alpha::GenericProxyConfig; + +class Filter; +class ActiveStream; + +class FilterConfig : public FilterChainFactory { +public: + FilterConfig(const std::string& stat_prefix, CodecFactoryPtr codec, RouteMatcherPtr route_matcher, + std::vector factories, Server::Configuration::FactoryContext&) + : stat_prefix_(stat_prefix), codec_factory_(std::move(codec)), + route_matcher_(std::move(route_matcher)), factories_(std::move(factories)) {} + + FilterConfig(const GenericProxyConfig& config, Server::Configuration::FactoryContext& context) + : FilterConfig( + config.stat_prefix(), codecFactoryFromProto(config.codec_specifier(), context), + routeMatcherFromProto(config.route_config(), context), + filtersFactoryFromProto(config.generic_filters(), config.stat_prefix(), context), + context) {} + + RouteEntryConstSharedPtr routeEntry(const GenericRequest& request) const { + return route_matcher_->routeEntry(request); + } + + // FilterChainFactory + void createFilterChain(FilterChainFactoryCallbacks& callbacks) override { + for (const auto& factory : factories_) { + factory(callbacks); + } + } + + const CodecFactory& codecFactory() { return *codec_factory_; } + +private: + friend class ActiveStream; + friend class Filter; + + static CodecFactoryPtr + codecFactoryFromProto(const envoy::config::core::v3::TypedExtensionConfig& codec_config, + Server::Configuration::FactoryContext& context); + + static RouteMatcherPtr routeMatcherFromProto( + const envoy::extensions::filters::network::generic_proxy::v3alpha::RouteConfiguration& + route_config, + Server::Configuration::FactoryContext& context); + + static std::vector filtersFactoryFromProto( + const ProtobufWkt::RepeatedPtrField< + envoy::extensions::filters::network::generic_proxy::v3alpha::GenericFilter>& filters, + const std::string stats_prefix, Server::Configuration::FactoryContext& context); + + const std::string stat_prefix_; + + CodecFactoryPtr codec_factory_; + + RouteMatcherPtr route_matcher_; + + std::vector factories_; +}; +using FilterConfigSharedPtr = std::shared_ptr; + +class ActiveStream : public FilterChainFactoryCallbacks, + public DecoderFilterCallback, + public EncoderFilterCallback, + public LinkedObject, + public Event::DeferredDeletable, + Logger::Loggable { +public: + ActiveStream(Filter& parent, GenericRequestPtr request); + ~ActiveStream(); + + // FilterChainFactoryCallbacks + void addDecoderFilter(DecoderFilterSharedPtr filter) override { + decoder_filters_.emplace_back(std::move(filter)); + } + void addEncoderFilter(EncoderFilterSharedPtr filter) override { + encoder_filters_.emplace_back(std::move(filter)); + } + void addFilter(StreamFilterSharedPtr filter) override { + decoder_filters_.push_back(filter); + encoder_filters_.emplace_back(std::move(filter)); + } + + // StreamFilterCallbacks + const Network::Connection* connection() override; + Event::Dispatcher& dispatcher() override; + const CodecFactory& downstreamCodec() override; + void resetStream() override; + const RouteEntry* routeEntry() const override { return cached_route_entry_.get(); } + Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return cached_cluster_info_; } + + // DecoderFilterCallback + void sendLocalReply(GenericState status, absl::string_view status_detail, + MetadataUpdateFunction&&) override; + void continueDecoding() override; + void upstreamResponse(GenericResponsePtr response) override; + + // EncoderFilterCallback + void continueEncoding() override; + +private: + Filter& parent_; + + bool active_stream_reset_{false}; + + GenericRequestPtr request_; + GenericResponsePtr response_; + + RouteEntryConstSharedPtr cached_route_entry_; + Upstream::ClusterInfoConstSharedPtr cached_cluster_info_; + + std::vector decoder_filters_; + size_t next_decoder_filter_index_{0}; + + std::vector encoder_filters_; + size_t next_encoder_filter_index_{0}; +}; +using ActiveStreamPtr = std::unique_ptr; + +class Filter : public Envoy::Network::ReadFilter, + public Network::ConnectionCallbacks, + public Envoy::Logger::Loggable, + public RequestDecoderCallback { +public: + Filter(FilterConfigSharedPtr config, Server::Configuration::FactoryContext& context) + : config_(std::move(config)), context_(context) { + decoder_ = config_->codecFactory().requestDecoder(); + decoder_->setDecoderCallback(*this); + response_encoder_ = config_->codecFactory().responseEncoder(); + creator_ = config_->codecFactory().messageCreator(); + } + + // Envoy::Network::ReadFilter + Envoy::Network::FilterStatus onData(Envoy::Buffer::Instance& data, bool end_stream) override; + Envoy::Network::FilterStatus onNewConnection() override { + return Envoy::Network::FilterStatus::Continue; + } + void initializeReadFilterCallbacks(Envoy::Network::ReadFilterCallbacks& callbacks) override { + callbacks_ = &callbacks; + callbacks_->connection().addConnectionCallbacks(*this); + } + + // RequestDecoderCallback + void onGenericRequest(GenericRequestPtr request) override; + void onDirectResponse(GenericResponsePtr direct) override; + void onDecodingError() override; + + // Network::ConnectionCallbacks + void onEvent(Network::ConnectionEvent event) override { + if (event == Network::ConnectionEvent::Connected) { + return; + } + downstream_connection_closed_ = true; + resetStreamsForUnexpectedError(); + } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + void sendReplyDownstream(GenericResponse& response); + void sendLocalReply(GenericState status, absl::string_view status_detail, + MetadataUpdateFunction&&); + + Network::Connection& connection() { + ASSERT(callbacks_ != nullptr); + return callbacks_->connection(); + } + + Server::Configuration::FactoryContext& factoryContext() { return context_; } + + void newDownstreamRequest(GenericRequestPtr request); + void deferredStream(ActiveStream& stream); + + static const std::string& name() { + CONSTRUCT_ON_FIRST_USE(std::string, "envoy.filters.network.generic_proxy"); + } + +private: + friend class ActiveStream; + + void resetStreamsForUnexpectedError(); + + bool downstream_connection_closed_{}; + + Envoy::Network::ReadFilterCallbacks* callbacks_{nullptr}; + + FilterConfigSharedPtr config_{}; + + GenericRequestDecoderPtr decoder_; + GenericResponseEncoderPtr response_encoder_; + GenericMessageCreatorPtr creator_; + + Buffer::OwnedImpl response_buffer_; + + Server::Configuration::FactoryContext& context_; + + std::list active_streams_; +}; + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/interface/BUILD b/contrib/generic_proxy/filters/network/source/interface/BUILD new file mode 100644 index 0000000000000..57e22b04db82e --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/interface/BUILD @@ -0,0 +1,68 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "generic_stream_interface", + hdrs = [ + "generic_stream.h", + ], +) + +envoy_cc_library( + name = "generic_codec_interface", + hdrs = [ + "generic_codec.h", + ], + deps = [ + ":generic_stream_interface", + "//envoy/buffer:buffer_interface", + "//envoy/config:typed_config_interface", + "//envoy/server:factory_context_interface", + ], +) + +envoy_cc_library( + name = "generic_filter_interface", + hdrs = [ + "generic_filter.h", + ], + deps = [ + ":generic_codec_interface", + ":generic_route_interface", + ":generic_stream_interface", + ], +) + +envoy_cc_library( + name = "generic_route_interface", + hdrs = [ + "generic_route.h", + ], + deps = [ + ":generic_stream_interface", + "//envoy/config:typed_metadata_interface", + "//envoy/event:dispatcher_interface", + "//envoy/network:connection_interface", + "//envoy/stream_info:stream_info_interface", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) + +envoy_cc_library( + name = "generic_config_interface", + hdrs = [ + "generic_config.h", + ], + deps = [ + ":generic_filter_interface", + "//envoy/config:typed_config_interface", + "//envoy/server:factory_context_interface", + ], +) diff --git a/contrib/generic_proxy/filters/network/source/interface/generic_codec.h b/contrib/generic_proxy/filters/network/source/interface/generic_codec.h new file mode 100644 index 0000000000000..e637c9bfad6fd --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/interface/generic_codec.h @@ -0,0 +1,149 @@ +#pragma once + +#include "envoy/buffer/buffer.h" +#include "envoy/config/typed_config.h" +#include "envoy/server/factory_context.h" + +#include "contrib/generic_proxy/filters/network/source/interface/generic_stream.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +/** + * Decoder callback of generic request. + */ +class RequestDecoderCallback { +public: + virtual ~RequestDecoderCallback() = default; + + virtual void onGenericRequest(GenericRequestPtr request) PURE; + virtual void onDirectResponse(GenericResponsePtr direct) PURE; + virtual void onDecodingError() PURE; +}; + +/** + * Decoder callback of generic Response. + */ +class ResponseDecoderCallback { +public: + virtual ~ResponseDecoderCallback() = default; + + virtual void onGenericResponse(GenericResponsePtr response) PURE; + virtual void onDecodingError() PURE; +}; + +/** + * Decoder of generic request. + */ +class GenericRequestDecoder { +public: + virtual ~GenericRequestDecoder() = default; + + virtual void setDecoderCallback(RequestDecoderCallback& callback) PURE; + virtual void decode(Buffer::Instance& buffer) PURE; +}; + +/** + * Decoder of generic respnose. + */ +class GenericResponseDecoder { +public: + virtual ~GenericResponseDecoder() = default; + + virtual void setDecoderCallback(ResponseDecoderCallback& callback) PURE; + virtual void decode(Buffer::Instance& buffer) PURE; +}; + +/* + * Encoder of generic request. + */ +class GenericRequestEncoder { +public: + virtual ~GenericRequestEncoder() = default; + + // TODO(wbpcode): update this method to support async encoding. + virtual void encode(GenericRequest&, Buffer::Instance& buffer) PURE; +}; + +/* + * Encoder of generic response. + */ +class GenericResponseEncoder { +public: + virtual ~GenericResponseEncoder() = default; + + // TODO(wbpcode): update this method to support async encoding. + virtual void encode(GenericResponse&, Buffer::Instance& buffer) PURE; +}; + +class GenericMessageCreator { +public: + virtual ~GenericMessageCreator() = default; + + /** + * Create local reponse message for local reply. + */ + virtual GenericResponsePtr response(GenericState status, absl::string_view status_detail, + GenericRequest* origin_request = nullptr) PURE; +}; + +using GenericRequestDecoderPtr = std::unique_ptr; +using GenericResponseDecoderPtr = std::unique_ptr; +using GenericRequestEncoderPtr = std::unique_ptr; +using GenericResponseEncoderPtr = std::unique_ptr; +using GenericMessageCreatorPtr = std::unique_ptr; + +/** + * Factory used to create generic stream encoder and decoder. If the developer wants to add new + * protocol support to this proxy, they need to implement the corresponding codec factory for the + * corresponding protocol. + */ +class CodecFactory { +public: + virtual ~CodecFactory() = default; + + /* + * Create generic request decoder. + */ + virtual GenericRequestDecoderPtr requestDecoder() const PURE; + + /* + * Create generic response decoder. + */ + virtual GenericResponseDecoderPtr responseDecoder() const PURE; + + /* + * Create generic request encoder. + */ + virtual GenericRequestEncoderPtr requestEncoder() const PURE; + + /* + * Create generic response encoder. + */ + virtual GenericResponseEncoderPtr responseEncoder() const PURE; + + /** + * Create generic message creator. + */ + virtual GenericMessageCreatorPtr messageCreator() const PURE; +}; + +using CodecFactoryPtr = std::unique_ptr; + +/** + * Factory config for codec factory. This class is used to register and create codec factories. + */ +class CodecFactoryConfig : public Envoy::Config::TypedFactory { +public: + virtual CodecFactoryPtr createFactory(const Protobuf::Message& config, + Envoy::Server::Configuration::FactoryContext& context) PURE; + + std::string category() const override { return "envoy.generic_proxy.codec"; } +}; + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/interface/generic_config.h b/contrib/generic_proxy/filters/network/source/interface/generic_config.h new file mode 100644 index 0000000000000..ad0716a8c45e1 --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/interface/generic_config.h @@ -0,0 +1,50 @@ +#pragma once + +#include "envoy/config/typed_config.h" +#include "envoy/server/factory_context.h" + +#include "contrib/generic_proxy/filters/network/source/interface/generic_filter.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +/** + * Implemented by each generic filter and registered via Registry::registerFactory or the + * convenience class RegisterFactory. + */ +class NamedGenericFilterConfigFactory : public Config::TypedFactory { +public: + virtual FilterFactoryCb + createFilterFactoryFromProto(const Protobuf::Message& config, const std::string& stat_prefix, + Server::Configuration::FactoryContext& context) PURE; + + /** + * @return ProtobufTypes::MessagePtr create empty route config proto message route specfic + * config. + */ + virtual ProtobufTypes::MessagePtr createEmptyRouteConfigProto() { return nullptr; } + + /** + * @return RouteSpecificFilterConfigConstSharedPtr allow the filter to pre-process per route + * config. Returned object will be stored in the loaded route configuration. + */ + virtual RouteSpecificFilterConfigConstSharedPtr + createRouteSpecificFilterConfig(const Protobuf::Message&, Server::Configuration::FactoryContext&, + ProtobufMessage::ValidationVisitor&) { + return nullptr; + } + + std::string category() const override { return "envoy.generic_proxy.filters"; } + + /** + * @return bool true if this filter must be the last filter in a filter chain, false otherwise. + */ + virtual bool isTerminalFilter() { return false; } +}; + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/interface/generic_filter.h b/contrib/generic_proxy/filters/network/source/interface/generic_filter.h new file mode 100644 index 0000000000000..198ac4412830e --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/interface/generic_filter.h @@ -0,0 +1,154 @@ +#pragma once + +#include "envoy/event/dispatcher.h" +#include "envoy/network/connection.h" +#include "envoy/stream_info/stream_info.h" + +#include "contrib/generic_proxy/filters/network/source/interface/generic_codec.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_route.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_stream.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +using MetadataUpdateFunction = std::function; + +/** + * The stream filter callbacks are passed to all filters to use for writing response data and + * interacting with the underlying stream in general. + */ +class StreamFilterCallbacks { +public: + virtual ~StreamFilterCallbacks() = default; + + /** + * @return const Network::Connection* the originating connection, or nullptr if there is none. + */ + virtual const Network::Connection* connection() PURE; + + /** + * @return Event::Dispatcher& the thread local dispatcher for allocating timers, etc. + */ + virtual Event::Dispatcher& dispatcher() PURE; + + /** + * @return const CodecFactory& the downstream codec factory used to create request/response + * decoder/encoder. + */ + virtual const CodecFactory& downstreamCodec() PURE; + + /** + * Reset the underlying stream. + */ + virtual void resetStream() PURE; + + /** + * @return const RouteEntry* cached route entry for current request. + */ + virtual const RouteEntry* routeEntry() const PURE; + + /** + * Returns the clusterInfo for the cached route. + */ + virtual Upstream::ClusterInfoConstSharedPtr clusterInfo() PURE; +}; + +class DecoderFilterCallback : public StreamFilterCallbacks { +public: + virtual void sendLocalReply(GenericState status, absl::string_view status_detail, + MetadataUpdateFunction&& cb = nullptr) PURE; + + virtual void continueDecoding() PURE; + + virtual void upstreamResponse(GenericResponsePtr response) PURE; +}; + +class EncoderFilterCallback : public StreamFilterCallbacks { +public: + virtual void continueEncoding() PURE; +}; + +enum class GenericFilterStatus { Continue, StopIteration }; + +class DecoderFilter { +public: + virtual ~DecoderFilter() = default; + + virtual bool isDualFilter() const { return false; } + + virtual void onDestroy() PURE; + + virtual void setDecoderFilterCallbacks(DecoderFilterCallback& callbacks) PURE; + virtual GenericFilterStatus onStreamDecoded(GenericRequest& request) PURE; +}; + +class EncoderFilter { +public: + virtual ~EncoderFilter() = default; + + virtual bool isDualFilter() const { return false; } + + virtual void onDestroy() PURE; + + virtual void setEncoderFilterCallbacks(EncoderFilterCallback& callbacks) PURE; + virtual GenericFilterStatus onStreamEncoded(GenericResponse& response) PURE; +}; + +class StreamFilter : public DecoderFilter, public EncoderFilter { +public: + bool isDualFilter() const override final { return true; } +}; + +using DecoderFilterSharedPtr = std::shared_ptr; +using EncoderFilterSharedPtr = std::shared_ptr; +using StreamFilterSharedPtr = std::shared_ptr; + +class FilterChainFactoryCallbacks { +public: + virtual ~FilterChainFactoryCallbacks() = default; + + /** + * Add a decoder filter that is used when reading connection data. + * @param filter supplies the filter to add. + */ + virtual void addDecoderFilter(DecoderFilterSharedPtr filter) PURE; + + /** + * Add a encoder filter that is used when writing connection data. + * @param filter supplies the filter to add. + */ + virtual void addEncoderFilter(EncoderFilterSharedPtr filter) PURE; + + /** + * Add a decoder/encoder filter that is used both when reading and writing connection data. + * @param filter supplies the filter to add. + */ + virtual void addFilter(StreamFilterSharedPtr filter) PURE; +}; + +using FilterFactoryCb = std::function; + +/** + * A FilterChainFactory is used by a connection manager to create a Kafka level filter chain when + * a new connection is created. Typically it would be implemented by a configuration engine that + * would install a set of filters that are able to process an application scenario on top of a + * stream of Dubbo requests. + */ +class FilterChainFactory { +public: + virtual ~FilterChainFactory() = default; + + /** + * Called when a new Kafka stream is created on the connection. + * @param callbacks supplies the "sink" that is used for actually creating the filter chain. @see + * FilterChainFactoryCallbacks. + */ + virtual void createFilterChain(FilterChainFactoryCallbacks& callbacks) PURE; +}; + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/interface/generic_route.h b/contrib/generic_proxy/filters/network/source/interface/generic_route.h new file mode 100644 index 0000000000000..ca31eae1dc824 --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/interface/generic_route.h @@ -0,0 +1,106 @@ +#pragma once + +#include + +#include + +#include "envoy/config/core/v3/base.pb.h" +#include "envoy/config/typed_metadata.h" + +#include "contrib/generic_proxy/filters/network/source/interface/generic_stream.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +class RouteSpecificFilterConfig { +public: + virtual ~RouteSpecificFilterConfig() = default; +}; +using RouteSpecificFilterConfigConstSharedPtr = std::shared_ptr; + +class GenericRouteTypedMetadataFactory : public Envoy::Config::TypedMetadataFactory {}; + +class RetryPolicy { +public: + virtual ~RetryPolicy() = default; + + /** + * When upstream returns a response or when a specific event occurs, whether it should retry. + * + * @param count The number of requests that have been made upstream. + * @param response The optional upstream response. + * @param event The optional upstream request or connection event. + * @return bool should kick off a new retry request or not. + */ + virtual bool shouldRetry(uint32_t count, const GenericResponse* response, + absl::optional event = absl::nullopt) const PURE; + + /** + * @return std::chrono::milliseconds per upstream request timeout. + */ + virtual std::chrono::milliseconds timeout() const PURE; +}; + +class RouteEntry { +public: + virtual ~RouteEntry() = default; + + virtual const std::string& clusterName() const PURE; + + /** + * Get route level per filter config by the filter name. + */ + virtual const RouteSpecificFilterConfig* perFilterConfig(absl::string_view) const PURE; + template const T* typedPerFilterConfig(absl::string_view name) const { + return dynamic_cast(perFilterConfig(name)); + } + + /** + * Update generic request before encode and send request to upstream. + */ + virtual void finalizeGenericRequest(GenericRequest& request) const PURE; + + /** + * Update generic respnose before encode and send response to downstream. + */ + virtual void finalizeGenericResponse(GenericResponse& response) const PURE; + + /** + * @return const Envoy::Config::TypedMetadata& return the typed metadata provided in the config + * for this route. + */ + virtual const Envoy::Config::TypedMetadata& typedMetadata() const PURE; + + /** + * @return const envoy::config::core::v3::Metadata& return the metadata provided in the config for + * this route. + */ + virtual const envoy::config::core::v3::Metadata& metadata() const PURE; + + /** + * @return std::chrono::milliseconds the route's timeout. + */ + virtual std::chrono::milliseconds timeout() const PURE; + + /** + * const RetryPolicy& the retry policy for the route. All routes have a retry policy even if it is + * empty and does not allow retries. + */ + virtual const RetryPolicy& retryPolicy() const PURE; +}; +using RouteEntryConstSharedPtr = std::shared_ptr; + +class RouteMatcher { +public: + virtual ~RouteMatcher() = default; + + virtual RouteEntryConstSharedPtr routeEntry(const GenericRequest& request) const PURE; +}; +using RouteMatcherPtr = std::unique_ptr; + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/interface/generic_stream.h b/contrib/generic_proxy/filters/network/source/interface/generic_stream.h new file mode 100644 index 0000000000000..418445c150724 --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/interface/generic_stream.h @@ -0,0 +1,148 @@ +#pragma once + +#include +#include +#include + +#include "envoy/common/pure.h" + +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +class GenericStreamBase { +public: + virtual ~GenericStreamBase(); + + using IterateCallback = std::function; + + /** + * Iterate over all generic stream metadata entry. + * + * @param callback supplies the iteration callback. + */ + virtual void forEach(IterateCallback callback) const PURE; + + /** + * Get generic stream metadata value by key. + * + * @param key The metadata key of string view type. + * @return The optional metadata value of string_view type. + */ + virtual absl::optional getByKey(absl::string_view key) const PURE; + + /** + * Set new generic stream metadata key/value pair. + * + * @param key The metadata key of string view type. + * @param val The metadata value of string view type. + */ + virtual void setByKey(absl::string_view key, absl::string_view val) PURE; + + /** + * Set new generic stream metadata key/value pair. The key MUST point to data that will live + * beyond the lifetime of any generic stream that using the string. + * + * @param key The metadata key of string view type. + * @param val The metadata value of string view type. + */ + virtual void setByReferenceKey(absl::string_view key, absl::string_view val) PURE; + + /** + * Set new generic stream metadata key/value pair. Both key and val MUST point to data that will + * live beyond the lifetime of any generic stream that using the string. + * + * @param key The metadata key of string view type. + * @param val The metadata value of string view type. + */ + virtual void setByReference(absl::string_view key, absl::string_view val) PURE; +}; + +class GenericRequest : public GenericStreamBase { +public: + /** + * Get request protocol. + * + * @return A string view representing the protocol of the generic request behind the context. + */ + virtual absl::string_view protocol() const PURE; + + /** + * Get request authority. + * + * @return The authority of generic request. It generally consists of the host and an optional + * user information and an optional port. + */ + virtual absl::string_view authority() const PURE; + + /** + * Get request path. + * + * @return The path of generic request. The content and meaning of path are determined by + * specific protocol itself. + */ + virtual absl::string_view path() const PURE; + + /** + * Get request method. + * + * @return The method of generic request. The content and meaning of method are determined by + * specific protocol itself. + */ + virtual absl::string_view method() const PURE; +}; +using GenericRequestPtr = std::unique_ptr; +using GenericRequestSharedPtr = std::shared_ptr; + +enum class GenericEvent { + Timeout, + ConnectionTimeout, + ConnectionClosed, + LocalConnectionClosed, + ConnectionFailure, +}; + +enum class GenericState { + NONE, + OK, + ExpectedError, + UnknowedError, + LocalOK, + LocalExpectedError, + LocalUnknowedError, +}; + +class GenericResponse : public GenericStreamBase { +public: + /** + * Get generic response protocol. + * + * @return A string view representing the protocol of the generic stream behind the context. + */ + virtual absl::string_view protocol() const PURE; + + /** + * Get generic response status. + * + * @return Generic response status. + */ + virtual GenericState status() const PURE; + + /** + * Get generic response status detail of string view type. + * + * @return Generic response status detail. Status detail is a specific supplement to status. + */ + virtual absl::string_view statusDetail() const PURE; +}; +using GenericResponsePtr = std::unique_ptr; +using GenericResponseSharedPtr = std::shared_ptr; + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/route_impl.cc b/contrib/generic_proxy/filters/network/source/route_impl.cc new file mode 100644 index 0000000000000..7a262c91339bf --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/route_impl.cc @@ -0,0 +1,131 @@ +#include "contrib/generic_proxy/filters/network/source/route_impl.h" + +#include "source/common/common/assert.h" +#include "source/common/common/matchers.h" +#include "source/common/config/utility.h" + +#include "contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/generic_proxy.pb.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_config.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_route.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +SingleHeaderMatch::SingleHeaderMatch( + const envoy::extensions::filters::network::generic_proxy::v3alpha::HeaderMatch& header) + : name_(header.name()), invert_match_(header.invert_match()) { + if (header.has_string_match()) { + string_.emplace(Envoy::Matchers::StringMatcherImpl(header.string_match())); + } +} + +bool SingleHeaderMatch::match(const GenericRequest& request) const { + auto header = request.getByKey(name_); + bool result = false; + + if (string_.has_value()) { + result = header.has_value() ? string_->match(header.value()) : false; + } else { + result = header.has_value() == present_match_; + } + + return result != invert_match_; +} + +SingleRouteMatch ::SingleRouteMatch( + const envoy::extensions::filters::network::generic_proxy::v3alpha::RouteMatch& matcher) { + if (matcher.has_path()) { + path_.emplace(Envoy::Matchers::StringMatcherImpl(matcher.path())); + } + if (matcher.has_method()) { + method_.emplace(Envoy::Matchers::StringMatcherImpl(matcher.method())); + } + + for (const auto& header : matcher.headers()) { + headers_.push_back({header}); + } +} + +bool SingleRouteMatch::match(const GenericRequest& request) const { + if (method_.has_value() && !method_->match(request.method())) { + return false; + } + if (path_.has_value() && !path_->match(request.path())) { + return false; + } + + for (const auto& header_match : headers_) { + if (!header_match.match(request)) { + return false; + } + } + return true; +} + +RouteEntryImpl::RouteEntryImpl( + const envoy::extensions::filters::network::generic_proxy::v3alpha::Route& route, + Envoy::Server::Configuration::FactoryContext& context) + : route_match_(route.match()), + timeout_(PROTOBUF_GET_MS_OR_DEFAULT(route, timeout, DEFAULT_ROUTE_TIMEOUT_MS)), + retry_policy_(route.retry()), metadata_(route.metadata()), typed_metadata_(metadata_) { + + for (const auto& proto_filter_config : route.per_filter_config()) { + auto& factory = Config::Utility::getAndCheckFactoryByName( + proto_filter_config.first); + + ProtobufTypes::MessagePtr message = factory.createEmptyRouteConfigProto(); + Envoy::Config::Utility::translateOpaqueConfig(proto_filter_config.second, + context.messageValidationVisitor(), *message); + + auto route_config = factory.createRouteSpecificFilterConfig(*message, context, + context.messageValidationVisitor()); + per_filter_configs_.emplace(proto_filter_config.first, std::move(route_config)); + } +} + +RouteMatcherImpl::RouteMatcherImpl( + const envoy::extensions::filters::network::generic_proxy::v3alpha::RouteConfiguration& + route_config, + Envoy::Server::Configuration::FactoryContext& context) + : name_(route_config.name()) { + for (const auto& authority : route_config.config()) { + auto routes = std::make_shared>(); + for (const auto& route : authority.routes()) { + routes->push_back(std::make_shared(route, context)); + } + if (routes->empty()) { + continue; + } + for (const auto& authority_name : authority.authorities()) { + if (authority_name.empty()) { + throw EnvoyException("Empty authority name for generic proxy is not allowed"); + } + if (routes_.count(authority_name) > 0) { + throw EnvoyException(fmt::format( + "Repeated authority name: {} for generic proxy is not allowed", authority_name)); + } + routes_.emplace(authority_name, routes); + } + } +} + +RouteEntryConstSharedPtr RouteMatcherImpl::routeEntry(const GenericRequest& request) const { + auto iter = routes_.find(request.authority()); + if (iter == routes_.end()) { + return nullptr; + } + + for (auto& route : *iter->second) { + if (route->match(request)) { + return route; + } + } + return nullptr; +} + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy diff --git a/contrib/generic_proxy/filters/network/source/route_impl.h b/contrib/generic_proxy/filters/network/source/route_impl.h new file mode 100644 index 0000000000000..da6525180b54f --- /dev/null +++ b/contrib/generic_proxy/filters/network/source/route_impl.h @@ -0,0 +1,116 @@ +#pragma once + +#include "envoy/config/core/v3/base.pb.h" +#include "envoy/config/typed_metadata.h" +#include "envoy/server/factory_context.h" + +#include "source/common/common/matchers.h" +#include "source/common/config/metadata.h" + +#include "contrib/envoy/extensions/filters/network/generic_proxy/v3alpha/generic_proxy.pb.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_route.h" +#include "contrib/generic_proxy/filters/network/source/interface/generic_stream.h" + +namespace Envoy { +namespace Proxy { +namespace NetworkFilters { +namespace GenericProxy { + +class SingleHeaderMatch { +public: + SingleHeaderMatch( + const envoy::extensions::filters::network::generic_proxy::v3alpha::HeaderMatch& header); + + bool match(const GenericRequest& request) const; + +private: + const std::string name_; + + const bool invert_match_{}; + + absl::optional> string_; + bool present_match_{}; +}; + +class SingleRouteMatch { +public: + SingleRouteMatch( + const envoy::extensions::filters::network::generic_proxy::v3alpha::RouteMatch& matcher); + + bool match(const GenericRequest& request) const; + +private: + absl::optional> path_; + absl::optional> method_; + + std::vector headers_; +}; + +class RetryPolicyImpl : public RetryPolicy { +public: + RetryPolicyImpl(const envoy::extensions::filters::network::generic_proxy::v3alpha::RetryPolicy&) { + } + + bool shouldRetry(uint32_t, const GenericResponse*, absl::optional) const override { + return false; + } + std::chrono::milliseconds timeout() const override { return {}; } +}; + +class RouteEntryImpl : public RouteEntry { +public: + RouteEntryImpl(const envoy::extensions::filters::network::generic_proxy::v3alpha::Route& route, + Envoy::Server::Configuration::FactoryContext& context); + + bool match(const GenericRequest& request) const { return route_match_.match(request); } + + // RouteEntry + const std::string& clusterName() const override { return cluster_name_; } + const RouteSpecificFilterConfig* perFilterConfig(absl::string_view name) const override { + auto iter = per_filter_configs_.find(name); + return iter != per_filter_configs_.end() ? iter->second.get() : nullptr; + } + void finalizeGenericRequest(GenericRequest&) const override {} + void finalizeGenericResponse(GenericResponse&) const override {} + const Envoy::Config::TypedMetadata& typedMetadata() const override { return typed_metadata_; } + const envoy::config::core::v3::Metadata& metadata() const override { return metadata_; } + std::chrono::milliseconds timeout() const override { return timeout_; }; + const RetryPolicy& retryPolicy() const override { return retry_policy_; } + +private: + static const uint64_t DEFAULT_ROUTE_TIMEOUT_MS = 15000; + + const SingleRouteMatch route_match_; + + std::string cluster_name_; + + const std::chrono::milliseconds timeout_; + const RetryPolicyImpl retry_policy_; + + envoy::config::core::v3::Metadata metadata_; + Envoy::Config::TypedMetadataImpl typed_metadata_; + + absl::flat_hash_map per_filter_configs_; +}; +using RouteEntryImplConstSharedPtr = std::shared_ptr; + +class RouteMatcherImpl : public RouteMatcher { +public: + RouteMatcherImpl( + const envoy::extensions::filters::network::generic_proxy::v3alpha::RouteConfiguration& + route_config, + Envoy::Server::Configuration::FactoryContext& context); + + RouteEntryConstSharedPtr routeEntry(const GenericRequest& request) const override; + +private: + std::string name_; + + using RouteEntryVectorSharedPtr = std::shared_ptr>; + absl::flat_hash_map routes_; +}; + +} // namespace GenericProxy +} // namespace NetworkFilters +} // namespace Proxy +} // namespace Envoy