Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Filter::StreamOpenState Filter::openStream() {
}

void Filter::onDestroy() {
ENVOY_LOG(trace, "onDestroy");
// Make doubly-sure we no longer use the stream, as
// per the filter contract.
processing_complete_ = true;
Expand Down Expand Up @@ -414,6 +415,7 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
// We won't be sending anything more to the stream after we
// receive this message.
processing_complete_ = true;
cleanUpTimers();
sendImmediateResponse(response->immediate_response());
message_handled = true;
break;
Expand Down Expand Up @@ -465,6 +467,9 @@ void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {

void Filter::onGrpcClose() {
ENVOY_LOG(debug, "Received gRPC stream close");
if (processing_complete_) {
return;
}
Comment thread
gbrail marked this conversation as resolved.
Outdated
processing_complete_ = true;
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
Expand Down
26 changes: 25 additions & 1 deletion test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ using Http::FilterHeadersStatus;
using Http::FilterTrailersStatus;
using Http::LowerCaseString;

using testing::AnyNumber;
using testing::Eq;
using testing::Invoke;
using testing::ReturnRef;
Expand All @@ -57,6 +58,19 @@ class HttpFilterTest : public testing::Test {
EXPECT_CALL(*client_, start(_)).WillOnce(Invoke(this, &HttpFilterTest::doStart));
EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(dispatcher_, createTimer_(_))
.Times(AnyNumber())
.WillRepeatedly(Invoke([this](Unused) {
// Create a mock timer that we can check at destruction time to see if
// all timers were disabled no matter what. MockTimer has default
// actions that we just have to enable properly here.
auto* timer = new Event::MockTimer();
EXPECT_CALL(*timer, enableTimer(_, _)).Times(AnyNumber());
EXPECT_CALL(*timer, disableTimer()).Times(AnyNumber());
EXPECT_CALL(*timer, enabled()).Times(AnyNumber());
timers_.push_back(timer);
return timer;
}));

envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor proto_config{};
if (!yaml.empty()) {
Expand All @@ -72,6 +86,15 @@ class HttpFilterTest : public testing::Test {
request_headers_.setMethod("POST");
}

void TearDown() override {
for (auto* t : timers_) {
// This will fail if any timer is un-disabled at the end of the test run.
Comment thread
gbrail marked this conversation as resolved.
Outdated
// (This particular test suite does not actually let timers expire,
// although other test suites do.)
EXPECT_FALSE(t->enabled_);
}
}

ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) {
stream_callbacks_ = &callbacks;

Expand Down Expand Up @@ -214,13 +237,14 @@ class HttpFilterTest : public testing::Test {
NiceMock<Stats::MockIsolatedStatsStore> stats_store_;
FilterConfigSharedPtr config_;
std::unique_ptr<Filter> filter_;
NiceMock<Event::MockDispatcher> dispatcher_;
testing::NiceMock<Event::MockDispatcher> dispatcher_;
Http::MockStreamDecoderFilterCallbacks decoder_callbacks_;
Http::MockStreamEncoderFilterCallbacks encoder_callbacks_;
Http::TestRequestHeaderMapImpl request_headers_;
Http::TestResponseHeaderMapImpl response_headers_;
Http::TestRequestTrailerMapImpl request_trailers_;
Http::TestResponseTrailerMapImpl response_trailers_;
std::vector<Event::MockTimer*> timers_;
};

// Using the default configuration, test the filter with a processor that
Expand Down
14 changes: 14 additions & 0 deletions test/extensions/filters/http/ext_proc/ordering_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ using Http::FilterHeadersStatus;
using Http::FilterTrailersStatus;
using Http::LowerCaseString;

using testing::AnyNumber;
using testing::Invoke;
using testing::Return;
using testing::ReturnRef;
Expand Down Expand Up @@ -532,9 +533,14 @@ TEST_F(OrderingTest, AddRequestTrailers) {
TEST_F(OrderingTest, ImmediateResponseOnRequest) {
initialize(absl::nullopt);

// MockTimer constructor sets up expectations in the Dispatcher class to wire it up
MockTimer* request_timer = new MockTimer(&dispatcher_);
EXPECT_CALL(*request_timer, enableTimer(kMessageTimeout, nullptr));
EXPECT_CALL(*request_timer, enabled()).Times(AnyNumber());
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersGet(true);
EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _));
EXPECT_CALL(*request_timer, disableTimer());
sendImmediateResponse500();
// The rest of the filter isn't necessarily called after this.
}
Expand All @@ -543,14 +549,22 @@ TEST_F(OrderingTest, ImmediateResponseOnRequest) {
TEST_F(OrderingTest, ImmediateResponseOnResponse) {
initialize(absl::nullopt);

MockTimer* request_timer = new MockTimer(&dispatcher_);
EXPECT_CALL(*request_timer, enabled()).Times(AnyNumber());
EXPECT_CALL(*request_timer, enableTimer(kMessageTimeout, nullptr));
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersGet(true);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
EXPECT_CALL(*request_timer, disableTimer());
sendRequestHeadersReply();

MockTimer* response_timer = new MockTimer(&dispatcher_);
EXPECT_CALL(*response_timer, enableTimer(kMessageTimeout, nullptr));
EXPECT_CALL(*response_timer, enabled()).Times(AnyNumber());
EXPECT_CALL(stream_delegate_, send(_, false));
sendResponseHeaders(true);
EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _));
EXPECT_CALL(*response_timer, disableTimer());
sendImmediateResponse500();
Buffer::OwnedImpl resp_body("Hello!");
EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_body, true));
Expand Down