From ecbba2fb1134df8403e0944e38a1dcb48f38ecff Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 24 Feb 2021 10:45:10 -0500 Subject: [PATCH 1/7] draft onLocalError Signed-off-by: Alyssa Wilk --- include/envoy/http/filter.h | 33 +++++++++++++ source/common/http/filter_manager.cc | 20 +++++++- source/common/http/filter_manager.h | 12 +++++ test/integration/BUILD | 1 + test/integration/filters/BUILD | 14 ++++++ .../filters/on_local_reply_filter.cc | 49 +++++++++++++++++++ test/integration/http2_integration_test.cc | 25 ++++++++++ 7 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 test/integration/filters/on_local_reply_filter.cc diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 605177c2c8dfa..f8e8cac40d4f3 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -163,6 +163,18 @@ enum class FilterMetadataStatus { Continue, }; +/** + * Return codes for onLocalReply filter invocations. + */ +enum class LocalErrorStatus { + // Continue sending the local reply after onLocalError has been sent to all filters. + Continue, + + // Continue sending onLocalReply to all filters, but reset the stream once all filters have been + // informed rather than sending the local reply. + ContinueAndResetStream, +}; + /** * The stream filter callbacks are passed to all filters to use for writing response data and * interacting with the underlying stream in general. @@ -596,6 +608,27 @@ class StreamFilterBase { * @param action the resulting match action */ virtual void onMatchCallback(const Matcher::Action&) {} + + struct LocalReplyData { + Http::Code code_; + absl::string_view details_; + }; + + /** + * Called after sendLocalReply is called, and before any local reply is + * serialized either to filters, or downstream. + * + * Note that in rare circumstances, onLocalReply may be called more than once + * for a given stream, because it is possible that a filter call + * sendLocalReply while processing the original local reply response. + * + * Filters implementing onLocalReply are responsible for never calling sendLocalReply + * from onLocalReply, as that has the potential for looping. + * + * @param data data associated with the sendLocalReply call. + * @param LocalErrorStatus the action to take after onLocalError completes. + */ + virtual LocalErrorStatus onLocalReply(LocalReplyData&) { return LocalErrorStatus::Continue; } }; /** diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 6caf33f7536fb..63cb80b98d8bf 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -812,6 +812,18 @@ FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter, return std::next(filter->entry()); } +LocalErrorStatus FilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) { + filter_manager_callbacks_.onLocalReply(data.code_); + + LocalErrorStatus status = LocalErrorStatus::Continue; + for (auto entry : filters_) { + if (entry->onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) { + status = LocalErrorStatus::ContinueAndResetStream; + } + } + return status; +} + void FilterManager::sendLocalReply( bool old_was_grpc_request, Code code, absl::string_view body, const std::function& modify_headers, @@ -824,7 +836,13 @@ void FilterManager::sendLocalReply( stream_info_.setResponseCodeDetails(details); - filter_manager_callbacks_.onLocalReply(code); + StreamFilterBase::LocalReplyData data{code, details}; + if (FilterManager::onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) { + ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. onLocalReply requested reset.", *this, + details); + filter_manager_callbacks_.resetStream(); + return; + } if (!filter_manager_callbacks_.responseHeaders().has_value()) { // If the response has not started at all, send the response through the filter chain. diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 8d8a3f097b4a7..0e9ba65e13f69 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -760,6 +760,7 @@ class FilterManager : public ScopeTrackedObject, // Http::FilterChainFactoryCallbacks void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter) override { addStreamDecoderFilterWorker(filter, nullptr, false); + filters_.push_back(filter.get()); } void addStreamDecoderFilter(StreamDecoderFilterSharedPtr filter, Matcher::MatchTreeSharedPtr match_tree) override { @@ -776,6 +777,7 @@ class FilterManager : public ScopeTrackedObject, } void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter) override { addStreamEncoderFilterWorker(filter, nullptr, false); + filters_.push_back(filter.get()); } void addStreamEncoderFilter(StreamEncoderFilterSharedPtr filter, Matcher::MatchTreeSharedPtr match_tree) override { @@ -793,6 +795,8 @@ class FilterManager : public ScopeTrackedObject, void addStreamFilter(StreamFilterSharedPtr filter) override { addStreamDecoderFilterWorker(filter, nullptr, true); addStreamEncoderFilterWorker(filter, nullptr, true); + StreamDecoderFilter* decoder_filter = filter.get(); + filters_.push_back(decoder_filter); } void addStreamFilter(StreamFilterSharedPtr filter, Matcher::MatchTreeSharedPtr match_tree) override { @@ -910,6 +914,13 @@ class FilterManager : public ScopeTrackedObject, */ void maybeEndEncode(bool end_stream); + /** + * Called before local reply is made by the filter manager. + * @param data the data associated with the local reply. + * @param LocalErrorStatus the status from the filter chain. + */ + LocalErrorStatus onLocalReply(StreamFilterBase::LocalReplyData& data); + void sendLocalReply(bool is_grpc_request, Code code, absl::string_view body, const std::function& modify_headers, const absl::optional grpc_status, @@ -1061,6 +1072,7 @@ class FilterManager : public ScopeTrackedObject, std::list decoder_filters_; std::list encoder_filters_; + std::list filters_; std::list access_log_handlers_; // Stores metadata added in the decoding filter that is being processed. Will be cleared before diff --git a/test/integration/BUILD b/test/integration/BUILD index 73ba25026202d..857a0d2c43042 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -357,6 +357,7 @@ envoy_cc_test( "//source/extensions/filters/http/buffer:config", "//source/extensions/filters/http/health_check:config", "//test/integration/filters:metadata_stop_all_filter_config_lib", + "//test/integration/filters:on_local_reply_filter_config_lib", "//test/integration/filters:request_metadata_filter_config_lib", "//test/integration/filters:response_metadata_filter_config_lib", "//test/integration/filters:set_response_code_filter_config_proto_cc_proto", diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index 1d78984323f3f..a1c89ea4996b2 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -157,6 +157,20 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "on_local_reply_filter_config_lib", + srcs = [ + "on_local_reply_filter.cc", + ], + deps = [ + "//include/envoy/http:filter_interface", + "//include/envoy/registry", + "//include/envoy/server:filter_config_interface", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "//test/extensions/filters/http/common:empty_http_filter_config_lib", + ], +) + envoy_cc_test_library( name = "passthrough_filter_config_lib", srcs = [ diff --git a/test/integration/filters/on_local_reply_filter.cc b/test/integration/filters/on_local_reply_filter.cc new file mode 100644 index 0000000000000..c457311714775 --- /dev/null +++ b/test/integration/filters/on_local_reply_filter.cc @@ -0,0 +1,49 @@ +#include + +#include "envoy/http/filter.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "extensions/filters/http/common/pass_through_filter.h" + +#include "test/extensions/filters/http/common/empty_http_filter_config.h" + +namespace Envoy { + +class OnLocalReplyFilter : public Http::PassThroughFilter { +public: + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override { + if (request_headers.get(Http::LowerCaseString("reset")).size() != 0) { + reset_ = true; + } + decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, "body", nullptr, absl::nullopt, + "details"); + return Http::FilterHeadersStatus::StopIteration; + } + + Http::LocalErrorStatus onLocalReply(LocalReplyData&) override { + if (reset_) { + return Http::LocalErrorStatus::ContinueAndResetStream; + } + return Http::LocalErrorStatus::Continue; + } + + bool reset_{}; +}; + +class OnLocalReplyFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig { +public: + OnLocalReplyFilterConfig() : EmptyHttpFilterConfig("on-local-reply-filter") {} + Http::FilterFactoryCb createFilter(const std::string&, + Server::Configuration::FactoryContext&) override { + return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamFilter(std::make_shared<::Envoy::OnLocalReplyFilter>()); + }; + } +}; + +// perform static registration +static Registry::RegisterFactory + register_; +} // namespace Envoy diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index 95ed212e7e2c3..dfa14dacca1f9 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -1589,6 +1589,7 @@ TEST_P(Http2MetadataIntegrationTest, UpstreamMetadataAfterEndStream) { upstream_request_->encodeHeaders(response_headers, true); // Upstream sends metadata. + const Http::MetadataMap response_metadata_map = {{"resp_key1", "resp_value1"}}; Http::MetadataMapPtr metadata_map_ptr = std::make_unique(response_metadata_map); @@ -1603,4 +1604,28 @@ TEST_P(Http2MetadataIntegrationTest, UpstreamMetadataAfterEndStream) { EXPECT_EQ("200", response->headers().getStatusValue()); } +static std::string on_local_reply_filter = R"EOF( +name: on-local-reply-filter +typed_config: + "@type": type.googleapis.com/google.protobuf.Empty +)EOF"; + +TEST_P(Http2IntegrationTest, OnLocalReply) { + config_helper_.addFilter(on_local_reply_filter); + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + { + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + response->waitForEndStream(); + ASSERT_TRUE(response->complete()); + } + { + default_request_headers_.addCopy("reset", "yes"); + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + response->waitForReset(); + ASSERT_FALSE(response->complete()); + } +} + } // namespace Envoy From 0b11bfea1ec997202a53d0e4920af5562f46da84 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Mon, 1 Mar 2021 16:26:40 -0500 Subject: [PATCH 2/7] unit tests Signed-off-by: Alyssa Wilk --- .../http/conn_manager_impl_test_base.cc | 5 +- test/common/http/filter_manager_test.cc | 105 +++++++++++++++++- .../filters/on_local_reply_filter.cc | 2 +- test/integration/http2_integration_test.cc | 1 - test/mocks/http/mocks.h | 3 + 5 files changed, 111 insertions(+), 5 deletions(-) diff --git a/test/common/http/conn_manager_impl_test_base.cc b/test/common/http/conn_manager_impl_test_base.cc index 3407d3c6faed5..5c5a24f072280 100644 --- a/test/common/http/conn_manager_impl_test_base.cc +++ b/test/common/http/conn_manager_impl_test_base.cc @@ -1,5 +1,6 @@ #include "test/common/http/conn_manager_impl_test_base.h" +using testing::AnyNumber; using testing::AtLeast; using testing::InSequence; using testing::InvokeWithoutArgs; @@ -90,11 +91,11 @@ void HttpConnectionManagerImplTest::setupFilterChain(int num_decoder_filters, // NOTE: The length/repetition in this routine allows InSequence to work correctly in an outer // scope. for (int i = 0; i < num_decoder_filters * num_requests; i++) { - decoder_filters_.push_back(new MockStreamDecoderFilter()); + decoder_filters_.push_back(new NiceMock()); } for (int i = 0; i < num_encoder_filters * num_requests; i++) { - encoder_filters_.push_back(new MockStreamEncoderFilter()); + encoder_filters_.push_back(new NiceMock()); } InSequence s; diff --git a/test/common/http/filter_manager_test.cc b/test/common/http/filter_manager_test.cc index 2b18706e67333..8080369e2e6dd 100644 --- a/test/common/http/filter_manager_test.cc +++ b/test/common/http/filter_manager_test.cc @@ -17,6 +17,7 @@ #include "gtest/gtest.h" +using testing::InSequence; using testing::Return; namespace Envoy { @@ -36,7 +37,7 @@ class FilterManagerTest : public testing::Test { Event::MockDispatcher dispatcher_; NiceMock connection_; Envoy::Http::MockFilterChainFactory filter_factory_; - LocalReply::MockLocalReply local_reply_; + NiceMock local_reply_; Protocol protocol_{Protocol::Http2}; NiceMock time_source_; StreamInfo::FilterStateSharedPtr filter_state_ = @@ -345,6 +346,108 @@ TEST_F(FilterManagerTest, MatchTreeFilterActionDualFilter) { filter_manager_->decodeHeaders(*grpc_headers, true); filter_manager_->destroyFilters(); } + +TEST_F(FilterManagerTest, OnLocalReply) { + initialize(); + + std::shared_ptr decoder_filter(new NiceMock()); + std::shared_ptr encoder_filter(new NiceMock()); + std::shared_ptr stream_filter(new NiceMock()); + + RequestHeaderMapPtr headers{ + new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; + + ON_CALL(filter_manager_callbacks_, requestHeaders()).WillByDefault(Return(makeOptRef(*headers))); + + EXPECT_CALL(filter_factory_, createFilterChain(_)) + .WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamDecoderFilter(decoder_filter); + callbacks.addStreamFilter(stream_filter); + callbacks.addStreamEncoderFilter(encoder_filter); + })); + + filter_manager_->createFilterChain(); + filter_manager_->requestHeadersInitialized(); + filter_manager_->decodeHeaders(*headers, true); + + // Make sure all 3 filters get onLocalReply, and that the reset is preserved + // even if not the last return. + EXPECT_CALL(*decoder_filter, onLocalReply(_)); + EXPECT_CALL(*stream_filter, onLocalReply(_)) + .WillOnce(Return(LocalErrorStatus::ContinueAndResetStream)); + EXPECT_CALL(*encoder_filter, onLocalReply(_)); + EXPECT_CALL(filter_manager_callbacks_, resetStream()); + decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body", nullptr, + absl::nullopt, "details"); + + // The reason for the response (in this case the reset) will still be tracked + // but as no response is sent the response code will remain absent. + ASSERT_TRUE(filter_manager_->streamInfo().responseCodeDetails().has_value()); + EXPECT_EQ(filter_manager_->streamInfo().responseCodeDetails().value(), "details"); + EXPECT_FALSE(filter_manager_->streamInfo().responseCode().has_value()); + + filter_manager_->destroyFilters(); +} + +TEST_F(FilterManagerTest, MultipleOnLocalReply) { + initialize(); + + std::shared_ptr decoder_filter(new NiceMock()); + std::shared_ptr encoder_filter(new NiceMock()); + std::shared_ptr stream_filter(new NiceMock()); + + RequestHeaderMapPtr headers{ + new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; + + ON_CALL(filter_manager_callbacks_, requestHeaders()).WillByDefault(Return(makeOptRef(*headers))); + + EXPECT_CALL(filter_factory_, createFilterChain(_)) + .WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamDecoderFilter(decoder_filter); + callbacks.addStreamFilter(stream_filter); + callbacks.addStreamEncoderFilter(encoder_filter); + })); + + filter_manager_->createFilterChain(); + filter_manager_->requestHeadersInitialized(); + filter_manager_->decodeHeaders(*headers, true); + + { + // Set up expectations to be triggered by sendLocalReply at the bottom of + // thi block. + InSequence s; + + // Make sure all 3 filters get onLocalReply + EXPECT_CALL(*decoder_filter, onLocalReply(_)); + EXPECT_CALL(*stream_filter, onLocalReply(_)); + EXPECT_CALL(*encoder_filter, onLocalReply(_)); + + // Now response encoding begins. Assume a filter co-opts the original reply + // with a new local reply. + EXPECT_CALL(*encoder_filter, encodeHeaders(_, _)) + .WillOnce(Invoke([&](ResponseHeaderMap&, bool) -> FilterHeadersStatus { + decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body2", nullptr, + absl::nullopt, "details2"); + return FilterHeadersStatus::StopIteration; + })); + + // All 3 filters should get the second onLocalReply. + EXPECT_CALL(*decoder_filter, onLocalReply(_)); + EXPECT_CALL(*stream_filter, onLocalReply(_)); + EXPECT_CALL(*encoder_filter, onLocalReply(_)); + + decoder_filter->callbacks_->sendLocalReply(Code::InternalServerError, "body", nullptr, + absl::nullopt, "details"); + } + + // The final details should be details2. + ASSERT_TRUE(filter_manager_->streamInfo().responseCodeDetails().has_value()); + EXPECT_EQ(filter_manager_->streamInfo().responseCodeDetails().value(), "details2"); + EXPECT_FALSE(filter_manager_->streamInfo().responseCode().has_value()); + + filter_manager_->destroyFilters(); +} + } // namespace } // namespace Http } // namespace Envoy diff --git a/test/integration/filters/on_local_reply_filter.cc b/test/integration/filters/on_local_reply_filter.cc index c457311714775..b8e19a6d3dafc 100644 --- a/test/integration/filters/on_local_reply_filter.cc +++ b/test/integration/filters/on_local_reply_filter.cc @@ -13,7 +13,7 @@ namespace Envoy { class OnLocalReplyFilter : public Http::PassThroughFilter { public: Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override { - if (request_headers.get(Http::LowerCaseString("reset")).size() != 0) { + if (!request_headers.get(Http::LowerCaseString("reset")).empty()) { reset_ = true; } decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, "body", nullptr, absl::nullopt, diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index dfa14dacca1f9..ebc6069eae100 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -1589,7 +1589,6 @@ TEST_P(Http2MetadataIntegrationTest, UpstreamMetadataAfterEndStream) { upstream_request_->encodeHeaders(response_headers, true); // Upstream sends metadata. - const Http::MetadataMap response_metadata_map = {{"resp_key1", "resp_value1"}}; Http::MetadataMapPtr metadata_map_ptr = std::make_unique(response_metadata_map); diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 378aa1c61a2be..0fa37cb2888d3 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -322,6 +322,7 @@ class MockStreamDecoderFilter : public StreamDecoderFilter { MOCK_METHOD(void, onStreamComplete, ()); MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&)); + MOCK_METHOD(LocalErrorStatus, onLocalReply, (LocalReplyData&)); // Http::StreamDecoderFilter MOCK_METHOD(FilterHeadersStatus, decodeHeaders, (RequestHeaderMap & headers, bool end_stream)); @@ -348,6 +349,7 @@ class MockStreamEncoderFilter : public StreamEncoderFilter { MOCK_METHOD(void, onStreamComplete, ()); MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&)); + MOCK_METHOD(LocalErrorStatus, onLocalReply, (LocalReplyData&)); // Http::MockStreamEncoderFilter MOCK_METHOD(FilterHeadersStatus, encode100ContinueHeaders, (ResponseHeaderMap & headers)); @@ -370,6 +372,7 @@ class MockStreamFilter : public StreamFilter { MOCK_METHOD(void, onStreamComplete, ()); MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&)); + MOCK_METHOD(LocalErrorStatus, onLocalReply, (LocalReplyData&)); // Http::StreamDecoderFilter MOCK_METHOD(FilterHeadersStatus, decodeHeaders, (RequestHeaderMap & headers, bool end_stream)); From 6084a39e50f5ce1b53eecaebac09b4d5ae1fcb1c Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Tue, 2 Mar 2021 09:56:24 -0500 Subject: [PATCH 3/7] typo Signed-off-by: Alyssa Wilk --- test/common/http/filter_manager_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/http/filter_manager_test.cc b/test/common/http/filter_manager_test.cc index 8080369e2e6dd..2831ef14f97a5 100644 --- a/test/common/http/filter_manager_test.cc +++ b/test/common/http/filter_manager_test.cc @@ -414,7 +414,7 @@ TEST_F(FilterManagerTest, MultipleOnLocalReply) { { // Set up expectations to be triggered by sendLocalReply at the bottom of - // thi block. + // this block. InSequence s; // Make sure all 3 filters get onLocalReply From b374b19fd729baafabfe0128d7ac7929a1839c6c Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Tue, 2 Mar 2021 15:55:49 -0500 Subject: [PATCH 4/7] tidy Signed-off-by: Alyssa Wilk --- test/common/http/conn_manager_impl_test_base.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/test/common/http/conn_manager_impl_test_base.cc b/test/common/http/conn_manager_impl_test_base.cc index 5c5a24f072280..a951418e02e11 100644 --- a/test/common/http/conn_manager_impl_test_base.cc +++ b/test/common/http/conn_manager_impl_test_base.cc @@ -1,6 +1,5 @@ #include "test/common/http/conn_manager_impl_test_base.h" -using testing::AnyNumber; using testing::AtLeast; using testing::InSequence; using testing::InvokeWithoutArgs; From c077e5000f39fb583349fa955a1cfe194d7dcd7b Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 3 Mar 2021 14:06:19 -0500 Subject: [PATCH 5/7] comments Signed-off-by: Alyssa Wilk --- include/envoy/http/filter.h | 13 +++++++++++-- source/common/http/filter_manager.cc | 14 ++++++++------ source/common/http/filter_manager.h | 7 ++++--- test/integration/filters/on_local_reply_filter.cc | 2 +- test/mocks/http/mocks.h | 6 +++--- 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index f8e8cac40d4f3..6436ec58f74dc 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -610,15 +610,22 @@ class StreamFilterBase { virtual void onMatchCallback(const Matcher::Action&) {} struct LocalReplyData { + // The error code which (barring reset) will be sent to the client. Http::Code code_; + // The details of why a local reply is being sent. absl::string_view details_; + // True if a reset will occur rather than the local reply (some prior filter + // has returned ContinueAndResetStream) + bool reset_imminent_; }; /** * Called after sendLocalReply is called, and before any local reply is * serialized either to filters, or downstream. + * This will be called on both encoder and decoder filters starting at the + * router filtre and working towards the first filter configured. * - * Note that in rare circumstances, onLocalReply may be called more than once + * Note that in some circumstances, onLocalReply may be called more than once * for a given stream, because it is possible that a filter call * sendLocalReply while processing the original local reply response. * @@ -628,7 +635,9 @@ class StreamFilterBase { * @param data data associated with the sendLocalReply call. * @param LocalErrorStatus the action to take after onLocalError completes. */ - virtual LocalErrorStatus onLocalReply(LocalReplyData&) { return LocalErrorStatus::Continue; } + virtual LocalErrorStatus onLocalReply(const LocalReplyData&) { + return LocalErrorStatus::Continue; + } }; /** diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 63cb80b98d8bf..3320e4e2b4b4f 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -812,22 +812,23 @@ FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter, return std::next(filter->entry()); } -LocalErrorStatus FilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) { +void FilterManager::onLocalReply(StreamFilterBase::LocalReplyData& data) { + state_.under_on_local_reply_ = true; filter_manager_callbacks_.onLocalReply(data.code_); - LocalErrorStatus status = LocalErrorStatus::Continue; for (auto entry : filters_) { if (entry->onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) { - status = LocalErrorStatus::ContinueAndResetStream; + data.reset_imminent_ = true; } } - return status; + state_.under_on_local_reply_ = false; } void FilterManager::sendLocalReply( bool old_was_grpc_request, Code code, absl::string_view body, const std::function& modify_headers, const absl::optional grpc_status, absl::string_view details) { + ASSERT(!state_.under_on_local_reply_); const bool is_head_request = state_.is_head_request_; bool is_grpc_request = old_was_grpc_request; if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unify_grpc_handling")) { @@ -836,8 +837,9 @@ void FilterManager::sendLocalReply( stream_info_.setResponseCodeDetails(details); - StreamFilterBase::LocalReplyData data{code, details}; - if (FilterManager::onLocalReply(data) == LocalErrorStatus::ContinueAndResetStream) { + StreamFilterBase::LocalReplyData data{code, details, false}; + FilterManager::onLocalReply(data); + if (data.reset_imminent_) { ENVOY_STREAM_LOG(debug, "Resetting stream due to {}. onLocalReply requested reset.", *this, details); filter_manager_callbacks_.resetStream(); diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 0e9ba65e13f69..aadfba8c4f0ca 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -917,9 +917,8 @@ class FilterManager : public ScopeTrackedObject, /** * Called before local reply is made by the filter manager. * @param data the data associated with the local reply. - * @param LocalErrorStatus the status from the filter chain. */ - LocalErrorStatus onLocalReply(StreamFilterBase::LocalReplyData& data); + void onLocalReply(StreamFilterBase::LocalReplyData& data); void sendLocalReply(bool is_grpc_request, Code code, absl::string_view body, const std::function& modify_headers, @@ -1122,7 +1121,7 @@ class FilterManager : public ScopeTrackedObject, State() : remote_complete_(false), local_complete_(false), has_continue_headers_(false), created_filter_chain_(false), is_head_request_(false), is_grpc_request_(false), - non_100_response_headers_encoded_(false) {} + non_100_response_headers_encoded_(false), under_on_local_reply_(false) {} uint32_t filter_call_state_{0}; @@ -1140,6 +1139,8 @@ class FilterManager : public ScopeTrackedObject, bool is_grpc_request_ : 1; // Tracks if headers other than 100-Continue have been encoded to the codec. bool non_100_response_headers_encoded_ : 1; + // True under the stack of onLocalReply, false otherwise. + bool under_on_local_reply_ : 1; // The following 3 members are booleans rather than part of the space-saving bitfield as they // are passed as arguments to functions expecting bools. Extend State using the bitfield diff --git a/test/integration/filters/on_local_reply_filter.cc b/test/integration/filters/on_local_reply_filter.cc index b8e19a6d3dafc..2bb2923d3c731 100644 --- a/test/integration/filters/on_local_reply_filter.cc +++ b/test/integration/filters/on_local_reply_filter.cc @@ -21,7 +21,7 @@ class OnLocalReplyFilter : public Http::PassThroughFilter { return Http::FilterHeadersStatus::StopIteration; } - Http::LocalErrorStatus onLocalReply(LocalReplyData&) override { + Http::LocalErrorStatus onLocalReply(const LocalReplyData&) override { if (reset_) { return Http::LocalErrorStatus::ContinueAndResetStream; } diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 0fa37cb2888d3..841213daacd62 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -322,7 +322,7 @@ class MockStreamDecoderFilter : public StreamDecoderFilter { MOCK_METHOD(void, onStreamComplete, ()); MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&)); - MOCK_METHOD(LocalErrorStatus, onLocalReply, (LocalReplyData&)); + MOCK_METHOD(LocalErrorStatus, onLocalReply, (const LocalReplyData&)); // Http::StreamDecoderFilter MOCK_METHOD(FilterHeadersStatus, decodeHeaders, (RequestHeaderMap & headers, bool end_stream)); @@ -349,7 +349,7 @@ class MockStreamEncoderFilter : public StreamEncoderFilter { MOCK_METHOD(void, onStreamComplete, ()); MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&)); - MOCK_METHOD(LocalErrorStatus, onLocalReply, (LocalReplyData&)); + MOCK_METHOD(LocalErrorStatus, onLocalReply, (const LocalReplyData&)); // Http::MockStreamEncoderFilter MOCK_METHOD(FilterHeadersStatus, encode100ContinueHeaders, (ResponseHeaderMap & headers)); @@ -372,7 +372,7 @@ class MockStreamFilter : public StreamFilter { MOCK_METHOD(void, onStreamComplete, ()); MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, onMatchCallback, (const Matcher::Action&)); - MOCK_METHOD(LocalErrorStatus, onLocalReply, (LocalReplyData&)); + MOCK_METHOD(LocalErrorStatus, onLocalReply, (const LocalReplyData&)); // Http::StreamDecoderFilter MOCK_METHOD(FilterHeadersStatus, decodeHeaders, (RequestHeaderMap & headers, bool end_stream)); From eee158050fa3a6b728a125fe8884f7f0e06ff384 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 3 Mar 2021 15:16:08 -0500 Subject: [PATCH 6/7] spelling Signed-off-by: Alyssa Wilk --- include/envoy/http/filter.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 6436ec58f74dc..6ed83c8b4d873 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -623,7 +623,7 @@ class StreamFilterBase { * Called after sendLocalReply is called, and before any local reply is * serialized either to filters, or downstream. * This will be called on both encoder and decoder filters starting at the - * router filtre and working towards the first filter configured. + * router filter and working towards the first filter configured. * * Note that in some circumstances, onLocalReply may be called more than once * for a given stream, because it is possible that a filter call From 3f0e7dd52e209ba6722b30fca71d1f9ae78823b5 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Mon, 8 Mar 2021 09:50:14 -0500 Subject: [PATCH 7/7] comment Signed-off-by: Alyssa Wilk --- include/envoy/http/filter.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 6ed83c8b4d873..9b6f86d018c68 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -623,7 +623,7 @@ class StreamFilterBase { * Called after sendLocalReply is called, and before any local reply is * serialized either to filters, or downstream. * This will be called on both encoder and decoder filters starting at the - * router filter and working towards the first filter configured. + * terminal filter (generally the router filter) and working towards the first filter configured. * * Note that in some circumstances, onLocalReply may be called more than once * for a given stream, because it is possible that a filter call