Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ envoy_cc_library(
":debug_config_lib",
":header_parser_lib",
":retry_state_lib",
":upstream_codec_filter_lib",
"//envoy/event:dispatcher_interface",
"//envoy/event:timer_interface",
"//envoy/grpc:status",
Expand Down Expand Up @@ -331,6 +332,38 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "upstream_codec_filter_lib",
srcs = [
"upstream_codec_filter.cc",
],
hdrs = [
"upstream_codec_filter.h",
],
deps = [
":config_lib",
"//envoy/event:dispatcher_interface",
"//envoy/http:codec_interface",
"//envoy/http:filter_interface",
"//envoy/runtime:runtime_interface",
"//envoy/stats:stats_interface",
"//envoy/stats:stats_macros",
"//envoy/upstream:upstream_interface",
"//source/common/common:assert_lib",
"//source/common/common:cleanup_lib",
"//source/common/common:empty_string",
"//source/common/common:utility_lib",
"//source/common/config:utility_lib",
"//source/common/http:codes_lib",
"//source/common/http:header_map_lib",
"//source/common/http:headers_lib",
"//source/common/http:message_lib",
"//source/common/http:utility_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/extensions/filters/http/upstream_codec/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "router_ratelimit_lib",
srcs = ["router_ratelimit.cc"],
Expand Down
191 changes: 191 additions & 0 deletions source/common/router/upstream_codec_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#include "source/common/router/upstream_codec_filter.h"

#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>

#include "envoy/event/dispatcher.h"
#include "envoy/event/timer.h"
#include "envoy/grpc/status.h"
#include "envoy/http/header_map.h"

#include "source/common/common/assert.h"
#include "source/common/common/dump_state_utils.h"
#include "source/common/common/empty_string.h"
#include "source/common/common/enum_to_int.h"
#include "source/common/common/scope_tracker.h"
#include "source/common/common/utility.h"
#include "source/common/grpc/common.h"
#include "source/common/http/codes.h"
#include "source/common/http/header_map_impl.h"
#include "source/common/http/headers.h"
#include "source/common/http/message_impl.h"
#include "source/common/http/utility.h"

namespace Envoy {
namespace Router {

void UpstreamCodecFilter::onBelowWriteBufferLowWatermark() {
callbacks_->clusterInfo()->stats().upstream_flow_control_resumed_reading_total_.inc();
callbacks_->upstreamCallbacks()->upstream()->readDisable(false);
}

void UpstreamCodecFilter::onAboveWriteBufferHighWatermark() {
callbacks_->clusterInfo()->stats().upstream_flow_control_paused_reading_total_.inc();
callbacks_->upstreamCallbacks()->upstream()->readDisable(true);
}

void UpstreamCodecFilter::onUpstreamConnectionEstablished() {
if (latched_end_stream_.has_value()) {
const bool end_stream = *latched_end_stream_;
latched_end_stream_.reset();
Http::FilterHeadersStatus status = decodeHeaders(*latched_headers_, end_stream);
if (status == Http::FilterHeadersStatus::Continue) {
callbacks_->continueDecoding();
}
}
}

// This is the last stop in the filter chain: take the headers and ship them to the codec.
Http::FilterHeadersStatus UpstreamCodecFilter::decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) {
ASSERT(callbacks_->upstreamCallbacks());
if (!callbacks_->upstreamCallbacks()->upstream()) {
latched_headers_ = headers;
latched_end_stream_ = end_stream;
return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
}

ENVOY_STREAM_LOG(trace, "proxying headers", *callbacks_);
calling_encode_headers_ = true;
const Http::Status status =
callbacks_->upstreamCallbacks()->upstream()->encodeHeaders(headers, end_stream);

calling_encode_headers_ = false;
if (!status.ok() || deferred_reset_) {
deferred_reset_ = false;
// It is possible that encodeHeaders() fails. This can happen if filters or other extensions
// erroneously remove required headers.
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DownstreamProtocolError);
const std::string details =
absl::StrCat(StreamInfo::ResponseCodeDetails::get().FilterRemovedRequiredRequestHeaders,
"{", StringUtil::replaceAllEmptySpace(status.message()), "}");
callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, status.message(), nullptr,
absl::nullopt, details);
return Http::FilterHeadersStatus::StopIteration;
}
upstreamTiming().onFirstUpstreamTxByteSent(callbacks_->dispatcher().timeSource());

if (end_stream) {
upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource());
}
if (callbacks_->upstreamCallbacks()->pausedForConnect()) {
return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
}
return Http::FilterHeadersStatus::Continue;
}

// This is the last stop in the filter chain: take the data and ship it to the codec.
Http::FilterDataStatus UpstreamCodecFilter::decodeData(Buffer::Instance& data, bool end_stream) {
ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect());
ENVOY_STREAM_LOG(trace, "proxying {} bytes", *callbacks_, data.length());
callbacks_->upstreamCallbacks()->upstreamStreamInfo().addBytesSent(data.length());
// TODO(alyssawilk) test intermediate filters calling continue.
callbacks_->upstreamCallbacks()->upstream()->encodeData(data, end_stream);
if (end_stream) {
upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource());
}
return Http::FilterDataStatus::Continue;
}

// This is the last stop in the filter chain: take the trailers and ship them to the codec.
Http::FilterTrailersStatus UpstreamCodecFilter::decodeTrailers(Http::RequestTrailerMap& trailers) {
ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect());
ENVOY_STREAM_LOG(trace, "proxying trailers", *callbacks_);
callbacks_->upstreamCallbacks()->upstream()->encodeTrailers(trailers);
upstreamTiming().onLastUpstreamTxByteSent(callbacks_->dispatcher().timeSource());
return Http::FilterTrailersStatus::Continue;
}

// This is the last stop in the filter chain: take the metadata and ship them to the codec.
Http::FilterMetadataStatus UpstreamCodecFilter::decodeMetadata(Http::MetadataMap& metadata_map) {
ASSERT(!callbacks_->upstreamCallbacks()->pausedForConnect());
ENVOY_STREAM_LOG(trace, "proxying metadata", *callbacks_);
Http::MetadataMapVector metadata_map_vector;
metadata_map_vector.emplace_back(std::make_unique<Http::MetadataMap>(metadata_map));
callbacks_->upstreamCallbacks()->upstream()->encodeMetadata(metadata_map_vector);
return Http::FilterMetadataStatus::Continue;
}

// Store the callbacks from the UpstreamFilterManager, for sending the response to.
void UpstreamCodecFilter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
callbacks_ = &callbacks;
callbacks_->addDownstreamWatermarkCallbacks(*this);
callbacks_->upstreamCallbacks()->addUpstreamCallbacks(*this);
callbacks_->upstreamCallbacks()->setUpstreamToDownstream(bridge_);
}

// This is the response 1xx headers arriving from the codec. Send them through the filter manager.
void UpstreamCodecFilter::CodecBridge::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) {
// The filter manager can not handle more than 1 1xx header, so only forward
// the first one.
if (!seen_1xx_headers_) {
seen_1xx_headers_ = true;
filter_.callbacks_->encode1xxHeaders(std::move(headers));
}
}

// This is the response headers arriving from the codec. Send them through the filter manager.
void UpstreamCodecFilter::CodecBridge::decodeHeaders(Http::ResponseHeaderMapPtr&& headers,
bool end_stream) {
// TODO(rodaine): This is actually measuring after the headers are parsed and not the first
// byte.
filter_.upstreamTiming().onFirstUpstreamRxByteReceived(
filter_.callbacks_->dispatcher().timeSource());

if (filter_.callbacks_->upstreamCallbacks()->pausedForConnect() &&
Http::Utility::getResponseStatus(*headers) == 200) {
filter_.callbacks_->upstreamCallbacks()->setPausedForConnect(false);
filter_.callbacks_->continueDecoding();
}

maybeEndDecode(end_stream);
filter_.callbacks_->encodeHeaders(std::move(headers), end_stream,
StreamInfo::ResponseCodeDetails::get().ViaUpstream);
}

// This is response data arriving from the codec. Send it through the filter manager.
void UpstreamCodecFilter::CodecBridge::decodeData(Buffer::Instance& data, bool end_stream) {
maybeEndDecode(end_stream);
filter_.callbacks_->encodeData(data, end_stream);
}

// This is response trailers arriving from the codec. Send them through the filter manager.
void UpstreamCodecFilter::CodecBridge::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) {
maybeEndDecode(true);
filter_.callbacks_->encodeTrailers(std::move(trailers));
}

// This is response metadata arriving from the codec. Send it through the filter manager.
void UpstreamCodecFilter::CodecBridge::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {
filter_.callbacks_->encodeMetadata(std::move(metadata_map));
}

void UpstreamCodecFilter::CodecBridge::dumpState(std::ostream& os, int indent_level) const {
filter_.callbacks_->upstreamCallbacks()->dumpState(os, indent_level);
}

void UpstreamCodecFilter::CodecBridge::maybeEndDecode(bool end_stream) {
if (end_stream) {
filter_.upstreamTiming().onLastUpstreamRxByteReceived(
filter_.callbacks_->dispatcher().timeSource());
}
}

REGISTER_FACTORY(UpstreamCodecFilterFactory,
Server::Configuration::UpstreamHttpFilterConfigFactory);

} // namespace Router
} // namespace Envoy
130 changes: 130 additions & 0 deletions source/common/router/upstream_codec_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#pragma once

#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>

#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.validate.h"
#include "envoy/http/conn_pool.h"
#include "envoy/http/filter.h"

#include "source/common/common/logger.h"
#include "source/common/config/well_known_names.h"
#include "source/extensions/filters/http/common/factory_base.h"

namespace Envoy {
namespace Router {

// This is the last filter in the upstream filter chain.
// It takes request headers/body/data from the filter manager and encodes them to the upstream
// codec. It also registers the CodecBridge with the upstream stream, and takes response
// headers/body/data from the upstream stream and sends them to the filter manager.
class UpstreamCodecFilter : public Http::StreamDecoderFilter,
public Logger::Loggable<Logger::Id::router>,
public Http::DownstreamWatermarkCallbacks,
public Http::UpstreamCallbacks {
public:
UpstreamCodecFilter() : bridge_(*this), calling_encode_headers_(false), deferred_reset_(false) {}

// Http::DownstreamWatermarkCallbacks
void onBelowWriteBufferLowWatermark() override;
void onAboveWriteBufferHighWatermark() override;

// UpstreamCallbacks
void onUpstreamConnectionEstablished() override;

// Http::StreamFilterBase
void onDestroy() override { callbacks_->removeDownstreamWatermarkCallbacks(*this); }

// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;
Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override;
Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override;
Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap& metadata_map) override;
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override;

// This bridge connects the upstream stream to the filter manager.
class CodecBridge : public UpstreamToDownstream {
public:
CodecBridge(UpstreamCodecFilter& filter) : filter_(filter) {}
void decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) override;
void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
void decodeData(Buffer::Instance& data, bool end_stream) override;
void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override;
void decodeMetadata(Http::MetadataMapPtr&&) override;
void dumpState(std::ostream& os, int indent_level) const override;

void onResetStream(Http::StreamResetReason reason,
absl::string_view transport_failure_reason) override {
if (filter_.calling_encode_headers_) {
filter_.deferred_reset_ = true;
return;
}
if (reason == Http::StreamResetReason::LocalReset) {
ASSERT(transport_failure_reason.empty());
// Use this to communicate to the upstream request to not force-terminate.
transport_failure_reason = "codec_error";
}
filter_.callbacks_->resetStream(reason, transport_failure_reason);
}
void onAboveWriteBufferHighWatermark() override {
filter_.callbacks_->onDecoderFilterAboveWriteBufferHighWatermark();
}
void onBelowWriteBufferLowWatermark() override {
filter_.callbacks_->onDecoderFilterBelowWriteBufferLowWatermark();
}
// UpstreamToDownstream
const Route& route() const override { return *filter_.callbacks_->route(); }
OptRef<const Network::Connection> connection() const override {
return filter_.callbacks_->connection();
}
const Http::ConnectionPool::Instance::StreamOptions& upstreamStreamOptions() const override {
return filter_.callbacks_->upstreamCallbacks()->upstreamStreamOptions();
}

private:
void maybeEndDecode(bool end_stream);
bool seen_1xx_headers_{};
UpstreamCodecFilter& filter_;
};
Http::StreamDecoderFilterCallbacks* callbacks_;
CodecBridge bridge_;
bool calling_encode_headers_ : 1;
bool deferred_reset_ : 1;
OptRef<Http::RequestHeaderMap> latched_headers_;
absl::optional<bool> latched_end_stream_;

private:
StreamInfo::UpstreamTiming& upstreamTiming() {
return callbacks_->upstreamCallbacks()->upstreamStreamInfo().upstreamInfo()->upstreamTiming();
}
};

class UpstreamCodecFilterFactory
: public Extensions::HttpFilters::Common::CommonFactoryBase<
envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec>,
public Server::Configuration::UpstreamHttpFilterConfigFactory {
public:
UpstreamCodecFilterFactory() : CommonFactoryBase("envoy.filters.http.upstream_codec") {}

std::string category() const override { return "envoy.filters.http.upstream"; }
Http::FilterFactoryCb
createFilterFactoryFromProto(const Protobuf::Message&, const std::string&,
Server::Configuration::UpstreamHttpFactoryContext&) override {
return [](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamDecoderFilter(std::make_shared<UpstreamCodecFilter>());
};
}
bool isTerminalFilterByProtoTyped(
const envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec&,
Server::Configuration::ServerFactoryContext&) override {
return true;
}
};

} // namespace Router
} // namespace Envoy
Loading