diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 50e113af45..8336c46601 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -18,7 +18,7 @@ jobs: - name: 'Install dependencies' run: ./ci/mac_ci_setup.sh - name: 'Run tests' - run: bazel test //test/common/... + run: bazel test --test_output=all //test/common/... tsan: name: tsan runs-on: ubuntu-18.04 @@ -34,7 +34,7 @@ jobs: export PATH=/usr/lib/llvm-8/bin:$PATH export CC=clang export CXX=clang++ - bazel test --config=clang-tsan //test/common/... + bazel test --config=clang-tsan --test_output=all //test/common/... asan: name: asan runs-on: ubuntu-18.04 @@ -50,4 +50,4 @@ jobs: export PATH=/usr/lib/llvm-8/bin:$PATH export CC=clang export CXX=clang++ - bazel test --config=clang-asan //test/common/... + bazel test --config=clang-asan --test_output=all //test/common/... diff --git a/.github/workflows/perf.yml b/.github/workflows/perf.yml index 686662132f..d1ee0af308 100644 --- a/.github/workflows/perf.yml +++ b/.github/workflows/perf.yml @@ -10,7 +10,7 @@ jobs: sizecurrent: name: size_current runs-on: ubuntu-18.04 - timeout-minutes: 45 + timeout-minutes: 60 steps: - uses: actions/checkout@v1 with: @@ -26,7 +26,7 @@ jobs: sizemaster: name: size_master runs-on: ubuntu-18.04 - timeout-minutes: 45 + timeout-minutes: 60 steps: - uses: actions/checkout@v1 with: @@ -45,7 +45,7 @@ jobs: name: size_compare needs: [sizecurrent, sizemaster] runs-on: ubuntu-18.04 - timeout-minutes: 45 + timeout-minutes: 30 steps: - uses: actions/checkout@v1 - name: 'Install dependencies' diff --git a/library/common/config_template.cc b/library/common/config_template.cc index 7ea531a2bd..6c8bfa8fa1 100644 --- a/library/common/config_template.cc +++ b/library/common/config_template.cc @@ -3,19 +3,51 @@ */ const char* config_template = R"( static_resources: + listeners: + - name: base_api_listener + address: + socket_address: + protocol: TCP + address: 0.0.0.0 + port_value: 10000 + api_listener: + api_listener: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: hcm + route_config: + name: api_router + virtual_hosts: + - name: api + domains: + - "*" + routes: + - match: + prefix: "/" + route: + cluster_header: x-envoy-mobile-cluster + http_filters: + - name: envoy.filters.http.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.FilterConfig + dns_cache_config: + name: dynamic_forward_proxy_cache_config + dns_lookup_family: AUTO + dns_refresh_rate: {{ dns_refresh_rate_seconds }}s + - name: envoy.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router clusters: - - name: base # Note: the direct API depends on the existence of a cluster with this name. + - name: base connect_timeout: {{ connect_timeout_seconds }}s - dns_refresh_rate: {{ dns_refresh_rate_seconds }}s - http2_protocol_options: {} - lb_policy: ROUND_ROBIN - load_assignment: - cluster_name: base - endpoints: &base_endpoints - - lb_endpoints: - - endpoint: - address: - socket_address: {address: {{ domain }}, port_value: 443} + lb_policy: CLUSTER_PROVIDED + cluster_type: + name: envoy.clusters.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig + dns_cache_config: + name: dynamic_forward_proxy_cache_config + dns_lookup_family: AUTO + dns_refresh_rate: {{ dns_refresh_rate_seconds }}s transport_socket: &base_transport_socket name: envoy.transport_sockets.tls typed_config: @@ -28,33 +60,38 @@ const char* config_template = R"( #include "certificates.inc" R"( sni: {{ domain }} - type: LOGICAL_DNS upstream_connection_options: &upstream_opts tcp_keepalive: keepalive_interval: 10 keepalive_probes: 1 keepalive_time: 5 - - name: base_wlan # Note: the direct API depends on the existence of a cluster with this name. + transport_socket: *base_transport_socket + upstream_connection_options: *upstream_opts + - name: base_wlan connect_timeout: {{ connect_timeout_seconds }}s - dns_refresh_rate: {{ dns_refresh_rate_seconds }}s - http2_protocol_options: {} - lb_policy: ROUND_ROBIN - load_assignment: - cluster_name: base_wlan - endpoints: *base_endpoints + lb_policy: CLUSTER_PROVIDED + cluster_type: + name: envoy.clusters.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig + dns_cache_config: + name: dynamic_forward_proxy_cache_config + dns_lookup_family: AUTO + dns_refresh_rate: {{ dns_refresh_rate_seconds }}s transport_socket: *base_transport_socket - type: LOGICAL_DNS upstream_connection_options: *upstream_opts - - name: base_wwan # Note: the direct API depends on the existence of a cluster with this name. + - name: base_wwan connect_timeout: {{ connect_timeout_seconds }}s - dns_refresh_rate: {{ dns_refresh_rate_seconds }}s - http2_protocol_options: {} - lb_policy: ROUND_ROBIN - load_assignment: - cluster_name: base_wwan - endpoints: *base_endpoints + lb_policy: CLUSTER_PROVIDED + cluster_type: + name: envoy.clusters.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig + dns_cache_config: + name: dynamic_forward_proxy_cache_config + dns_lookup_family: AUTO + dns_refresh_rate: {{ dns_refresh_rate_seconds }}s transport_socket: *base_transport_socket - type: LOGICAL_DNS upstream_connection_options: *upstream_opts - name: stats connect_timeout: {{ connect_timeout_seconds }}s diff --git a/library/common/engine.cc b/library/common/engine.cc index aaa7e05e72..cff652e00e 100644 --- a/library/common/engine.cc +++ b/library/common/engine.cc @@ -64,7 +64,9 @@ envoy_status_t Engine::run(std::string config, std::string log_level) { postinit_callback_handler_ = main_common_->server()->lifecycleNotifier().registerCallback( Envoy::Server::ServerLifecycleNotifier::Stage::PostInit, [this]() -> void { Server::Instance* server = TS_UNCHECKED_READ(main_common_)->server(); - http_dispatcher_->ready(server->dispatcher(), server->clusterManager()); + auto api_listener = server->listenerManager().apiListener()->get().http(); + ASSERT(api_listener.has_value()); + http_dispatcher_->ready(server->dispatcher(), api_listener.value()); }); } // mutex_ diff --git a/library/common/http/BUILD b/library/common/http/BUILD index 06e3ed78f1..5df54e9782 100644 --- a/library/common/http/BUILD +++ b/library/common/http/BUILD @@ -14,16 +14,20 @@ envoy_cc_library( "//library/common/buffer:bridge_fragment_lib", "//library/common/buffer:utility_lib", "//library/common/http:header_utility_lib", + "//library/common/network:synthetic_address_lib", + "//library/common/thread:lock_guard_lib", "//library/common/types:c_types_lib", "@envoy//include/envoy/buffer:buffer_interface", "@envoy//include/envoy/event:dispatcher_interface", + "@envoy//include/envoy/http:api_listener_interface", "@envoy//include/envoy/http:async_client_interface", "@envoy//include/envoy/http:header_map_interface", - "@envoy//include/envoy/upstream:cluster_manager_interface", "@envoy//source/common/buffer:buffer_lib", "@envoy//source/common/common:lock_guard_lib", "@envoy//source/common/common:minimal_logger_lib", "@envoy//source/common/common:thread_lib", + "@envoy//source/common/common:thread_synchronizer_lib", + "@envoy//source/common/http:codec_helper_lib", "@envoy//source/common/http:headers_lib", "@envoy//source/common/http:utility_lib", ], diff --git a/library/common/http/dispatcher.cc b/library/common/http/dispatcher.cc index 2d4faef6d6..12be3b7590 100644 --- a/library/common/http/dispatcher.cc +++ b/library/common/http/dispatcher.cc @@ -8,39 +8,70 @@ #include "library/common/buffer/bridge_fragment.h" #include "library/common/buffer/utility.h" #include "library/common/http/header_utility.h" +#include "library/common/network/synthetic_address_impl.h" +#include "library/common/thread/lock_guard.h" namespace Envoy { namespace Http { -Dispatcher::DirectStreamCallbacks::DirectStreamCallbacks(envoy_stream_t stream, +/** + * IMPORTANT: stream closure semantics in envoy mobile depends on the fact that the HCM fires a + * stream reset when the remote side of the stream is closed but the local side remains open. + * In other words the HCM (like the rest of Envoy) dissallows locally half-open streams. + * If this changes in Envoy, this file will need to change as well. + * For implementation details @see Dispatcher::DirectStreamCallbacks::closeRemote. + */ + +Dispatcher::DirectStreamCallbacks::DirectStreamCallbacks(DirectStream& direct_stream, envoy_http_callbacks bridge_callbacks, Dispatcher& http_dispatcher) - : stream_handle_(stream), bridge_callbacks_(bridge_callbacks), + : direct_stream_(direct_stream), bridge_callbacks_(bridge_callbacks), http_dispatcher_(http_dispatcher) {} -void Dispatcher::DirectStreamCallbacks::onHeaders(HeaderMapPtr&& headers, bool end_stream) { - ENVOY_LOG(debug, "[S{}] response headers for stream (end_stream={}):\n{}", stream_handle_, - end_stream, *headers); +void Dispatcher::DirectStreamCallbacks::encodeHeaders(const HeaderMap& headers, bool end_stream) { + ENVOY_LOG(debug, "[S{}] response headers for stream (end_stream={}):\n{}", + direct_stream_.stream_handle_, end_stream, headers); // TODO: ***HACK*** currently Envoy sends local replies in cases where an error ought to be // surfaced via the error path. There are ways we can clean up Envoy's local reply path to // make this possible, but nothing expedient. For the immediate term this is our only real // option. See https://github.com/lyft/envoy-mobile/issues/460 // The presence of EnvoyUpstreamServiceTime implies these headers are not due to a local reply. - if (headers->get(Headers::get().EnvoyUpstreamServiceTime) != nullptr) { - envoy_headers bridge_headers = Utility::toBridgeHeaders(*headers); - bridge_callbacks_.on_headers(bridge_headers, end_stream, bridge_callbacks_.context); + if (headers.get(Headers::get().EnvoyUpstreamServiceTime) != nullptr) { + // Testing hook. + http_dispatcher_.synchronizer_.syncPoint("dispatch_encode_headers"); + + // @see Dispatcher::DirectStream::dispatch_lock_ for why this lock is necessary. + Thread::BasicLockable* mutex = end_stream ? nullptr : &direct_stream_.dispatch_lock_; + Thread::OptionalReleasableLockGuard lock(mutex); + if (direct_stream_.dispatchable(end_stream)) { + ENVOY_LOG(debug, + "[S{}] dispatching to platform response headers for stream (end_stream={}):\n{}", + direct_stream_.stream_handle_, end_stream, headers); + bridge_callbacks_.on_headers(Utility::toBridgeHeaders(headers), end_stream, + bridge_callbacks_.context); + lock.release(); + closeRemote(end_stream); + } return; } - // We assume that local replies represent error conditions, having audited occurrences in - // Envoy today. This is not a good long-term solution. - uint64_t response_status = Http::Utility::getResponseStatus(*headers); + // Deal with a local response based on the HTTP status code received. Envoy Mobile treats + // successful local responses as actual success. Envoy Mobile surfaces non-200 local responses as + // errors via callbacks rather than an HTTP response. This is inline with behaviour of other + // mobile networking libraries. + uint64_t response_status = Http::Utility::getResponseStatus(headers); switch (response_status) { case 200: { - // We still treat successful local responses as actual success. - envoy_headers bridge_headers = Utility::toBridgeHeaders(*headers); - bridge_callbacks_.on_headers(bridge_headers, end_stream, bridge_callbacks_.context); + // @see Dispatcher::DirectStream::dispatch_lock_ for why this lock is necessary. + Thread::BasicLockable* mutex = end_stream ? nullptr : &direct_stream_.dispatch_lock_; + Thread::OptionalReleasableLockGuard lock(mutex); + if (direct_stream_.dispatchable(end_stream)) { + bridge_callbacks_.on_headers(Utility::toBridgeHeaders(headers), end_stream, + bridge_callbacks_.context); + lock.release(); + closeRemote(end_stream); + } return; } case 503: @@ -49,76 +80,142 @@ void Dispatcher::DirectStreamCallbacks::onHeaders(HeaderMapPtr&& headers, bool e default: error_code_ = ENVOY_UNDEFINED_ERROR; } - ENVOY_LOG(debug, "[S{}] intercepted local response", stream_handle_); + ENVOY_LOG(debug, "[S{}] intercepted local response", direct_stream_.stream_handle_); if (end_stream) { - // The local stream may or may not have completed. We don't want to be tracking/synchronized - // on that state, so we just reset everything now to ensure teardown. - auto stream = http_dispatcher_.getStream(stream_handle_); - ASSERT(stream); - stream->underlying_stream_.reset(); + // The local stream may or may not have completed. + // If the local is not closed envoy will fire the reset for us. + // @see Dispatcher::DirectStreamCallbacks::closeRemote. + // Otherwise fire the reset from here. + if (direct_stream_.local_closed_) { + onReset(); + } } } -void Dispatcher::DirectStreamCallbacks::onData(Buffer::Instance& data, bool end_stream) { - ENVOY_LOG(debug, "[S{}] response data for stream (length={} end_stream={})", stream_handle_, - data.length(), end_stream); +void Dispatcher::DirectStreamCallbacks::encodeData(Buffer::Instance& data, bool end_stream) { + ENVOY_LOG(debug, "[S{}] response data for stream (length={} end_stream={})", + direct_stream_.stream_handle_, data.length(), end_stream); if (!error_code_.has_value()) { - bridge_callbacks_.on_data(Buffer::Utility::toBridgeData(data), end_stream, - bridge_callbacks_.context); + // @see Dispatcher::DirectStream::dispatch_lock_ for why this lock is necessary. + Thread::BasicLockable* mutex = end_stream ? nullptr : &direct_stream_.dispatch_lock_; + Thread::OptionalReleasableLockGuard lock(mutex); + if (direct_stream_.dispatchable(end_stream)) { + ENVOY_LOG(debug, + "[S{}] dispatching to platform response data for stream (length={} end_stream={})", + direct_stream_.stream_handle_, data.length(), end_stream); + bridge_callbacks_.on_data(Buffer::Utility::toBridgeData(data), end_stream, + bridge_callbacks_.context); + lock.release(); + closeRemote(end_stream); + } } else { - ASSERT(end_stream); + ASSERT(end_stream, "local response has to end the stream with a data frame. If Envoy changes " + "this expectation, this code needs to be updated."); error_message_ = Buffer::Utility::toBridgeData(data); - // The local stream may or may not have completed. We don't want to be tracking/synchronized on - // that state, so we just reset everything now to ensure teardown. - auto stream = http_dispatcher_.getStream(stream_handle_); - ASSERT(stream); - stream->underlying_stream_.reset(); + // The local stream may or may not have completed. + // If the local is not closed envoy will fire the reset for us. + // @see Dispatcher::DirectStreamCallbacks::closeRemote. + // Otherwise fire the reset from here. + if (direct_stream_.local_closed_) { + onReset(); + } } } -void Dispatcher::DirectStreamCallbacks::onTrailers(HeaderMapPtr&& trailers) { - ENVOY_LOG(debug, "[S{}] response trailers for stream:\n{}", stream_handle_, *trailers); - bridge_callbacks_.on_trailers(Utility::toBridgeHeaders(*trailers), bridge_callbacks_.context); +void Dispatcher::DirectStreamCallbacks::encodeTrailers(const HeaderMap& trailers) { + ENVOY_LOG(debug, "[S{}] response trailers for stream:\n{}", direct_stream_.stream_handle_, + trailers); + if (direct_stream_.dispatchable(true)) { + ENVOY_LOG(debug, "[S{}] dispatching to platform response trailers for stream:\n{}", + direct_stream_.stream_handle_, trailers); + bridge_callbacks_.on_trailers(Utility::toBridgeHeaders(trailers), bridge_callbacks_.context); + closeRemote(true); + } } -void Dispatcher::DirectStreamCallbacks::onComplete() { - ENVOY_LOG(debug, "[S{}] complete stream", stream_handle_); - bridge_callbacks_.on_complete(bridge_callbacks_.context); - // Very important: onComplete and onReset both clean up stream state in the http dispatcher - // because the underlying async client implementation **guarantees** that only onComplete **or** - // onReset will be fired for a stream. This means it is safe to clean up the stream when either of - // the terminal callbacks fire without keeping additional state in this layer. - http_dispatcher_.cleanup(stream_handle_); +// n.b: all calls to closeRemote are guarded by a call to dispatchable. Hence the on_complete call +// here does not, and should not call dispatchable. +void Dispatcher::DirectStreamCallbacks::closeRemote(bool end_stream) { + if (end_stream) { + // Envoy itself does not currently allow half-open streams where the local half is open + // but the remote half is closed. Therefore, we fire the on_complete callback + // to the platform layer whenever remote closes. + ENVOY_LOG(debug, "[S{}] complete stream", direct_stream_.stream_handle_); + bridge_callbacks_.on_complete(bridge_callbacks_.context); + // Likewise cleanup happens whenever remote closes even though + // local might be open. Note that if local is open Envoy will reset the stream. Calling cleanup + // here is fine because the stream reset will come through synchronously in the same thread as + // this closeRemote code. Because DirectStream deletion is deferred, the deletion will happen + // necessarily after the reset occurs. Thus Dispatcher::DirectStreamCallbacks::onReset will + // **not** have a dangling reference. + ENVOY_LOG(debug, "[S{}] scheduling cleanup", direct_stream_.stream_handle_); + http_dispatcher_.cleanup(direct_stream_.stream_handle_); + } } +Stream& Dispatcher::DirectStreamCallbacks::getStream() { return direct_stream_; } + void Dispatcher::DirectStreamCallbacks::onReset() { - ENVOY_LOG(debug, "[S{}] remote reset stream", stream_handle_); + ENVOY_LOG(debug, "[S{}] remote reset stream", direct_stream_.stream_handle_); envoy_error_code_t code = error_code_.value_or(ENVOY_STREAM_RESET); envoy_data message = error_message_.value_or(envoy_nodata); - bridge_callbacks_.on_error({code, message}, bridge_callbacks_.context); - // Very important: onComplete and onReset both clean up stream state in the http dispatcher - // because the underlying async client implementation **guarantees** that only onComplete **or** - // onReset will be fired for a stream. This means it is safe to clean up the stream when either of - // the terminal callbacks fire without keeping additional state in this layer. - http_dispatcher_.cleanup(stream_handle_); + + // Testing hook. + http_dispatcher_.synchronizer_.syncPoint("dispatch_on_error"); + + // direct_stream_ will not be a dangling reference even in the case that closeRemote cleaned up + // because in that case this reset is happening synchronously, with the encoding call that called + // closeRemote, in the Envoy Main thread. Hence DirectStream destruction which is posted on the + // Envoy Main thread's event loop will strictly happen after this direct_stream_ reference is + // used. @see Dispatcher::DirectStreamCallbacks::closeRemote() for more details. + if (direct_stream_.dispatchable(true)) { + ENVOY_LOG(debug, "[S{}] dispatching to platform remote reset stream", + direct_stream_.stream_handle_); + bridge_callbacks_.on_error({code, message}, bridge_callbacks_.context); + + // All the terminal callbacks only cleanup if they are dispatchable. + // This ensures that cleanup will happen exactly one time. + http_dispatcher_.cleanup(direct_stream_.stream_handle_); + } } -Dispatcher::DirectStream::DirectStream(envoy_stream_t stream_handle, - AsyncClient::Stream& underlying_stream, - DirectStreamCallbacksPtr&& callbacks) - : stream_handle_(stream_handle), underlying_stream_(underlying_stream), - callbacks_(std::move(callbacks)) {} - -AsyncClient::StreamOptions -Dispatcher::DirectStream::toNativeStreamOptions(envoy_stream_options stream_options) { - AsyncClient::StreamOptions native_stream_options; - native_stream_options.setBufferBodyForRetry(stream_options.buffer_body_for_retry); - return native_stream_options; +void Dispatcher::DirectStreamCallbacks::onCancel() { + // This call is guarded at the call-site @see Dispatcher::DirectStream::resetStream(). + // Therefore, it is dispatched here without protection. + ENVOY_LOG(debug, "[S{}] dispatching to platform cancel stream", direct_stream_.stream_handle_); + bridge_callbacks_.on_cancel(bridge_callbacks_.context); } -void Dispatcher::ready(Event::Dispatcher& event_dispatcher, - Upstream::ClusterManager& cluster_manager) { - Thread::LockGuard lock(dispatch_lock_); +Dispatcher::DirectStream::DirectStream(envoy_stream_t stream_handle, Dispatcher& http_dispatcher) + : stream_handle_(stream_handle), parent_(http_dispatcher) {} + +Dispatcher::DirectStream::~DirectStream() { + ENVOY_LOG(debug, "[S{}] destroy stream", stream_handle_); +} + +// TODO(junr03): map from StreamResetReason to Envoy Mobile's error types. Right now all resets +// will be ENVOY_STREAM_RESET. +void Dispatcher::DirectStream::resetStream(StreamResetReason) { callbacks_->onReset(); } + +void Dispatcher::DirectStream::closeLocal(bool end_stream) { + // TODO: potentially guard against double local closure. + local_closed_ = end_stream; + + // No cleanup happens here because cleanup always happens on remote closure or local reset. + // @see Dispatcher::DirectStreamCallbacks::closeRemote, and @see Dispatcher::resetStream, + // respectively. +} + +bool Dispatcher::DirectStream::dispatchable(bool close) { + if (close) { + // Set closed to true and return true if not previously closed. + return !closed_.exchange(close); + } + return !closed_.load(); +} + +void Dispatcher::ready(Event::Dispatcher& event_dispatcher, ApiListener& api_listener) { + Thread::LockGuard lock(ready_lock_); // Drain the init_queue_ into the event_dispatcher_. for (const Event::PostCb& cb : init_queue_) { @@ -128,11 +225,11 @@ void Dispatcher::ready(Event::Dispatcher& event_dispatcher, // Ordering somewhat matters here if concurrency guarantees are loosened (e.g. if // we rely on atomics instead of locks). event_dispatcher_ = &event_dispatcher; - cluster_manager_ = &cluster_manager; + api_listener_ = &api_listener; } void Dispatcher::post(Event::PostCb callback) { - Thread::LockGuard lock(dispatch_lock_); + Thread::LockGuard lock(ready_lock_); // If the event_dispatcher_ is set, then post the functor directly to it. if (event_dispatcher_ != nullptr) { @@ -146,33 +243,25 @@ void Dispatcher::post(Event::PostCb callback) { } Dispatcher::Dispatcher(std::atomic& preferred_network) - : preferred_network_(preferred_network) {} + : preferred_network_(preferred_network), + address_(std::make_shared()) {} envoy_status_t Dispatcher::startStream(envoy_stream_t new_stream_handle, envoy_http_callbacks bridge_callbacks, - envoy_stream_options stream_options) { - post([this, new_stream_handle, bridge_callbacks, stream_options]() -> void { - DirectStreamCallbacksPtr callbacks = - std::make_unique(new_stream_handle, bridge_callbacks, *this); - - AsyncClient& async_client = getClient(); - // While this struct is passed by reference to AsyncClient::start, it does not need to be - // preserved outside of this stack frame because its values are not used beyond the return of - // AsyncClient::start. If this changes, we need to store this struct in the DirectStream. - AsyncClient::StreamOptions native_stream_options = - Dispatcher::DirectStream::toNativeStreamOptions(stream_options); - AsyncClient::Stream* underlying_stream = async_client.start(*callbacks, native_stream_options); - - if (!underlying_stream) { - // TODO: this callback might fire before the startStream function returns. - // Take this into account when thinking about stream cancellation. - callbacks->onReset(); - } else { - DirectStreamPtr direct_stream = std::make_unique( - new_stream_handle, *underlying_stream, std::move(callbacks)); - streams_.emplace(new_stream_handle, std::move(direct_stream)); - ENVOY_LOG(debug, "[S{}] start stream", new_stream_handle); - } + envoy_stream_options) { + post([this, new_stream_handle, bridge_callbacks]() -> void { + Dispatcher::DirectStreamSharedPtr direct_stream{new DirectStream(new_stream_handle, *this)}; + direct_stream->callbacks_ = + std::make_unique(*direct_stream, bridge_callbacks, *this); + + // Only the initial setting of the api_listener_ is guarded. + direct_stream->stream_decoder_ = + &TS_UNCHECKED_READ(api_listener_)->newStream(*direct_stream->callbacks_); + + Thread::ReleasableLockGuard lock(streams_lock_); + streams_.emplace(new_stream_handle, std::move(direct_stream)); + lock.release(); + ENVOY_LOG(debug, "[S{}] start stream", new_stream_handle); }); return ENVOY_SUCCESS; @@ -181,7 +270,7 @@ envoy_status_t Dispatcher::startStream(envoy_stream_t new_stream_handle, envoy_status_t Dispatcher::sendHeaders(envoy_stream_t stream, envoy_headers headers, bool end_stream) { post([this, stream, headers, end_stream]() -> void { - DirectStream* direct_stream = getStream(stream); + Dispatcher::DirectStreamSharedPtr direct_stream = getStream(stream); // If direct_stream is not found, it means the stream has already closed or been reset // and the appropriate callback has been issued to the caller. There's nothing to do here // except silently swallow this. @@ -189,11 +278,13 @@ envoy_status_t Dispatcher::sendHeaders(envoy_stream_t stream, envoy_headers head // first place. Additionally it is possible to get a nullptr due to bogus envoy_stream_t // from the caller. // https://github.com/lyft/envoy-mobile/issues/301 - if (direct_stream != nullptr) { - direct_stream->headers_ = Utility::toInternalHeaders(headers); + if (direct_stream) { + HeaderMapPtr internal_headers = Utility::toInternalHeaders(headers); ENVOY_LOG(debug, "[S{}] request headers for stream (end_stream={}):\n{}", stream, end_stream, - *direct_stream->headers_); - direct_stream->underlying_stream_.sendHeaders(*direct_stream->headers_, end_stream); + *internal_headers); + attachPreferredNetwork(*internal_headers); + direct_stream->stream_decoder_->decodeHeaders(std::move(internal_headers), end_stream); + direct_stream->closeLocal(end_stream); } }); @@ -202,7 +293,7 @@ envoy_status_t Dispatcher::sendHeaders(envoy_stream_t stream, envoy_headers head envoy_status_t Dispatcher::sendData(envoy_stream_t stream, envoy_data data, bool end_stream) { post([this, stream, data, end_stream]() -> void { - DirectStream* direct_stream = getStream(stream); + Dispatcher::DirectStreamSharedPtr direct_stream = getStream(stream); // If direct_stream is not found, it means the stream has already closed or been reset // and the appropriate callback has been issued to the caller. There's nothing to do here // except silently swallow this. @@ -210,14 +301,15 @@ envoy_status_t Dispatcher::sendData(envoy_stream_t stream, envoy_data data, bool // first place. Additionally it is possible to get a nullptr due to bogus envoy_stream_t // from the caller. // https://github.com/lyft/envoy-mobile/issues/301 - if (direct_stream != nullptr) { - // The buffer is moved internally, in a synchronous fashion, so we don't need the lifetime of - // the InstancePtr to outlive this function call. + if (direct_stream) { + // The buffer is moved internally, in a synchronous fashion, so we don't need the lifetime + // of the InstancePtr to outlive this function call. Buffer::InstancePtr buf = Buffer::Utility::toInternalData(data); ENVOY_LOG(debug, "[S{}] request data for stream (length={} end_stream={})\n", stream, data.length, end_stream); - direct_stream->underlying_stream_.sendData(*buf, end_stream); + direct_stream->stream_decoder_->decodeData(*buf, end_stream); + direct_stream->closeLocal(end_stream); } }); @@ -229,7 +321,7 @@ envoy_status_t Dispatcher::sendMetadata(envoy_stream_t, envoy_headers) { return envoy_status_t Dispatcher::sendTrailers(envoy_stream_t stream, envoy_headers trailers) { post([this, stream, trailers]() -> void { - DirectStream* direct_stream = getStream(stream); + Dispatcher::DirectStreamSharedPtr direct_stream = getStream(stream); // If direct_stream is not found, it means the stream has already closed or been reset // and the appropriate callback has been issued to the caller. There's nothing to do here // except silently swallow this. @@ -237,10 +329,11 @@ envoy_status_t Dispatcher::sendTrailers(envoy_stream_t stream, envoy_headers tra // first place. Additionally it is possible to get a nullptr due to bogus envoy_stream_t // from the caller. // https://github.com/lyft/envoy-mobile/issues/301 - if (direct_stream != nullptr) { - direct_stream->trailers_ = Utility::toInternalHeaders(trailers); - ENVOY_LOG(debug, "[S{}] request trailers for stream:\n{}", stream, *direct_stream->trailers_); - direct_stream->underlying_stream_.sendTrailers(*direct_stream->trailers_); + if (direct_stream) { + HeaderMapPtr internal_trailers = Utility::toInternalHeaders(trailers); + ENVOY_LOG(debug, "[S{}] request trailers for stream:\n{}", stream, *internal_trailers); + direct_stream->stream_decoder_->decodeTrailers(std::move(internal_trailers)); + direct_stream->closeLocal(true); } }); @@ -248,52 +341,102 @@ envoy_status_t Dispatcher::sendTrailers(envoy_stream_t stream, envoy_headers tra } envoy_status_t Dispatcher::resetStream(envoy_stream_t stream) { - post([this, stream]() -> void { - DirectStream* direct_stream = getStream(stream); - if (direct_stream) { - direct_stream->underlying_stream_.reset(); + // Testing hook. + synchronizer_.syncPoint("getStream_on_cancel"); + + Dispatcher::DirectStreamSharedPtr direct_stream = getStream(stream); + if (direct_stream) { + + // Testing hook. + synchronizer_.syncPoint("dispatch_on_cancel"); + + // @see Dispatcher::DirectStream::dispatch_lock_ for why this lock is necessary. + Thread::ReleasableLockGuard lock(direct_stream->dispatch_lock_); + if (direct_stream->dispatchable(true)) { + direct_stream->callbacks_->onCancel(); + lock.release(); + // n.b: this is guarded by the call above. If the onCancel is not dispatchable then that means + // that another terminal callback has already happened. All terminal callbacks clean up stream + // state, so there is no need to dispatch here. + post([this, stream]() -> void { + Dispatcher::DirectStreamSharedPtr direct_stream = getStream(stream); + if (direct_stream) { + // This interaction is important. The runResetCallbacks call synchronously causes Envoy to + // defer delete the HCM's ActiveStream. That means that the lifetime of the DirectStream + // only needs to be as long as that deferred delete. Therefore, we synchronously call + // cleanup here which will defer delete the DirectStream, which by definition will be + // scheduled **after** the HCM's defer delete as they are scheduled on the same dispatcher + // context. + // + // StreamResetReason::RemoteReset is used as the platform code that issues the + // cancellation is considered the remote. + direct_stream->runResetCallbacks(StreamResetReason::RemoteReset); + cleanup(direct_stream->stream_handle_); + } + }); } - }); - return ENVOY_SUCCESS; -} - -// Select the client based on the current preferred network. This helps to ensure that -// the engine uses connections opened on the current favored interface. -AsyncClient& Dispatcher::getClient() { - // This function must be called from the dispatcher's own thread and so this state - // is safe to access without holding the dispatch_lock_. - ASSERT(TS_UNCHECKED_READ(event_dispatcher_)->isThreadSafe(), - "cluster interaction must be performed on the event_dispatcher_'s thread."); - switch (preferred_network_.load()) { - case ENVOY_NET_WLAN: - // The ASSERT above ensures the cluster_manager_ is safe to access. - return TS_UNCHECKED_READ(cluster_manager_)->httpAsyncClientForCluster("base_wlan"); - case ENVOY_NET_WWAN: - return TS_UNCHECKED_READ(cluster_manager_)->httpAsyncClientForCluster("base_wwan"); - case ENVOY_NET_GENERIC: - default: - return TS_UNCHECKED_READ(cluster_manager_)->httpAsyncClientForCluster("base"); + return ENVOY_SUCCESS; } + return ENVOY_FAILURE; } -Dispatcher::DirectStream* Dispatcher::getStream(envoy_stream_t stream) { - // The dispatch_lock_ does not need to guard the event_dispatcher_ pointer here because this - // function should only be called from the context of Envoy's event dispatcher. - ASSERT(TS_UNCHECKED_READ(event_dispatcher_)->isThreadSafe(), - "stream interaction must be performed on the event_dispatcher_'s thread."); +Dispatcher::DirectStreamSharedPtr Dispatcher::getStream(envoy_stream_t stream) { + Thread::LockGuard lock(streams_lock_); + auto direct_stream_pair_it = streams_.find(stream); - return (direct_stream_pair_it != streams_.end()) ? direct_stream_pair_it->second.get() : nullptr; + // Returning will copy the shared_ptr and increase the ref count. Moreover, this is safe because + // creation of the return value happens before destruction of local variables: + // https://stackoverflow.com/questions/33150508/in-c-which-happens-first-the-copy-of-a-return-object-or-local-objects-destru + return (direct_stream_pair_it != streams_.end()) ? direct_stream_pair_it->second : nullptr; } void Dispatcher::cleanup(envoy_stream_t stream_handle) { - DirectStream* direct_stream = getStream(stream_handle); - + ASSERT(TS_UNCHECKED_READ(event_dispatcher_)->isThreadSafe(), + "stream cleanup must be performed on the event_dispatcher_'s thread."); + Dispatcher::DirectStreamSharedPtr direct_stream = getStream(stream_handle); RELEASE_ASSERT(direct_stream, "cleanup is a private method that is only called with stream ids that exist"); + // The DirectStream should live through synchronous code that already has a reference to it. + // Hence why it is scheduled for deferred deletion. If this was all that was needed then it + // would be sufficient to return a shared_ptr in getStream. However, deferred deletion is still + // required because in Dispatcher::resetStream the DirectStream needs to live for as long and + // the HCM's ActiveStream lives. Hence its deletion needs to live beyond the synchronous code in + // Dispatcher::resetStream. + // TODO: currently upstream Envoy does not have a deferred delete version for shared_ptr. This + // means that instead of efficiently queuing the deletes for one event in the event loop, all + // deletes here get queued up as individual posts. + TS_UNCHECKED_READ(event_dispatcher_)->post([direct_stream]() -> void { + ENVOY_LOG(debug, "[S{}] deferred deletion of stream", direct_stream->stream_handle_); + }); + // However, the entry in the map should not exist after cleanup. + // Hence why it is synchronously erased from the streams map. + Thread::ReleasableLockGuard lock(streams_lock_); size_t erased = streams_.erase(stream_handle); + lock.release(); ASSERT(erased == 1, "cleanup should always remove one entry from the streams map"); - ENVOY_LOG(debug, "[S{}] remove stream", stream_handle); + ENVOY_LOG(debug, "[S{}] erased stream from streams container", stream_handle); +} + +namespace { +const LowerCaseString ClusterHeader{"x-envoy-mobile-cluster"}; +const std::string BaseCluster = "base"; +const std::string BaseWlanCluster = "base_wlan"; +const std::string BaseWwanCluster = "base_wwan"; +} // namespace + +void Dispatcher::attachPreferredNetwork(HeaderMap& headers) { + switch (preferred_network_.load()) { + case ENVOY_NET_WLAN: + headers.addReference(ClusterHeader, BaseWlanCluster); + break; + case ENVOY_NET_WWAN: + headers.addReference(ClusterHeader, BaseWwanCluster); + break; + case ENVOY_NET_GENERIC: + default: + headers.addReference(ClusterHeader, BaseCluster); + } } } // namespace Http diff --git a/library/common/http/dispatcher.h b/library/common/http/dispatcher.h index f08fb6d993..636783d593 100644 --- a/library/common/http/dispatcher.h +++ b/library/common/http/dispatcher.h @@ -3,13 +3,17 @@ #include #include "envoy/buffer/buffer.h" +#include "envoy/event/deferred_deletable.h" #include "envoy/event/dispatcher.h" +#include "envoy/http/api_listener.h" #include "envoy/http/async_client.h" +#include "envoy/http/codec.h" #include "envoy/http/header_map.h" -#include "envoy/upstream/cluster_manager.h" #include "common/common/logger.h" #include "common/common/thread.h" +#include "common/common/thread_synchronizer.h" +#include "common/http/codec_helper.h" #include "absl/types/optional.h" #include "library/common/types/c_types.h" @@ -25,7 +29,7 @@ class Dispatcher : public Logger::Loggable { public: Dispatcher(std::atomic& preferred_network); - void ready(Event::Dispatcher& event_dispatcher, Upstream::ClusterManager& cluster_manager); + void ready(Event::Dispatcher& event_dispatcher, ApiListener& api_listener); /** * Attempts to open a new stream to the remote. Note that this function is asynchronous and @@ -83,7 +87,11 @@ class Dispatcher : public Logger::Loggable { */ envoy_status_t resetStream(envoy_stream_t stream); + Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; } + private: + class DirectStream; + /** * Notifies caller of async HTTP stream status. * Note the HTTP stream is full-duplex, even if the local to remote stream has been ended @@ -91,21 +99,26 @@ class Dispatcher : public Logger::Loggable { * DirectStreamCallbacks can continue to receive events until the remote to local stream is * closed, or resetStream is called. */ - class DirectStreamCallbacks : public AsyncClient::StreamCallbacks, - public Logger::Loggable { + class DirectStreamCallbacks : public StreamEncoder, public Logger::Loggable { public: - DirectStreamCallbacks(envoy_stream_t stream_handle, envoy_http_callbacks bridge_callbacks, + DirectStreamCallbacks(DirectStream& direct_stream, envoy_http_callbacks bridge_callbacks, Dispatcher& http_dispatcher); - // AsyncClient::StreamCallbacks - void onHeaders(HeaderMapPtr&& headers, bool end_stream) override; - void onData(Buffer::Instance& data, bool end_stream) override; - void onTrailers(HeaderMapPtr&& trailers) override; - void onComplete() override; - void onReset() override; + void onReset(); + void onCancel(); + void closeRemote(bool end_stream); + + // StreamEncoder + void encodeHeaders(const HeaderMap& headers, bool end_stream) override; + void encodeData(Buffer::Instance& data, bool end_stream) override; + void encodeTrailers(const HeaderMap& trailers) override; + Stream& getStream() override; + // TODO: implement + void encode100ContinueHeaders(const HeaderMap&) override {} + void encodeMetadata(const MetadataMapVector&) override {} private: - const envoy_stream_t stream_handle_; + DirectStream& direct_stream_; const envoy_http_callbacks bridge_callbacks_; absl::optional error_code_; absl::optional error_message_; @@ -118,48 +131,97 @@ class Dispatcher : public Logger::Loggable { * Contains state about an HTTP stream; both in the outgoing direction via an underlying * AsyncClient::Stream and in the incoming direction via DirectStreamCallbacks. */ - class DirectStream { + class DirectStream : public Stream, + public StreamCallbackHelper, + public Logger::Loggable { public: - DirectStream(envoy_stream_t stream_handle, AsyncClient::Stream& underlying_stream, - DirectStreamCallbacksPtr&& callbacks); - - static AsyncClient::StreamOptions toNativeStreamOptions(envoy_stream_options stream_options); + DirectStream(envoy_stream_t stream_handle, Dispatcher& http_dispatcher); + ~DirectStream(); + + // Stream + void addCallbacks(StreamCallbacks& callbacks) override { addCallbacks_(callbacks); } + void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacks_(callbacks); } + void resetStream(StreamResetReason) override; + const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override { + return parent_.address_; + } + // TODO: stream watermark control. + void readDisable(bool) override {} + uint32_t bufferLimit() override { return 65000; } + + void closeLocal(bool end_stream); + + /** + * Return whether a callback should be allowed to continue with execution. + * This ensures at most one 'terminal' callback is issued for any given stream. + * + * @param close, whether the DirectStream should close if it has not closed before. + * @return bool, whether callbacks on this stream are dispatchable or not. + */ + bool dispatchable(bool close); const envoy_stream_t stream_handle_; + // https://github.com/lyft/envoy-mobile/pull/616 moved stream cancellation (and its atomic + // state) from the platform layer to the core layer, here. This change was made to solidify two + // platform-level implementations into one implementation in the core layer. Moreover, it + // allowed Envoy Mobile to have test coverage where it didn't before. + + // However, it introduced a subtle race between Dispatcher::resetStream's onCancel and any of + // encodeHeaders/Data's callbacks that are _not_ terminal. The race happens because the two + // callbacks are being enqueued onto the same dispatch queue/ran on the same executor by two + // different threading contexts _after_ the atomic check of the closed_ state happens. This + // means that they could be serialized in either order; whereas we want to guarantee that _no_ + // callback will be executed after onCancel fires in the application. The lock protects the + // critical region between the call to dispatchable, and after the call that dispatches the + // appropriate callback. There should not be much lock contention because most calls will happen + // from the single-threaded context of the Envoy Main thread (encodeHeaders/Data). Alternative + // solutions will be considered in: https://github.com/lyft/envoy-mobile/issues/647 + Thread::MutexBasicLockable dispatch_lock_; + std::atomic closed_{}; + bool local_closed_{}; + // Used to issue outgoing HTTP stream operations. - AsyncClient::Stream& underlying_stream_; + StreamDecoder* stream_decoder_; // Used to receive incoming HTTP stream operations. - const DirectStreamCallbacksPtr callbacks_; + DirectStreamCallbacksPtr callbacks_; + Dispatcher& parent_; - HeaderMapPtr headers_; // TODO: because the client may send infinite metadata frames we need some ongoing way to // free metadata ahead of object destruction. // An implementation option would be to have drainable header maps, or done callbacks. std::vector metadata_; - HeaderMapPtr trailers_; }; - using DirectStreamPtr = std::unique_ptr; + using DirectStreamSharedPtr = std::shared_ptr; /** * Post a functor to the dispatcher. This is safe cross thread. * @param callback, the functor to post. */ void post(Event::PostCb callback); - // Everything in the below interface must only be accessed from the event_dispatcher's thread. - // This allows us to generally avoid synchronization. - AsyncClient& getClient(); - DirectStream* getStream(envoy_stream_t stream_handle); + DirectStreamSharedPtr getStream(envoy_stream_t stream_handle); void cleanup(envoy_stream_t stream_handle); - - // The dispatch_lock_ and init_queue_, and event_dispatcher_ are the only member state that may - // be accessed from a thread other than the event_dispatcher's own thread. - Thread::MutexBasicLockable dispatch_lock_; - std::list init_queue_ GUARDED_BY(dispatch_lock_); - Event::Dispatcher* event_dispatcher_ GUARDED_BY(dispatch_lock_){}; - Upstream::ClusterManager* cluster_manager_ GUARDED_BY(dispatch_lock_){}; - std::unordered_map streams_; + void attachPreferredNetwork(HeaderMap& headers); + + Thread::MutexBasicLockable ready_lock_; + std::list init_queue_ GUARDED_BY(ready_lock_); + Event::Dispatcher* event_dispatcher_ GUARDED_BY(ready_lock_){}; + ApiListener* api_listener_ GUARDED_BY(ready_lock_){}; + // std::unordered_map does is not safe for concurrent access. Thus a cross-thread, concurrent find + // in cancellation (which happens in a platform thread) with an erase (which always happens in the + // Envoy Main thread) is not safe. + // TODO: implement a lock-free access scheme here. + Thread::MutexBasicLockable streams_lock_; + // streams_ holds shared_ptr in order to allow cancellation to happen synchronously even though + // DirectStream cleanup happens asynchronously. This is also done to keep the scope of the + // streams_lock_ small to make it easier to remove; one could easily use the lock in the + // Dispatcher::resetStream to avoid using shared_ptrs. + // @see Dispatcher::resetStream. + std::unordered_map streams_ GUARDED_BY(streams_lock_); std::atomic& preferred_network_; + // Shared synthetic address across DirectStreams. + Network::Address::InstanceConstSharedPtr address_; + Thread::ThreadSynchronizer synchronizer_; }; } // namespace Http diff --git a/library/common/jni_interface.cc b/library/common/jni_interface.cc index 9d7c88fb3e..cecde25c56 100644 --- a/library/common/jni_interface.cc +++ b/library/common/jni_interface.cc @@ -214,8 +214,13 @@ static void jvm_on_error(envoy_error error, void* context) { } static void jvm_on_complete(void* context) { - JNIEnv* env = nullptr; - static_jvm->GetEnv((void**)&env, JNI_VERSION); + JNIEnv* env = get_env(); + jobject j_context = static_cast(context); + env->DeleteGlobalRef(j_context); +} + +static void jvm_on_cancel(void* context) { + JNIEnv* env = get_env(); jobject j_context = static_cast(context); env->DeleteGlobalRef(j_context); } @@ -294,9 +299,9 @@ extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibra // TODO: To be truly safe we may need stronger guarantees of operation ordering on this ref jobject retained_context = env->NewGlobalRef(j_context); - envoy_http_callbacks native_callbacks = {jvm_on_headers, jvm_on_data, jvm_on_metadata, - jvm_on_trailers, jvm_on_error, jvm_on_complete, - retained_context}; + envoy_http_callbacks native_callbacks = {jvm_on_headers, jvm_on_data, jvm_on_metadata, + jvm_on_trailers, jvm_on_error, jvm_on_complete, + jvm_on_cancel, retained_context}; envoy_stream_options stream_options = {buffer_for_retry == JNI_TRUE ? true : false}; envoy_status_t result = start_stream(static_cast(stream_handle), native_callbacks, stream_options); diff --git a/library/common/main_interface.h b/library/common/main_interface.h index a1aad07d5c..fc24350587 100644 --- a/library/common/main_interface.h +++ b/library/common/main_interface.h @@ -30,6 +30,7 @@ envoy_stream_t init_stream(envoy_engine_t); * can occur. * @param stream, handle to the stream to be started. * @param callbacks, the callbacks that will run the stream callbacks. + * @param options, DEPRECATED. * @return envoy_stream, with a stream handle and a success status, or a failure status. */ envoy_status_t start_stream(envoy_stream_t, envoy_http_callbacks callbacks, diff --git a/library/common/network/BUILD b/library/common/network/BUILD new file mode 100644 index 0000000000..fd40032b9b --- /dev/null +++ b/library/common/network/BUILD @@ -0,0 +1,12 @@ +licenses(["notice"]) # Apache 2 + +load("@envoy//bazel:envoy_build_system.bzl", "envoy_cc_library", "envoy_package") + +envoy_package() + +envoy_cc_library( + name = "synthetic_address_lib", + hdrs = ["synthetic_address_impl.h"], + repository = "@envoy", + deps = ["@envoy//include/envoy/network:address_interface"], +) diff --git a/library/common/network/synthetic_address_impl.h b/library/common/network/synthetic_address_impl.h new file mode 100644 index 0000000000..988d1444d9 --- /dev/null +++ b/library/common/network/synthetic_address_impl.h @@ -0,0 +1,58 @@ +#pragma once + +#include + +#include "envoy/network/address.h" + +namespace Envoy { +namespace Network { +namespace Address { + +// TODO(junr03): https://github.com/envoyproxy/envoy/pull/9362/ introduced API surface to the +// codec's Stream interface that made it necessary for Stream to be aware of its underlying +// connection. This class is created in order to stub out Address for Stream implementations +// that have no backing connection, e.g Envoy Mobile's DirectStream. It might be possible to +// eliminate this dependency. +// TODO(junr03): consider moving this code to Envoy's codebase. +class SyntheticAddressImpl : public Instance { +public: + SyntheticAddressImpl() {} + + bool operator==(const Instance&) const { + // Every synthetic address is different from one another and other address types. In reality, + // whatever object owns a synthetic address can't rely on address equality for any logic as the + // address is just a stub. + return false; + } + + const std::string& asString() const { return address_; } + + absl::string_view asStringView() const { return address_; } + + const std::string& logicalName() const { return address_; } + + Api::SysCallIntResult bind(int) const { + // a socket should never be bound to a synthetic address. + return {-1, EADDRNOTAVAIL}; + } + + Api::SysCallIntResult connect(int) const { + // a socket should never connect to a synthetic address. + return {-1, EPROTOTYPE}; + } + + const Ip* ip() const { return nullptr; } + + IoHandlePtr socket(SocketType) const { return nullptr; } + + Type type() const { + // TODO(junr03): consider adding another type of address. + return Address::Type::Ip; + } + +private: + const std::string address_{"synthetic"}; +}; +} // namespace Address +} // namespace Network +} // namespace Envoy diff --git a/library/common/thread/BUILD b/library/common/thread/BUILD new file mode 100644 index 0000000000..030af50c60 --- /dev/null +++ b/library/common/thread/BUILD @@ -0,0 +1,15 @@ +licenses(["notice"]) # Apache 2 + +load("@envoy//bazel:envoy_build_system.bzl", "envoy_cc_library", "envoy_package") + +envoy_package() + +envoy_cc_library( + name = "lock_guard_lib", + hdrs = ["lock_guard.h"], + repository = "@envoy", + deps = [ + "@envoy//include/envoy/thread:thread_interface", + "@envoy//source/common/common:thread_annotations", + ], +) diff --git a/library/common/thread/lock_guard.h b/library/common/thread/lock_guard.h new file mode 100644 index 0000000000..f8d12ee4af --- /dev/null +++ b/library/common/thread/lock_guard.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include + +#include "envoy/thread/thread.h" + +namespace Envoy { +namespace Thread { + +/** + * A lock guard that deals with an optional lock and allows call-sites to release the lock prior to + * the lock guard going out of scope. + */ +// TODO(junr03): this could be moved to Envoy's codebase. +class ABSL_SCOPED_LOCKABLE OptionalReleasableLockGuard { +public: + /** + * Establishes a scoped mutex-lock. If non-null, the mutex is locked upon construction. + * + * @param lock the mutex. + */ + OptionalReleasableLockGuard(BasicLockable* lock) ABSL_EXCLUSIVE_LOCK_FUNCTION(lock) + : lock_(lock) { + if (lock_ != nullptr) { + lock_->lock(); + } + } + + /** + * Destruction of the OptionalReleasableLockGuard unlocks the lock, if it is non-null and has not + * already been explicitly released. + */ + ~OptionalReleasableLockGuard() ABSL_UNLOCK_FUNCTION() { release(); } + + /** + * Unlocks the mutex. This enables call-sites to release the mutex prior to the Lock going out of + * scope. + */ + void release() ABSL_UNLOCK_FUNCTION() { + if (lock_ != nullptr) { + lock_->unlock(); + lock_ = nullptr; + } + } + +private: + BasicLockable* lock_; // Set to nullptr on unlock, to prevent double-unlocking. +}; + +} // namespace Thread +} // namespace Envoy diff --git a/library/common/types/c_types.h b/library/common/types/c_types.h index 85c2f2d1ca..282d129239 100644 --- a/library/common/types/c_types.h +++ b/library/common/types/c_types.h @@ -195,6 +195,13 @@ typedef void (*envoy_on_error_f)(envoy_error error, void* context); */ typedef void (*envoy_on_complete_f)(void* context); +/** + * Called when the async HTTP stream has been cancelled by the client. + * @param context, contains the necessary state to carry out platform-specific dispatch and + * execution. + */ +typedef void (*envoy_on_cancel_f)(void* context); + /** * Called when the envoy engine is exiting. */ @@ -214,6 +221,7 @@ typedef struct { envoy_on_trailers_f on_trailers; envoy_on_error_f on_error; envoy_on_complete_f on_complete; + envoy_on_cancel_f on_cancel; void* context; // Will be passed through to callbacks to provide dispatch and execution state. } envoy_http_callbacks; diff --git a/library/java/src/io/envoyproxy/envoymobile/engine/EnvoyHTTPStream.java b/library/java/src/io/envoyproxy/envoymobile/engine/EnvoyHTTPStream.java index fb5773bc1b..4743b24cec 100644 --- a/library/java/src/io/envoyproxy/envoymobile/engine/EnvoyHTTPStream.java +++ b/library/java/src/io/envoyproxy/envoymobile/engine/EnvoyHTTPStream.java @@ -77,14 +77,7 @@ public void sendTrailers(Map> trailers) { * * @return int, success unless the stream has already been canceled. */ - public int cancel() { - // Determine if this cancellation results in bidirectional signaling (i.e. if onCancel is being - // fired). - boolean fullCancel = callbacksContext.cancel(); - // Propagate the reset into native code regardless. - JniLibrary.resetStream(streamHandle); - return fullCancel ? 0 : 1; - } + public int cancel() { return JniLibrary.resetStream(streamHandle); } private static byte[][] toJniLibraryHeaders(Map> headers) { // Create array with some room for potential headers that have more than one diff --git a/library/java/src/io/envoyproxy/envoymobile/engine/JvmCallbackContext.java b/library/java/src/io/envoyproxy/envoymobile/engine/JvmCallbackContext.java index a32fe12a55..0f7a92f64e 100644 --- a/library/java/src/io/envoyproxy/envoymobile/engine/JvmCallbackContext.java +++ b/library/java/src/io/envoyproxy/envoymobile/engine/JvmCallbackContext.java @@ -17,7 +17,6 @@ private enum FrameType { TRAILERS, } - private final AtomicBoolean closed = new AtomicBoolean(false); private final EnvoyHTTPCallbacks callbacks; // State-tracking for header accumulation @@ -29,20 +28,6 @@ private enum FrameType { public JvmCallbackContext(EnvoyHTTPCallbacks callbacks) { this.callbacks = callbacks; } - /** - * Return whether a callback should be allowed to continue with execution. This ensures at most - * one 'terminal' callback is issued for any given stream. - * - * @param close, whether the stream should be closed as part of this determination. - */ - private boolean dispatchable(boolean close) { - if (close) { - // Set closed to true and return true if not previously closed. - return !closed.getAndSet(true); - } - return !closed.get(); - } - /** * Initializes state for accumulating header pairs via passHeaders, ultimately * to be dispatched via the callback. @@ -103,10 +88,6 @@ public void passHeader(byte[] key, byte[] value, boolean endHeaders) { Runnable runnable = new Runnable() { public void run() { - if (!dispatchable(endStream)) { - return; - } - switch (frameType) { case HEADERS: callbacks.onHeaders(headers, endStream); @@ -138,9 +119,6 @@ public void run() { public void onData(byte[] data, boolean endStream) { callbacks.getExecutor().execute(new Runnable() { public void run() { - if (!dispatchable(endStream)) { - return; - } ByteBuffer dataBuffer = ByteBuffer.wrap(data); callbacks.onData(dataBuffer, endStream); } @@ -156,9 +134,6 @@ public void run() { public void onError(byte[] message, int errorCode) { callbacks.getExecutor().execute(new Runnable() { public void run() { - if (!dispatchable(true)) { - return; - } String errorMessage = new String(message); callbacks.onError(errorCode, errorMessage); } @@ -177,22 +152,6 @@ public void run() { }); } - /** - * Cancel the callback context atomically so that no further callbacks occur - * other than onCancel. - * - * @return boolean, whether the callback context was closed or not. - */ - public boolean cancel() { - // Atomically close the stream if not already closed. - boolean closed = dispatchable(true); - if (closed) { - // Directly fire callback if closure occurred. - onCancel(); - } - return closed; - } - private void startAccumulation(FrameType type, long length, boolean endStream) { assert headerAccumulator == null; assert pendingFrameType == FrameType.NONE; diff --git a/library/objective-c/EnvoyHTTPStreamImpl.m b/library/objective-c/EnvoyHTTPStreamImpl.m index fc06869edb..ddbc13deb7 100644 --- a/library/objective-c/EnvoyHTTPStreamImpl.m +++ b/library/objective-c/EnvoyHTTPStreamImpl.m @@ -84,26 +84,13 @@ static envoy_headers toNativeHeaders(EnvoyHeaders *headers) { #pragma mark - C callbacks -// Return whether a callback should be allowed to continue with execution. This ensures at most one -// 'terminal' callback is issued for any given stream. -static bool dispatchable(atomic_bool *closed, bool close) { - if (close) { - // Set closed to true and return true if not previously closed. - return !atomic_exchange(closed, YES); - } - // Return true if not yet closed. - return !atomic_load(closed); -} - static void ios_on_headers(envoy_headers headers, bool end_stream, void *context) { ios_context *c = (ios_context *)context; EnvoyHTTPCallbacks *callbacks = c->callbacks; dispatch_async(callbacks.dispatchQueue, ^{ - if (!dispatchable(c->closed, end_stream) || !callbacks.onHeaders) { - return; + if (callbacks.onHeaders) { + callbacks.onHeaders(to_ios_headers(headers), end_stream); } - - callbacks.onHeaders(to_ios_headers(headers), end_stream); }); } @@ -111,11 +98,9 @@ static void ios_on_data(envoy_data data, bool end_stream, void *context) { ios_context *c = (ios_context *)context; EnvoyHTTPCallbacks *callbacks = c->callbacks; dispatch_async(callbacks.dispatchQueue, ^{ - if (!dispatchable(c->closed, end_stream) || !callbacks.onData) { - return; + if (callbacks.onData) { + callbacks.onData(to_ios_data(data), end_stream); } - - callbacks.onData(to_ios_data(data), end_stream); }); } @@ -123,11 +108,9 @@ static void ios_on_metadata(envoy_headers metadata, void *context) { ios_context *c = (ios_context *)context; EnvoyHTTPCallbacks *callbacks = c->callbacks; dispatch_async(callbacks.dispatchQueue, ^{ - if (atomic_load(c->closed) || !callbacks.onMetadata) { - return; + if (callbacks.onMetadata) { + callbacks.onMetadata(to_ios_headers(metadata)); } - - callbacks.onMetadata(to_ios_headers(metadata)); }); } @@ -135,11 +118,9 @@ static void ios_on_trailers(envoy_headers trailers, void *context) { ios_context *c = (ios_context *)context; EnvoyHTTPCallbacks *callbacks = c->callbacks; dispatch_async(callbacks.dispatchQueue, ^{ - if (!dispatchable(c->closed, YES) || !callbacks.onTrailers) { - return; + if (callbacks.onTrailers) { + callbacks.onTrailers(to_ios_headers(trailers)); } - - callbacks.onTrailers(to_ios_headers(trailers)); }); } @@ -158,16 +139,17 @@ static void ios_on_cancel(void *context) { // This call is atomically gated at the call-site and will only happen once. It may still fire // after a complete response or error callback, but no other callbacks for the stream will ever // fire AFTER the cancellation callback. - - // The cancellation callback does not clean up the stream, since that will race with work Envoy's - // main thread may already be doing. Instead we rely on the reset that's dispatched to Envoy to - // effect cleanup, with appropriate timing. ios_context *c = (ios_context *)context; EnvoyHTTPCallbacks *callbacks = c->callbacks; + EnvoyHTTPStreamImpl *stream = c->stream; dispatch_async(callbacks.dispatchQueue, ^{ if (callbacks.onCancel) { callbacks.onCancel(); } + + // TODO: If the callback queue is not serial, clean up is not currently thread-safe. + assert(stream); + [stream cleanUp]; }); } @@ -176,7 +158,7 @@ static void ios_on_error(envoy_error error, void *context) { EnvoyHTTPCallbacks *callbacks = c->callbacks; EnvoyHTTPStreamImpl *stream = c->stream; dispatch_async(callbacks.dispatchQueue, ^{ - if (!dispatchable(c->closed, YES) && callbacks.onError) { + if (callbacks.onError) { NSString *errorMessage = [[NSString alloc] initWithBytes:error.message.bytes length:error.message.length encoding:NSUTF8StringEncoding]; @@ -221,7 +203,7 @@ - (instancetype)initWithHandle:(envoy_stream_t)handle // Create native callbacks envoy_http_callbacks native_callbacks = {ios_on_headers, ios_on_data, ios_on_trailers, ios_on_metadata, ios_on_error, ios_on_complete, - context}; + ios_on_cancel, context}; _nativeCallbacks = native_callbacks; // We need create the native-held strong ref on this stream before we call start_stream because @@ -260,21 +242,7 @@ - (void)sendTrailers:(EnvoyHeaders *)trailers { } - (int)cancel { - ios_context *context = _nativeCallbacks.context; - // Step 1: atomically and synchronously prevent the execution of further callbacks other than - // on_cancel. - if (!dispatchable(context->closed, YES)) { - // Step 2: directly fire the cancel callback. - ios_on_cancel(context); - // Step 3: propagate the reset into native code. - reset_stream(_streamHandle); - return 0; - } else { - // Propagate reset, but don't worry about callbacks (another terminal callback has already - // fired). - reset_stream(_streamHandle); - return 1; - } + return reset_stream(_streamHandle); } - (void)cleanUp { diff --git a/test/common/http/BUILD b/test/common/http/BUILD index 02e3d0df69..acd4159f63 100644 --- a/test/common/http/BUILD +++ b/test/common/http/BUILD @@ -12,11 +12,11 @@ envoy_cc_test( "//library/common/http:dispatcher_lib", "//library/common/http:header_utility_lib", "//library/common/types:c_types_lib", - "@envoy//source/common/http:async_client_lib", "@envoy//source/common/http:context_lib", "@envoy//test/common/http:common_lib", "@envoy//test/mocks/buffer:buffer_mocks", "@envoy//test/mocks/event:event_mocks", + "@envoy//test/mocks/http:api_listener_mocks", "@envoy//test/mocks/local_info:local_info_mocks", "@envoy//test/mocks/upstream:upstream_mocks", ], diff --git a/test/common/http/dispatcher_test.cc b/test/common/http/dispatcher_test.cc index 7f222bb393..7430932ad0 100644 --- a/test/common/http/dispatcher_test.cc +++ b/test/common/http/dispatcher_test.cc @@ -1,12 +1,12 @@ #include #include "common/buffer/buffer_impl.h" -#include "common/http/async_client_impl.h" #include "common/http/context_impl.h" #include "test/common/http/common.h" #include "test/mocks/buffer/mocks.h" #include "test/mocks/event/mocks.h" +#include "test/mocks/http/api_listener.h" #include "test/mocks/http/mocks.h" #include "test/mocks/local_info/mocks.h" #include "test/mocks/upstream/mocks.h" @@ -30,47 +30,30 @@ namespace Http { class DispatcherTest : public testing::Test { public: - DispatcherTest() - : http_context_(stats_store_.symbolTable()), - client_(cm_.thread_local_cluster_.cluster_.info_, stats_store_, event_dispatcher_, - local_info_, cm_, runtime_, random_, - Router::ShadowWriterPtr{new NiceMock()}, http_context_) { - http_dispatcher_.ready(event_dispatcher_, cm_); - ON_CALL(*cm_.conn_pool_.host_, locality()) - .WillByDefault(ReturnRef(envoy::config::core::v3::Locality().default_instance())); - } + DispatcherTest() { http_dispatcher_.ready(event_dispatcher_, api_listener_); } typedef struct { uint32_t on_headers_calls; uint32_t on_data_calls; uint32_t on_complete_calls; uint32_t on_error_calls; + uint32_t on_cancel_calls; } callbacks_called; - Stats::MockIsolatedStatsStore stats_store_; - MockAsyncClientCallbacks callbacks_; - MockAsyncClientStreamCallbacks stream_callbacks_; - NiceMock cm_; - NiceMock stream_encoder_; - StreamDecoder* response_decoder_{}; - NiceMock* timer_; + MockApiListener api_listener_; + MockStreamDecoder request_decoder_; + StreamEncoder* response_encoder_{}; NiceMock event_dispatcher_; - NiceMock runtime_; - NiceMock random_; - NiceMock local_info_; - Http::ContextImpl http_context_; - AsyncClientImpl client_; envoy_http_callbacks bridge_callbacks_; - NiceMock stream_info_; std::atomic preferred_network_{ENVOY_NET_GENERIC}; Dispatcher http_dispatcher_{preferred_network_}; }; -TEST_F(DispatcherTest, BasicStreamHeadersOnly) { +TEST_F(DispatcherTest, PreferredNetwork) { envoy_stream_t stream = 1; // Setup bridge_callbacks to handle the response headers. envoy_http_callbacks bridge_callbacks; - callbacks_called cc = {0, 0, 0, 0}; + callbacks_called cc = {0, 0, 0, 0, 0}; bridge_callbacks.context = &cc; bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { @@ -85,14 +68,113 @@ TEST_F(DispatcherTest, BasicStreamHeadersOnly) { cc->on_complete_calls++; }; - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - response_decoder_ = &decoder; - return nullptr; + // Create a stream. + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); + EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; })); + start_stream_post_cb(); + + // Send request headers. Sending multiple headers is illegal and the upstream codec would not + // accept it. However, given we are just trying to test preferred network headers and using mocks + // this is fine. + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Utility::toBridgeHeaders(headers); + + preferred_network_.store(ENVOY_NET_WLAN); + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, false); + + TestHeaderMapImpl expected_headers{ + {":scheme", "http"}, + {":method", "GET"}, + {":authority", "host"}, + {":path", "/"}, + {"x-envoy-mobile-cluster", "base_wlan"}, + }; + EXPECT_CALL(request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers), false)); + send_headers_post_cb(); + + TestHeaderMapImpl headers2; + HttpTestUtility::addDefaultHeaders(headers2); + envoy_headers c_headers2 = Utility::toBridgeHeaders(headers2); + + preferred_network_.store(ENVOY_NET_WLAN); + Event::PostCb send_headers_post_cb2; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb2)); + http_dispatcher_.sendHeaders(stream, c_headers2, false); + + TestHeaderMapImpl expected_headers2{ + {":scheme", "http"}, + {":method", "GET"}, + {":authority", "host"}, + {":path", "/"}, + {"x-envoy-mobile-cluster", "base_wlan"}, + }; + EXPECT_CALL(request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers2), false)); + send_headers_post_cb2(); + + TestHeaderMapImpl headers3; + HttpTestUtility::addDefaultHeaders(headers3); + envoy_headers c_headers3 = Utility::toBridgeHeaders(headers3); + + preferred_network_.store(ENVOY_NET_WWAN); + Event::PostCb send_headers_post_cb3; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb3)); + http_dispatcher_.sendHeaders(stream, c_headers3, true); + + TestHeaderMapImpl expected_headers3{ + {":scheme", "http"}, + {":method", "GET"}, + {":authority", "host"}, + {":path", "/"}, + {"x-envoy-mobile-cluster", "base_wwan"}, + }; + EXPECT_CALL(request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers3), true)); + send_headers_post_cb3(); + + // Encode response headers. + Event::PostCb stream_deletion_post_cb; + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_encoder_->encodeHeaders(response_headers, true); + ASSERT_EQ(cc.on_headers_calls, 1); + stream_deletion_post_cb(); + + // Ensure that the callbacks on the bridge_callbacks were called. + ASSERT_EQ(cc.on_complete_calls, 1); +} + +TEST_F(DispatcherTest, BasicStreamHeadersOnly) { + envoy_stream_t stream = 1; + // Setup bridge_callbacks to handle the response headers. + envoy_http_callbacks bridge_callbacks; + callbacks_called cc = {0, 0, 0, 0, 0}; + bridge_callbacks.context = &cc; + bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, + void* context) -> void { + ASSERT_TRUE(end_stream); + HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers_calls++; + }; + bridge_callbacks.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete_calls++; + }; // Build a set of request headers. TestHeaderMapImpl headers; @@ -100,50 +182,46 @@ TEST_F(DispatcherTest, BasicStreamHeadersOnly) { envoy_headers c_headers = Utility::toBridgeHeaders(headers); // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce( - WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { - return client_.start(callbacks, AsyncClient::StreamOptions()); - }))); + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); + // Send request headers. - Event::PostCb post_cb; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb)); + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); http_dispatcher_.sendHeaders(stream, c_headers, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); - post_cb(); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); + send_headers_post_cb(); - // Decode response headers. decodeHeaders with true will bubble up to onHeaders, which will in - // turn cause closeRemote. Because closeLocal has already been called, cleanup will happen; hence - // the second call to isThreadSafe. + // Encode response headers. + Event::PostCb stream_deletion_post_cb; EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - response_decoder_->decode100ContinueHeaders( - HeaderMapPtr(new TestHeaderMapImpl{{":status", "100"}})); - response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), true); - - EXPECT_EQ( - 1UL, - cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value()); - EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("internal.upstream_rq_200") - .value()); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_encoder_->encodeHeaders(response_headers, true); + ASSERT_EQ(cc.on_headers_calls, 1); + stream_deletion_post_cb(); // Ensure that the callbacks on the bridge_callbacks were called. - ASSERT_EQ(cc.on_headers_calls, 1); ASSERT_EQ(cc.on_complete_calls, 1); - ASSERT_EQ(cc.on_data_calls, 0); } TEST_F(DispatcherTest, BasicStream) { envoy_stream_t stream = 1; // Setup bridge_callbacks to handle the response. envoy_http_callbacks bridge_callbacks; - callbacks_called cc = {0, 0, 0, 0}; + callbacks_called cc = {0, 0, 0, 0, 0}; bridge_callbacks.context = &cc; bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { @@ -165,15 +243,6 @@ TEST_F(DispatcherTest, BasicStream) { cc->on_complete_calls++; }; - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - response_decoder_ = &decoder; - return nullptr; - })); - // Build a set of request headers. TestHeaderMapImpl headers; HttpTestUtility::addDefaultHeaders(headers); @@ -184,22 +253,26 @@ TEST_F(DispatcherTest, BasicStream) { envoy_data c_data = Buffer::Utility::toBridgeData(request_data); // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce( - WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { - return client_.start(callbacks, AsyncClient::StreamOptions()); - }))); + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the + // dispatcher API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); + // Send request headers. Event::PostCb headers_post_cb; EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&headers_post_cb)); http_dispatcher_.sendHeaders(stream, c_headers, false); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); headers_post_cb(); // Send request data. @@ -207,66 +280,121 @@ TEST_F(DispatcherTest, BasicStream) { EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&data_post_cb)); http_dispatcher_.sendData(stream, c_data, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeData(BufferStringEqual("request body"), true)); + EXPECT_CALL(request_decoder_, decodeData(BufferStringEqual("request body"), true)); data_post_cb(); - // Decode response headers and data. + // Encode response headers and data. + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_encoder_->encodeHeaders(response_headers, false); + ASSERT_EQ(cc.on_headers_calls, 1); + + Event::PostCb stream_deletion_post_cb; EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - response_decoder_->decode100ContinueHeaders( - HeaderMapPtr(new TestHeaderMapImpl{{":status", "100"}})); - response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); Buffer::InstancePtr response_data{new Buffer::OwnedImpl("response body")}; - response_decoder_->decodeData(*response_data, true); - - EXPECT_EQ( - 1UL, - cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value()); - EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("internal.upstream_rq_200") - .value()); + response_encoder_->encodeData(*response_data, true); + ASSERT_EQ(cc.on_data_calls, 1); + stream_deletion_post_cb(); // Ensure that the callbacks on the bridge_callbacks were called. - ASSERT_EQ(cc.on_headers_calls, 1); - ASSERT_EQ(cc.on_data_calls, 1); ASSERT_EQ(cc.on_complete_calls, 1); } -TEST_F(DispatcherTest, ResetStream) { +TEST_F(DispatcherTest, MultipleDataStream) { envoy_stream_t stream = 1; + // Setup bridge_callbacks to handle the response. envoy_http_callbacks bridge_callbacks; - callbacks_called cc = {0, 0, 0, 0}; + callbacks_called cc = {0, 0, 0, 0, 0}; bridge_callbacks.context = &cc; - bridge_callbacks.on_error = [](envoy_error actual_error, void* context) -> void { - envoy_error expected_error = {ENVOY_STREAM_RESET, envoy_nodata}; - ASSERT_EQ(actual_error.error_code, expected_error.error_code); + bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, + void* context) -> void { + ASSERT_FALSE(end_stream); + HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); callbacks_called* cc = static_cast(context); - cc->on_error_calls++; + cc->on_headers_calls++; + }; + bridge_callbacks.on_data = [](envoy_data data, bool, void* context) -> void { + // TODO: assert end_stream and contents of c_data for multiple calls of on_data. + callbacks_called* cc = static_cast(context); + cc->on_data_calls++; + data.release(data.context); }; bridge_callbacks.on_complete = [](void* context) -> void { callbacks_called* cc = static_cast(context); cc->on_complete_calls++; }; - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce( - WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { - return client_.start(callbacks, AsyncClient::StreamOptions()); - }))); + // Build a set of request headers. + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Utility::toBridgeHeaders(headers); + + // Build first body data + Buffer::OwnedImpl request_data = Buffer::OwnedImpl("request body"); + envoy_data c_data = Buffer::Utility::toBridgeData(request_data); + + // Build second body data + Buffer::OwnedImpl request_data2 = Buffer::OwnedImpl("request body2"); + envoy_data c_data2 = Buffer::Utility::toBridgeData(request_data2); + + // Create a stream. + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); - Event::PostCb post_cb; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb)); - http_dispatcher_.resetStream(stream); + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(2).WillRepeatedly(Return(true)); - post_cb(); + // Send request headers. + Event::PostCb headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, false); - // Ensure that the on_error on the bridge_callbacks was called. - ASSERT_EQ(cc.on_error_calls, 1); - ASSERT_EQ(cc.on_complete_calls, 0); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); + headers_post_cb(); + + // Send request data. + Event::PostCb data_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&data_post_cb)); + http_dispatcher_.sendData(stream, c_data, false); + + EXPECT_CALL(request_decoder_, decodeData(BufferStringEqual("request body"), false)); + data_post_cb(); + + // Send second request data. + Event::PostCb data_post_cb2; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&data_post_cb2)); + http_dispatcher_.sendData(stream, c_data2, true); + + EXPECT_CALL(request_decoder_, decodeData(BufferStringEqual("request body2"), true)); + data_post_cb2(); + + // Encode response headers and data. + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_encoder_->encodeHeaders(response_headers, false); + ASSERT_EQ(cc.on_headers_calls, 1); + Buffer::InstancePtr response_data{new Buffer::OwnedImpl("response body")}; + response_encoder_->encodeData(*response_data, false); + ASSERT_EQ(cc.on_data_calls, 1); + + Event::PostCb stream_deletion_post_cb; + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + Buffer::InstancePtr response_data2{new Buffer::OwnedImpl("response body2")}; + response_encoder_->encodeData(*response_data2, true); + ASSERT_EQ(cc.on_data_calls, 2); + stream_deletion_post_cb(); + + // Ensure that the callbacks on the bridge_callbacks were called. + ASSERT_EQ(cc.on_complete_calls, 1); } TEST_F(DispatcherTest, MultipleStreams) { @@ -275,7 +403,7 @@ TEST_F(DispatcherTest, MultipleStreams) { // Start stream1. // Setup bridge_callbacks to handle the response headers. envoy_http_callbacks bridge_callbacks; - callbacks_called cc = {0, 0, 0, 0}; + callbacks_called cc = {0, 0, 0, 0, 0}; bridge_callbacks.context = &cc; bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { @@ -290,51 +418,46 @@ TEST_F(DispatcherTest, MultipleStreams) { cc->on_complete_calls++; }; - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - response_decoder_ = &decoder; - return nullptr; - })); - // Build a set of request headers. TestHeaderMapImpl headers; HttpTestUtility::addDefaultHeaders(headers); envoy_headers c_headers = Utility::toBridgeHeaders(headers); // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce( - WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { - return client_.start(callbacks, AsyncClient::StreamOptions()); - }))); + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); EXPECT_EQ(http_dispatcher_.startStream(stream1, bridge_callbacks, {}), ENVOY_SUCCESS); + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); + // Send request headers. - Event::PostCb post_cb; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb)); + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); http_dispatcher_.sendHeaders(stream1, c_headers, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); - post_cb(); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); + send_headers_post_cb(); // Start stream2. // Setup bridge_callbacks to handle the response headers. - NiceMock stream_encoder2; - StreamDecoder* response_decoder2{}; + NiceMock request_decoder2; + StreamEncoder* response_encoder2{}; envoy_http_callbacks bridge_callbacks2; - callbacks_called cc2 = {false, 0, false, false}; + callbacks_called cc2 = {0, 0, 0, 0, 0}; bridge_callbacks2.context = &cc2; bridge_callbacks2.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { ASSERT_TRUE(end_stream); HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); - EXPECT_EQ(response_headers->Status()->value().getStringView(), "503"); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); bool* on_headers_called2 = static_cast(context); *on_headers_called2 = true; }; @@ -343,68 +466,111 @@ TEST_F(DispatcherTest, MultipleStreams) { cc->on_complete_calls++; }; - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder2, cm_.conn_pool_.host_, stream_info_); - response_decoder2 = &decoder; - return nullptr; - })); - // Build a set of request headers. TestHeaderMapImpl headers2; HttpTestUtility::addDefaultHeaders(headers2); envoy_headers c_headers2 = Utility::toBridgeHeaders(headers2); // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce( - WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { - return client_.start(callbacks, AsyncClient::StreamOptions()); - }))); - EXPECT_CALL(event_dispatcher_, post(_)); + Event::PostCb start_stream_post_cb2; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb2)); EXPECT_EQ(http_dispatcher_.startStream(stream2, bridge_callbacks2, {}), ENVOY_SUCCESS); + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder2 = &encoder; + return request_decoder2; + })); + start_stream_post_cb2(); + // Send request headers. - Event::PostCb post_cb2; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb2)); + Event::PostCb send_headers_post_cb2; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb2)); http_dispatcher_.sendHeaders(stream2, c_headers2, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder2, encodeHeaders(_, true)); - post_cb2(); + EXPECT_CALL(request_decoder2, decodeHeaders_(_, true)); + send_headers_post_cb2(); // Finish stream 2. + Event::PostCb stream_deletion_post_cb2; EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - HeaderMapPtr response_headers2(new TestHeaderMapImpl{{":status", "503"}}); - response_decoder2->decodeHeaders(std::move(response_headers2), true); - // Ensure that the on_headers on the bridge_callbacks was called. + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb2)); + TestHeaderMapImpl response_headers2{{":status", "200"}}; + response_encoder2->encodeHeaders(response_headers2, true); ASSERT_EQ(cc2.on_headers_calls, 1); + stream_deletion_post_cb2(); + // Ensure that the on_headers on the bridge_callbacks was called. ASSERT_EQ(cc2.on_complete_calls, 1); // Finish stream 1. + Event::PostCb stream_deletion_post_cb; EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); - response_decoder_->decodeHeaders(std::move(response_headers), true); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_encoder_->encodeHeaders(response_headers, true); ASSERT_EQ(cc.on_headers_calls, 1); + stream_deletion_post_cb(); ASSERT_EQ(cc.on_complete_calls, 1); } -TEST_F(DispatcherTest, LocalResetAfterStreamStart) { +TEST_F(DispatcherTest, ResetStreamLocal) { envoy_stream_t stream = 1; envoy_http_callbacks bridge_callbacks; - callbacks_called cc = {0, 0, 0, 0}; + callbacks_called cc = {0, 0, 0, 0, 0}; bridge_callbacks.context = &cc; - - bridge_callbacks.on_error = [](envoy_error actual_error, void* context) -> void { - envoy_error expected_error = {ENVOY_STREAM_RESET, envoy_nodata}; - ASSERT_EQ(actual_error.error_code, expected_error.error_code); + bridge_callbacks.on_error = [](envoy_error, void* context) -> void { callbacks_called* cc = static_cast(context); cc->on_error_calls++; }; + bridge_callbacks.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete_calls++; + }; + bridge_callbacks.on_cancel = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_cancel_calls++; + }; + + // Create a stream. + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); + EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); + + Event::PostCb reset_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&reset_stream_post_cb)); + ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_SUCCESS); + // The callback happens synchronously outside of the reset_stream_post_cb(). + ASSERT_EQ(cc.on_cancel_calls, 1); + + Event::PostCb stream_deletion_post_cb; + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + reset_stream_post_cb(); + stream_deletion_post_cb(); + + ASSERT_EQ(cc.on_error_calls, 0); + ASSERT_EQ(cc.on_complete_calls, 0); +} + +TEST_F(DispatcherTest, RemoteResetAfterStreamStart) { + envoy_stream_t stream = 1; + // Setup bridge_callbacks to handle the response headers. + envoy_http_callbacks bridge_callbacks; + callbacks_called cc = {0, 0, 0, 0, 0}; + bridge_callbacks.context = &cc; bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { ASSERT_FALSE(end_stream); @@ -417,15 +583,14 @@ TEST_F(DispatcherTest, LocalResetAfterStreamStart) { callbacks_called* cc = static_cast(context); cc->on_complete_calls++; }; - - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - response_decoder_ = &decoder; - return nullptr; - })); + bridge_callbacks.on_error = [](envoy_error, void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_error_calls++; + }; + bridge_callbacks.on_cancel = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_cancel_calls++; + }; // Build a set of request headers. TestHeaderMapImpl headers; @@ -433,55 +598,52 @@ TEST_F(DispatcherTest, LocalResetAfterStreamStart) { envoy_headers c_headers = Utility::toBridgeHeaders(headers); // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce( - WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { - return client_.start(callbacks, AsyncClient::StreamOptions()); - }))); + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); + // Send request headers. Event::PostCb send_headers_post_cb; EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); - http_dispatcher_.sendHeaders(stream, c_headers, false); + http_dispatcher_.sendHeaders(stream, c_headers, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); send_headers_post_cb(); - response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); - // Ensure that the on_headers on the bridge_callbacks was called. + // Encode response headers. + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_encoder_->encodeHeaders(response_headers, false); ASSERT_EQ(cc.on_headers_calls, 1); - Event::PostCb reset_post_cb; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&reset_post_cb)); - http_dispatcher_.resetStream(stream); - - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(2).WillRepeatedly(Return(true)); - reset_post_cb(); - + Event::PostCb stream_deletion_post_cb; + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + response_encoder_->getStream().resetStream(StreamResetReason::RemoteReset); + stream_deletion_post_cb(); // Ensure that the on_error on the bridge_callbacks was called. ASSERT_EQ(cc.on_error_calls, 1); ASSERT_EQ(cc.on_complete_calls, 0); } -TEST_F(DispatcherTest, RemoteResetAfterStreamStart) { +TEST_F(DispatcherTest, StreamResetAfterOnComplete) { envoy_stream_t stream = 1; + // Setup bridge_callbacks to handle the response headers. envoy_http_callbacks bridge_callbacks; - callbacks_called cc = {0, 0, 0, 0}; + callbacks_called cc = {0, 0, 0, 0, 0}; bridge_callbacks.context = &cc; - - bridge_callbacks.on_error = [](envoy_error actual_error, void* context) -> void { - envoy_error expected_error = {ENVOY_STREAM_RESET, envoy_nodata}; - ASSERT_EQ(actual_error.error_code, expected_error.error_code); - callbacks_called* cc = static_cast(context); - cc->on_error_calls++; - }; bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { - ASSERT_FALSE(end_stream); + ASSERT_TRUE(end_stream); HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); callbacks_called* cc = static_cast(context); @@ -492,59 +654,77 @@ TEST_F(DispatcherTest, RemoteResetAfterStreamStart) { cc->on_complete_calls++; }; - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - response_decoder_ = &decoder; - return nullptr; - })); - // Build a set of request headers. TestHeaderMapImpl headers; HttpTestUtility::addDefaultHeaders(headers); envoy_headers c_headers = Utility::toBridgeHeaders(headers); // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce( - WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { - return client_.start(callbacks, AsyncClient::StreamOptions()); - }))); + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); + // Send request headers. Event::PostCb send_headers_post_cb; EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); - http_dispatcher_.sendHeaders(stream, c_headers, false); + http_dispatcher_.sendHeaders(stream, c_headers, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(2).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); send_headers_post_cb(); - response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); - // Ensure that the on_headers on the bridge_callbacks was called. + // Encode response headers. + Event::PostCb stream_deletion_post_cb; + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_encoder_->encodeHeaders(response_headers, true); ASSERT_EQ(cc.on_headers_calls, 1); + stream_deletion_post_cb(); - stream_encoder_.getStream().resetStream(StreamResetReason::RemoteReset); - // Ensure that the on_error on the bridge_callbacks was called. - ASSERT_EQ(cc.on_error_calls, 1); - ASSERT_EQ(cc.on_complete_calls, 0); + // Ensure that the callbacks on the bridge_callbacks were called. + ASSERT_EQ(cc.on_complete_calls, 1); + + // Cancellation should have no effect as the stream should have already been cleaned up. + ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_FAILURE); } -TEST_F(DispatcherTest, DestroyWithActiveStream) { +TEST_F(DispatcherTest, ResetStreamLocalHeadersRemoteRaceLocalWins) { envoy_stream_t stream = 1; - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - response_decoder_ = &decoder; - return nullptr; - })); + envoy_http_callbacks bridge_callbacks; + callbacks_called cc = {0, 0, 0, 0, 0}; + bridge_callbacks.context = &cc; + bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, + void* context) -> void { + ASSERT_TRUE(end_stream); + HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers_calls++; + }; + bridge_callbacks.on_error = [](envoy_error, void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_error_calls++; + }; + bridge_callbacks.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete_calls++; + }; + bridge_callbacks.on_cancel = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_cancel_calls++; + }; + + http_dispatcher_.synchronizer().enable(); // Build a set of request headers. TestHeaderMapImpl headers; @@ -552,30 +732,88 @@ TEST_F(DispatcherTest, DestroyWithActiveStream) { envoy_headers c_headers = Utility::toBridgeHeaders(headers); // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions()))); - EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks_, {}), ENVOY_SUCCESS); + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); + EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); // Send request headers. - EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); - EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); - EXPECT_CALL(stream_callbacks_, onReset()); + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, true); + + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); + send_headers_post_cb(); + + // Start a thread to encode response headers. This will wait pre-dispatchable call. + http_dispatcher_.synchronizer().waitOn("dispatch_encode_headers"); + std::thread t1([&] { + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_headers.setEnvoyUpstreamServiceTime(20); + response_encoder_->encodeHeaders(response_headers, true); + }); + // Wait until the thread is actually waiting. + http_dispatcher_.synchronizer().barrierOn("dispatch_encode_headers"); + + // reset the stream from the client side. This should succeed. + Event::PostCb reset_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&reset_stream_post_cb)); + ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_SUCCESS); + // The callback happens synchronously outside of the reset_stream_post_cb(). + ASSERT_EQ(cc.on_cancel_calls, 1); + + Event::PostCb stream_deletion_post_cb; EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - http_dispatcher_.sendHeaders(stream, c_headers, false); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + reset_stream_post_cb(); + + // Now signal the thread to continue. Dispatchable should return false and prevent on_headers and + // on_complete from being called. + http_dispatcher_.synchronizer().signal("dispatch_encode_headers"); + t1.join(); + + stream_deletion_post_cb(); + + ASSERT_EQ(cc.on_headers_calls, 0); + ASSERT_EQ(cc.on_complete_calls, 0); } -TEST_F(DispatcherTest, ResetInOnHeaders) { +TEST_F(DispatcherTest, ResetStreamLocalHeadersRemoteRemoteWinsDeletesStream) { envoy_stream_t stream = 1; - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - response_decoder_ = &decoder; - return nullptr; - })); + envoy_http_callbacks bridge_callbacks; + callbacks_called cc = {0, 0, 0, 0, 0}; + bridge_callbacks.context = &cc; + bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, + void* context) -> void { + ASSERT_TRUE(end_stream); + HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers_calls++; + }; + bridge_callbacks.on_error = [](envoy_error, void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_error_calls++; + }; + bridge_callbacks.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete_calls++; + }; + bridge_callbacks.on_cancel = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_cancel_calls++; + }; + + http_dispatcher_.synchronizer().enable(); // Build a set of request headers. TestHeaderMapImpl headers; @@ -583,274 +821,325 @@ TEST_F(DispatcherTest, ResetInOnHeaders) { envoy_headers c_headers = Utility::toBridgeHeaders(headers); // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions()))); - EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks_, {}), ENVOY_SUCCESS); + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); + EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); // Send request headers. Event::PostCb send_headers_post_cb; EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); - http_dispatcher_.sendHeaders(stream, c_headers, false); + http_dispatcher_.sendHeaders(stream, c_headers, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); send_headers_post_cb(); - TestHeaderMapImpl expected_headers{{":status", "200"}}; + // Start a thread to reset stream. This will wait pre-dispatchable call. But after getting the + // stream stream. + http_dispatcher_.synchronizer().waitOn("getStream_on_cancel"); + std::thread t1([&] { + // This should fail synchronously because remote cleaned up the stream before the local reset + // ran getStream. + ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_FAILURE); + }); + // Wait until the thread is actually waiting. + http_dispatcher_.synchronizer().barrierOn("getStream_on_cancel"); + + // Now encode headers. This will go through. + Event::PostCb stream_deletion_post_cb; EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(event_dispatcher_, post(_)); - EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)) - .WillOnce(Invoke([this, stream](HeaderMap&, bool) { http_dispatcher_.resetStream(stream); })); - EXPECT_CALL(stream_callbacks_, onData(_, _)).Times(0); - EXPECT_CALL(stream_callbacks_, onReset()); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_headers.setEnvoyUpstreamServiceTime(20); + response_encoder_->encodeHeaders(response_headers, true); + ASSERT_EQ(cc.on_headers_calls, 1); + stream_deletion_post_cb(); + + // Now signal the thread to continue. Dispatchable should return false and prevent on_headers and + // on_complete from being called. + http_dispatcher_.synchronizer().signal("getStream_on_cancel"); + t1.join(); - response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + // The cancellation callback was not dispatchable. + ASSERT_EQ(cc.on_cancel_calls, 0); + ASSERT_EQ(cc.on_headers_calls, 1); + ASSERT_EQ(cc.on_complete_calls, 1); } -TEST_F(DispatcherTest, StreamTimeout) { +TEST_F(DispatcherTest, ResetStreamLocalHeadersRemoteRemoteWins) { envoy_stream_t stream = 1; - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder&, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - return nullptr; - })); + envoy_http_callbacks bridge_callbacks; + callbacks_called cc = {0, 0, 0, 0, 0}; + bridge_callbacks.context = &cc; + bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, + void* context) -> void { + ASSERT_TRUE(end_stream); + HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers_calls++; + }; + bridge_callbacks.on_error = [](envoy_error, void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_error_calls++; + }; + bridge_callbacks.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete_calls++; + }; + bridge_callbacks.on_cancel = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_cancel_calls++; + }; + + http_dispatcher_.synchronizer().enable(); // Build a set of request headers. TestHeaderMapImpl headers; HttpTestUtility::addDefaultHeaders(headers); envoy_headers c_headers = Utility::toBridgeHeaders(headers); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions().setTimeout( - std::chrono::milliseconds(40))))); - EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks_, {}), ENVOY_SUCCESS); + // Create a stream. + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); + EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); // Send request headers. Event::PostCb send_headers_post_cb; EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); http_dispatcher_.sendHeaders(stream, c_headers, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); - timer_ = new NiceMock(&event_dispatcher_); - EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40), _)); - EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); - - TestHeaderMapImpl expected_timeout{ - {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; - EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), false)); - EXPECT_CALL(stream_callbacks_, onData(_, true)); - EXPECT_CALL(stream_callbacks_, onComplete()); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); send_headers_post_cb(); - timer_->invokeCallback(); - - EXPECT_EQ(1UL, - cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_timeout") - .value()); - EXPECT_EQ(1UL, cm_.conn_pool_.host_->stats().rq_timeout_.value()); - EXPECT_EQ( - 1UL, - cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_504").value()); + + // Start a thread to reset stream. This will wait pre-dispatchable call. But after getting the + // stream stream. + http_dispatcher_.synchronizer().waitOn("dispatch_on_cancel"); + std::thread t1([&] { + // This should succeed because the stream was still present. However, the assertion at the end + // of the test shows that the callback was not fired. + ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_SUCCESS); + }); + // Wait until the thread is actually waiting. + http_dispatcher_.synchronizer().barrierOn("dispatch_on_cancel"); + + // Now encode headers. This will go through. + Event::PostCb stream_deletion_post_cb; + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_headers.setEnvoyUpstreamServiceTime(20); + response_encoder_->encodeHeaders(response_headers, true); + ASSERT_EQ(cc.on_headers_calls, 1); + stream_deletion_post_cb(); + + // Now signal the thread to continue. Dispatchable should return false and prevent on_headers and + // on_complete from being called. + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(0); + EXPECT_CALL(event_dispatcher_, post(_)).Times(0); + http_dispatcher_.synchronizer().signal("dispatch_on_cancel"); + t1.join(); + + // The cancellation callback was not dispatchable. + ASSERT_EQ(cc.on_cancel_calls, 0); + ASSERT_EQ(cc.on_headers_calls, 1); + ASSERT_EQ(cc.on_complete_calls, 1); } -TEST_F(DispatcherTest, StreamTimeoutHeadReply) { +TEST_F(DispatcherTest, ResetStreamLocalResetRemoteRaceLocalWins) { envoy_stream_t stream = 1; - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder&, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - return nullptr; - })); + envoy_http_callbacks bridge_callbacks; + callbacks_called cc = {0, 0, 0, 0, 0}; + bridge_callbacks.context = &cc; + bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, + void* context) -> void { + ASSERT_TRUE(end_stream); + HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers_calls++; + }; + bridge_callbacks.on_error = [](envoy_error, void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_error_calls++; + }; + bridge_callbacks.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete_calls++; + }; + bridge_callbacks.on_cancel = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_cancel_calls++; + }; + + http_dispatcher_.synchronizer().enable(); // Build a set of request headers. TestHeaderMapImpl headers; - HttpTestUtility::addDefaultHeaders(headers, "HEAD"); + HttpTestUtility::addDefaultHeaders(headers); envoy_headers c_headers = Utility::toBridgeHeaders(headers); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions().setTimeout( - std::chrono::milliseconds(40))))); - EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks_, {}), ENVOY_SUCCESS); + // Create a stream. + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); + EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); // Send request headers. Event::PostCb send_headers_post_cb; EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); http_dispatcher_.sendHeaders(stream, c_headers, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); - timer_ = new NiceMock(&event_dispatcher_); - EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40), _)); - EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); - - TestHeaderMapImpl expected_timeout{ - {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; - EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), true)); - EXPECT_CALL(stream_callbacks_, onComplete()); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); send_headers_post_cb(); - timer_->invokeCallback(); -} - -TEST_F(DispatcherTest, DisableTimerWithStream) { - envoy_stream_t stream = 1; - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder&, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - return nullptr; - })); - TestHeaderMapImpl headers; - HttpTestUtility::addDefaultHeaders(headers, "HEAD"); - envoy_headers c_headers = Utility::toBridgeHeaders(headers); + // Start a thread to remote reset stream. This will wait pre-dispatchable call. But after getting + // the stream stream. + http_dispatcher_.synchronizer().waitOn("dispatch_on_error"); + std::thread t1( + [&] { response_encoder_->getStream().resetStream(StreamResetReason::RemoteReset); }); + // Wait until the thread is actually waiting. + http_dispatcher_.synchronizer().barrierOn("dispatch_on_error"); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions().setTimeout( - std::chrono::milliseconds(40))))); - EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks_, {}), ENVOY_SUCCESS); - - // Send request headers and reset stream. - Event::PostCb send_headers_post_cb; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); - http_dispatcher_.sendHeaders(stream, c_headers, true); + // Now local reset the stream. This will go through. Event::PostCb reset_stream_post_cb; EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&reset_stream_post_cb)); - http_dispatcher_.resetStream(stream); - - EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); - timer_ = new NiceMock(&event_dispatcher_); - EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40), _)); - EXPECT_CALL(*timer_, disableTimer()); - EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); - EXPECT_CALL(stream_callbacks_, onReset()); + ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_SUCCESS); + // The callback happens synchronously outside of the reset_stream_post_cb(). + ASSERT_EQ(cc.on_cancel_calls, 1); + Event::PostCb stream_deletion_post_cb; EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - send_headers_post_cb(); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); reset_stream_post_cb(); + + // Now signal the thread to continue. The remote reset will not run. + http_dispatcher_.synchronizer().signal("dispatch_on_error"); + t1.join(); + + stream_deletion_post_cb(); + + ASSERT_EQ(cc.on_error_calls, 0); + ASSERT_EQ(cc.on_cancel_calls, 1); } -TEST_F(DispatcherTest, MultipleDataStream) { +TEST_F(DispatcherTest, ResetStreamLocalResetRemoteRemoteWinsDeletesStream) { envoy_stream_t stream = 1; - // Setup bridge_callbacks to handle the response. envoy_http_callbacks bridge_callbacks; - callbacks_called cc = {0, 0, 0, 0}; + callbacks_called cc = {0, 0, 0, 0, 0}; bridge_callbacks.context = &cc; bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { - ASSERT_FALSE(end_stream); + ASSERT_TRUE(end_stream); HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); callbacks_called* cc = static_cast(context); cc->on_headers_calls++; }; - bridge_callbacks.on_data = [](envoy_data data, bool, void* context) -> void { - // TODO: assert end_stream and contents of c_data for multiple calls of on_data. + bridge_callbacks.on_error = [](envoy_error, void* context) -> void { callbacks_called* cc = static_cast(context); - cc->on_data_calls++; - data.release(data.context); + cc->on_error_calls++; }; bridge_callbacks.on_complete = [](void* context) -> void { callbacks_called* cc = static_cast(context); cc->on_complete_calls++; }; + bridge_callbacks.on_cancel = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_cancel_calls++; + }; - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - response_decoder_ = &decoder; - return nullptr; - })); + http_dispatcher_.synchronizer().enable(); // Build a set of request headers. TestHeaderMapImpl headers; HttpTestUtility::addDefaultHeaders(headers); envoy_headers c_headers = Utility::toBridgeHeaders(headers); - // Build first body data - Buffer::OwnedImpl request_data = Buffer::OwnedImpl("request body"); - envoy_data c_data = Buffer::Utility::toBridgeData(request_data); - - // Build second body data - Buffer::OwnedImpl request_data2 = Buffer::OwnedImpl("request body2"); - envoy_data c_data2 = Buffer::Utility::toBridgeData(request_data2); - // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce( - WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { - return client_.start(callbacks, AsyncClient::StreamOptions()); - }))); + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); - // Send request headers. - Event::PostCb headers_post_cb; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&headers_post_cb)); - http_dispatcher_.sendHeaders(stream, c_headers, false); - - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, false)); - headers_post_cb(); - - // Send request data. - Event::PostCb data_post_cb; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&data_post_cb)); - http_dispatcher_.sendData(stream, c_data, false); - - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeData(BufferStringEqual("request body"), false)); - data_post_cb(); + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); - // Send second request data. - Event::PostCb data_post_cb2; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&data_post_cb2)); - http_dispatcher_.sendData(stream, c_data2, true); + // Send request headers. + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); + http_dispatcher_.sendHeaders(stream, c_headers, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeData(BufferStringEqual("request body2"), true)); - data_post_cb2(); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); + send_headers_post_cb(); - // Decode response headers and data. + // Start a thread to locally reset stream. This will wait pre-dispatchable call. But after getting + // the stream stream. + http_dispatcher_.synchronizer().waitOn("getStream_on_cancel"); + std::thread t1([&] { + // This should fail synchronously because remote cleaned up the stream before the local reset + // ran getStream. + ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_FAILURE); + }); + // Wait until the thread is actually waiting. + http_dispatcher_.synchronizer().barrierOn("getStream_on_cancel"); + + // Now remote reset the stream. This will go through. + Event::PostCb stream_deletion_post_cb; EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - response_decoder_->decode100ContinueHeaders( - HeaderMapPtr(new TestHeaderMapImpl{{":status", "100"}})); - response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); - Buffer::InstancePtr response_data{new Buffer::OwnedImpl("response body")}; - response_decoder_->decodeData(*response_data, false); - Buffer::InstancePtr response_data2{new Buffer::OwnedImpl("response body2")}; - response_decoder_->decodeData(*response_data2, true); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + response_encoder_->getStream().resetStream(StreamResetReason::RemoteReset); + ASSERT_EQ(cc.on_error_calls, 1); + stream_deletion_post_cb(); - EXPECT_EQ( - 1UL, - cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value()); - EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("internal.upstream_rq_200") - .value()); + // Now signal the thread to continue. The local reset will not run. + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(0); + EXPECT_CALL(event_dispatcher_, post(_)).Times(0); + http_dispatcher_.synchronizer().signal("getStream_on_cancel"); + t1.join(); - // Ensure that the callbacks on the bridge_callbacks were called. - ASSERT_EQ(cc.on_headers_calls, 1); - ASSERT_EQ(cc.on_data_calls, 2); - ASSERT_EQ(cc.on_complete_calls, 1); + // The callback was not dispatchable. + ASSERT_EQ(cc.on_cancel_calls, 0); } -TEST_F(DispatcherTest, StreamResetAfterOnComplete) { +TEST_F(DispatcherTest, ResetStreamLocalResetRemoteRemoteWins) { envoy_stream_t stream = 1; - // Setup bridge_callbacks to handle the response headers. envoy_http_callbacks bridge_callbacks; - callbacks_called cc = {0, 0, 0, 0}; + callbacks_called cc = {0, 0, 0, 0, 0}; bridge_callbacks.context = &cc; bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, void* context) -> void { @@ -860,25 +1149,20 @@ TEST_F(DispatcherTest, StreamResetAfterOnComplete) { callbacks_called* cc = static_cast(context); cc->on_headers_calls++; }; + bridge_callbacks.on_error = [](envoy_error, void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_error_calls++; + }; bridge_callbacks.on_complete = [](void* context) -> void { callbacks_called* cc = static_cast(context); cc->on_complete_calls++; }; - bridge_callbacks.on_error = [](envoy_error actual_error, void* context) -> void { - envoy_error expected_error = {ENVOY_STREAM_RESET, envoy_nodata}; - ASSERT_EQ(actual_error.error_code, expected_error.error_code); + bridge_callbacks.on_cancel = [](void* context) -> void { callbacks_called* cc = static_cast(context); - cc->on_error_calls++; + cc->on_cancel_calls++; }; - // Grab the response decoder in order to dispatch responses on the stream. - EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) - .WillOnce(Invoke([&](StreamDecoder& decoder, - ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { - callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_, stream_info_); - response_decoder_ = &decoder; - return nullptr; - })); + http_dispatcher_.synchronizer().enable(); // Build a set of request headers. TestHeaderMapImpl headers; @@ -886,52 +1170,103 @@ TEST_F(DispatcherTest, StreamResetAfterOnComplete) { envoy_headers c_headers = Utility::toBridgeHeaders(headers); // Create a stream. - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_)); - EXPECT_CALL(cm_.async_client_, start(_, _)) - .WillOnce( - WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* { - return client_.start(callbacks, AsyncClient::StreamOptions()); - }))); + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); + // Send request headers. - Event::PostCb post_cb; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb)); + Event::PostCb send_headers_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb)); http_dispatcher_.sendHeaders(stream, c_headers, true); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - EXPECT_CALL(stream_encoder_, encodeHeaders(_, true)); - post_cb(); + EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); + send_headers_post_cb(); - // Decode response headers. decodeHeaders with true will bubble up to onHeaders, which will in - // turn cause closeRemote. Because closeLocal has already been called, cleanup will happen; hence - // the second call to isThreadSafe. + // Start a thread to locally reset stream. This will wait pre-dispatchable call. But after getting + // the stream stream. + http_dispatcher_.synchronizer().waitOn("dispatch_on_cancel"); + std::thread t1([&] { + // This should succeed because the stream was still present. However, the assertion at the end + // of the test shows that the callback was not fired. + ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_SUCCESS); + }); + // Wait until the thread is actually waiting. + http_dispatcher_.synchronizer().barrierOn("dispatch_on_cancel"); + + // Now remote reset the stream. This will go through. + Event::PostCb stream_deletion_post_cb; EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - response_decoder_->decode100ContinueHeaders( - HeaderMapPtr(new TestHeaderMapImpl{{":status", "100"}})); - response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), true); - - EXPECT_EQ( - 1UL, - cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value()); - EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_ - .counter("internal.upstream_rq_200") - .value()); - - // resetStream after onComplete has fired is a no-op, as the stream is cleaned from the - // dispatcher. - Event::PostCb reset_post_cb; - EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&reset_post_cb)); - http_dispatcher_.resetStream(stream); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + response_encoder_->getStream().resetStream(StreamResetReason::RemoteReset); + ASSERT_EQ(cc.on_error_calls, 1); + stream_deletion_post_cb(); - EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); - reset_post_cb(); + // Now signal the thread to continue. The local reset will not run. + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(0); + EXPECT_CALL(event_dispatcher_, post(_)).Times(0); + http_dispatcher_.synchronizer().signal("dispatch_on_cancel"); + t1.join(); - // Ensure that the callbacks on the bridge_callbacks were called. + // The callback was not dispatchable. + ASSERT_EQ(cc.on_cancel_calls, 0); +} + +TEST_F(DispatcherTest, ResetWhenRemoteClosesBeforeLocal) { + envoy_stream_t stream = 1; + // Setup bridge_callbacks to handle the response headers. + envoy_http_callbacks bridge_callbacks; + callbacks_called cc = {0, 0, 0, 0, 0}; + bridge_callbacks.context = &cc; + bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, + void* context) -> void { + ASSERT_TRUE(end_stream); + HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers_calls++; + }; + bridge_callbacks.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete_calls++; + }; + + // Create a stream. + Event::PostCb start_stream_post_cb; + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&start_stream_post_cb)); + EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + + // Grab the response encoder in order to dispatch responses on the stream. + // Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher + // API. + EXPECT_CALL(api_listener_, newStream(_, _)) + .WillOnce(Invoke([&](StreamEncoder& encoder, bool) -> StreamDecoder& { + response_encoder_ = &encoder; + return request_decoder_; + })); + start_stream_post_cb(); + + // Encode response headers. + Event::PostCb stream_deletion_post_cb; + EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true)); + EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&stream_deletion_post_cb)); + TestHeaderMapImpl response_headers{{":status", "200"}}; + response_encoder_->encodeHeaders(response_headers, true); ASSERT_EQ(cc.on_headers_calls, 1); ASSERT_EQ(cc.on_complete_calls, 1); - ASSERT_EQ(cc.on_data_calls, 0); + + // Fire stream reset because Envoy does not allow half-open streams on the local side. + response_encoder_->getStream().resetStream(StreamResetReason::RemoteReset); + stream_deletion_post_cb(); ASSERT_EQ(cc.on_error_calls, 0); } diff --git a/test/common/thread/BUILD b/test/common/thread/BUILD new file mode 100644 index 0000000000..0d0caaffc2 --- /dev/null +++ b/test/common/thread/BUILD @@ -0,0 +1,15 @@ +licenses(["notice"]) # Apache 2 + +load("@envoy//bazel:envoy_build_system.bzl", "envoy_cc_test", "envoy_package") + +envoy_package() + +envoy_cc_test( + name = "lock_guard_test", + srcs = ["lock_guard_test.cc"], + repository = "@envoy", + deps = [ + "//library/common/thread:lock_guard_lib", + "@envoy//source/common/common:thread_lib", + ], +) diff --git a/test/common/thread/lock_guard_test.cc b/test/common/thread/lock_guard_test.cc new file mode 100644 index 0000000000..c6f872e77f --- /dev/null +++ b/test/common/thread/lock_guard_test.cc @@ -0,0 +1,27 @@ +#include "common/common/thread.h" + +#include "gtest/gtest.h" +#include "library/common/thread/lock_guard.h" + +namespace Envoy { +namespace Thread { + +class ThreadTest : public testing::Test { +protected: + ThreadTest() = default; + int a_ ABSL_GUARDED_BY(a_mutex_){0}; + MutexBasicLockable a_mutex_; + int b_{0}; +}; + +TEST_F(ThreadTest, TestOptionalReleasableLockGuard) { + OptionalReleasableLockGuard lock(nullptr); + EXPECT_EQ(1, ++b_); + + OptionalReleasableLockGuard lock2(&a_mutex_); + EXPECT_EQ(1, ++a_); + lock2.release(); +} + +} // namespace Thread +} // namespace Envoy diff --git a/test/integration/BUILD b/test/integration/BUILD new file mode 100644 index 0000000000..6d1d89ad48 --- /dev/null +++ b/test/integration/BUILD @@ -0,0 +1,19 @@ +licenses(["notice"]) # Apache 2 + +load("@envoy//bazel:envoy_build_system.bzl", "envoy_cc_test", "envoy_package") + +envoy_package() + +envoy_cc_test( + name = "dispatcher_integration_test", + srcs = ["dispatcher_integration_test.cc"], + repository = "@envoy", + deps = [ + "//library/common/http:dispatcher_lib", + "//library/common/http:header_utility_lib", + "//library/common/types:c_types_lib", + "@envoy//test/common/http:common_lib", + "@envoy//test/integration:http_integration_lib", + "@envoy//test/server:utility_lib", + ], +) diff --git a/test/integration/dispatcher_integration_test.cc b/test/integration/dispatcher_integration_test.cc new file mode 100644 index 0000000000..ff79f61aff --- /dev/null +++ b/test/integration/dispatcher_integration_test.cc @@ -0,0 +1,157 @@ +#include "test/common/http/common.h" +#include "test/integration/integration.h" +#include "test/server/utility.h" +#include "test/test_common/environment.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "library/common/buffer/utility.h" +#include "library/common/http/dispatcher.h" +#include "library/common/http/header_utility.h" +#include "library/common/types/c_types.h" + +using testing::ReturnRef; + +namespace Envoy { +namespace { + +typedef struct { + uint32_t on_headers_calls; + uint32_t on_data_calls; + uint32_t on_complete_calls; + uint32_t on_error_calls; + uint32_t on_cancel_calls; + ConditionalInitializer* terminal_callback; +} callbacks_called; + +// TODO(junr03): move this to derive from the ApiListenerIntegrationTest after moving that class +// into a test lib. +class DispatcherIntegrationTest : public BaseIntegrationTest, + public testing::TestWithParam { +public: + DispatcherIntegrationTest() : BaseIntegrationTest(GetParam(), bootstrap_config()) { + use_lds_ = false; + autonomous_upstream_ = true; + } + + void SetUp() override { + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // currently ApiListener does not trigger this wait + // https://github.com/envoyproxy/envoy/blob/0b92c58d08d28ba7ef0ed5aaf44f90f0fccc5dce/test/integration/integration.cc#L454 + // Thus, the ApiListener has to be added in addition to the already existing listener in the + // config. + bootstrap.mutable_static_resources()->add_listeners()->MergeFrom( + Server::parseListenerFromV2Yaml(api_listener_config())); + }); + BaseIntegrationTest::initialize(); + } + + void TearDown() override { + test_server_.reset(); + fake_upstreams_.clear(); + } + + static std::string bootstrap_config() { + // At least one empty filter chain needs to be specified. + return ConfigHelper::BASE_CONFIG + R"EOF( + filter_chains: + filters: + )EOF"; + } + + static std::string api_listener_config() { + return R"EOF( +name: api_listener +address: + socket_address: + address: 127.0.0.1 + port_value: 1 +api_listener: + api_listener: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: hcm + route_config: + virtual_hosts: + name: integration + routes: + route: + cluster: cluster_0 + match: + prefix: "/" + domains: "*" + name: route_config_0 + http_filters: + - name: envoy.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + )EOF"; + } + + std::atomic preferred_network_{ENVOY_NET_GENERIC}; + Http::Dispatcher http_dispatcher_{preferred_network_}; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, DispatcherIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +TEST_P(DispatcherIntegrationTest, Basic) { + ConditionalInitializer ready_ran; + test_server_->server().dispatcher().post([this, &ready_ran]() -> void { + http_dispatcher_.ready( + test_server_->server().dispatcher(), + test_server_->server().listenerManager().apiListener()->get().http()->get()); + ready_ran.setReady(); + }); + ready_ran.waitReady(); + + envoy_stream_t stream = 1; + ConditionalInitializer terminal_callback; + // Setup bridge_callbacks to handle the response. + envoy_http_callbacks bridge_callbacks; + callbacks_called cc = {0, 0, 0, 0, 0, &terminal_callback}; + bridge_callbacks.context = &cc; + bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, + void* context) -> void { + ASSERT_FALSE(end_stream); + Http::HeaderMapPtr response_headers = Http::Utility::toInternalHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers_calls++; + }; + bridge_callbacks.on_data = [](envoy_data c_data, bool end_stream, void* context) -> void { + if (end_stream) { + ASSERT_EQ(Http::Utility::convertToString(c_data), ""); + } else { + ASSERT_EQ(c_data.length, 10); + } + callbacks_called* cc = static_cast(context); + cc->on_data_calls++; + c_data.release(c_data.context); + }; + bridge_callbacks.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete_calls++; + cc->terminal_callback->setReady(); + }; + + // Build a set of request headers. + Http::TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Http::Utility::toBridgeHeaders(headers); + + // Create a stream. + Event::PostCb start_stream_post_cb; + EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + http_dispatcher_.sendHeaders(stream, c_headers, true); + + terminal_callback.waitReady(); + + ASSERT_EQ(cc.on_headers_calls, 1); + ASSERT_EQ(cc.on_data_calls, 2); + ASSERT_EQ(cc.on_complete_calls, 1); +} + +} // namespace +} // namespace Envoy