diff --git a/api/envoy/service/ext_proc/v3alpha/external_processor.proto b/api/envoy/service/ext_proc/v3alpha/external_processor.proto index e57ea2da9e409..09572331aa42a 100644 --- a/api/envoy/service/ext_proc/v3alpha/external_processor.proto +++ b/api/envoy/service/ext_proc/v3alpha/external_processor.proto @@ -231,15 +231,18 @@ message CommonResponse { // stream as normal. This is the default. CONTINUE = 0; - // [#not-implemented-hide:] - // Replace the request or response with the contents - // of this message. If header_mutation is set, apply it to the - // headers. If body_mutation is set and contains a body, then add that - // body to the request or response, even if one does not already exist -- - // otherwise, clear the body. Any additional body and trailers - // received from downstream or upstream will be ignored. - // This can be used to add a body to a request or response that does not - // have one already. + // Apply the specified header mutation, replace the body with the body + // specified in the body mutation (if present), and do not send any + // further messages for this request or response even if the processing + // mode is configured to do so. + // + // When used in response to a request_headers or response_headers message, + // this status makes it possible to either completely replace the body + // while discarding the original body, or to add a body to a message that + // formerly did not have one. + // + // In other words, this response makes it possible to turn an HTTP GET + // into a POST, PUT, or PATCH. CONTINUE_AND_REPLACE = 1; } diff --git a/generated_api_shadow/envoy/service/ext_proc/v3alpha/external_processor.proto b/generated_api_shadow/envoy/service/ext_proc/v3alpha/external_processor.proto index e57ea2da9e409..09572331aa42a 100644 --- a/generated_api_shadow/envoy/service/ext_proc/v3alpha/external_processor.proto +++ b/generated_api_shadow/envoy/service/ext_proc/v3alpha/external_processor.proto @@ -231,15 +231,18 @@ message CommonResponse { // stream as normal. This is the default. CONTINUE = 0; - // [#not-implemented-hide:] - // Replace the request or response with the contents - // of this message. If header_mutation is set, apply it to the - // headers. If body_mutation is set and contains a body, then add that - // body to the request or response, even if one does not already exist -- - // otherwise, clear the body. Any additional body and trailers - // received from downstream or upstream will be ignored. - // This can be used to add a body to a request or response that does not - // have one already. + // Apply the specified header mutation, replace the body with the body + // specified in the body mutation (if present), and do not send any + // further messages for this request or response even if the processing + // mode is configured to do so. + // + // When used in response to a request_headers or response_headers message, + // this status makes it possible to either completely replace the body + // while discarding the original body, or to add a body to a message that + // formerly did not have one. + // + // In other words, this response makes it possible to turn an HTTP GET + // into a POST, PUT, or PATCH. CONTINUE_AND_REPLACE = 1; } diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index 1468c4d9e0f84..1a0dbe8a05a56 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -26,6 +26,7 @@ envoy_cc_library( "//include/envoy/http:filter_interface", "//include/envoy/http:header_map_interface", "//include/envoy/stats:stats_macros", + "//source/common/buffer:buffer_lib", "//source/extensions/filters/http/common:pass_through_filter_lib", "@com_google_absl//absl/strings:str_format", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto", diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 941ecf4c1b1c1..3467dfd01fd2c 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -60,8 +60,8 @@ void Filter::onDestroy() { } } -FilterHeadersStatus Filter::onHeaders(ProcessorState& state, Http::HeaderMap& headers, - bool end_stream) { +FilterHeadersStatus Filter::onHeaders(ProcessorState& state, + Http::RequestOrResponseHeaderMap& headers, bool end_stream) { switch (openStream()) { case StreamOpenState::Error: return FilterHeadersStatus::StopIteration; @@ -75,7 +75,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, Http::HeaderMap& he state.setHeaders(&headers); ProcessingRequest req; auto* headers_req = state.mutableHeaders(req); - MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers()); + MutationUtils::headersToProto(headers, *headers_req->mutable_headers()); headers_req->set_end_of_stream(end_stream); state.setCallbackState(ProcessorState::CallbackState::HeadersCallback); state.startMessageTimer(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout()); @@ -101,12 +101,22 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st return status; } -Http::FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, - bool end_stream) { +FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, bool end_stream) { if (end_stream) { state.setCompleteBodyAvailable(true); } + if (state.bodyReplaced()) { + ENVOY_LOG(trace, "Clearing body chunk because CONTINUE_AND_REPLACE was returned"); + data.drain(data.length()); + return FilterDataStatus::Continue; + } + + if (processing_complete_) { + ENVOY_LOG(trace, "Continuing (processing complete)"); + return FilterDataStatus::Continue; + } + bool just_added_trailers = false; Http::HeaderMap* new_trailers = nullptr; if (end_stream && state.sendTrailers()) { @@ -192,17 +202,17 @@ Http::FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& d FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) { ENVOY_LOG(trace, "decodeData({}): end_stream = {}", data.length(), end_stream); - if (processing_complete_) { - ENVOY_LOG(trace, "decodeData: Continue (complete)"); - return FilterDataStatus::Continue; - } - const auto status = onData(decoding_state_, data, end_stream); ENVOY_LOG(trace, "decodeData returning {}", status); return status; } FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap& trailers) { + if (processing_complete_) { + ENVOY_LOG(trace, "trailers: Continue"); + return FilterTrailersStatus::Continue; + } + bool body_delivered = state.completeBodyAvailable(); state.setCompleteBodyAvailable(true); state.setTrailersAvailable(true); @@ -243,11 +253,6 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap& FilterTrailersStatus Filter::decodeTrailers(RequestTrailerMap& trailers) { ENVOY_LOG(trace, "decodeTrailers"); - if (processing_complete_) { - ENVOY_LOG(trace, "decodeTrailers: Continue"); - return FilterTrailersStatus::Continue; - } - const auto status = onTrailers(decoding_state_, trailers); ENVOY_LOG(trace, "encodeTrailers returning {}", status); return status; @@ -271,11 +276,6 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s FilterDataStatus Filter::encodeData(Buffer::Instance& data, bool end_stream) { ENVOY_LOG(trace, "encodeData({}): end_stream = {}", data.length(), end_stream); - if (processing_complete_) { - ENVOY_LOG(trace, "encodeData: Continue (complete)"); - return FilterDataStatus::Continue; - } - const auto status = onData(encoding_state_, data, end_stream); ENVOY_LOG(trace, "encodeData returning {}", status); return status; @@ -283,11 +283,6 @@ FilterDataStatus Filter::encodeData(Buffer::Instance& data, bool end_stream) { FilterTrailersStatus Filter::encodeTrailers(ResponseTrailerMap& trailers) { ENVOY_LOG(trace, "encodeTrailers"); - if (processing_complete_) { - ENVOY_LOG(trace, "encodeTrailers: Continue"); - return FilterTrailersStatus::Continue; - } - const auto status = onTrailers(encoding_state_, trailers); ENVOY_LOG(trace, "encodeTrailers returning {}", status); return status; @@ -308,7 +303,7 @@ void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data, void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) { ProcessingRequest req; auto* trailers_req = state.mutableTrailers(req); - MutationUtils::buildHttpHeaders(trailers, *trailers_req->mutable_trailers()); + MutationUtils::headersToProto(trailers, *trailers_req->mutable_trailers()); state.setCallbackState(ProcessorState::CallbackState::TrailersCallback); state.startMessageTimer(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout()); ENVOY_LOG(debug, "Sending trailers message"); @@ -459,7 +454,7 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) { : absl::nullopt; const auto mutate_headers = [&response](Http::ResponseHeaderMap& headers) { if (response.has_headers()) { - MutationUtils::applyHeaderMutations(response.headers(), headers); + MutationUtils::applyHeaderMutations(response.headers(), headers, false); } }; diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 37dc63d646ae5..0ccb231621131 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -133,8 +133,8 @@ class Filter : public Logger::Loggable, void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response); void sendBodyChunk(ProcessorState& state, const Buffer::Instance& data, bool end_stream); - Http::FilterHeadersStatus onHeaders(ProcessorState& state, Http::HeaderMap& headers, - bool end_stream); + Http::FilterHeadersStatus onHeaders(ProcessorState& state, + Http::RequestOrResponseHeaderMap& headers, bool end_stream); Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream); Http::FilterTrailersStatus onTrailers(ProcessorState& state, Http::HeaderMap& trailers); diff --git a/source/extensions/filters/http/ext_proc/mutation_utils.cc b/source/extensions/filters/http/ext_proc/mutation_utils.cc index 111e7a537d105..e0a5602030a20 100644 --- a/source/extensions/filters/http/ext_proc/mutation_utils.cc +++ b/source/extensions/filters/http/ext_proc/mutation_utils.cc @@ -16,13 +16,14 @@ using Http::LowerCaseString; using envoy::service::ext_proc::v3alpha::BodyMutation; using envoy::service::ext_proc::v3alpha::BodyResponse; +using envoy::service::ext_proc::v3alpha::CommonResponse; using envoy::service::ext_proc::v3alpha::HeaderMutation; using envoy::service::ext_proc::v3alpha::HeadersResponse; -void MutationUtils::buildHttpHeaders(const Http::HeaderMap& headers_in, - envoy::config::core::v3::HeaderMap& headers_out) { - headers_in.iterate([&headers_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate { - auto* new_header = headers_out.add_headers(); +void MutationUtils::headersToProto(const Http::HeaderMap& headers_in, + envoy::config::core::v3::HeaderMap& proto_out) { + headers_in.iterate([&proto_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate { + auto* new_header = proto_out.add_headers(); new_header->set_key(std::string(e.key().getStringView())); new_header->set_value(std::string(e.value().getStringView())); return Http::HeaderMap::Iterate::Continue; @@ -34,12 +35,14 @@ void MutationUtils::applyCommonHeaderResponse(const HeadersResponse& response, if (response.has_response()) { const auto& common_response = response.response(); if (common_response.has_header_mutation()) { - applyHeaderMutations(common_response.header_mutation(), headers); + applyHeaderMutations(common_response.header_mutation(), headers, + common_response.status() == CommonResponse::CONTINUE_AND_REPLACE); } } } -void MutationUtils::applyHeaderMutations(const HeaderMutation& mutation, Http::HeaderMap& headers) { +void MutationUtils::applyHeaderMutations(const HeaderMutation& mutation, Http::HeaderMap& headers, + bool replacing_message) { for (const auto& remove_header : mutation.remove_headers()) { if (Http::HeaderUtility::isRemovableHeader(remove_header)) { ENVOY_LOG(trace, "Removing header {}", remove_header); @@ -53,7 +56,7 @@ void MutationUtils::applyHeaderMutations(const HeaderMutation& mutation, Http::H if (!sh.has_header()) { continue; } - if (isSettableHeader(sh.header().key())) { + if (isSettableHeader(sh.header().key(), replacing_message)) { // Make "false" the default. This is logical and matches the ext_authz // filter. However, the router handles this same protobuf and uses "true" // as the default instead. @@ -70,14 +73,20 @@ void MutationUtils::applyHeaderMutations(const HeaderMutation& mutation, Http::H } } -void MutationUtils::applyCommonBodyResponse(const BodyResponse& response, Http::HeaderMap* headers, +void MutationUtils::applyCommonBodyResponse(const BodyResponse& response, + Http::RequestOrResponseHeaderMap* headers, Buffer::Instance& buffer) { if (response.has_response()) { const auto& common_response = response.response(); if (headers != nullptr && common_response.has_header_mutation()) { - applyHeaderMutations(common_response.header_mutation(), *headers); + applyHeaderMutations(common_response.header_mutation(), *headers, + common_response.status() == CommonResponse::CONTINUE_AND_REPLACE); } if (common_response.has_body_mutation()) { + if (headers != nullptr) { + // Always clear content length if we can before modifying body + headers->removeContentLength(); + } applyBodyMutations(common_response.body_mutation(), buffer); } } @@ -87,10 +96,13 @@ void MutationUtils::applyBodyMutations(const BodyMutation& mutation, Buffer::Ins switch (mutation.mutation_case()) { case BodyMutation::MutationCase::kClearBody: if (mutation.clear_body()) { + ENVOY_LOG(trace, "Clearing HTTP body"); buffer.drain(buffer.length()); } break; case BodyMutation::MutationCase::kBody: + ENVOY_LOG(trace, "Replacing body of {} bytes with new body of {} bytes", buffer.length(), + mutation.body().size()); buffer.drain(buffer.length()); buffer.add(mutation.body()); break; @@ -103,11 +115,11 @@ void MutationUtils::applyBodyMutations(const BodyMutation& mutation, Buffer::Ins // Ignore attempts to set certain sensitive headers that can break later processing. // We may re-enable some of these after further testing. This logic is specific // to the ext_proc filter so it is not shared with HeaderUtils. -bool MutationUtils::isSettableHeader(absl::string_view key) { +bool MutationUtils::isSettableHeader(absl::string_view key, bool replacing_message) { const auto& headers = Headers::get(); return !absl::EqualsIgnoreCase(key, headers.HostLegacy.get()) && !absl::EqualsIgnoreCase(key, headers.Host.get()) && - !absl::EqualsIgnoreCase(key, headers.Method.get()) && + (!absl::EqualsIgnoreCase(key, headers.Method.get()) || replacing_message) && !absl::EqualsIgnoreCase(key, headers.Scheme.get()) && !absl::StartsWithIgnoreCase(key, headers.prefix()); } diff --git a/source/extensions/filters/http/ext_proc/mutation_utils.h b/source/extensions/filters/http/ext_proc/mutation_utils.h index 7a71c6ad3388d..cb700019c5a78 100644 --- a/source/extensions/filters/http/ext_proc/mutation_utils.h +++ b/source/extensions/filters/http/ext_proc/mutation_utils.h @@ -14,8 +14,8 @@ namespace ExternalProcessing { class MutationUtils : public Logger::Loggable { public: // Convert a header map until a protobuf - static void buildHttpHeaders(const Http::HeaderMap& headers_in, - envoy::config::core::v3::HeaderMap& headers_out); + static void headersToProto(const Http::HeaderMap& headers_in, + envoy::config::core::v3::HeaderMap& proto_out); // Apply mutations that are common to header responses. static void @@ -25,19 +25,20 @@ class MutationUtils : public Logger::Loggable { // Modify header map based on a set of mutations from a protobuf static void applyHeaderMutations(const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation, - Http::HeaderMap& headers); + Http::HeaderMap& headers, bool replacing_message); // Apply mutations that are common to body responses. // Mutations will be applied to the header map if it is not null. static void applyCommonBodyResponse(const envoy::service::ext_proc::v3alpha::BodyResponse& body, - Http::HeaderMap* headers, Buffer::Instance& buffer); + Http::RequestOrResponseHeaderMap* headers, + Buffer::Instance& buffer); // Modify a buffer based on a set of mutations from a protobuf static void applyBodyMutations(const envoy::service::ext_proc::v3alpha::BodyMutation& mutation, Buffer::Instance& buffer); private: - static bool isSettableHeader(absl::string_view key); + static bool isSettableHeader(absl::string_view key, bool replacing_message); }; } // namespace ExternalProcessing diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index 19b0dc8c4ec16..f02faf3e267a4 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -1,5 +1,8 @@ #include "extensions/filters/http/ext_proc/processor_state.h" +#include "common/buffer/buffer_impl.h" +#include "common/protobuf/utility.h" + #include "extensions/filters/http/ext_proc/ext_proc.h" #include "extensions/filters/http/ext_proc/mutation_utils.h" @@ -11,6 +14,7 @@ namespace ExternalProcessing { using envoy::extensions::filters::http::ext_proc::v3alpha::ProcessingMode; using envoy::service::ext_proc::v3alpha::BodyResponse; +using envoy::service::ext_proc::v3alpha::CommonResponse; using envoy::service::ext_proc::v3alpha::HeadersResponse; using envoy::service::ext_proc::v3alpha::TrailersResponse; @@ -24,6 +28,7 @@ void ProcessorState::startMessageTimer(Event::TimerCb cb, std::chrono::milliseco bool ProcessorState::handleHeadersResponse(const HeadersResponse& response) { if (callback_state_ == CallbackState::HeadersCallback) { ENVOY_LOG(debug, "applying headers response"); + const auto& common_response = response.response(); MutationUtils::applyCommonHeaderResponse(response, *headers_); if (response.response().clear_route_cache()) { filter_callbacks_->clearRouteCache(); @@ -32,26 +37,52 @@ bool ProcessorState::handleHeadersResponse(const HeadersResponse& response) { clearWatermark(); message_timer_->disableTimer(); - if (body_mode_ == ProcessingMode::BUFFERED) { - if (complete_body_available_) { - // If we get here, then all the body data came in before the header message - // was complete, and the server wants the body. So, don't continue filter - // processing, but send the buffered request body now. - ENVOY_LOG(debug, "Sending buffered request body message"); - filter_.sendBufferedData(*this, true); + if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) { + ENVOY_LOG(debug, "Replacing complete message"); + // Completely replace the body that may already exist. + if (common_response.has_body_mutation()) { + // Always remove the content-length header if changing the body. + // The proxy can restore it later if it needs to. + headers_->removeContentLength(); + body_replaced_ = true; + if (bufferedData() == nullptr) { + Buffer::OwnedImpl new_body; + MutationUtils::applyBodyMutations(common_response.body_mutation(), new_body); + addBufferedData(new_body); + } else { + modifyBufferedData([&common_response](Buffer::Instance& buf) { + MutationUtils::applyBodyMutations(common_response.body_mutation(), buf); + }); + } } - // Otherwise, we're not ready to continue processing because then - // we won't be able to modify the headers any more, so do nothing and - // let the doData callback handle body chunks until the end is reached. - return true; - } + // Once this message is received, we won't send anything more on this request + // or response to the processor. Clear flags to make sure. + body_mode_ = ProcessingMode::NONE; + send_trailers_ = false; + + } else { + if (body_mode_ == ProcessingMode::BUFFERED) { + if (complete_body_available_) { + // If we get here, then all the body data came in before the header message + // was complete, and the server wants the body. So, don't continue filter + // processing, but send the buffered request body now. + ENVOY_LOG(debug, "Sending buffered request body message"); + filter_.sendBufferedData(*this, true); + } + + // Otherwise, we're not ready to continue processing because then + // we won't be able to modify the headers any more, so do nothing and + // let the doData callback handle body chunks until the end is reached. + return true; + } - if (send_trailers_ && trailers_available_) { - // Trailers came in while we were waiting for this response, and the server - // is not interested in the body, so send them now. - filter_.sendTrailers(*this, *trailers_); - return true; + if (send_trailers_ && trailers_available_) { + // Trailers came in while we were waiting for this response, and the server + // is not interested in the body, so send them now. + filter_.sendTrailers(*this, *trailers_); + return true; + } } // If we got here, then the processor doesn't care about the body or is not ready for @@ -93,7 +124,7 @@ bool ProcessorState::handleTrailersResponse(const TrailersResponse& response) { if (callback_state_ == CallbackState::TrailersCallback) { ENVOY_LOG(debug, "Applying response to buffered trailers"); if (response.has_header_mutation()) { - MutationUtils::applyHeaderMutations(response.header_mutation(), *trailers_); + MutationUtils::applyHeaderMutations(response.header_mutation(), *trailers_, false); } trailers_ = nullptr; callback_state_ = CallbackState::Idle; diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index 61473e03c0bcd..fc3914c66c1d8 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -32,7 +32,7 @@ class ProcessorState : public Logger::Loggable { explicit ProcessorState(Filter& filter) : filter_(filter), watermark_requested_(false), complete_body_available_(false), - trailers_available_(false) {} + trailers_available_(false), body_replaced_(false) {} ProcessorState(const ProcessorState&) = delete; virtual ~ProcessorState() = default; ProcessorState& operator=(const ProcessorState&) = delete; @@ -43,6 +43,7 @@ class ProcessorState : public Logger::Loggable { bool completeBodyAvailable() const { return complete_body_available_; } void setCompleteBodyAvailable(bool d) { complete_body_available_ = d; } void setTrailersAvailable(bool d) { trailers_available_ = d; } + bool bodyReplaced() const { return body_replaced_; } virtual void setProcessingMode( const envoy::extensions::filters::http::ext_proc::v3alpha::ProcessingMode& mode) PURE; @@ -53,7 +54,7 @@ class ProcessorState : public Logger::Loggable { return body_mode_; } - void setHeaders(Http::HeaderMap* headers) { headers_ = headers; } + void setHeaders(Http::RequestOrResponseHeaderMap* headers) { headers_ = headers; } void setTrailers(Http::HeaderMap* trailers) { trailers_ = trailers; } void startMessageTimer(Event::TimerCb cb, std::chrono::milliseconds timeout); @@ -95,6 +96,8 @@ class ProcessorState : public Logger::Loggable { bool complete_body_available_ : 1; // If true, then the filter received the trailers bool trailers_available_ : 1; + // If true, then a CONTINUE_AND_REPLACE status was used on a response + bool body_replaced_ : 1; // If true, the server wants to see the headers bool send_headers_ : 1; @@ -104,7 +107,7 @@ class ProcessorState : public Logger::Loggable { // The specific mode for body handling envoy::extensions::filters::http::ext_proc::v3alpha::ProcessingMode_BodySendMode body_mode_; - Http::HeaderMap* headers_ = nullptr; + Http::RequestOrResponseHeaderMap* headers_ = nullptr; Http::HeaderMap* trailers_ = nullptr; Event::TimerPtr message_timer_; }; diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index ae3eeeeb6e90e..898226637fad4 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -37,6 +37,7 @@ envoy_extension_cc_test( "//test/mocks/event:event_mocks", "//test/mocks/server:factory_context_mocks", "//test/test_common:test_runtime_lib", + "@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto", ], ) diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 41c432e1dd70d..0e3786158db12 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -17,6 +17,7 @@ namespace Envoy { using envoy::extensions::filters::http::ext_proc::v3alpha::ProcessingMode; using envoy::service::ext_proc::v3alpha::BodyResponse; +using envoy::service::ext_proc::v3alpha::CommonResponse; using envoy::service::ext_proc::v3alpha::HeadersResponse; using envoy::service::ext_proc::v3alpha::HttpBody; using envoy::service::ext_proc::v3alpha::HttpHeaders; @@ -568,7 +569,6 @@ TEST_P(ExtProcIntegrationTest, GetAndSetBodyAndHeadersOnResponse) { }); verifyDownstreamResponse(*response, 200); - EXPECT_THAT(response->headers(), SingleHeaderValueIs("content-length", "13")); EXPECT_THAT(response->headers(), SingleHeaderValueIs("x-testing-response-header", "Yes")); EXPECT_EQ("Hello, World!", response->body()); } @@ -834,6 +834,91 @@ TEST_P(ExtProcIntegrationTest, GetAndRespondImmediatelyOnResponseBody) { EXPECT_EQ("{\"reason\": \"Not authorized\"}", response->body()); } +// Test the ability of the filter to turn a GET into a POST by adding a body +// and changing the method. +TEST_P(ExtProcIntegrationTest, ConvertGetToPost) { + initializeConfig(); + HttpIntegrationTest::initialize(); + + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage(true, [](const HttpHeaders&, HeadersResponse& headers_resp) { + auto* header_mut = headers_resp.mutable_response()->mutable_header_mutation(); + auto* method = header_mut->add_set_headers(); + method->mutable_header()->set_key(":method"); + method->mutable_header()->set_value("POST"); + auto* content_type = header_mut->add_set_headers(); + content_type->mutable_header()->set_key("content-type"); + content_type->mutable_header()->set_value("text/plain"); + headers_resp.mutable_response()->mutable_body_mutation()->set_body("Hello, Server!"); + // This special status tells us to replace the whole request + headers_resp.mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE); + return true; + }); + + handleUpstreamRequest(); + + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs(":method", "POST")); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("content-type", "text/plain")); + EXPECT_EQ(upstream_request_->bodyLength(), 14); + EXPECT_EQ(upstream_request_->body().toString(), "Hello, Server!"); + + processResponseHeadersMessage(false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +// Test the ability of the filter to completely replace a request message with a new +// request message. +TEST_P(ExtProcIntegrationTest, ReplaceCompleteRequest) { + initializeConfig(); + HttpIntegrationTest::initialize(); + + auto response = sendDownstreamRequestWithBody("Replace this!", absl::nullopt); + + processRequestHeadersMessage(true, [](const HttpHeaders&, HeadersResponse& headers_resp) { + headers_resp.mutable_response()->mutable_body_mutation()->set_body("Hello, Server!"); + // This special status tells us to replace the whole request + headers_resp.mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE); + return true; + }); + + handleUpstreamRequest(); + + // Ensure that we replaced and did not append to the request. + EXPECT_EQ(upstream_request_->body().toString(), "Hello, Server!"); + + processResponseHeadersMessage(false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +// Test the ability of the filter to completely replace a request message with a new +// request message. +TEST_P(ExtProcIntegrationTest, ReplaceCompleteRequestBuffered) { + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED); + initializeConfig(); + HttpIntegrationTest::initialize(); + + auto response = sendDownstreamRequestWithBody("Replace this!", absl::nullopt); + + processRequestHeadersMessage(true, [](const HttpHeaders&, HeadersResponse& headers_resp) { + headers_resp.mutable_response()->mutable_body_mutation()->set_body("Hello, Server!"); + // This special status tells us to replace the whole request + headers_resp.mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE); + return true; + }); + + // Even though we set the body mode to BUFFERED, we should receive no callback because + // we returned CONTINUE_AND_REPLACE. + + handleUpstreamRequest(); + + // Ensure that we replaced and did not append to the request. + EXPECT_EQ(upstream_request_->body().toString(), "Hello, Server!"); + + processResponseHeadersMessage(false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + // Send a request, but wait longer than the "message timeout" before sending a response // from the external processor. This should trigger the timeout and result // in a 500 error. diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 5fb454b97f1ce..804d1cb4ba4cb 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -1,3 +1,5 @@ +#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h" + #include "extensions/filters/http/ext_proc/ext_proc.h" #include "test/common/http/common.h" @@ -24,6 +26,7 @@ namespace { using envoy::extensions::filters::http::ext_proc::v3alpha::ProcessingMode; using envoy::service::ext_proc::v3alpha::BodyResponse; +using envoy::service::ext_proc::v3alpha::CommonResponse; using envoy::service::ext_proc::v3alpha::HeadersResponse; using envoy::service::ext_proc::v3alpha::HttpBody; using envoy::service::ext_proc::v3alpha::HttpHeaders; @@ -1319,6 +1322,107 @@ TEST_F(HttpFilterTest, ClearRouteCache) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// Using the default configuration, turn a GET into a POST. +TEST_F(HttpFilterTest, ReplaceRequest) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true)); + + Buffer::OwnedImpl req_buffer; + setUpDecodingBuffering(req_buffer); + processRequestHeaders( + false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& hdrs_resp) { + hdrs_resp.mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE); + auto* hdr = hdrs_resp.mutable_response()->mutable_header_mutation()->add_set_headers(); + hdr->mutable_header()->set_key(":method"); + hdr->mutable_header()->set_value("POST"); + hdrs_resp.mutable_response()->mutable_body_mutation()->set_body("Hello, World!"); + }); + + Http::TestRequestHeaderMapImpl expected_request{ + {":scheme", "http"}, {":authority", "host"}, {":path", "/"}, {":method", "POST"}}; + EXPECT_THAT(&request_headers_, HeaderMapEqualIgnoreOrder(&expected_request)); + EXPECT_EQ(req_buffer.toString(), "Hello, World!"); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "200"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(false, absl::nullopt); + + Buffer::OwnedImpl resp_data_1; + TestUtility::feedBufferWithRandomCharacters(resp_data_1, 100); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data_1, true)); + + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + +// Using a configuration with response mode set up for buffering, replace the complete response. +// This should result in none of the actual response coming back and no callbacks being +// fired. +TEST_F(HttpFilterTest, ReplaceCompleteResponseBuffered) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + response_body_mode: "BUFFERED" + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true)); + processRequestHeaders(false, absl::nullopt); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "200"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + Buffer::OwnedImpl resp_data_1; + TestUtility::feedBufferWithRandomCharacters(resp_data_1, 100); + Buffer::OwnedImpl resp_data_2; + TestUtility::feedBufferWithRandomCharacters(resp_data_2, 100); + Buffer::OwnedImpl buffered_resp_data; + setUpEncodingBuffering(buffered_resp_data); + + processResponseHeaders( + false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& hdrs_resp) { + hdrs_resp.mutable_response()->set_status(CommonResponse::CONTINUE_AND_REPLACE); + auto* hdr = hdrs_resp.mutable_response()->mutable_header_mutation()->add_set_headers(); + hdr->mutable_header()->set_key("x-test-header"); + hdr->mutable_header()->set_value("true"); + hdrs_resp.mutable_response()->mutable_body_mutation()->set_body("Hello, World!"); + }); + + // Ensure buffered data was updated + EXPECT_EQ(buffered_resp_data.toString(), "Hello, World!"); + + // Since we did CONTINUE_AND_REPLACE, later data is cleared + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data_1, false)); + EXPECT_EQ(resp_data_1.length(), 0); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data_2, true)); + EXPECT_EQ(resp_data_2.length(), 0); + + // No additional messages should come in since we replaced, although they + // are configured. + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + // Using the default configuration, test the filter with a processor that // replies to the request_headers message incorrectly by sending a // request_body message, which should result in the stream being closed diff --git a/test/extensions/filters/http/ext_proc/mutation_utils_test.cc b/test/extensions/filters/http/ext_proc/mutation_utils_test.cc index 6eb0da487c4b3..4096fcc68a267 100644 --- a/test/extensions/filters/http/ext_proc/mutation_utils_test.cc +++ b/test/extensions/filters/http/ext_proc/mutation_utils_test.cc @@ -28,7 +28,7 @@ TEST(MutationUtils, TestBuildHeaders) { headers.addCopy(LowerCaseString("x-number"), 9999); envoy::config::core::v3::HeaderMap proto_headers; - MutationUtils::buildHttpHeaders(headers, proto_headers); + MutationUtils::headersToProto(headers, proto_headers); Http::TestRequestHeaderMapImpl expected{{":method", "GET"}, {":path", "/foo/the/bar?size=123"}, @@ -101,7 +101,7 @@ TEST(MutationUtils, TestApplyMutations) { s->mutable_header()->set_key("X-Envoy-StrangeThing"); s->mutable_header()->set_value("Yes"); - MutationUtils::applyHeaderMutations(mutation, headers); + MutationUtils::applyHeaderMutations(mutation, headers, false); Http::TestRequestHeaderMapImpl expected_headers{ {":scheme", "https"},