diff --git a/include/envoy/server/api_listener.h b/include/envoy/server/api_listener.h index da5937f5d680a..d07dd2ec8fd2c 100644 --- a/include/envoy/server/api_listener.h +++ b/include/envoy/server/api_listener.h @@ -21,6 +21,13 @@ class ApiListener { */ virtual absl::string_view name() const PURE; + /** + * Shutdown the ApiListener. This is an interrupt, not a drain. In other words, calling this + * function results in termination of all active streams vs. draining where no new streams are + * allowed, but already existing streams are allowed to finish. + */ + virtual void shutdown() PURE; + /** * @return the Type of the ApiListener. */ diff --git a/source/server/api_listener_impl.cc b/source/server/api_listener_impl.cc index f990254683815..107afabcadb2c 100644 --- a/source/server/api_listener_impl.cc +++ b/source/server/api_listener_impl.cc @@ -25,6 +25,13 @@ ApiListenerImplBase::ApiListenerImplBase(const envoy::config::listener::v3::List factory_context_(parent_.server_, config_, *this, *global_scope_, *listener_scope_), read_callbacks_(SyntheticReadCallbacks(*this)) {} +void ApiListenerImplBase::SyntheticReadCallbacks::SyntheticConnection::raiseConnectionEvent( + Network::ConnectionEvent event) { + for (Network::ConnectionCallbacks* callback : callbacks_) { + callback->onEvent(event); + } +} + HttpApiListener::HttpApiListener(const envoy::config::listener::v3::Listener& config, ListenerManagerImpl& parent, const std::string& name) : ApiListenerImplBase(config, parent, name) { @@ -44,5 +51,12 @@ Http::ApiListenerOptRef HttpApiListener::http() { return Http::ApiListenerOptRef(std::ref(*http_connection_manager_)); } +void HttpApiListener::shutdown() { + // The Http::ConnectionManagerImpl is a callback target for the read_callback_.connection_. By + // raising connection closure, Http::ConnectionManagerImpl::onEvent is fired. In that case the + // Http::ConnectionManagerImpl will reset any ActiveStreams it has. + read_callbacks_.connection_.raiseConnectionEvent(Network::ConnectionEvent::RemoteClose); +} + } // namespace Server } // namespace Envoy diff --git a/source/server/api_listener_impl.h b/source/server/api_listener_impl.h index 3b5dfdcded010..9227f21810860 100644 --- a/source/server/api_listener_impl.h +++ b/source/server/api_listener_impl.h @@ -75,6 +75,8 @@ class ApiListenerImplBase : public ApiListener, : parent_(parent), stream_info_(parent_.parent_.factory_context_.timeSource()), options_(std::make_shared>()) {} + void raiseConnectionEvent(Network::ConnectionEvent event); + // Network::FilterManager void addWriteFilter(Network::WriteFilterSharedPtr) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; @@ -84,7 +86,9 @@ class ApiListenerImplBase : public ApiListener, bool initializeReadFilters() override { return true; } // Network::Connection - void addConnectionCallbacks(Network::ConnectionCallbacks&) override {} + void addConnectionCallbacks(Network::ConnectionCallbacks& cb) override { + callbacks_.push_back(&cb); + } void addBytesSentCallback(Network::Connection::BytesSentCb) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } @@ -129,6 +133,7 @@ class ApiListenerImplBase : public ApiListener, SyntheticReadCallbacks& parent_; StreamInfo::StreamInfoImpl stream_info_; Network::ConnectionSocket::OptionsSharedPtr options_; + std::list callbacks_; }; ApiListenerImplBase& parent_; @@ -157,6 +162,9 @@ class HttpApiListener : public ApiListenerImplBase { // ApiListener ApiListener::Type type() const override { return ApiListener::Type::HttpApiListener; } Http::ApiListenerOptRef http() override; + void shutdown() override; + + Network::ReadFilterCallbacks& readCallbacksForTest() { return read_callbacks_; } private: // Need to store the factory due to the shared_ptrs that need to be kept alive: date provider, diff --git a/source/server/server.cc b/source/server/server.cc index 32d38f7916f6f..024e7c5d23710 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -578,6 +578,13 @@ void InstanceImpl::terminate() { // Shutdown all the workers now that the main dispatch loop is done. if (listener_manager_ != nullptr) { + // Also shutdown the listener manager's ApiListener, if there is one, which runs on the main + // thread. This needs to happen ahead of calling thread_local_.shutdown() below to prevent any + // objects in the ApiListener destructor to reference any objects in thread local storage. + if (listener_manager_->apiListener().has_value()) { + listener_manager_->apiListener()->get().shutdown(); + } + listener_manager_->stopWorkers(); } diff --git a/test/integration/api_listener_integration_test.cc b/test/integration/api_listener_integration_test.cc index ee1e2d672dab1..1a9e04ef072b0 100644 --- a/test/integration/api_listener_integration_test.cc +++ b/test/integration/api_listener_integration_test.cc @@ -30,7 +30,6 @@ class ApiListenerIntegrationTest : public BaseIntegrationTest, bootstrap.mutable_static_resources()->add_listeners()->MergeFrom( Server::parseListenerFromV2Yaml(api_listener_config())); }); - BaseIntegrationTest::initialize(); } void TearDown() override { @@ -84,6 +83,7 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, ApiListenerIntegrationTest, TestUtility::ipTestParamsToString); TEST_P(ApiListenerIntegrationTest, Basic) { + BaseIntegrationTest::initialize(); absl::Notification done; test_server_->server().dispatcher().post([this, &done]() -> void { ASSERT_TRUE(test_server_->server().listenerManager().apiListener().has_value()); @@ -111,5 +111,33 @@ TEST_P(ApiListenerIntegrationTest, Basic) { ASSERT_TRUE(done.WaitForNotificationWithTimeout(absl::Seconds(1))); } +TEST_P(ApiListenerIntegrationTest, DestroyWithActiveStreams) { + autonomous_allow_incomplete_streams_ = true; + BaseIntegrationTest::initialize(); + absl::Notification done; + + test_server_->server().dispatcher().post([this, &done]() -> void { + ASSERT_TRUE(test_server_->server().listenerManager().apiListener().has_value()); + ASSERT_EQ("api_listener", test_server_->server().listenerManager().apiListener()->get().name()); + ASSERT_TRUE(test_server_->server().listenerManager().apiListener()->get().http().has_value()); + auto& http_api_listener = + test_server_->server().listenerManager().apiListener()->get().http()->get(); + + ON_CALL(stream_encoder_, getStream()).WillByDefault(ReturnRef(stream_encoder_.stream_)); + auto& stream_decoder = http_api_listener.newStream(stream_encoder_); + + // Send a headers-only request + stream_decoder.decodeHeaders( + Http::HeaderMapPtr(new Http::TestHeaderMapImpl{ + {":method", "GET"}, {":path", "/api"}, {":scheme", "http"}, {":authority", "host"}}), + false); + + done.Notify(); + }); + ASSERT_TRUE(done.WaitForNotificationWithTimeout(absl::Seconds(1))); + // The server should shutdown the ApiListener at the right time during server termination such + // that no crashes occur if termination happens when the ApiListener still has ongoing streams. +} + } // namespace -} // namespace Envoy \ No newline at end of file +} // namespace Envoy diff --git a/test/server/BUILD b/test/server/BUILD index 400c4b80fab88..358ec824569c0 100644 --- a/test/server/BUILD +++ b/test/server/BUILD @@ -23,6 +23,7 @@ envoy_cc_test( ":utility_lib", "//source/server:api_listener_lib", "//source/server:listener_lib", + "//test/mocks/network:network_mocks", "//test/mocks/server:server_mocks", "//test/test_common:utility_lib", "@envoy_api//envoy/config/listener/v3:pkg_cc_proto", diff --git a/test/server/api_listener_test.cc b/test/server/api_listener_test.cc index ccf9c643ee551..f229823c59c32 100644 --- a/test/server/api_listener_test.cc +++ b/test/server/api_listener_test.cc @@ -5,6 +5,7 @@ #include "server/api_listener_impl.h" #include "server/listener_manager_impl.h" +#include "test/mocks/network/mocks.h" #include "test/mocks/server/mocks.h" #include "test/server/utility.h" #include "test/test_common/utility.h" @@ -87,5 +88,48 @@ name: test_api_listener "eds_cluster_config {\n eds_config {\n path: \"eds path\"\n }\n }\n}\n"); } +TEST_F(ApiListenerTest, HttpApiListenerShutdown) { + const std::string yaml = R"EOF( +name: test_api_listener +address: + socket_address: + address: 127.0.0.1 + port_value: 1234 +api_listener: + api_listener: + "@type": type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager + stat_prefix: hcm + route_config: + name: api_router + virtual_hosts: + - name: api + domains: + - "*" + routes: + - match: + prefix: "/" + route: + cluster: dynamic_forward_proxy_cluster + )EOF"; + + const envoy::config::listener::v3::Listener config = parseListenerFromV2Yaml(yaml); + + auto http_api_listener = HttpApiListener(config, *listener_manager_, config.name()); + + ASSERT_EQ("test_api_listener", http_api_listener.name()); + ASSERT_EQ(ApiListener::Type::HttpApiListener, http_api_listener.type()); + ASSERT_TRUE(http_api_listener.http().has_value()); + + Network::MockConnectionCallbacks network_connection_callbacks; + // TODO(junr03): potentially figure out a way of unit testing this behavior without exposing a + // ForTest function. + http_api_listener.readCallbacksForTest().connection().addConnectionCallbacks( + network_connection_callbacks); + + EXPECT_CALL(network_connection_callbacks, onEvent(Network::ConnectionEvent::RemoteClose)); + // Shutting down the ApiListener should raise an event on all connection callback targets. + http_api_listener.shutdown(); +} + } // namespace Server -} // namespace Envoy \ No newline at end of file +} // namespace Envoy