diff --git a/source/common/router/BUILD b/source/common/router/BUILD index 0de2a42c2e500..698535dc2bcd3 100644 --- a/source/common/router/BUILD +++ b/source/common/router/BUILD @@ -280,6 +280,7 @@ envoy_cc_library( ":debug_config_lib", ":header_parser_lib", ":retry_state_lib", + ":upstream_codec_filter_lib", "//envoy/event:dispatcher_interface", "//envoy/event:timer_interface", "//envoy/grpc:status", @@ -331,6 +332,38 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "upstream_codec_filter_lib", + srcs = [ + "upstream_codec_filter.cc", + ], + hdrs = [ + "upstream_codec_filter.h", + ], + deps = [ + ":config_lib", + "//envoy/event:dispatcher_interface", + "//envoy/http:codec_interface", + "//envoy/http:filter_interface", + "//envoy/runtime:runtime_interface", + "//envoy/stats:stats_interface", + "//envoy/stats:stats_macros", + "//envoy/upstream:upstream_interface", + "//source/common/common:assert_lib", + "//source/common/common:cleanup_lib", + "//source/common/common:empty_string", + "//source/common/common:utility_lib", + "//source/common/config:utility_lib", + "//source/common/http:codes_lib", + "//source/common/http:header_map_lib", + "//source/common/http:headers_lib", + "//source/common/http:message_lib", + "//source/common/http:utility_lib", + "//source/extensions/filters/http/common:factory_base_lib", + "@envoy_api//envoy/extensions/filters/http/upstream_codec/v3:pkg_cc_proto", + ], +) + envoy_cc_library( name = "router_ratelimit_lib", srcs = ["router_ratelimit.cc"], diff --git a/source/common/router/upstream_codec_filter.cc b/source/common/router/upstream_codec_filter.cc new file mode 100644 index 0000000000000..07e43f4998512 --- /dev/null +++ b/source/common/router/upstream_codec_filter.cc @@ -0,0 +1,191 @@ +#include "source/common/router/upstream_codec_filter.h" + +#include +#include +#include +#include +#include + +#include "envoy/event/dispatcher.h" +#include "envoy/event/timer.h" +#include "envoy/grpc/status.h" +#include "envoy/http/header_map.h" + +#include "source/common/common/assert.h" +#include "source/common/common/dump_state_utils.h" +#include "source/common/common/empty_string.h" +#include "source/common/common/enum_to_int.h" +#include "source/common/common/scope_tracker.h" +#include "source/common/common/utility.h" +#include "source/common/grpc/common.h" +#include "source/common/http/codes.h" +#include "source/common/http/header_map_impl.h" +#include "source/common/http/headers.h" +#include "source/common/http/message_impl.h" +#include "source/common/http/utility.h" + +namespace Envoy { +namespace Router { + +void UpstreamCodecFilter::onBelowWriteBufferLowWatermark() { + callbacks_->clusterInfo()->stats().upstream_flow_control_resumed_reading_total_.inc(); + callbacks_->upstreamCallbacks()->upstream()->readDisable(false); +} + +void UpstreamCodecFilter::onAboveWriteBufferHighWatermark() { + callbacks_->clusterInfo()->stats().upstream_flow_control_paused_reading_total_.inc(); + callbacks_->upstreamCallbacks()->upstream()->readDisable(true); +} + +void UpstreamCodecFilter::onUpstreamConnectionEstablished() { + if (latched_end_stream_.has_value()) { + const bool end_stream = *latched_end_stream_; + latched_end_stream_.reset(); + Http::FilterHeadersStatus status = decodeHeaders(*latched_headers_, end_stream); + if (status == Http::FilterHeadersStatus::Continue) { + callbacks_->continueDecoding(); + } + } +} + +// This is the last stop in the filter chain: take the headers and ship them to the codec. +Http::FilterHeadersStatus UpstreamCodecFilter::decodeHeaders(Http::RequestHeaderMap& headers, + bool end_stream) { + ASSERT(callbacks_->upstreamCallbacks()); + if (!callbacks_->upstreamCallbacks()->upstream()) { + latched_headers_ = headers; + latched_end_stream_ = end_stream; + return Http::FilterHeadersStatus::StopAllIterationAndWatermark; + } + + ENVOY_STREAM_LOG(trace, "proxying headers", *callbacks_); + calling_encode_headers_ = true; + const Http::Status status = + callbacks_->upstreamCallbacks()->upstream()->encodeHeaders(headers, end_stream); + + calling_encode_headers_ = false; + if (!status.ok() || deferred_reset_) { + deferred_reset_ = false; + // It is possible that encodeHeaders() fails. This can happen if filters or other extensions + // erroneously remove required headers. + callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DownstreamProtocolError); + const std::string details = + absl::StrCat(StreamInfo::ResponseCodeDetails::get().FilterRemovedRequiredRequestHeaders, + "{", StringUtil::replaceAllEmptySpace(status.message()), "}"); + callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, status.message(), nullptr, + absl::nullopt, details); + return Http::FilterHeadersStatus::StopIteration; + } + upstreamTiming().onFirstUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); + + if (end_stream) { + upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); + } + if (callbacks_->upstreamCallbacks()->pausedForConnect()) { + return Http::FilterHeadersStatus::StopAllIterationAndWatermark; + } + return Http::FilterHeadersStatus::Continue; +} + +// This is the last stop in the filter chain: take the data and ship it to the codec. +Http::FilterDataStatus UpstreamCodecFilter::decodeData(Buffer::Instance& data, bool end_stream) { + ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect()); + ENVOY_STREAM_LOG(trace, "proxying {} bytes", *callbacks_, data.length()); + callbacks_->upstreamCallbacks()->upstreamStreamInfo().addBytesSent(data.length()); + // TODO(alyssawilk) test intermediate filters calling continue. + callbacks_->upstreamCallbacks()->upstream()->encodeData(data, end_stream); + if (end_stream) { + upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); + } + return Http::FilterDataStatus::Continue; +} + +// This is the last stop in the filter chain: take the trailers and ship them to the codec. +Http::FilterTrailersStatus UpstreamCodecFilter::decodeTrailers(Http::RequestTrailerMap& trailers) { + ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect()); + ENVOY_STREAM_LOG(trace, "proxying trailers", *callbacks_); + callbacks_->upstreamCallbacks()->upstream()->encodeTrailers(trailers); + upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); + return Http::FilterTrailersStatus::Continue; +} + +// This is the last stop in the filter chain: take the metadata and ship them to the codec. +Http::FilterMetadataStatus UpstreamCodecFilter::decodeMetadata(Http::MetadataMap& metadata_map) { + ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect()); + ENVOY_STREAM_LOG(trace, "proxying metadata", *callbacks_); + Http::MetadataMapVector metadata_map_vector; + metadata_map_vector.emplace_back(std::make_unique(metadata_map)); + callbacks_->upstreamCallbacks()->upstream()->encodeMetadata(metadata_map_vector); + return Http::FilterMetadataStatus::Continue; +} + +// Store the callbacks from the UpstreamFilterManager, for sending the response to. +void UpstreamCodecFilter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { + callbacks_ = &callbacks; + callbacks_->addDownstreamWatermarkCallbacks(*this); + callbacks_->upstreamCallbacks()->addUpstreamCallbacks(*this); + callbacks_->upstreamCallbacks()->setUpstreamToDownstream(bridge_); +} + +// This is the response 1xx headers arriving from the codec. Send them through the filter manager. +void UpstreamCodecFilter::CodecBridge::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) { + // The filter manager can not handle more than 1 1xx header, so only forward + // the first one. + if (!seen_1xx_headers_) { + seen_1xx_headers_ = true; + filter_.callbacks_->encode1xxHeaders(std::move(headers)); + } +} + +// This is the response headers arriving from the codec. Send them through the filter manager. +void UpstreamCodecFilter::CodecBridge::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, + bool end_stream) { + // TODO(rodaine): This is actually measuring after the headers are parsed and not the first + // byte. + filter_.upstreamTiming().onFirstUpstreamRxByteReceived( + filter_.callbacks_->dispatcher().timeSource()); + + if (filter_.callbacks_->upstreamCallbacks()->pausedForConnect() && + Http::Utility::getResponseStatus(*headers) == 200) { + filter_.callbacks_->upstreamCallbacks()->setPausedForConnect(false); + filter_.callbacks_->continueDecoding(); + } + + maybeEndDecode(end_stream); + filter_.callbacks_->encodeHeaders(std::move(headers), end_stream, + StreamInfo::ResponseCodeDetails::get().ViaUpstream); +} + +// This is response data arriving from the codec. Send it through the filter manager. +void UpstreamCodecFilter::CodecBridge::decodeData(Buffer::Instance& data, bool end_stream) { + maybeEndDecode(end_stream); + filter_.callbacks_->encodeData(data, end_stream); +} + +// This is response trailers arriving from the codec. Send them through the filter manager. +void UpstreamCodecFilter::CodecBridge::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) { + maybeEndDecode(true); + filter_.callbacks_->encodeTrailers(std::move(trailers)); +} + +// This is response metadata arriving from the codec. Send it through the filter manager. +void UpstreamCodecFilter::CodecBridge::decodeMetadata(Http::MetadataMapPtr&& metadata_map) { + filter_.callbacks_->encodeMetadata(std::move(metadata_map)); +} + +void UpstreamCodecFilter::CodecBridge::dumpState(std::ostream& os, int indent_level) const { + filter_.callbacks_->upstreamCallbacks()->dumpState(os, indent_level); +} + +void UpstreamCodecFilter::CodecBridge::maybeEndDecode(bool end_stream) { + if (end_stream) { + filter_.upstreamTiming().onLastUpstreamRxByteReceived( + filter_.callbacks_->dispatcher().timeSource()); + } +} + +REGISTER_FACTORY(UpstreamCodecFilterFactory, + Server::Configuration::UpstreamHttpFilterConfigFactory); + +} // namespace Router +} // namespace Envoy diff --git a/source/common/router/upstream_codec_filter.h b/source/common/router/upstream_codec_filter.h new file mode 100644 index 0000000000000..c49bba6683f17 --- /dev/null +++ b/source/common/router/upstream_codec_filter.h @@ -0,0 +1,130 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h" +#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.validate.h" +#include "envoy/http/conn_pool.h" +#include "envoy/http/filter.h" + +#include "source/common/common/logger.h" +#include "source/common/config/well_known_names.h" +#include "source/extensions/filters/http/common/factory_base.h" + +namespace Envoy { +namespace Router { + +// This is the last filter in the upstream filter chain. +// It takes request headers/body/data from the filter manager and encodes them to the upstream +// codec. It also registers the CodecBridge with the upstream stream, and takes response +// headers/body/data from the upstream stream and sends them to the filter manager. +class UpstreamCodecFilter : public Http::StreamDecoderFilter, + public Logger::Loggable, + public Http::DownstreamWatermarkCallbacks, + public Http::UpstreamCallbacks { +public: + UpstreamCodecFilter() : bridge_(*this), calling_encode_headers_(false), deferred_reset_(false) {} + + // Http::DownstreamWatermarkCallbacks + void onBelowWriteBufferLowWatermark() override; + void onAboveWriteBufferHighWatermark() override; + + // UpstreamCallbacks + void onUpstreamConnectionEstablished() override; + + // Http::StreamFilterBase + void onDestroy() override { callbacks_->removeDownstreamWatermarkCallbacks(*this); } + + // Http::StreamDecoderFilter + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers, + bool end_stream) override; + Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override; + Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override; + Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap& metadata_map) override; + void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override; + + // This bridge connects the upstream stream to the filter manager. + class CodecBridge : public UpstreamToDownstream { + public: + CodecBridge(UpstreamCodecFilter& filter) : filter_(filter) {} + void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override; + void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override; + void decodeData(Buffer::Instance& data, bool end_stream) override; + void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override; + void decodeMetadata(Http::MetadataMapPtr&&) override; + void dumpState(std::ostream& os, int indent_level) const override; + + void onResetStream(Http::StreamResetReason reason, + absl::string_view transport_failure_reason) override { + if (filter_.calling_encode_headers_) { + filter_.deferred_reset_ = true; + return; + } + if (reason == Http::StreamResetReason::LocalReset) { + ASSERT(transport_failure_reason.empty()); + // Use this to communicate to the upstream request to not force-terminate. + transport_failure_reason = "codec_error"; + } + filter_.callbacks_->resetStream(reason, transport_failure_reason); + } + void onAboveWriteBufferHighWatermark() override { + filter_.callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); + } + void onBelowWriteBufferLowWatermark() override { + filter_.callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); + } + // UpstreamToDownstream + const Route& route() const override { return *filter_.callbacks_->route(); } + OptRef connection() const override { + return filter_.callbacks_->connection(); + } + const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override { + return filter_.callbacks_->upstreamCallbacks()->upstreamStreamOptions(); + } + + private: + void maybeEndDecode(bool end_stream); + bool seen_1xx_headers_{}; + UpstreamCodecFilter& filter_; + }; + Http::StreamDecoderFilterCallbacks* callbacks_; + CodecBridge bridge_; + bool calling_encode_headers_ : 1; + bool deferred_reset_ : 1; + OptRef latched_headers_; + absl::optional latched_end_stream_; + +private: + StreamInfo::UpstreamTiming& upstreamTiming() { + return callbacks_->upstreamCallbacks()->upstreamStreamInfo().upstreamInfo()->upstreamTiming(); + } +}; + +class UpstreamCodecFilterFactory + : public Extensions::HttpFilters::Common::CommonFactoryBase< + envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec>, + public Server::Configuration::UpstreamHttpFilterConfigFactory { +public: + UpstreamCodecFilterFactory() : CommonFactoryBase("envoy.filters.http.upstream_codec") {} + + std::string category() const override { return "envoy.filters.http.upstream"; } + Http::FilterFactoryCb + createFilterFactoryFromProto(const Protobuf::Message&, const std::string&, + Server::Configuration::UpstreamHttpFactoryContext&) override { + return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamDecoderFilter(std::make_shared()); + }; + } + bool isTerminalFilterByProtoTyped( + const envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec&, + Server::Configuration::ServerFactoryContext&) override { + return true; + } +}; + +} // namespace Router +} // namespace Envoy diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 8d3cc149e150b..e753a72f0bb9c 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -41,163 +41,6 @@ namespace Envoy { namespace Router { -void UpstreamCodecFilter::onBelowWriteBufferLowWatermark() { - callbacks_->clusterInfo()->stats().upstream_flow_control_resumed_reading_total_.inc(); - callbacks_->upstreamCallbacks()->upstream()->readDisable(false); -} - -void UpstreamCodecFilter::onAboveWriteBufferHighWatermark() { - callbacks_->clusterInfo()->stats().upstream_flow_control_paused_reading_total_.inc(); - callbacks_->upstreamCallbacks()->upstream()->readDisable(true); -} - -void UpstreamCodecFilter::onUpstreamConnectionEstablished() { - if (latched_end_stream_.has_value()) { - const bool end_stream = *latched_end_stream_; - latched_end_stream_.reset(); - Http::FilterHeadersStatus status = decodeHeaders(*latched_headers_, end_stream); - if (status == Http::FilterHeadersStatus::Continue) { - callbacks_->continueDecoding(); - } - } -} - -// This is the last stop in the filter chain: take the headers and ship them to the codec. -Http::FilterHeadersStatus UpstreamCodecFilter::decodeHeaders(Http::RequestHeaderMap& headers, - bool end_stream) { - ASSERT(callbacks_->upstreamCallbacks()); - if (!callbacks_->upstreamCallbacks()->upstream()) { - latched_headers_ = headers; - latched_end_stream_ = end_stream; - return Http::FilterHeadersStatus::StopAllIterationAndWatermark; - } - - ENVOY_STREAM_LOG(trace, "proxying headers", *callbacks_); - calling_encode_headers_ = true; - const Http::Status status = - callbacks_->upstreamCallbacks()->upstream()->encodeHeaders(headers, end_stream); - - calling_encode_headers_ = false; - if (!status.ok() || deferred_reset_) { - deferred_reset_ = false; - // It is possible that encodeHeaders() fails. This can happen if filters or other extensions - // erroneously remove required headers. - callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DownstreamProtocolError); - const std::string details = - absl::StrCat(StreamInfo::ResponseCodeDetails::get().FilterRemovedRequiredRequestHeaders, - "{", StringUtil::replaceAllEmptySpace(status.message()), "}"); - callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, status.message(), nullptr, - absl::nullopt, details); - return Http::FilterHeadersStatus::StopIteration; - } - upstreamTiming().onFirstUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); - - if (end_stream) { - upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); - } - if (callbacks_->upstreamCallbacks()->pausedForConnect()) { - return Http::FilterHeadersStatus::StopAllIterationAndWatermark; - } - return Http::FilterHeadersStatus::Continue; -} - -// This is the last stop in the filter chain: take the data and ship it to the codec. -Http::FilterDataStatus UpstreamCodecFilter::decodeData(Buffer::Instance& data, bool end_stream) { - ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect()); - ENVOY_STREAM_LOG(trace, "proxying {} bytes", *callbacks_, data.length()); - callbacks_->upstreamCallbacks()->upstreamStreamInfo().addBytesSent(data.length()); - // TODO(alyssawilk) test intermediate filters calling continue. - callbacks_->upstreamCallbacks()->upstream()->encodeData(data, end_stream); - if (end_stream) { - upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); - } - return Http::FilterDataStatus::Continue; -} - -// This is the last stop in the filter chain: take the trailers and ship them to the codec. -Http::FilterTrailersStatus UpstreamCodecFilter::decodeTrailers(Http::RequestTrailerMap& trailers) { - ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect()); - ENVOY_STREAM_LOG(trace, "proxying trailers", *callbacks_); - callbacks_->upstreamCallbacks()->upstream()->encodeTrailers(trailers); - upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource()); - return Http::FilterTrailersStatus::Continue; -} - -// This is the last stop in the filter chain: take the metadata and ship them to the codec. -Http::FilterMetadataStatus UpstreamCodecFilter::decodeMetadata(Http::MetadataMap& metadata_map) { - ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect()); - ENVOY_STREAM_LOG(trace, "proxying metadata", *callbacks_); - Http::MetadataMapVector metadata_map_vector; - metadata_map_vector.emplace_back(std::make_unique(metadata_map)); - callbacks_->upstreamCallbacks()->upstream()->encodeMetadata(metadata_map_vector); - return Http::FilterMetadataStatus::Continue; -} - -// Store the callbacks from the UpstreamFilterManager, for sending the response to. -void UpstreamCodecFilter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { - callbacks_ = &callbacks; - callbacks_->addDownstreamWatermarkCallbacks(*this); - callbacks_->upstreamCallbacks()->addUpstreamCallbacks(*this); - callbacks_->upstreamCallbacks()->setUpstreamToDownstream(bridge_); -} - -// This is the response 1xx headers arriving from the codec. Send them through the filter manager. -void UpstreamCodecFilter::CodecBridge::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) { - // The filter manager can not handle more than 1 1xx header, so only forward - // the first one. - if (!seen_1xx_headers_) { - seen_1xx_headers_ = true; - filter_.callbacks_->encode1xxHeaders(std::move(headers)); - } -} - -// This is the response headers arriving from the codec. Send them through the filter manager. -void UpstreamCodecFilter::CodecBridge::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, - bool end_stream) { - // TODO(rodaine): This is actually measuring after the headers are parsed and not the first - // byte. - filter_.upstreamTiming().onFirstUpstreamRxByteReceived( - filter_.callbacks_->dispatcher().timeSource()); - - if (filter_.callbacks_->upstreamCallbacks()->pausedForConnect() && - Http::Utility::getResponseStatus(*headers) == 200) { - filter_.callbacks_->upstreamCallbacks()->setPausedForConnect(false); - filter_.callbacks_->continueDecoding(); - } - - maybeEndDecode(end_stream); - filter_.callbacks_->encodeHeaders(std::move(headers), end_stream, - StreamInfo::ResponseCodeDetails::get().ViaUpstream); -} - -// This is response data arriving from the codec. Send it through the filter manager. -void UpstreamCodecFilter::CodecBridge::decodeData(Buffer::Instance& data, bool end_stream) { - maybeEndDecode(end_stream); - filter_.callbacks_->encodeData(data, end_stream); -} - -// This is response trailers arriving from the codec. Send them through the filter manager. -void UpstreamCodecFilter::CodecBridge::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) { - maybeEndDecode(true); - filter_.callbacks_->encodeTrailers(std::move(trailers)); -} - -// This is response metadata arriving from the codec. Send it through the filter manager. -void UpstreamCodecFilter::CodecBridge::decodeMetadata(Http::MetadataMapPtr&& metadata_map) { - filter_.callbacks_->encodeMetadata(std::move(metadata_map)); -} - -void UpstreamCodecFilter::CodecBridge::dumpState(std::ostream& os, int indent_level) const { - filter_.callbacks_->upstreamCallbacks()->dumpState(os, indent_level); -} - -void UpstreamCodecFilter::CodecBridge::maybeEndDecode(bool end_stream) { - if (end_stream) { - filter_.upstreamTiming().onLastUpstreamRxByteReceived( - filter_.callbacks_->dispatcher().timeSource()); - } -} - // The upstream filter manager class. class UpstreamFilterManager : public Http::FilterManager { public: @@ -1025,8 +868,5 @@ UpstreamRequestFilterManagerCallbacks::http1StreamEncoderOptions() { return upstream_request_.parent_.callbacks()->http1StreamEncoderOptions(); } -REGISTER_FACTORY(UpstreamCodecFilterFactory, - Server::Configuration::UpstreamHttpFilterConfigFactory); - } // namespace Router } // namespace Envoy diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index 6dd823ec3821f..dacdfe4b00004 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -376,115 +376,5 @@ class UpstreamRequestFilterManagerCallbacks : public Http::FilterManagerCallback UpstreamRequest& upstream_request_; }; -// This is the last filter in the upstream filter chain. -// It takes request headers/body/data from the filter manager and encodes them to the upstream -// codec. It also registers the CodecBridge with the upstream stream, and takes response -// headers/body/data from the upstream stream and sends them to the filter manager. -// -// TODO(alyssawilk) move this to its own file in a follow-up PR. -class UpstreamCodecFilter : public Http::StreamDecoderFilter, - public Logger::Loggable, - public Http::DownstreamWatermarkCallbacks, - public Http::UpstreamCallbacks { -public: - UpstreamCodecFilter() : bridge_(*this), calling_encode_headers_(false), deferred_reset_(false) {} - - // Http::DownstreamWatermarkCallbacks - void onBelowWriteBufferLowWatermark() override; - void onAboveWriteBufferHighWatermark() override; - - // UpstreamCallbacks - void onUpstreamConnectionEstablished() override; - - // Http::StreamFilterBase - void onDestroy() override { callbacks_->removeDownstreamWatermarkCallbacks(*this); } - - // Http::StreamDecoderFilter - Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers, - bool end_stream) override; - Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override; - Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override; - Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap& metadata_map) override; - void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override; - - // This bridge connects the upstream stream to the filter manager. - class CodecBridge : public UpstreamToDownstream { - public: - CodecBridge(UpstreamCodecFilter& filter) : filter_(filter) {} - void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override; - void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override; - void decodeData(Buffer::Instance& data, bool end_stream) override; - void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override; - void decodeMetadata(Http::MetadataMapPtr&&) override; - void dumpState(std::ostream& os, int indent_level) const override; - - void onResetStream(Http::StreamResetReason reason, - absl::string_view transport_failure_reason) override { - if (filter_.calling_encode_headers_) { - filter_.deferred_reset_ = true; - return; - } - if (reason == Http::StreamResetReason::LocalReset) { - ASSERT(transport_failure_reason.empty()); - // Use this to communicate to the upstream request to not force-terminate. - transport_failure_reason = "codec_error"; - } - filter_.callbacks_->resetStream(reason, transport_failure_reason); - } - void onAboveWriteBufferHighWatermark() override { - filter_.callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); - } - void onBelowWriteBufferLowWatermark() override { - filter_.callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); - } - // UpstreamToDownstream - const Route& route() const override { return *filter_.callbacks_->route(); } - OptRef connection() const override { - return filter_.callbacks_->connection(); - } - const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override { - return filter_.callbacks_->upstreamCallbacks()->upstreamStreamOptions(); - } - - private: - void maybeEndDecode(bool end_stream); - bool seen_1xx_headers_{}; - UpstreamCodecFilter& filter_; - }; - Http::StreamDecoderFilterCallbacks* callbacks_; - CodecBridge bridge_; - bool calling_encode_headers_ : 1; - bool deferred_reset_ : 1; - OptRef latched_headers_; - absl::optional latched_end_stream_; - -private: - StreamInfo::UpstreamTiming& upstreamTiming() { - return callbacks_->upstreamCallbacks()->upstreamStreamInfo().upstreamInfo()->upstreamTiming(); - } -}; - -class UpstreamCodecFilterFactory - : public Extensions::HttpFilters::Common::CommonFactoryBase< - envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec>, - public Server::Configuration::UpstreamHttpFilterConfigFactory { -public: - UpstreamCodecFilterFactory() : CommonFactoryBase("envoy.filters.http.upstream_codec") {} - - std::string category() const override { return "envoy.filters.http.upstream"; } - Http::FilterFactoryCb - createFilterFactoryFromProto(const Protobuf::Message&, const std::string&, - Server::Configuration::UpstreamHttpFactoryContext&) override { - return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { - callbacks.addStreamDecoderFilter(std::make_shared()); - }; - } - bool isTerminalFilterByProtoTyped( - const envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec&, - Server::Configuration::ServerFactoryContext&) override { - return true; - } -}; - } // namespace Router } // namespace Envoy diff --git a/test/common/grpc/grpc_client_integration_test_harness.h b/test/common/grpc/grpc_client_integration_test_harness.h index b8caa4f1f2aa3..49947606770fc 100644 --- a/test/common/grpc/grpc_client_integration_test_harness.h +++ b/test/common/grpc/grpc_client_integration_test_harness.h @@ -23,6 +23,7 @@ #include "source/common/network/connection_impl.h" #include "source/common/network/raw_buffer_socket.h" #include "source/common/router/context_impl.h" +#include "source/common/router/upstream_codec_filter.h" #include "source/common/stats/symbol_table.h" #include "source/extensions/transport_sockets/tls/context_config_impl.h" diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index ce96f0d5e79e7..63c0606cfef15 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -12,6 +12,7 @@ #include "source/common/http/headers.h" #include "source/common/http/utility.h" #include "source/common/router/context_impl.h" +#include "source/common/router/upstream_codec_filter.h" #include "test/common/http/common.h" #include "test/mocks/buffer/mocks.h" diff --git a/test/common/router/router_test_base.cc b/test/common/router/router_test_base.cc index be2e8e77287bf..94fb0d5ca1863 100644 --- a/test/common/router/router_test_base.cc +++ b/test/common/router/router_test_base.cc @@ -1,6 +1,7 @@ #include "test/common/router/router_test_base.h" #include "source/common/router/debug_config.h" +#include "source/common/router/upstream_codec_filter.h" namespace Envoy { namespace Router { diff --git a/test/common/router/router_upstream_log_test.cc b/test/common/router/router_upstream_log_test.cc index af6c93adda96f..905bb7274611b 100644 --- a/test/common/router/router_upstream_log_test.cc +++ b/test/common/router/router_upstream_log_test.cc @@ -8,6 +8,7 @@ #include "source/common/network/utility.h" #include "source/common/router/router.h" +#include "source/common/router/upstream_codec_filter.h" #include "source/common/upstream/upstream_impl.h" #include "test/common/http/common.h" diff --git a/test/common/router/upstream_request_test.cc b/test/common/router/upstream_request_test.cc index 557b494b76c26..02380b89ffe2b 100644 --- a/test/common/router/upstream_request_test.cc +++ b/test/common/router/upstream_request_test.cc @@ -1,5 +1,6 @@ #include "source/common/common/utility.h" #include "source/common/network/utility.h" +#include "source/common/router/upstream_codec_filter.h" #include "source/common/router/upstream_request.h" #include "test/common/http/common.h" diff --git a/test/extensions/upstreams/http/tcp/upstream_request_test.cc b/test/extensions/upstreams/http/tcp/upstream_request_test.cc index 6832feb4cdf9f..4ad701b13acdd 100644 --- a/test/extensions/upstreams/http/tcp/upstream_request_test.cc +++ b/test/extensions/upstreams/http/tcp/upstream_request_test.cc @@ -2,6 +2,7 @@ #include "source/common/network/address_impl.h" #include "source/common/router/config_impl.h" #include "source/common/router/router.h" +#include "source/common/router/upstream_codec_filter.h" #include "source/common/router/upstream_request.h" #include "source/extensions/common/proxy_protocol/proxy_protocol_header.h" #include "source/extensions/upstreams/http/tcp/upstream_request.h"