diff --git a/library/common/http/dispatcher.cc b/library/common/http/dispatcher.cc index 8d923565b1..c9a3fa39cb 100644 --- a/library/common/http/dispatcher.cc +++ b/library/common/http/dispatcher.cc @@ -9,54 +9,53 @@ namespace Http { Dispatcher::DirectStreamCallbacks::DirectStreamCallbacks(envoy_stream_t stream, envoy_observer observer, Dispatcher& http_dispatcher) - : stream_(stream), observer_(observer), http_dispatcher_(http_dispatcher) {} + : stream_handle_(stream), observer_(observer), http_dispatcher_(http_dispatcher) {} void Dispatcher::DirectStreamCallbacks::onHeaders(HeaderMapPtr&& headers, bool end_stream) { - ENVOY_LOG(debug, "response headers for stream (end_stream={}):\n{}", end_stream, *headers); - if (end_stream) { - http_dispatcher_.removeStream(stream_); - } + ENVOY_LOG(debug, "[S{}] response headers for stream (end_stream={}):\n{}", stream_handle_, + end_stream, *headers); observer_.on_headers(Utility::transformHeaders(*headers), end_stream, observer_.context); } void Dispatcher::DirectStreamCallbacks::onData(Buffer::Instance& data, bool end_stream) { - ENVOY_LOG(debug, "response data for stream (length={} end_stream={})", data.length(), end_stream); - if (end_stream) { - http_dispatcher_.removeStream(stream_); - } + ENVOY_LOG(debug, "[S{}] response data for stream (length={} end_stream={})", stream_handle_, + data.length(), end_stream); observer_.on_data(Envoy::Buffer::Utility::transformData(data), end_stream, observer_.context); } void Dispatcher::DirectStreamCallbacks::onTrailers(HeaderMapPtr&& trailers) { - ENVOY_LOG(debug, "response trailers for stream:\n{}", *trailers); - http_dispatcher_.removeStream(stream_); + ENVOY_LOG(debug, "[S{}] response trailers for stream:\n{}", stream_handle_, *trailers); observer_.on_trailers(Utility::transformHeaders(*trailers), observer_.context); } void Dispatcher::DirectStreamCallbacks::onComplete() { - // TODO: implement me + ENVOY_LOG(debug, "[S{}] complete stream", stream_handle_); + observer_.on_complete(observer_.context); + http_dispatcher_.cleanup(stream_handle_); } void Dispatcher::DirectStreamCallbacks::onReset() { - http_dispatcher_.removeStream(stream_); + ENVOY_LOG(debug, "[S{}] remote reset stream", stream_handle_); observer_.on_error({ENVOY_STREAM_RESET, envoy_nodata}, observer_.context); } -Dispatcher::DirectStream::DirectStream(AsyncClient::Stream& underlying_stream, +Dispatcher::DirectStream::DirectStream(envoy_stream_t stream_handle, + AsyncClient::Stream& underlying_stream, DirectStreamCallbacksPtr&& callbacks) - : underlying_stream_(underlying_stream), callbacks_(std::move(callbacks)) {} + : stream_handle_(stream_handle), underlying_stream_(underlying_stream), + callbacks_(std::move(callbacks)) {} Dispatcher::Dispatcher(Event::Dispatcher& event_dispatcher, Upstream::ClusterManager& cluster_manager) - : current_stream_id_(0), event_dispatcher_(event_dispatcher), + : current_stream_handle_(0), event_dispatcher_(event_dispatcher), cluster_manager_(cluster_manager) {} envoy_stream_t Dispatcher::startStream(envoy_observer observer) { - envoy_stream_t new_stream_id = current_stream_id_++; + envoy_stream_t new_stream_handle = current_stream_handle_++; - event_dispatcher_.post([this, observer, new_stream_id]() -> void { + event_dispatcher_.post([this, observer, new_stream_handle]() -> void { DirectStreamCallbacksPtr callbacks = - std::make_unique(new_stream_id, observer, *this); + std::make_unique(new_stream_handle, observer, *this); AsyncClient& async_client = cluster_manager_.httpAsyncClientForCluster("egress_cluster"); AsyncClient::Stream* underlying_stream = async_client.start(*callbacks, {}); @@ -65,31 +64,31 @@ envoy_stream_t Dispatcher::startStream(envoy_observer observer) { // Take this into account when thinking about stream cancellation. callbacks->onReset(); } else { - DirectStreamPtr direct_stream = - std::make_unique(*underlying_stream, std::move(callbacks)); - streams_.emplace(new_stream_id, std::move(direct_stream)); - ENVOY_LOG(debug, "started stream [{}]", new_stream_id); + DirectStreamPtr direct_stream = std::make_unique( + new_stream_handle, *underlying_stream, std::move(callbacks)); + streams_.emplace(new_stream_handle, std::move(direct_stream)); + ENVOY_LOG(debug, "[S{}] start stream", new_stream_handle); } }); - return new_stream_id; + return new_stream_handle; } -envoy_status_t Dispatcher::sendHeaders(envoy_stream_t stream_id, envoy_headers headers, +envoy_status_t Dispatcher::sendHeaders(envoy_stream_t stream, envoy_headers headers, bool end_stream) { - event_dispatcher_.post([this, stream_id, headers, end_stream]() -> void { - DirectStream* direct_stream = getStream(stream_id); + event_dispatcher_.post([this, stream, headers, end_stream]() -> void { + DirectStream* direct_stream = getStream(stream); // If direct_stream is not found, it means the stream has already closed or been reset // and the appropriate callback has been issued to the caller. There's nothing to do here // except silently swallow this. // TODO: handle potential race condition with cancellation or failure get a stream in the - // first place. Additionally it is possible to get a nullptr due to bogus stream_id + // first place. Additionally it is possible to get a nullptr due to bogus envoy_stream_t // from the caller. // https://github.com/lyft/envoy-mobile/issues/301 if (direct_stream != nullptr) { direct_stream->headers_ = Utility::transformHeaders(headers); - ENVOY_LOG(debug, "request headers for stream [{}] (end_stream={}):\n{}", stream_id, - end_stream, *direct_stream->headers_); + ENVOY_LOG(debug, "[S{}] request headers for stream (end_stream={}):\n{}", stream, end_stream, + *direct_stream->headers_); direct_stream->underlying_stream_.sendHeaders(*direct_stream->headers_, end_stream); } }); @@ -103,19 +102,35 @@ envoy_status_t Dispatcher::sendMetadata(envoy_stream_t, envoy_headers, bool) { return ENVOY_FAILURE; } envoy_status_t Dispatcher::sendTrailers(envoy_stream_t, envoy_headers) { return ENVOY_FAILURE; } -envoy_status_t Dispatcher::locallyCloseStream(envoy_stream_t) { return ENVOY_FAILURE; } -envoy_status_t Dispatcher::resetStream(envoy_stream_t) { return ENVOY_FAILURE; } -Dispatcher::DirectStream* Dispatcher::getStream(envoy_stream_t stream_id) { +envoy_status_t Dispatcher::resetStream(envoy_stream_t stream) { + event_dispatcher_.post([this, stream]() -> void { + DirectStream* direct_stream = getStream(stream); + if (direct_stream) { + direct_stream->underlying_stream_.reset(); + } + }); + return ENVOY_SUCCESS; +} + +Dispatcher::DirectStream* Dispatcher::getStream(envoy_stream_t stream) { ASSERT(event_dispatcher_.isThreadSafe(), "stream interaction must be performed on the event_dispatcher_'s thread."); - auto direct_stream_pair_it = streams_.find(stream_id); + auto direct_stream_pair_it = streams_.find(stream); return (direct_stream_pair_it != streams_.end()) ? direct_stream_pair_it->second.get() : nullptr; } -// TODO: implement. Note: the stream might not be in the map if for example startStream called -// onReset due to its inability to get an underlying stream. -envoy_status_t Dispatcher::removeStream(envoy_stream_t) { return ENVOY_FAILURE; } +void Dispatcher::cleanup(envoy_stream_t stream_handle) { + DirectStream* direct_stream = getStream(stream_handle); + + RELEASE_ASSERT(direct_stream, + "cleanup is a private method that is only called with stream ids that exist"); + + // TODO: think about thread safety of deleting the DirectStream immediately. + size_t erased = streams_.erase(stream_handle); + ASSERT(erased == 1, "cleanup should always remove one entry from the streams map"); + ENVOY_LOG(debug, "[S{}] remove stream", stream_handle); +} } // namespace Http } // namespace Envoy diff --git a/library/common/http/dispatcher.h b/library/common/http/dispatcher.h index 0266997db8..52296abed7 100644 --- a/library/common/http/dispatcher.h +++ b/library/common/http/dispatcher.h @@ -36,13 +36,11 @@ class Dispatcher : public Logger::Loggable { envoy_status_t sendData(envoy_stream_t stream, envoy_headers headers, bool end_stream); envoy_status_t sendMetadata(envoy_stream_t stream, envoy_headers headers, bool end_stream); envoy_status_t sendTrailers(envoy_stream_t stream, envoy_headers headers); - envoy_status_t locallyCloseStream(envoy_stream_t stream); // TODO: when implementing this function we have to make sure to prevent races with already // scheduled and potentially scheduled callbacks. In order to do so the platform callbacks need to // check for atomic state (boolean most likely) that will be updated here to mark the stream as // closed. envoy_status_t resetStream(envoy_stream_t stream); - envoy_status_t removeStream(envoy_stream_t stream); private: /** @@ -55,7 +53,7 @@ class Dispatcher : public Logger::Loggable { class DirectStreamCallbacks : public AsyncClient::StreamCallbacks, public Logger::Loggable { public: - DirectStreamCallbacks(envoy_stream_t stream, envoy_observer observer, + DirectStreamCallbacks(envoy_stream_t stream_handle, envoy_observer observer, Dispatcher& http_dispatcher); // AsyncClient::StreamCallbacks @@ -66,7 +64,7 @@ class Dispatcher : public Logger::Loggable { void onReset() override; private: - const envoy_stream_t stream_; + const envoy_stream_t stream_handle_; const envoy_observer observer_; Dispatcher& http_dispatcher_; }; @@ -78,13 +76,11 @@ class Dispatcher : public Logger::Loggable { * AsyncClient::Stream and in the incoming direction via DirectStreamCallbacks. */ class DirectStream { - // TODO: Bookkeeping for this class is insufficient to fully cover all cases necessary to - // track the lifecycle of the underlying_stream_. One way or another, we must fix this - // to prevent bugs in the future. (Enhanced internal bookkeeping is probably good enough, - // but other options include upstream modifications to AsyncClient and friends. public: - DirectStream(AsyncClient::Stream& underlying_stream, DirectStreamCallbacksPtr&& callbacks); + DirectStream(envoy_stream_t stream_handle, AsyncClient::Stream& underlying_stream, + DirectStreamCallbacksPtr&& callbacks); + const envoy_stream_t stream_handle_; // Used to issue outgoing HTTP stream operations. AsyncClient::Stream& underlying_stream_; // Used to receive incoming HTTP stream operations. @@ -102,10 +98,11 @@ class Dispatcher : public Logger::Loggable { // Everything in the below interface must only be accessed from the event_dispatcher's thread. // This allows us to generally avoid synchronization. - DirectStream* getStream(envoy_stream_t stream_id); + DirectStream* getStream(envoy_stream_t stream_handle); + void cleanup(envoy_stream_t stream_handle); std::unordered_map streams_; - std::atomic current_stream_id_; + std::atomic current_stream_handle_; // The event_dispatcher is the only member state that may be accessed from a thread other than // the event_dispatcher's own thread. Event::Dispatcher& event_dispatcher_; diff --git a/library/common/include/c_types.h b/library/common/include/c_types.h index eeacb5ba99..893c12d2a1 100644 --- a/library/common/include/c_types.h +++ b/library/common/include/c_types.h @@ -152,7 +152,11 @@ typedef void (*envoy_on_trailers_f)(envoy_headers trailers, void* context); */ typedef void (*envoy_on_error_f)(envoy_error error, void* context); -// FIXME comments +/** + * Called when the async HTTP stream has completed without an error bi-directionally. + * @param context, contains the necessary state to carry out platform-specific dispatch and + * execution. + */ typedef void (*envoy_on_complete_f)(void* context); #ifdef __cplusplus @@ -167,7 +171,7 @@ typedef struct { envoy_on_data_f on_data; envoy_on_metadata_f on_metadata; envoy_on_trailers_f on_trailers; - envoy_on_complete_f on_complete; envoy_on_error_f on_error; + envoy_on_complete_f on_complete; void* context; // Will be passed through to callbacks to provide dispatch and execution state. } envoy_observer; diff --git a/library/common/main_interface.cc b/library/common/main_interface.cc index 9c44e51c6d..6a77b72900 100644 --- a/library/common/main_interface.cc +++ b/library/common/main_interface.cc @@ -26,16 +26,16 @@ envoy_stream start_stream(envoy_observer observer) { return {ENVOY_SUCCESS, http_dispatcher_->startStream(observer)}; } -envoy_status_t send_headers(envoy_stream_t stream_id, envoy_headers headers, bool end_stream) { - return http_dispatcher_->sendHeaders(stream_id, headers, end_stream); +envoy_status_t send_headers(envoy_stream_t stream, envoy_headers headers, bool end_stream) { + return http_dispatcher_->sendHeaders(stream, headers, end_stream); } // TODO: implement. envoy_status_t send_data(envoy_stream_t, envoy_data, bool) { return ENVOY_FAILURE; } envoy_status_t send_metadata(envoy_stream_t, envoy_headers) { return ENVOY_FAILURE; } envoy_status_t send_trailers(envoy_stream_t, envoy_headers) { return ENVOY_FAILURE; } -envoy_status_t locally_close_stream(envoy_stream_t) { return ENVOY_FAILURE; } -envoy_status_t reset_stream(envoy_stream_t) { return ENVOY_FAILURE; } + +envoy_status_t reset_stream(envoy_stream_t stream) { return http_dispatcher_->resetStream(stream); } /* * Setup envoy for interaction via the main interface. diff --git a/library/common/main_interface.h b/library/common/main_interface.h index 9c8655e78c..1e0a72a325 100644 --- a/library/common/main_interface.h +++ b/library/common/main_interface.h @@ -54,14 +54,6 @@ envoy_status_t send_metadata(envoy_stream_t stream, envoy_headers metadata); */ envoy_status_t send_trailers(envoy_stream_t stream, envoy_headers trailers); -/** - * Half-close an HTTP stream. The stream will be observable and may return further data - * via the observer callbacks. However, nothing further may be sent. - * @param stream, the stream to close. - * @return envoy_status_t, the resulting status of the operation. - */ -envoy_status_t locally_close_stream(envoy_stream_t stream); - /** * Detach all observers from a stream and send an interrupt upstream if supported by transport. * @param stream, the stream to evict. diff --git a/test/common/http/BUILD b/test/common/http/BUILD index f0732d6c8a..bcc01d4507 100644 --- a/test/common/http/BUILD +++ b/test/common/http/BUILD @@ -4,6 +4,23 @@ load("@envoy//bazel:envoy_build_system.bzl", "envoy_cc_test", "envoy_package") envoy_package() +envoy_cc_test( + name = "dispatcher_test", + srcs = ["dispatcher_test.cc"], + repository = "@envoy", + deps = [ + "//library/common/http:dispatcher_lib", + "//library/common/http:header_utility_lib", + "//library/common/include:c_types_interface", + "@envoy//source/common/http:async_client_lib", + "@envoy//source/common/http:context_lib", + "@envoy//test/common/http:common_lib", + "@envoy//test/mocks/event:event_mocks", + "@envoy//test/mocks/local_info:local_info_mocks", + "@envoy//test/mocks/upstream:upstream_mocks", + ], +) + envoy_cc_test( name = "header_utility_test", srcs = ["header_utility_test.cc"], diff --git a/test/common/http/dispatcher_test.cc b/test/common/http/dispatcher_test.cc new file mode 100644 index 0000000000..1e9daad9ea --- /dev/null +++ b/test/common/http/dispatcher_test.cc @@ -0,0 +1,628 @@ +#include "common/http/async_client_impl.h" +#include "common/http/context_impl.h" + +#include "test/common/http/common.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/http/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/upstream/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "library/common/http/dispatcher.h" +#include "library/common/http/header_utility.h" +#include "library/common/include/c_types.h" + +using testing::_; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; +using testing::SaveArg; +using testing::WithArg; + +namespace Envoy { +namespace Http { + +class DispatcherTest : public testing::Test { +public: + DispatcherTest() + : http_context_(stats_store_.symbolTable()), + client_(cm_.thread_local_cluster_.cluster_.info_, stats_store_, event_dispatcher_, + local_info_, cm_, runtime_, random_, + Router::ShadowWriterPtr{new NiceMock()}, http_context_), + http_dispatcher_(event_dispatcher_, cm_) { + ON_CALL(*cm_.conn_pool_.host_, locality()) + .WillByDefault(ReturnRef(envoy::api::v2::core::Locality().default_instance())); + } + + typedef struct { + bool on_headers; + bool on_complete; + bool on_error; + } callbacks_called; + + Stats::MockIsolatedStatsStore stats_store_; + MockAsyncClientCallbacks callbacks_; + MockAsyncClientStreamCallbacks stream_callbacks_; + NiceMock cm_; + NiceMock stream_encoder_; + StreamDecoder* response_decoder_{}; + NiceMock* timer_; + NiceMock event_dispatcher_; + NiceMock runtime_; + NiceMock random_; + NiceMock local_info_; + Http::ContextImpl http_context_; + AsyncClientImpl client_; + Dispatcher http_dispatcher_; + envoy_observer observer_; +}; + +TEST_F(DispatcherTest, BasicStreamHeadersOnly) { + // Setup observer to handle the response headers. + envoy_observer observer; + callbacks_called cc = {false, false, false}; + observer.context = &cc; + observer.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { + ASSERT_TRUE(end_stream); + HeaderMapPtr response_headers = Utility::transformHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers = true; + }; + observer.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete = true; + }; + + // Grab the response decoder in order to dispatch responses on the stream. + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + // Build a set of request headers. + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Utility::transformHeaders(headers); + + // Create a stream. + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce( + WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { + return client_.start(callbacks, AsyncClient::StreamOptions()); + }))); + envoy_stream_t stream = http_dispatcher_.startStream(observer); + + // Send request headers. + Event::PostCb post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, true); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); + post_cb(); + + // Decode response headers. decodeHeaders with true will bubble up to onHeaders, which will in + // turn cause closeRemote. Because closeLocal has already been called, cleanup will happen; hence + // the second call to isThreadSafe. + // TODO: find a way to make the fact that the stream is correctly closed more explicit. + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + response_decoder_->decode100ContinueHeaders( + HeaderMapPtr(new TestHeaderMapImpl{{":status", "100"}})); + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), true); + + EXPECT_EQ( + 1UL, + cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value()); + EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("internal.upstream_rq_200") + .value()); + + // Ensure that the on_headers on the observer was called. + ASSERT_TRUE(cc.on_headers); + ASSERT_TRUE(cc.on_complete); +} + +TEST_F(DispatcherTest, ResetStream) { + envoy_observer observer; + callbacks_called cc = {false, false, false}; + observer.context = &cc; + observer.on_error = [](envoy_error actual_error, void* context) -> void { + envoy_error expected_error = {ENVOY_STREAM_RESET, envoy_nodata}; + ASSERT_EQ(actual_error.error_code, expected_error.error_code); + callbacks_called* cc = static_cast(context); + cc->on_error = true; + }; + observer.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete = true; + }; + + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce( + WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { + return client_.start(callbacks, AsyncClient::StreamOptions()); + }))); + envoy_stream_t stream = http_dispatcher_.startStream(observer); + + Event::PostCb post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb)); + http_dispatcher_.resetStream(stream); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + post_cb(); + + // Ensure that the on_error on the observer was called. + ASSERT_TRUE(cc.on_error); + ASSERT_FALSE(cc.on_complete); +} + +TEST_F(DispatcherTest, MultipleStreams) { + // Start stream1. + // Setup observer to handle the response headers. + envoy_observer observer; + callbacks_called cc = {false, false, false}; + observer.context = &cc; + observer.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { + ASSERT_TRUE(end_stream); + HeaderMapPtr response_headers = Utility::transformHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers = true; + }; + observer.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete = true; + }; + + // Grab the response decoder in order to dispatch responses on the stream. + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + // Build a set of request headers. + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Utility::transformHeaders(headers); + + // Create a stream. + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce( + WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { + return client_.start(callbacks, AsyncClient::StreamOptions()); + }))); + envoy_stream_t stream = http_dispatcher_.startStream(observer); + + // Send request headers. + Event::PostCb post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, true); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); + post_cb(); + + // Start stream2. + // Setup observer to handle the response headers. + NiceMock stream_encoder2; + StreamDecoder* response_decoder2{}; + envoy_observer observer2; + callbacks_called cc2 = {false, false, false}; + observer2.context = &cc2; + observer2.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { + ASSERT_TRUE(end_stream); + HeaderMapPtr response_headers = Utility::transformHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "503"); + bool* on_headers_called2 = static_cast(context); + *on_headers_called2 = true; + }; + observer2.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete = true; + }; + + // Grab the response decoder in order to dispatch responses on the stream. + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder2, cm_.conn_pool_.host_); + response_decoder2 = &decoder; + return nullptr; + })); + + // Build a set of request headers. + TestHeaderMapImpl headers2; + HttpTestUtility::addDefaultHeaders(headers2); + envoy_headers c_headers2 = Utility::transformHeaders(headers2); + + // Create a stream. + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce( + WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { + return client_.start(callbacks, AsyncClient::StreamOptions()); + }))); + EXPECT_CALL(event_dispatcher_, post(_)); + envoy_stream_t stream2 = http_dispatcher_.startStream(observer2); + + // Send request headers. + Event::PostCb post_cb2; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb2)); + http_dispatcher_.sendHeaders(stream2, c_headers2, true); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(stream_encoder2, encodeHeaders(_, true)); + post_cb2(); + + // Finish stream 2. + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + HeaderMapPtr response_headers2(new TestHeaderMapImpl{{":status", "503"}}); + response_decoder2->decodeHeaders(std::move(response_headers2), true); + // Ensure that the on_headers on the observer was called. + ASSERT_TRUE(cc2.on_headers); + ASSERT_TRUE(cc2.on_complete); + + // Finish stream 1. + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), true); + ASSERT_TRUE(cc.on_headers); + ASSERT_TRUE(cc.on_complete); +} + +TEST_F(DispatcherTest, LocalResetAfterStreamStart) { + envoy_observer observer; + callbacks_called cc = {false, false, false}; + observer.context = &cc; + + observer.on_error = [](envoy_error actual_error, void* context) -> void { + envoy_error expected_error = {ENVOY_STREAM_RESET, envoy_nodata}; + ASSERT_EQ(actual_error.error_code, expected_error.error_code); + callbacks_called* cc = static_cast(context); + cc->on_error = true; + }; + observer.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { + ASSERT_FALSE(end_stream); + HeaderMapPtr response_headers = Utility::transformHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers = true; + }; + observer.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete = true; + }; + + // Grab the response decoder in order to dispatch responses on the stream. + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + // Build a set of request headers. + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Utility::transformHeaders(headers); + + // Create a stream. + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce( + WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { + return client_.start(callbacks, AsyncClient::StreamOptions()); + }))); + envoy_stream_t stream = http_dispatcher_.startStream(observer); + + // Send request headers. + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, false); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); + send_headers_post_cb(); + + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + // Ensure that the on_headers on the observer was called. + ASSERT_TRUE(cc.on_headers); + + Event::PostCb reset_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&reset_post_cb)); + http_dispatcher_.resetStream(stream); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + reset_post_cb(); + + // Ensure that the on_error on the observer was called. + ASSERT_TRUE(cc.on_error); + ASSERT_FALSE(cc.on_complete); +} + +TEST_F(DispatcherTest, RemoteResetAfterStreamStart) { + envoy_observer observer; + callbacks_called cc = {false, false, false}; + observer.context = &cc; + + observer.on_error = [](envoy_error actual_error, void* context) -> void { + envoy_error expected_error = {ENVOY_STREAM_RESET, envoy_nodata}; + ASSERT_EQ(actual_error.error_code, expected_error.error_code); + callbacks_called* cc = static_cast(context); + cc->on_error = true; + }; + observer.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { + ASSERT_FALSE(end_stream); + HeaderMapPtr response_headers = Utility::transformHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers = true; + }; + observer.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete = true; + }; + + // Grab the response decoder in order to dispatch responses on the stream. + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + // Build a set of request headers. + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Utility::transformHeaders(headers); + + // Create a stream. + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce( + WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { + return client_.start(callbacks, AsyncClient::StreamOptions()); + }))); + envoy_stream_t stream = http_dispatcher_.startStream(observer); + + // Send request headers. + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, false); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); + send_headers_post_cb(); + + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + // Ensure that the on_headers on the observer was called. + ASSERT_TRUE(cc.on_headers); + + stream_encoder_.getStream().resetStream(StreamResetReason::RemoteReset); + // Ensure that the on_error on the observer was called. + ASSERT_TRUE(cc.on_error); + ASSERT_FALSE(cc.on_complete); +} + +TEST_F(DispatcherTest, DestroyWithActiveStream) { + // Grab the response decoder in order to dispatch responses on the stream. + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + // Build a set of request headers. + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Utility::transformHeaders(headers); + + // Create a stream. + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions()))); + envoy_stream_t stream = http_dispatcher_.startStream(observer_); + + // Send request headers. + EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); + EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); + EXPECT_CALL(stream_callbacks_, onReset()); + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + http_dispatcher_.sendHeaders(stream, c_headers, false); +} + +TEST_F(DispatcherTest, ResetInOnHeaders) { + // Grab the response decoder in order to dispatch responses on the stream. + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + // Build a set of request headers. + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Utility::transformHeaders(headers); + + // Create a stream. + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions()))); + envoy_stream_t stream = http_dispatcher_.startStream(observer_); + + // Send request headers. + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, false); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); + send_headers_post_cb(); + + TestHeaderMapImpl expected_headers{{":status", "200"}}; + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(event_dispatcher_, post(_)); + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)) + .WillOnce(Invoke([this, stream](HeaderMap&, bool) { http_dispatcher_.resetStream(stream); })); + EXPECT_CALL(stream_callbacks_, onData(_, _)).Times(0); + EXPECT_CALL(stream_callbacks_, onReset()); + + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + // TODO: Need to finish the data side (sendData, onData) in order to fully verify that onData was + // never called due to the reset. +} + +TEST_F(DispatcherTest, StreamTimeout) { + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder&, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + return nullptr; + })); + + // Build a set of request headers. + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Utility::transformHeaders(headers); + + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions().setTimeout( + std::chrono::milliseconds(40))))); + envoy_stream_t stream = http_dispatcher_.startStream(observer_); + + // Send request headers. + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, true); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); + timer_ = new NiceMock(&event_dispatcher_); + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40))); + EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); + + TestHeaderMapImpl expected_timeout{ + {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), false)); + EXPECT_CALL(stream_callbacks_, onData(_, true)); + EXPECT_CALL(stream_callbacks_, onComplete()); + send_headers_post_cb(); + timer_->callback_(); + + EXPECT_EQ(1UL, + cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_timeout") + .value()); + EXPECT_EQ(1UL, cm_.conn_pool_.host_->stats().rq_timeout_.value()); + EXPECT_EQ( + 1UL, + cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_504").value()); +} + +TEST_F(DispatcherTest, StreamTimeoutHeadReply) { + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder&, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + return nullptr; + })); + + // Build a set of request headers. + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers, "HEAD"); + envoy_headers c_headers = Utility::transformHeaders(headers); + + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions().setTimeout( + std::chrono::milliseconds(40))))); + envoy_stream_t stream = http_dispatcher_.startStream(observer_); + + // Send request headers. + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, true); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); + timer_ = new NiceMock(&event_dispatcher_); + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40))); + EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); + + TestHeaderMapImpl expected_timeout{ + {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), true)); + EXPECT_CALL(stream_callbacks_, onComplete()); + send_headers_post_cb(); + timer_->callback_(); +} + +TEST_F(DispatcherTest, DisableTimerWithStream) { + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder&, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + return nullptr; + })); + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers, "HEAD"); + envoy_headers c_headers = Utility::transformHeaders(headers); + + EXPECT_CALL(cm_, httpAsyncClientForCluster("egress_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + EXPECT_CALL(cm_.async_client_, start(_, _)) + .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions().setTimeout( + std::chrono::milliseconds(40))))); + envoy_stream_t stream = http_dispatcher_.startStream(observer_); + + // Send request headers and reset stream. + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, true); + Event::PostCb reset_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&reset_stream_post_cb)); + http_dispatcher_.resetStream(stream); + + EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); + timer_ = new NiceMock(&event_dispatcher_); + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40))); + EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); + EXPECT_CALL(stream_callbacks_, onReset()); + + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + send_headers_post_cb(); + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + reset_stream_post_cb(); +} + +} // namespace Http +} // namespace Envoy