Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
}
Utility::sendLocalReply(
remote_closed_,
Utility::EncodeFunctions{nullptr, nullptr,
Utility::EncodeFunctions{nullptr, nullptr, nullptr,
[this, modify_headers, &details](ResponseHeaderMapPtr&& headers,
bool end_stream) -> void {
if (modify_headers != nullptr) {
Expand Down
144 changes: 135 additions & 9 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,12 @@ void FilterManager::sendLocalReplyViaFilterChain(
// a no-op.
createFilterChain();

// A bit awkward, but save the local reply data for later. This will come in handy.
// When this pointer is non-null, it means a local reply has been initiated, which
// is important later on.
local_reply_data_ = std::make_unique<Utility::LocalReplyData>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there's got to be a way to avoid this. If I'm wrong (and I probably am given you've probably tried =P) let's rewrite to just explain why we do this
// latch the local reply data, as it us used later in blahblah function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the best explanation is that we may need some of this context if we end up doing the rewrite in encodeData (because an upstream response exists and needs to be dropped / replaced by the rewritten body, so this cannot happen in encodeHeaders alone)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reviewing again, I agree this is pretty dirty but it currently serves an important purpose. Mostly, to have the local reply code path communicate to the canonical rewrite path that the local reply is grpc, is a head request, the grpc status, local reply body etc. I think this would seem cleaner, at least on the surface, if there were one member local_reply_context_ that held both local reply data (if any) and a pointer to a local reply object that should be used for the request. This could also give us an opportunity to read per-route config and override that local reply object to the per-route variant. The alternative would be to create the FM with the per-route local reply rewriter, but I'm not sure yet which makes the most sense from a lifecycle perspective.

Utility::LocalReplyData{is_grpc_request, code, body, grpc_status, is_head_request});

Utility::sendLocalReply(
state_.destroyed_,
Utility::EncodeFunctions{
Expand All @@ -875,13 +881,14 @@ void FilterManager::sendLocalReplyViaFilterChain(
modify_headers(headers);
}
},
[this](ResponseHeaderMap& response_headers, Code& code, std::string& body,
absl::string_view& content_type) -> void {
// TODO(snowp): This &get() business isn't nice, rework LocalReply and others to accept
// opt refs.
local_reply_.rewrite(filter_manager_callbacks_.requestHeaders().ptr(), response_headers,
stream_info_, code, body, content_type);
},
// Local reply rewrites (and upstream rewrites, for that matter) happen on the filter
// chain now, so we do not need to do any additional rewriting here.
/*rewrite_=*/nullptr,
// Local gRPC replies are encoded as trailers-only responses on the filter chain,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the above comment I should explicitly call out "but not upstream gRPC replies"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, would it work to have an ASSERT above this function where we check that if it's a gRPC reply that there's no body or trailers? Then we could move most of this comment out and say // As documented above, no special gRPC logic is necessary here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call

// so we don't need to any additional encoding of the gRPC response here. This
// is different from the direct local reply case, where we do encode the gRPC
// reply as a headers-only response using an encoder callback.
/*encode_grpc_=*/nullptr,
[this, modify_headers](ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
filter_manager_callbacks_.setResponseHeaders(std::move(headers));
// TODO: Start encoding from the last decoder filter that saw the
Expand Down Expand Up @@ -918,9 +925,18 @@ void FilterManager::sendDirectLocalReply(
},
[&](ResponseHeaderMap& response_headers, Code& code, std::string& body,
absl::string_view& content_type) -> void {
/* Direct local reply rewrites happen here since we are not going through the filter
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should explicitly clarify "must happen here since..."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to check my understanding, local reply rewrites that currently exist don't go through the filter chain? Because local replies as a whole do AFIK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local replies in the general case do go through the filter chain. However, in this direct reply case (e.g. on downstream timeouts where headers have already been sent), we are not going through the filter chain so we continue to call local_reply_.rewrite here.

* chain. */
local_reply_.rewrite(filter_manager_callbacks_.requestHeaders().ptr(), response_headers,
stream_info_, code, body, content_type);
},
[&](ResponseHeaderMap& headers, Code& code, std::string& body,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
bool is_head_request) -> void {
/* Direct local reply gRPC encoding happens here since we are not going through the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same re: "must"

* filter chain. */
Utility::toGrpcTrailersOnlyResponse(headers, code, body, grpc_status, is_head_request);
},
[&](ResponseHeaderMapPtr&& response_headers, bool end_stream) -> void {
// Move the response headers into the FilterManager to make sure they're visible to
// access logs.
Expand Down Expand Up @@ -1044,9 +1060,55 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea
}
}

const bool modified_end_stream = (end_stream && continue_data_entry == encoder_filters_.end());
bool modified_end_stream = (end_stream && continue_data_entry == encoder_filters_.end());

// Capture the original modified_end_stream value. We may modify it again below if we add
// a response body.
const bool original_modified_end_stream = modified_end_stream;

// See if we should do an upstream rewrite. For local replies, always try to do a rewrite.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"should do a response rewrite"

// For upstream replies, only try to do a rewrite if some filter rule actually applies.
// The rewrite config supports unconditional rewrites with no filter rules. This makes sense
// for local reply rewrites (e.g. one consistent format for all responses generated at Envoy)
// but makes a lot less sense for all responses upstream.
state_.do_response_rewrite_ =
local_reply_data_ != nullptr ||
local_reply_.matchesAnyMapper(filter_manager_callbacks_.requestHeaders().ptr(),
*filter_manager_callbacks_.responseHeaders(),
filter_manager_callbacks_.responseTrailers().ptr(),
stream_info_);
ENVOY_STREAM_LOG(debug,
"FilterManager::encodeHeaders: end_stream={}, modified_end_stream={}, "
"state_.do_response_rewrite_={}",
*this, end_stream, modified_end_stream, state_.do_response_rewrite_);

if (state_.do_response_rewrite_) {
// Try to rewrite the response. This may end up not doing any actual rewrite if this is
// a local reply and there is no matching rule. If it's not a local reply, we know there's
// a match at this point since that's a condition for state_.do_response_rewrite_ above.
rewriteResponse();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point in the code, we've already run through the filter chain.

Question: should we do response rewriting before or after the filter chain? If we do it before, then the filter chain has an opportunity to see the rewritten response and make its own changes. If we do it after, then we give response rewriting the final say and the highest precedence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, if we can tell ahead of time we're doing a rewrite, I think we want to let the filters see the reply that is going to the client. If they do their own stats tracking of say "how many 500s happened" I think they want to see what will be written.
@snowp @lizan can I get a non-googly take on this question?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One interesting case to consider is when a filter itself returns a status (or sets a header) that would match a rewrite rule. For example, if ext authz returned 403, the user may want the same 403 rewrite rule to apply as it would have if an upstream service had returned it. Same story might reply to a sentinel header like x-my-custom-failure-header that could get set by your upstream services or by a custom filter. Either way the user might want the same rewrite rules to match/apply.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a tricky one. I haven't yet thought about this a huge amount, but I think in a perfect world we would do something like the following:

  1. Unconditional rewriting happens before the encoder filter chain runs.
  2. Conditional rewriting (matching) actually is attempted after each encoder filter runs. This I think is the behavior we want where it allows each filter to change something and then possibly have a transform happen before the next filter runs.

I do wonder if somehow this could be built using the new matching infra that @snowp created but I haven't looked at it in detail yet.

Let me know if it would be easier to chat about this in a short meeting or in a doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that matched my thought, where (I think) this one qualifies as unconditional and would happen first.


if (buffered_response_data_) {
// If we have a rewritten response body, then modified_end_stream can no longer be true
// because we have a body now.
modified_end_stream = false;
}
}

state_.non_100_response_headers_encoded_ = true;
filter_manager_callbacks_.encodeHeaders(headers, modified_end_stream);

// Encode the rewritten response right away if the original `modified_end_stream` was true.
// If it wasn't, then we know a body will be encoded later, and we'll let that function
// take care of encoding the rewritten response.
if (state_.do_response_rewrite_ && original_modified_end_stream && buffered_response_data_) {
ENVOY_STREAM_LOG(trace,
"FilterManager::encodeData calling filter_manager_callbacks_ with {} bytes "
"from buffered_response_data and modified_end_stream={}",
*this, buffered_response_data_->length(), modified_end_stream);
filter_manager_callbacks_.encodeData(*buffered_response_data_, modified_end_stream);
}

maybeEndEncode(modified_end_stream);

if (!modified_end_stream) {
Expand Down Expand Up @@ -1180,7 +1242,19 @@ void FilterManager::encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instan
}

const bool modified_end_stream = end_stream && trailers_added_entry == encoder_filters_.end();
filter_manager_callbacks_.encodeData(data, modified_end_stream);
if (state_.do_response_rewrite_ && buffered_response_data_) {
// If we're doing a rewrite and modified_end_stream=true, encode the buffered_response_data_
// that was set by rewriteResponse() earlier in encodeHeaders().
if (modified_end_stream) {
ENVOY_STREAM_LOG(trace,
"FilterManager::encodeData calling filter_manager_callbacks_ with {} bytes "
"from buffered_response_data and modified_end_stream={}",
*this, buffered_response_data_->length(), modified_end_stream);
filter_manager_callbacks_.encodeData(*buffered_response_data_, modified_end_stream);
}
} else {
filter_manager_callbacks_.encodeData(data, modified_end_stream);
}
maybeEndEncode(modified_end_stream);

// If trailers were adding during encodeData we need to trigger decodeTrailers in order
Expand Down Expand Up @@ -1225,6 +1299,58 @@ void FilterManager::maybeEndEncode(bool end_stream) {
}
}

void FilterManager::rewriteResponse() {
auto response_headers = filter_manager_callbacks_.responseHeaders();
ASSERT(response_headers.ptr() != nullptr);

std::string rewritten_body{};
absl::string_view rewritten_content_type{};
Http::Code rewritten_code{static_cast<Http::Code>(Utility::getResponseStatus(*response_headers))};

// Start with the local reply body and text/plain content type, if we have it.
if (local_reply_data_) {
rewritten_body = local_reply_data_->body_text_;
rewritten_content_type = Headers::get().ContentTypeValues.Text;
}

// Get rid of any buffered response data we may have at this point. We're going to use this
// as the output parameter for a rewritten body, if any.
buffered_response_data_ = nullptr;

ENVOY_STREAM_LOG(trace, "rewriteResponse: calling local_reply_.rewrite with body=\"{}\", code={}",
*this, rewritten_body, rewritten_code);
const bool did_body_rewrite =
local_reply_.rewrite(filter_manager_callbacks_.requestHeaders().ptr(), *response_headers,
stream_info_, rewritten_code, rewritten_body, rewritten_content_type);
ENVOY_STREAM_LOG(
trace,
"rewriteResponse: local_reply_.rewrite returned did_body_rewrite={}, body=\"{}\", "
"content_type={}, code={}",
*this, did_body_rewrite, rewritten_body, rewritten_content_type, rewritten_code);

if (local_reply_data_ && local_reply_data_->is_grpc_) {
// Send a trailers-only grpc response
Utility::toGrpcTrailersOnlyResponse(*response_headers, rewritten_code, rewritten_body,
local_reply_data_->grpc_status_,
local_reply_data_->is_head_request_);
return;
}

if (did_body_rewrite) {
buffered_response_data_ = std::make_unique<Buffer::OwnedImpl>(rewritten_body);

// If we rewrote with a non-empty body, set the content length and type.
// Otherwise, remove them.
if (buffered_response_data_->length() > 0) {
response_headers->setContentLength(buffered_response_data_->length());
response_headers->setContentType(rewritten_content_type);
} else {
response_headers->removeContentLength();
response_headers->removeContentType();
}
}
}

bool FilterManager::processNewlyAddedMetadata() {
if (request_metadata_map_vector_ == nullptr) {
return false;
Expand Down
15 changes: 14 additions & 1 deletion source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "common/grpc/common.h"
#include "common/http/header_utility.h"
#include "common/http/headers.h"
#include "common/http/utility.h"
#include "common/local_reply/local_reply.h"
#include "common/matcher/matcher.h"
#include "common/protobuf/utility.h"
Expand Down Expand Up @@ -909,6 +910,11 @@ class FilterManager : public ScopeTrackedObject,
*/
void maybeEndEncode(bool end_stream);

/**
* Rewrite the response headers and body using local_reply_.
*/
void rewriteResponse();

void sendLocalReply(bool is_grpc_request, Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
Expand Down Expand Up @@ -1075,7 +1081,12 @@ class FilterManager : public ScopeTrackedObject,
std::make_shared<Network::Socket::Options>();

FilterChainFactory& filter_chain_factory_;
// Used to track local reply state
// TODO(esmet): We could combine the local reply reference and data into one pointer
// member that stays nullptr when no local reply config is present to save space in
// the 'off' case. But, for now, we spend two pointers of space in all cases.
const LocalReply::LocalReply& local_reply_;
Utility::LocalReplyDataPtr local_reply_data_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a TODO since I think it will be a requirement to not regress memory usage in the case where Envoy is not configured to use any local reply rewrites.

OverridableRemoteSocketAddressSetterStreamInfo stream_info_;
// TODO(snowp): Once FM has been moved to its own file we'll make these private classes of FM,
// at which point they no longer need to be friends.
Expand Down Expand Up @@ -1109,7 +1120,7 @@ class FilterManager : public ScopeTrackedObject,
State()
: remote_complete_(false), local_complete_(false), has_continue_headers_(false),
created_filter_chain_(false), is_head_request_(false), is_grpc_request_(false),
non_100_response_headers_encoded_(false) {}
non_100_response_headers_encoded_(false), do_response_rewrite_(false) {}

uint32_t filter_call_state_{0};

Expand All @@ -1127,6 +1138,8 @@ class FilterManager : public ScopeTrackedObject,
bool is_grpc_request_ : 1;
// Tracks if headers other than 100-Continue have been encoded to the codec.
bool non_100_response_headers_encoded_ : 1;
// True if we should rewrite the upstream response using local_reply_
bool do_response_rewrite_ : 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably move this state bit into local_reply_data_ (perhaps renamed as local_reply_context_/state_ or whatever) but this approach worked quickly and IIUC there should already be a bit available in this struct without causing an extra byte to the struct space.


// The following 3 members are booleans rather than part of the space-saving bitfield as they
// are passed as arguments to functions expecting bools. Extend State using the bitfield
Expand Down
48 changes: 29 additions & 19 deletions source/common/http/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ void Utility::sendLocalReply(const bool& is_reset, StreamDecoderFilterCallbacks&

sendLocalReply(
is_reset,
Utility::EncodeFunctions{nullptr, nullptr,
Utility::EncodeFunctions{nullptr, nullptr, nullptr,
[&](ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
callbacks.encodeHeaders(std::move(headers), end_stream, details);
},
Expand All @@ -522,6 +522,27 @@ void Utility::sendLocalReply(const bool& is_reset, StreamDecoderFilterCallbacks&
local_reply_data);
}

void Utility::toGrpcTrailersOnlyResponse(Http::ResponseHeaderMap& response_headers,
const Http::Code& code, std::string& response_body,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
bool is_head_request) {
response_headers.setStatus(std::to_string(enumToInt(Http::Code::OK)));
response_headers.setReferenceContentType(Headers::get().ContentTypeValues.Grpc);
response_headers.setGrpcStatus(std::to_string(enumToInt(
grpc_status ? grpc_status.value() : Grpc::Utility::httpToGrpcStatus(enumToInt(code)))));
if (!response_body.empty() && !is_head_request) {
// TODO(dio): Probably it is worth to consider caching the encoded message based on gRPC
// status.
// JsonFormatter adds a '\n' at the end. For header value, it should be removed.
// https://github.com/envoyproxy/envoy/blob/main/source/common/formatter/substitution_formatter.cc#L129
if (response_body[response_body.length() - 1] == '\n') {
response_body = response_body.substr(0, response_body.length() - 1);
}
response_headers.setGrpcMessage(Http::Utility::PercentEncoding::encode(response_body));
}
response_headers.removeContentLength();
}

void Utility::sendLocalReply(const bool& is_reset, const EncodeFunctions& encode_functions,
const LocalReplyData& local_reply_data) {
// encode_headers() may reset the stream, so the stream must not be reset before calling it.
Expand All @@ -542,26 +563,15 @@ void Utility::sendLocalReply(const bool& is_reset, const EncodeFunctions& encode
encode_functions.rewrite_(*response_headers, response_code, body_text, content_type);
}

// Respond with a gRPC trailers-only response if the request is gRPC
// Respond with a gRPC trailers-only response if the request is gRPC. The actual encoding of the
// trailers-only response happens in encode_grpc_, if set. Otherwise it is assumed that the
// response is already encoded and ready to be finalized by encodeHeaders with end_stream=true.
if (local_reply_data.is_grpc_) {
response_headers->setStatus(std::to_string(enumToInt(Code::OK)));
response_headers->setReferenceContentType(Headers::get().ContentTypeValues.Grpc);
response_headers->setGrpcStatus(
std::to_string(enumToInt(local_reply_data.grpc_status_
? local_reply_data.grpc_status_.value()
: Grpc::Utility::httpToGrpcStatus(enumToInt(response_code)))));
if (!body_text.empty() && !local_reply_data.is_head_request_) {
// TODO(dio): Probably it is worth to consider caching the encoded message based on gRPC
// status.
// JsonFormatter adds a '\n' at the end. For header value, it should be removed.
// https://github.com/envoyproxy/envoy/blob/main/source/common/formatter/substitution_formatter.cc#L129
if (body_text[body_text.length() - 1] == '\n') {
body_text = body_text.substr(0, body_text.length() - 1);
}
response_headers->setGrpcMessage(PercentEncoding::encode(body_text));
if (encode_functions.encode_grpc_) {
encode_functions.encode_grpc_(*response_headers, response_code, body_text,
local_reply_data.grpc_status_,
local_reply_data.is_head_request_);
}
// The `modify_headers` function may have added content-length, remove it.
response_headers->removeContentLength();
encode_functions.encode_headers_(std::move(response_headers), true); // Trailers only response
return;
}
Expand Down
18 changes: 18 additions & 0 deletions source/common/http/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ struct EncodeFunctions {
std::function<void(ResponseHeaderMap& response_headers, Code& code, std::string& body,
absl::string_view& content_type)>
rewrite_;
// Function to encode a gRPC response.
std::function<void(ResponseHeaderMap& headers, Code& code, std::string& body,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
bool is_head_request)>
encode_grpc_;
// Function to encode response headers.
std::function<void(ResponseHeaderMapPtr&& headers, bool end_stream)> encode_headers_;
// Function to encode the response body.
Expand All @@ -313,6 +318,7 @@ struct LocalReplyData {
// Tells if this is a response to a HEAD request.
bool is_head_request_ = false;
};
using LocalReplyDataPtr = std::unique_ptr<LocalReplyData>;

/**
* Create a locally generated response using filter callbacks.
Expand All @@ -337,6 +343,18 @@ void sendLocalReply(const bool& is_reset, StreamDecoderFilterCallbacks& callback
void sendLocalReply(const bool& is_reset, const EncodeFunctions& encode_functions,
const LocalReplyData& local_reply_data);

/**
* Convert a response into a gRPC trailers-only response.
* @param response_headers the response headers. will be modified.
* @param code the upstream response code. will be converted to grpc-status
* @param grpc_status the original grpc status
* @param is_head_request whether this is a HEAD request
*/
void toGrpcTrailersOnlyResponse(Http::ResponseHeaderMap& response_headers, const Http::Code& code,
std::string& response_body,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
bool is_head_request);

struct GetLastAddressFromXffInfo {
// Last valid address pulled from the XFF header.
Network::Address::InstanceConstSharedPtr address_;
Expand Down
Loading