Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ message ExternalProcessor {
// message in an external processor response. In such case, no local reply will be sent.
// Instead, the stream to the external processor will be closed. There will be no
// more external processing for this stream from now on.
// [#not-implemented-hide:]
bool disable_immediate_response = 15;
}

Expand Down
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ new_features:
- area: extension_discovery_service
change: |
added ECDS support for :ref:` downstream network filters<envoy_v3_api_field_config.listener.v3.Filter.config_discovery>`.
- area: ext_proc
change: |
added disable_immediate_response config API to ignore immediate response from the external processing server.
Comment thread
yanjunxiang-google marked this conversation as resolved.
Outdated

- area: http
change: |
added :ref:`Json-To-Metadata filter <envoy_v3_api_msg_extensions.filters.http.json_to_metadata.v3.JsonToMetadata>`.
Expand Down
33 changes: 20 additions & 13 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -653,19 +653,26 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
processing_status = encoding_state_.handleTrailersResponse(response->response_trailers());
break;
case ProcessingResponse::ResponseCase::kImmediateResponse:
// We won't be sending anything more to the stream after we
// receive this message.
ENVOY_LOG(debug, "Sending immediate response");
// TODO(tyxia) For immediate response case here and below, logging is needed because
// `onFinishProcessorCalls` is called after `closeStream` below.
// Investigate to see if we can switch the order of those two so that the logging here can be
// avoided.
logGrpcStreamInfo();
processing_complete_ = true;
closeStream();
onFinishProcessorCalls(Grpc::Status::Ok);
sendImmediateResponse(response->immediate_response());
processing_status = absl::OkStatus();
if (config_->disableImmediateResponse()) {
ENVOY_LOG(debug,
"Filter has diable immediate response configured. Treat it as spurious response");
Comment thread
yanjunxiang-google marked this conversation as resolved.
Outdated
processing_status =
absl::FailedPreconditionError("unhandled immediate response due to config disabled it");
} else {
// We won't be sending anything more to the stream after we
// receive this message.
ENVOY_LOG(debug, "Sending immediate response");
// TODO(tyxia) For immediate response case here and below, logging is needed because
// `onFinishProcessorCalls` is called after `closeStream` below.
// Investigate to see if we can switch the order of those two so that the logging here can be
// avoided.
logGrpcStreamInfo();
processing_complete_ = true;
closeStream();
onFinishProcessorCalls(Grpc::Status::Ok);
sendImmediateResponse(response->immediate_response());
processing_status = absl::OkStatus();
}
break;
default:
// Any other message is considered spurious
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class FilterConfig {
processing_mode_(config.processing_mode()), mutation_checker_(config.mutation_rules()),
filter_metadata_(config.filter_metadata()),
allow_mode_override_(config.allow_mode_override()),
disable_immediate_response_(config.disable_immediate_response()),
allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())),
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())) {}

Expand All @@ -150,6 +151,7 @@ class FilterConfig {
}

bool allowModeOverride() const { return allow_mode_override_; }
bool disableImmediateResponse() const { return disable_immediate_response_; }

const Filters::Common::MutationRules::Checker& mutationChecker() const {
return mutation_checker_;
Expand Down Expand Up @@ -192,6 +194,9 @@ class FilterConfig {
const Envoy::ProtobufWkt::Struct filter_metadata_;
// If set to true, allow the processing mode to be modified by the ext_proc response.
const bool allow_mode_override_;
// If set to true, disable the immediate response from the ext_proc server, which means
// closing the stream to the ext_proc server, and no more external processing.
const bool disable_immediate_response_;
// Empty allowed_header_ means allow all.
const std::vector<Matchers::StringMatcherPtr> allowed_headers_;
// Empty disallowed_header_ means disallow nothing, i.e, allow all.
Expand Down
34 changes: 34 additions & 0 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,40 @@ TEST_F(HttpFilterTest, PostAndRespondImmediately) {
expectFilterState(Envoy::ProtobufWkt::Struct());
}

TEST_F(HttpFilterTest, PostAndRespondImmediatelyWithDisabledConfig) {
initialize(R"EOF(
grpc_service:
envoy_grpc:
cluster_name: "ext_proc_server"
disable_immediate_response: true
)EOF");

EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
test_time_->advanceTimeWait(std::chrono::microseconds(10));
std::unique_ptr<ProcessingResponse> resp1 = std::make_unique<ProcessingResponse>();
auto* immediate_response = resp1->mutable_immediate_response();
immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest);
immediate_response->set_body("Bad request");
immediate_response->set_details("Got a bad request");
auto* immediate_headers = immediate_response->mutable_headers();
auto* hdr1 = immediate_headers->add_set_headers();
hdr1->mutable_header()->set_key("content-type");
hdr1->mutable_header()->set_value("text/plain");
stream_callbacks_->onReceiveMessage(std::move(resp1));

Buffer::OwnedImpl req_data("foo");
EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, true));
Comment thread
yanjunxiang-google marked this conversation as resolved.
Outdated
EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_));
EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true));
filter_->onDestroy();

EXPECT_EQ(1, config_->stats().streams_started_.value());
EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value());
EXPECT_EQ(1, config_->stats().spurious_msgs_received_.value());
EXPECT_EQ(0, config_->stats().stream_msgs_received_.value());
EXPECT_EQ(1, config_->stats().streams_closed_.value());
}

// Using the default configuration, test the filter with a processor that
// replies to the request_headers message with an "immediate response" message
// during response headers processing that should result in a response being
Expand Down