diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index fe92e3548f07f..c9500486a3a52 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -205,7 +205,6 @@ message ExternalProcessor { // message in an external processor response. In such case, no local reply will be sent. // Instead, the stream to the external processor will be closed. There will be no // more external processing for this stream from now on. - // [#not-implemented-hide:] bool disable_immediate_response = 15; } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 40f1f2aabbde2..e5d5875014fb6 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -46,6 +46,14 @@ new_features: - area: extension_discovery_service change: | added ECDS support for :ref:` downstream network filters`. +- area: ext_proc + change: | + added + :ref:`disable_immediate_response ` + config API to ignore the + :ref:`immediate_response ` + message from the external processing server. + - area: http change: | added :ref:`Json-To-Metadata filter `. diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 245d88d6160fe..d1105dd6b51bd 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -653,19 +653,26 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { processing_status = encoding_state_.handleTrailersResponse(response->response_trailers()); break; case ProcessingResponse::ResponseCase::kImmediateResponse: - // We won't be sending anything more to the stream after we - // receive this message. - ENVOY_LOG(debug, "Sending immediate response"); - // TODO(tyxia) For immediate response case here and below, logging is needed because - // `onFinishProcessorCalls` is called after `closeStream` below. - // Investigate to see if we can switch the order of those two so that the logging here can be - // avoided. - logGrpcStreamInfo(); - processing_complete_ = true; - closeStream(); - onFinishProcessorCalls(Grpc::Status::Ok); - sendImmediateResponse(response->immediate_response()); - processing_status = absl::OkStatus(); + if (config_->disableImmediateResponse()) { + ENVOY_LOG(debug, "Filter has disable_immediate_response configured. " + "Treat the immediate response message as spurious response."); + processing_status = + absl::FailedPreconditionError("unhandled immediate response due to config disabled it"); + } else { + // We won't be sending anything more to the stream after we + // receive this message. + ENVOY_LOG(debug, "Sending immediate response"); + // TODO(tyxia) For immediate response case here and below, logging is needed because + // `onFinishProcessorCalls` is called after `closeStream` below. + // Investigate to see if we can switch the order of those two so that the logging here can be + // avoided. + logGrpcStreamInfo(); + processing_complete_ = true; + closeStream(); + onFinishProcessorCalls(Grpc::Status::Ok); + sendImmediateResponse(response->immediate_response()); + processing_status = absl::OkStatus(); + } break; default: // Any other message is considered spurious diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 49e04c69e94b9..ac82a9a90c120 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -134,6 +134,7 @@ class FilterConfig { processing_mode_(config.processing_mode()), mutation_checker_(config.mutation_rules()), filter_metadata_(config.filter_metadata()), allow_mode_override_(config.allow_mode_override()), + disable_immediate_response_(config.disable_immediate_response()), allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())), disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())) {} @@ -150,6 +151,7 @@ class FilterConfig { } bool allowModeOverride() const { return allow_mode_override_; } + bool disableImmediateResponse() const { return disable_immediate_response_; } const Filters::Common::MutationRules::Checker& mutationChecker() const { return mutation_checker_; @@ -192,6 +194,9 @@ class FilterConfig { const Envoy::ProtobufWkt::Struct filter_metadata_; // If set to true, allow the processing mode to be modified by the ext_proc response. const bool allow_mode_override_; + // If set to true, disable the immediate response from the ext_proc server, which means + // closing the stream to the ext_proc server, and no more external processing. + const bool disable_immediate_response_; // Empty allowed_header_ means allow all. const std::vector allowed_headers_; // Empty disallowed_header_ means disallow nothing, i.e, allow all. diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 6ce24c000789e..dc6cb56dfcba6 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -701,6 +701,40 @@ TEST_F(HttpFilterTest, PostAndRespondImmediately) { expectFilterState(Envoy::ProtobufWkt::Struct()); } +TEST_F(HttpFilterTest, PostAndRespondImmediatelyWithDisabledConfig) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + disable_immediate_response: true + )EOF"); + + EXPECT_EQ(filter_->decodeHeaders(request_headers_, false), FilterHeadersStatus::StopIteration); + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + std::unique_ptr resp1 = std::make_unique(); + auto* immediate_response = resp1->mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate_response->set_body("Bad request"); + immediate_response->set_details("Got a bad request"); + auto* immediate_headers = immediate_response->mutable_headers(); + auto* hdr1 = immediate_headers->add_set_headers(); + hdr1->mutable_header()->set_key("content-type"); + hdr1->mutable_header()->set_value("text/plain"); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + Buffer::OwnedImpl req_data("foo"); + EXPECT_EQ(filter_->decodeData(req_data, true), FilterDataStatus::Continue); + EXPECT_EQ(filter_->decodeTrailers(request_trailers_), FilterTrailersStatus::Continue); + EXPECT_EQ(filter_->encodeHeaders(response_headers_, true), FilterHeadersStatus::Continue); + filter_->onDestroy(); + + EXPECT_EQ(config_->stats().streams_started_.value(), 1); + EXPECT_EQ(config_->stats().stream_msgs_sent_.value(), 1); + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 1); + EXPECT_EQ(config_->stats().stream_msgs_received_.value(), 0); + EXPECT_EQ(config_->stats().streams_closed_.value(), 1); +} + // Using the default configuration, test the filter with a processor that // replies to the request_headers message with an "immediate response" message // during response headers processing that should result in a response being