diff --git a/test/integration/integration.cc b/test/integration/integration.cc index 28e5eb4782..9eba5c7727 100644 --- a/test/integration/integration.cc +++ b/test/integration/integration.cc @@ -294,10 +294,9 @@ void BaseIntegrationTest::createUpstreams() { } void BaseIntegrationTest::createEnvoy() { - std::vector ports; for (auto& upstream : fake_upstreams_) { if (upstream->localAddress()->ip()) { - ports.push_back(upstream->localAddress()->ip()->port()); + ports_.push_back(upstream->localAddress()->ip()->port()); } } @@ -314,7 +313,7 @@ void BaseIntegrationTest::createEnvoy() { // Note that finalize assumes that every fake_upstream_ must correspond to a bootstrap config // static entry. So, if you want to manually create a fake upstream without specifying it in the // config, you will need to do so *after* initialize() (which calls this function) is done. - config_helper_.finalize(ports); + config_helper_.finalize(ports_); envoy::config::bootstrap::v2::Bootstrap bootstrap = config_helper_.bootstrap(); if (use_lds_) { diff --git a/test/integration/integration.h b/test/integration/integration.h index 873dde090a..240c8c5ed9 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -135,7 +135,7 @@ struct ApiFilesystemConfig { /** * Test fixture for all integration tests. */ -class BaseIntegrationTest : Logger::Loggable { +class BaseIntegrationTest : protected Logger::Loggable { public: using TestTimeSystemPtr = std::unique_ptr; using InstanceConstSharedPtrFn = std::function; @@ -363,6 +363,7 @@ class BaseIntegrationTest : Logger::Loggable { bool tls_xds_upstream_{false}; bool use_lds_{true}; // Use the integration framework's LDS set up. Grpc::SotwOrDelta sotw_or_delta_{Grpc::SotwOrDelta::Sotw}; + std::vector ports_; private: // The type for the Envoy-to-backend connection diff --git a/test/stress/BUILD b/test/stress/BUILD new file mode 100644 index 0000000000..7b1098f1cc --- /dev/null +++ b/test/stress/BUILD @@ -0,0 +1,40 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_cc_test_library", + "envoy_package", +) + +envoy_package() + +envoy_cc_test_library( + name = "stress_test_lib", + srcs = [ + "stress_test.cc", + "stress_test_common.cc", + "stress_test_downstream.cc", + "stress_test_upstream.cc", + ], + hdrs = [ + "stress_test.h", + "stress_test_common.h", + "stress_test_downstream.h", + "stress_test_upstream.h", + ], + deps = [ + "//source/server:server_lib", + "//test/integration:http_protocol_integration_lib", + ], +) + +envoy_cc_test( + name = "stress_test_self_test", + srcs = [ + "stress_test_self_test.cc", + ], + deps = [ + ":stress_test_lib", + ], +) diff --git a/test/stress/stress_test.cc b/test/stress/stress_test.cc new file mode 100644 index 0000000000..89dcb962e1 --- /dev/null +++ b/test/stress/stress_test.cc @@ -0,0 +1,196 @@ +#include "stress_test.h" + +namespace Envoy { +namespace Stress { + +const std::string StressTest::ORIGIN_CLUSTER_NAME{"origin_cluster"}; + +static const std::string BOOTSTRAP_CONFIG = R"EOF( +admin: + access_log_path: /dev/null + address: + socket_address: + address: {} + port_value: 0 +dynamic_resources: + lds_config: + path: /dev/null +static_resources: + listeners: + name: listener_0 + address: + socket_address: + address: {} + port_value: 0 + filter_chains: + filters: + name: envoy.http_connection_manager + config: + stat_prefix: config_test + http_filters: + name: envoy.router + codec_type: auto + access_log: + name: envoy.file_access_log + filter: + not_health_check_filter: {{}} + config: + path: /dev/null + route_config: + virtual_hosts: + name: integration + routes: + route: + cluster: {} + match: + prefix: "/" + domains: "*" + name: route_config_0 +)EOF"; + +std::string StressTest::baseBootstrap(Network::Address::IpVersion ip_version) { + return fmt::format(BOOTSTRAP_CONFIG, Network::Test::getLoopbackAddressString(ip_version), + Network::Test::getLoopbackAddressString(ip_version), ORIGIN_CLUSTER_NAME); +} + +ClusterHelper& StressTest::addCluster(ClusterHelperPtr&& cluster_helper) { + const std::string& name = cluster_helper->name(); + auto it = clusters_.emplace(std::make_pair( + name, std::make_unique(std::move(cluster_helper), transport_socket_factory_, + ip_version_, http_type_))); + + if (!it.second) { + throw EnvoyException(fmt::format("Duplicate cluster named '{}'", name)); + } + + return it.first->second->clusterHelper(); +} + +void StressTest::bind() { + for (auto& it : clusters_) { + it.second->bind(); + } +} + +LoadGeneratorPtr StressTest::start() { + { + const auto& it = clusters_.find(ORIGIN_CLUSTER_NAME); + if (it == clusters_.end()) { + throw EnvoyException(fmt::format("One cluster must be named '{}'", ORIGIN_CLUSTER_NAME)); + } + } + + for (auto& it : clusters_) { + it.second->start(); + it.second->addClusterToBootstrap(config_helper_, ports_); + } + + setUpstreamProtocol(Http::CodecClient::Type::HTTP2 == http_type_ + ? FakeHttpConnection::Type::HTTP2 + : FakeHttpConnection::Type::HTTP1); + // Start envoy + HttpIntegrationTest::initialize(); + + ENVOY_LOG(debug, "Bootstrap Config:\n{}", + MessageUtil::getYamlStringFromMessage(config_helper_.bootstrap(), true)); + + Network::Address::InstanceConstSharedPtr address{ + loopbackAddress(ip_version_, lookupPort("http"))}; + return std::make_unique(client_, transport_socket_factory_, http_type_, address); +} + +uint16_t StressTest::firstPortInCluster(const std::string& cluster_name) const { + const auto& it = clusters_.find(cluster_name); + return it == clusters_.end() ? 0 : it->second->firstPort(); +} + +const ClusterHelper& StressTest::findCluster(const std::string& cluster_name) const { + const auto& it = clusters_.find(cluster_name); + if (it == clusters_.end()) { + throw EnvoyException(fmt::format("Cannot find cluster '{}'", cluster_name)); + } + return it->second->clusterHelper(); +} + +void StressTest::stopServers() { + // Stop envoy by destroying it. + test_server_ = nullptr; + + // Wait until all clusters have no more active connections + for (auto& it : clusters_) { + it.second->wait(); + } +} + +// Must be called before Envoy is stopped +void StressTest::extractCounters(StressTest::CounterMap& counters, const std::string& prefix) { + for (const auto& it : test_server_->stat_store().counters()) { + if (!absl::StartsWith(it->name(), prefix)) { + continue; + } + counters[it->name()] = it->value(); + } +} + +void StressTest::dumpCounters(StressTest::CounterMap& counters) { + for (const auto& it : counters) { + ENVOY_LOG(info, "{} = {}", it.first, it.second); + } +} + +void StressTest::Cluster::bind() { + if (bound_) { + return; + } + for (size_t i = 0; i < cluster_helper_->servers().size(); ++i) { + listeners_.emplace_back(new LocalListenSocket(ip_version_)); + ENVOY_LOG(debug, "{} bound port {}", cluster_helper_->name(), + listeners_.back()->localAddress()->ip()->port()); + } + bound_ = true; +} + +void StressTest::Cluster::addClusterToBootstrap(ConfigHelper& config_helper, + std::vector& ports) const { + config_helper.addConfigModifier([this](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { + auto cluster = bootstrap.mutable_static_resources()->add_clusters(); + + cluster->set_name(cluster_helper_->name()); + cluster->set_type(envoy::api::v2::Cluster_DiscoveryType::Cluster_DiscoveryType_STATIC); + cluster->set_lb_policy(envoy::api::v2::Cluster_LbPolicy::Cluster_LbPolicy_ROUND_ROBIN); + + if (http_type_ == Http::CodecClient::Type::HTTP1) { + auto opts = cluster->mutable_http_protocol_options(); + opts->set_accept_http_10(false); + } else { + auto opts = cluster->mutable_http2_protocol_options(); + auto value = opts->mutable_max_concurrent_streams(); + value->set_value(2147483647U); + } + + for (const auto& listener : listeners_) { + auto hosts = cluster->add_hosts(); + auto address = hosts->mutable_socket_address(); + address->set_address(Network::Test::getLoopbackAddressString(ip_version_)); + address->set_port_value(listener->localAddress()->ip()->port()); + } + }); + + // This avoids "assert failure: ports.size() > port_idx" complaints from + // ConfigHelper::finalize() + for (const auto& listener : listeners_) { + ports.push_back(listener->localAddress()->ip()->port()); + } +} + +void StressTest::Cluster::start() { + bind(); + for (size_t i = 0; i < cluster_helper_->servers().size(); ++i) { + servers_.emplace_back(new Server(fmt::format("{}-{}", cluster_helper_->name(), i), + *listeners_[i], transport_socket_factory_, http_type_)); + servers_.back()->start(*cluster_helper_->servers()[i]); + } +} + +} // namespace Stress +} // namespace Envoy diff --git a/test/stress/stress_test.h b/test/stress/stress_test.h new file mode 100644 index 0000000000..acedc131eb --- /dev/null +++ b/test/stress/stress_test.h @@ -0,0 +1,94 @@ +#pragma once + +#include "test/integration/http_integration.h" +#include "test/stress/stress_test_downstream.h" +#include "test/stress/stress_test_upstream.h" + +namespace Envoy { +namespace Stress { + +class StressTest : public HttpIntegrationTest { +public: + static const std::string ORIGIN_CLUSTER_NAME; + + StressTest(Network::Address::IpVersion ip_protocol, Http::CodecClient::Type http_type) + : HttpIntegrationTest(http_type, ip_protocol, baseBootstrap(ip_protocol)), + ip_version_(ip_protocol), http_type_{http_type}, + transport_socket_factory_{}, client_{"client"} { + // Tell the base class that we will create our own upstream origin server + fake_upstreams_count_ = 0; + } + +protected: + Network::Address::IpVersion ipVersion() const { return ip_version_; } + + Http::CodecClient::Type httpType() const { return http_type_; } + + ClusterHelper& addCluster(ClusterHelperPtr&& cluster_helper); + + void bind(); + + LoadGeneratorPtr start(); + + uint16_t envoyPort() { return static_cast(lookupPort("http")); } + + uint16_t firstPortInCluster(const std::string& cluster_name) const; + + const ClusterHelper& findCluster(const std::string& cluster_name) const; + + void stopServers(); + + using CounterMap = std::unordered_map; + + // Must be called before Envoy is stopped + void extractCounters(CounterMap& counters, const std::string& prefix = ""); + + void dumpCounters(CounterMap& counters); + +private: + static std::string baseBootstrap(Network::Address::IpVersion ip_protocol); + + class Cluster { + public: + Cluster(ClusterHelperPtr&& cluster_helper, + Network::TransportSocketFactory& transport_socket_factory, + Network::Address::IpVersion ip_version, Http::CodecClient::Type http_type) + : transport_socket_factory_{transport_socket_factory}, ip_version_{ip_version}, + http_type_{http_type}, cluster_helper_{std::move(cluster_helper)} {} + + void wait() { cluster_helper_->wait(); } + + void bind(); + + uint16_t firstPort() const { + return static_cast(listeners_[0]->localAddress()->ip()->port()); + } + + const ClusterHelper& clusterHelper() const { return *cluster_helper_; } + ClusterHelper& clusterHelper() { return *cluster_helper_; } + + void addClusterToBootstrap(ConfigHelper& config_helper, std::vector& ports) const; + + void start(); + + private: + bool bound_{false}; + Network::TransportSocketFactory& transport_socket_factory_; + Network::Address::IpVersion ip_version_; + Http::CodecClient::Type http_type_; + ClusterHelperPtr cluster_helper_; + std::vector listeners_; + std::vector servers_; + }; + + typedef std::unique_ptr ClusterPtr; + + Network::Address::IpVersion ip_version_; + Http::CodecClient::Type http_type_; + Network::RawBufferSocketFactory transport_socket_factory_; + Client client_; + std::unordered_map clusters_; +}; + +} // namespace Stress +} // namespace Envoy diff --git a/test/stress/stress_test_common.cc b/test/stress/stress_test_common.cc new file mode 100644 index 0000000000..553e249233 --- /dev/null +++ b/test/stress/stress_test_common.cc @@ -0,0 +1,35 @@ +#include "stress_test_common.h" + +#include "common/network/address_impl.h" + +namespace Envoy { +namespace Stress { + +Http::CodecClient::Type httpType(const std::string& str) { + return 0 == str.compare("http1") ? Http::CodecClient::Type::HTTP1 + : Http::CodecClient::Type::HTTP2; +} + +Network::Address::IpVersion ipVersion(const std::string& str) { + return 0 == str.compare("IPv4") ? Network::Address::IpVersion::v4 + : Network::Address::IpVersion::v6; +} + +Network::Address::InstanceConstSharedPtr loopbackAddress(Network::Address::IpVersion ip_version, + uint32_t port) { + switch (ip_version) { + case Network::Address::IpVersion::v6: { + Network::Address::InstanceConstSharedPtr addr{new Network::Address::Ipv6Instance("::1", port)}; + return addr; + } + case Network::Address::IpVersion::v4: + default: { + Network::Address::InstanceConstSharedPtr addr{ + new Network::Address::Ipv4Instance("127.0.0.1", port)}; + return addr; + } + } +} + +} // namespace Stress +} // namespace Envoy diff --git a/test/stress/stress_test_common.h b/test/stress/stress_test_common.h new file mode 100644 index 0000000000..fec235bd42 --- /dev/null +++ b/test/stress/stress_test_common.h @@ -0,0 +1,18 @@ +#pragma once + +#include "envoy/network/address.h" + +#include "common/http/codec_client.h" + +namespace Envoy { +namespace Stress { + +extern Network::Address::InstanceConstSharedPtr +loopbackAddress(Network::Address::IpVersion ip_version, uint32_t port); + +extern Http::CodecClient::Type httpType(const std::string& str); + +extern Network::Address::IpVersion ipVersion(const std::string& str); + +} // namespace Stress +} // namespace Envoy diff --git a/test/stress/stress_test_downstream.cc b/test/stress/stress_test_downstream.cc new file mode 100644 index 0000000000..98aebb1bbe --- /dev/null +++ b/test/stress/stress_test_downstream.cc @@ -0,0 +1,571 @@ +#include "stress_test_downstream.h" + +#include +#include + +#include "envoy/thread/thread.h" + +#include "common/http/http1/codec_impl.h" +#include "common/http/http2/codec_impl.h" +#include "common/stats/isolated_store_impl.h" + +namespace Envoy { +namespace Stress { + +class ClientStream : public Http::StreamDecoder, + public Http::StreamCallbacks, + Logger::Loggable { +public: + ClientStream(uint32_t id, ClientConnection& connection, ClientResponseCallback& callback) + : id_(id), connection_(connection), callback_(callback) {} + ClientStream(const ClientStream&) = delete; + + void operator=(const ClientStream&) = delete; + + ~ClientStream() override { + ENVOY_LOG(trace, "ClientStream({}:{}:{}) destroyed", connection_.name(), connection_.id(), id_); + } + + // + // Http::StreamDecoder + // + + void decode100ContinueHeaders(Http::HeaderMapPtr&&) override { + ENVOY_LOG(trace, "ClientStream({}:{}:{}) got continue headers", connection_.name(), + connection_.id(), id_); + } + + void decodeHeaders(Http::HeaderMapPtr&& response_headers, bool end_stream) override { + ENVOY_LOG(debug, "ClientStream({}:{}:{}) got response headers", connection_.name(), + connection_.id(), id_); + + response_headers_ = std::move(response_headers); + + if (end_stream) { + onEndStream(); + // stream is now destroyed + } + } + + void decodeData(Buffer::Instance&, bool end_stream) override { + ENVOY_LOG(debug, "ClientStream({}:{}:{}) got response body data", connection_.name(), + connection_.id(), id_); + + if (end_stream) { + onEndStream(); + // stream is now destroyed + } + } + + void decodeTrailers(Http::HeaderMapPtr&&) override { + ENVOY_LOG(trace, "ClientStream({}:{}:{}) got response trailers", connection_.name(), + connection_.id(), id_); + onEndStream(); + // stream is now destroyed + } + + void decodeMetadata(Http::MetadataMapPtr&&) override { + ENVOY_LOG(trace, "ClientStream({}:{}):{} got metadata", connection_.name(), connection_.id(), + id_); + } + + // + // Http::StreamCallbacks + // + + void onResetStream(Http::StreamResetReason reason, absl::string_view) override { + switch (reason) { + case Http::StreamResetReason::LocalReset: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) was locally reset", connection_.name(), + connection_.id(), id_); + break; + case Http::StreamResetReason::LocalRefusedStreamReset: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) refused local stream reset", connection_.name(), + connection_.id(), id_); + break; + case Http::StreamResetReason::RemoteReset: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) was remotely reset", connection_.name(), + connection_.id(), id_); + break; + case Http::StreamResetReason::RemoteRefusedStreamReset: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) refused remote stream reset", connection_.name(), + connection_.id(), id_); + break; + case Http::StreamResetReason::ConnectionFailure: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) reseet due to initial connection failure", + connection_.name(), connection_.id(), id_); + break; + case Http::StreamResetReason::ConnectionTermination: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) reset due to underlying connection reset", + connection_.name(), connection_.id(), id_); + break; + case Http::StreamResetReason::Overflow: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) reset due to resource overflow", connection_.name(), + connection_.id(), id_); + break; + default: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) reset due to unknown reason", connection_.name(), + connection_.id(), id_); + break; + } + } + + void onAboveWriteBufferHighWatermark() override { + ENVOY_LOG(trace, "ClientStream({}:{}:{}) above write buffer high watermark", connection_.name(), + connection_.id(), id_); + } + + void onBelowWriteBufferLowWatermark() override { + ENVOY_LOG(trace, "ClientStream({}:{}:{}) below write buffer low watermark", connection_.name(), + connection_.id(), id_); + } + + virtual void sendRequest(const Http::HeaderMap& request_headers, + const std::chrono::milliseconds timeout) { + if (connection_.networkConnection().state() != Network::Connection::State::Open) { + ENVOY_LOG(warn, "ClientStream({}:{}:{})'s underlying connection is not open!", + connection_.name(), connection_.id(), id_); + connection_.removeStream(id_); + // This stream is now destroyed + return; + } + + Http::StreamEncoder& encoder = connection_.httpConnection().newStream(*this); + encoder.getStream().addCallbacks(*this); + + ENVOY_LOG(debug, "ClientStream({}:{}:{}) sending request headers", connection_.name(), + connection_.id(), id_); + encoder.encodeHeaders(request_headers, true); + + timeout_timer_ = connection_.dispatcher().createTimer([this, timeout]() { + ENVOY_LOG(debug, "ClientStream({}:{}:{}) timed out after {} msec waiting for response", + connection_.name(), connection_.id(), id_, static_cast(timeout.count())); + callback_(connection_, nullptr); + connection_.removeStream(id_); + // This stream is now destroyed + }); + timeout_timer_->enableTimer(timeout); + } + +private: + void onEndStream() { + ENVOY_LOG(debug, "ClientStream({}:{}:{}) complete", connection_.name(), connection_.id(), id_); + callback_(connection_, std::move(response_headers_)); + connection_.removeStream(id_); + // This stream is now destroyed + } + + uint32_t id_; + ClientConnection& connection_; + Http::HeaderMapPtr response_headers_{nullptr}; + ClientResponseCallback& callback_; + Event::TimerPtr timeout_timer_{nullptr}; +}; + +class HttpClientReadFilter : public Network::ReadFilter, Logger::Loggable { +public: + HttpClientReadFilter(const std::string& name, uint32_t id, Http::ClientConnection& connection) + : name_(name), id_(id), connection_(connection) {} + HttpClientReadFilter(const HttpClientReadFilter&) = delete; + + void operator=(const HttpClientReadFilter&) = delete; + + // + // Network::ReadFilter + // + + Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override { + ENVOY_LOG(trace, "ClientConnection({}:{}) got data", name_, id_); + + connection_.dispatch(data); + + if (end_stream) { + ENVOY_LOG(error, "ClientConnection({}:{}) got end stream", name_, id_); + } + + return Network::FilterStatus::StopIteration; + } + + Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; } + + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks&) override {} + +private: + std::string name_; + uint32_t id_; + Http::ClientConnection& connection_; +}; + +typedef std::unique_ptr HttpClientReadFilterPtr; +typedef std::shared_ptr HttpClientReadFilterSharedPtr; + +class Http1ClientConnection : public ClientConnection { +public: + Http1ClientConnection(Client& client, uint32_t id, ClientConnectCallback& connect_callback, + ClientCloseCallback& close_callback, + std::shared_ptr& dispatcher, + Network::ClientConnectionPtr&& network_connection) + : ClientConnection(client, id, connect_callback, close_callback, dispatcher), + network_connection_(std::move(network_connection)), + http_connection_(*network_connection_, *this), + read_filter_{std::make_shared(client.name(), id, http_connection_)} { + network_connection_->addReadFilter(read_filter_); + network_connection_->addConnectionCallbacks(*this); + } + Http1ClientConnection(const Http1ClientConnection&) = delete; + + Http1ClientConnection& operator=(const Http1ClientConnection&) = delete; + + Network::ClientConnection& networkConnection() override { return *network_connection_; } + + Http::ClientConnection& httpConnection() override { return http_connection_; } + +private: + Network::ClientConnectionPtr network_connection_; + Http::Http1::ClientConnectionImpl http_connection_; + HttpClientReadFilterSharedPtr read_filter_; +}; + +static constexpr uint32_t max_request_headers_kb = 2U; + +class Http2ClientConnection : public ClientConnection { +public: + Http2ClientConnection(Client& client, uint32_t id, ClientConnectCallback& connect_callback, + ClientCloseCallback& close_callback, + std::shared_ptr& dispatcher, + Network::ClientConnectionPtr&& network_connection) + : ClientConnection(client, id, connect_callback, close_callback, dispatcher), stats_(), + settings_(), network_connection_(std::move(network_connection)), + http_connection_(*network_connection_, *this, stats_, settings_, max_request_headers_kb), + read_filter_{std::make_shared(client.name(), id, http_connection_)} { + network_connection_->addReadFilter(read_filter_); + network_connection_->addConnectionCallbacks(*this); + } + Http2ClientConnection(const Http2ClientConnection&) = delete; + + Http2ClientConnection& operator=(const Http2ClientConnection&) = delete; + + Network::ClientConnection& networkConnection() override { return *network_connection_; } + + Http::ClientConnection& httpConnection() override { return http_connection_; } + +private: + Stats::IsolatedStoreImpl stats_; + Http::Http2Settings settings_; + Network::ClientConnectionPtr network_connection_; + Http::Http2::ClientConnectionImpl http_connection_; + HttpClientReadFilterSharedPtr read_filter_; +}; + +ClientStream& ClientConnection::newStream(ClientResponseCallback& callback) { + std::lock_guard guard(streams_lock_); + + uint32_t id = stream_counter_++; + ClientStreamPtr stream = std::make_unique(id, *this, callback); + ClientStream* raw = stream.get(); + streams_[id] = std::move(stream); + + return *raw; +} + +ClientConnection::ClientConnection(Client& client, uint32_t id, + ClientConnectCallback& connect_callback, + ClientCloseCallback& close_callback, + std::shared_ptr& dispatcher) + : client_(client), id_(id), connect_callback_(connect_callback), + close_callback_(close_callback), dispatcher_(dispatcher) {} + +ClientConnection::~ClientConnection() { + ENVOY_LOG(trace, "ClientConnection({}:{}) destroyed", client_.name(), id_); +} + +const std::string& ClientConnection::name() const { return client_.name(); } + +uint32_t ClientConnection::id() const { return id_; } + +Event::Dispatcher& ClientConnection::dispatcher() { return *dispatcher_; }; + +void ClientConnection::removeStream(uint32_t stream_id) { + unsigned long size = 0UL; + + { + std::lock_guard guard(streams_lock_); + streams_.erase(stream_id); + size = streams_.size(); + } + + if (0 == size) { + ENVOY_LOG(debug, "ClientConnection({}:{}) is idle", client_.name(), id_); + if (ClientCallbackResult::CLOSE == connect_callback_(*this, ClientConnectionState::IDLE)) { + // This will trigger a + // networkConnection().onEvent(Network::ConnectionEvent::LocalClose) + networkConnection().close(Network::ConnectionCloseType::NoFlush); + } + } +} + +void ClientConnection::onEvent(Network::ConnectionEvent event) { + switch (event) { + // properly on connection destruction. + case Network::ConnectionEvent::RemoteClose: + if (established_) { + ENVOY_LOG(debug, "ClientConnection({}:{}) closed by peer or reset", client_.name(), id_); + close_callback_(*this, ClientCloseReason::REMOTE_CLOSE); + } else { + ENVOY_LOG(debug, "ClientConnection({}:{}) cannot connect to peer", client_.name(), id_); + close_callback_(*this, ClientCloseReason::CONNECT_FAILED); + } + client_.releaseConnection(*this); + // ClientConnection has been destroyed + return; + case Network::ConnectionEvent::LocalClose: + ENVOY_LOG(debug, "ClientConnection({}:{}) closed locally", client_.name(), id_); + close_callback_(*this, ClientCloseReason::LOCAL_CLOSE); + client_.releaseConnection(*this); + // ClientConnection has been destroyed + return; + case Network::ConnectionEvent::Connected: + established_ = true; + ENVOY_LOG(debug, "ClientConnection({}:{}) established", client_.name(), id_); + if (ClientCallbackResult::CLOSE == connect_callback_(*this, ClientConnectionState::CONNECTED)) { + // This will trigger a + // networkConnection().onEvent(Network::ConnectionEvent::LocalClose) + networkConnection().close(Network::ConnectionCloseType::NoFlush); + } + break; + default: + ENVOY_LOG(error, "ClientConnection({}:{}) got unknown event", client_.name(), id_); + }; +} + +void ClientConnection::onAboveWriteBufferHighWatermark() { + ENVOY_LOG(warn, "ClientConnection({}:{}) above write buffer high watermark", client_.name(), id_); + httpConnection().onUnderlyingConnectionAboveWriteBufferHighWatermark(); +} + +void ClientConnection::onBelowWriteBufferLowWatermark() { + ENVOY_LOG(warn, "ClientConnection({}:{}) below write buffer low watermark", client_.name(), id_); + httpConnection().onUnderlyingConnectionBelowWriteBufferLowWatermark(); +} + +void ClientConnection::onGoAway() { + ENVOY_LOG(warn, "ClientConnection({}:{}) remote closed", client_.name(), id_); +} + +void ClientConnection::sendRequest(const Http::HeaderMap& headers, ClientResponseCallback& callback, + std::chrono::milliseconds timeout) { + newStream(callback).sendRequest(headers, timeout); +} + +Client::Client(const std::string& name) + : name_(name), stats_(), thread_(nullptr), time_system_(), + api_(Thread::threadFactoryForTest(), stats_, time_system_, Filesystem::fileSystemForTest()), + dispatcher_{api_.allocateDispatcher()} {} + +Client::~Client() { + stop(); + ENVOY_LOG(trace, "Client({}) destroyed", name_); +} + +const std::string& Client::name() const { return name_; } + +void Client::connect(Network::TransportSocketFactory& socket_factory, + Http::CodecClient::Type http_version, + Network::Address::InstanceConstSharedPtr& address, + const Network::ConnectionSocket::OptionsSharedPtr& sockopts, + ClientConnectCallback& connect_cb, ClientCloseCallback& close_cb) { + dispatcher_->post([this, &socket_factory, http_version, address, sockopts, &connect_cb, + &close_cb]() { + Network::ClientConnectionPtr connection = dispatcher_->createClientConnection( + address, nullptr, socket_factory.createTransportSocket(nullptr), sockopts); + uint32_t id = connection_counter_++; + + ClientConnectionPtr ptr; + if (Http::CodecClient::Type::HTTP1 == http_version) { + ptr = std::make_unique(*this, id, connect_cb, close_cb, dispatcher_, + std::move(connection)); + } else { + ptr = std::make_unique(*this, id, connect_cb, close_cb, dispatcher_, + std::move(connection)); + } + ClientConnection& raw = *ptr.get(); + + { + std::lock_guard guard(connections_lock_); + connections_[id] = std::move(ptr); + } + + ENVOY_LOG(debug, "ClientConnection({}:{}) connecting to {}", name_, id, address->asString()); + raw.networkConnection().connect(); + }); +} + +void Client::start() { + std::promise promise; + + if (is_running_) { + return; + } + + thread_ = api_.threadFactory().createThread([this, &promise]() { + ENVOY_LOG(debug, "Client({}) dispatcher started", name_); + + is_running_ = true; + promise.set_value(true); // do not use promise again after this + while (is_running_) { + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + ENVOY_LOG(debug, "Client({}) dispatcher stopped", name_); + }); + + promise.get_future().get(); +} + +void Client::stop() { + ENVOY_LOG(debug, "Client({}) stop requested", name_); + + is_running_ = false; + if (thread_) { + thread_->join(); + thread_ = nullptr; + } + + ENVOY_LOG(debug, "Client({}) stopped", name_); +} + +void Client::releaseConnection(uint32_t id) { + size_t erased = 0; + { + std::lock_guard guard(connections_lock_); + dispatcher_->deferredDelete(std::move(connections_[id])); + erased = connections_.erase(id); + } + if (1 > erased) { + ENVOY_LOG(error, "Client({}) cannot remove ClientConnection({}:{})", name_, name_, id); + } +} + +void Client::releaseConnection(ClientConnection& connection) { releaseConnection(connection.id()); } + +LoadGenerator::LoadGenerator(Client& client, Network::TransportSocketFactory& socket_factory, + Http::CodecClient::Type http_version, + Network::Address::InstanceConstSharedPtr& address, + const Network::ConnectionSocket::OptionsSharedPtr& sockopts) + : client_(client), socket_factory_(socket_factory), http_version_(http_version), + address_(address), sockopts_(sockopts) { + response_callback_ = [this](ClientConnection& connection, Http::HeaderMapPtr&& response) { + if (!response) { + ENVOY_LOG(debug, "Connection({}:{}) timedout waiting for response", connection.name(), + connection.id()); + ++response_timeouts_; + return; + } + + ++responses_received_; + + uint64_t status = 0; + auto str = std::string(response->Status()->value().getStringView()); + if (!StringUtil::atoull(str.c_str(), status)) { + ENVOY_LOG(error, "Connection({}:{}) received response with bad status", connection.name(), + connection.id()); + } else if (200 <= status && status < 300) { + ++class_2xx_; + } else if (400 <= status && status < 500) { + ++class_4xx_; + } else if (500 <= status && status < 600) { + ++class_5xx_; + } + + if (0 >= requests_remaining_--) { + // Break if we've already sent or scheduled every request we wanted to + return; + } + + connection.sendRequest(*request_, response_callback_, timeout_); + }; + + connect_callback_ = [this](ClientConnection& connection, + ClientConnectionState state) -> ClientCallbackResult { + if (state == ClientConnectionState::IDLE) { + // This will result in a CloseReason::LOCAL_CLOSE passed to the + // close_callback + return ClientCallbackResult::CLOSE; + } + // If ConnectionResult::SUCCESS: + + ++connect_successes_; + + if (0 >= requests_remaining_--) { + // This will result in a ConnectionState::IDLE passed to this callback + // once all active streams have finished. + return ClientCallbackResult::CONTINUE; + } + + connection.sendRequest(*request_, response_callback_, timeout_); + + return ClientCallbackResult::CONTINUE; + }; + + close_callback_ = [this](ClientConnection&, ClientCloseReason reason) { + switch (reason) { + case ClientCloseReason::CONNECT_FAILED: + ++connect_failures_; + break; + case ClientCloseReason::REMOTE_CLOSE: + ++remote_closes_; + break; + case ClientCloseReason::LOCAL_CLOSE: + // We initiated this by responding to ConnectionState::IDLE with a + // CallbackResult::Close + ++local_closes_; + break; + } + + // Unblock run() once we've seen a close for every connection initiated. + if (remote_closes_ + local_closes_ + connect_failures_ >= connections_to_initiate_) { + promise_all_connections_closed_.set_value(true); + } + }; +} + +void LoadGenerator::run(uint32_t connections, uint32_t requests, Http::HeaderMapPtr&& request, + std::chrono::milliseconds timeout) { + connections_to_initiate_ = connections; + requests_to_send_ = requests; + request_ = std::move(request); + promise_all_connections_closed_ = std::promise(); + timeout_ = timeout; + requests_remaining_ = requests_to_send_; + connect_failures_ = 0; + connect_successes_ = 0; + responses_received_ = 0; + response_timeouts_ = 0; + local_closes_ = 0; + remote_closes_ = 0; + class_2xx_ = 0; + class_4xx_ = 0; + class_5xx_ = 0; + + client_.start(); // idempotent + + for (uint32_t i = 0; i < connections_to_initiate_; ++i) { + client_.connect(socket_factory_, http_version_, address_, sockopts_, connect_callback_, + close_callback_); + } + + promise_all_connections_closed_.get_future().get(); +} + +uint32_t LoadGenerator::connectFailures() const { return connect_failures_; } +uint32_t LoadGenerator::connectSuccesses() const { return connect_successes_; } +uint32_t LoadGenerator::responsesReceived() const { return responses_received_; } +uint32_t LoadGenerator::responseTimeouts() const { return response_timeouts_; } +uint32_t LoadGenerator::localCloses() const { return local_closes_; } +uint32_t LoadGenerator::remoteCloses() const { return remote_closes_; } +uint32_t LoadGenerator::class2xxResponses() const { return class_2xx_; } +uint32_t LoadGenerator::class4xxResponses() const { return class_4xx_; } +uint32_t LoadGenerator::class5xxResponses() const { return class_5xx_; } + +} // namespace Stress +} // namespace Envoy diff --git a/test/stress/stress_test_downstream.h b/test/stress/stress_test_downstream.h new file mode 100644 index 0000000000..1a576e5820 --- /dev/null +++ b/test/stress/stress_test_downstream.h @@ -0,0 +1,288 @@ +#pragma once + +#include + +#include "envoy/api/api.h" +#include "envoy/event/dispatcher.h" +#include "envoy/http/codec.h" +#include "envoy/network/address.h" +#include "envoy/thread/thread.h" + +#include "common/api/api_impl.h" +#include "common/common/thread.h" +#include "common/http/codec_client.h" +#include "common/network/raw_buffer_socket.h" +#include "common/stats/isolated_store_impl.h" + +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +#include "fmt/printf.h" +#include "stress_test_common.h" + +namespace Envoy { +namespace Stress { + +class ClientStream; +class ClientConnection; +class Client; +typedef std::unique_ptr ClientStreamPtr; +typedef std::shared_ptr ClientStreamSharedPtr; +typedef std::unique_ptr ClientConnectionPtr; +typedef std::shared_ptr ClientConnectionSharedPtr; +typedef std::unique_ptr ClientPtr; +typedef std::shared_ptr ClientSharedPtr; + +enum class ClientConnectionState { + CONNECTED, // Connection established. Non-Terminal. Will be followed by one + // of the codes below. + IDLE, // Connection has no active streams. Non-Terminal. Close it, use it, + // or put it in a pool. +}; + +enum class ClientCloseReason { + CONNECT_FAILED, // Connection could not be established + REMOTE_CLOSE, // Peer closed or connection was reset after it was + // established. + LOCAL_CLOSE // This process decided to close the connection. +}; + +enum class ClientCallbackResult { + CONTINUE, // Leave the connection open + CLOSE // Close the connection. +}; + +/** + * Handle a non-terminal connection event asynchronously. + * + * @param connection The connection with the event + * @param state The state of the connection (connected or idle). + */ +typedef std::function + ClientConnectCallback; + +/** + * Handle a terminal connection close event asynchronously. + * + * @param connection The connection that was closed + * @param reason The reason the connection was closed + */ +typedef std::function + ClientCloseCallback; + +/** + * Handle a response asynchronously. + * + * @param connection The connection that received the response. + * @param response_headers The response headers or null if timed out. + */ +typedef std::function + ClientResponseCallback; + +class ClientConnection : public Network::ConnectionCallbacks, + public Http::ConnectionCallbacks, + public Event::DeferredDeletable, + protected Logger::Loggable { +public: + ClientConnection(Client& client, uint32_t id, ClientConnectCallback& connect_callback, + ClientCloseCallback& close_callback, + std::shared_ptr& dispatcher); + ClientConnection(const ClientConnection&) = delete; + + ClientConnection& operator=(const ClientConnection&) = delete; + ~ClientConnection() override; + + const std::string& name() const; + + uint32_t id() const; + + virtual Network::ClientConnection& networkConnection() PURE; + + virtual Http::ClientConnection& httpConnection() PURE; + + Event::Dispatcher& dispatcher(); + + /** + * Asynchronously send a request. On HTTP1.1 connections at most one request + * can be outstanding on a connection. For HTTP2 multiple requests may + * outstanding. + * + * @param request_headers + * @param callback + */ + virtual void sendRequest(const Http::HeaderMap& request_headers, ClientResponseCallback& callback, + std::chrono::milliseconds timeout = std::chrono::milliseconds(5'000)); + + /** + * For internal use + * + * @param stream_id + */ + void removeStream(uint32_t stream_id); + + // + // Network::ConnectionCallbacks + // + + void onEvent(Network::ConnectionEvent event) override; + + void onAboveWriteBufferHighWatermark() override; + + void onBelowWriteBufferLowWatermark() override; + + // + // Http::ConnectionCallbacks + // + + void onGoAway() override; + +private: + ClientStream& newStream(ClientResponseCallback& callback); + + Client& client_; + uint32_t id_; + ClientConnectCallback& connect_callback_; + ClientCloseCallback& close_callback_; + std::shared_ptr dispatcher_; + bool established_{false}; + + std::mutex streams_lock_; + std::unordered_map streams_; + std::atomic stream_counter_{0U}; +}; + +class Client : Logger::Loggable { +public: + explicit Client(const std::string& name); + Client(const Client&) = delete; + + Client& operator=(const Client&) = delete; + virtual ~Client(); + + const std::string& name() const; + + /** + * Start the client's dispatcher in a background thread. This is a noop if + * the client has already been started. This will block until the dispatcher + * is running on another thread. + */ + void start(); + + /** + * Stop the client's dispatcher and join the background thread. This will + * block until the background thread exits. + */ + void stop(); + + /** + * For internal use + */ + void releaseConnection(uint32_t id); + + /** + * For internal use + */ + void releaseConnection(ClientConnection& connection); + + /** + * Asynchronously connect to a peer. The connect_callback will be called on + * successful connection establishment and also on idle state, giving the + * caller the opportunity to reuse or close connections. The close_callback + * will be called after the connection is closed, giving the caller the + * opportunity to cleanup additional resources, etc. + */ + void connect(Network::TransportSocketFactory& socket_factory, + Http::CodecClient::Type http_version, + Network::Address::InstanceConstSharedPtr& address, + const Network::ConnectionSocket::OptionsSharedPtr& sockopts, + ClientConnectCallback& connect_callback, ClientCloseCallback& close_callback); + +private: + std::atomic is_running_{false}; + std::string name_; + Stats::IsolatedStoreImpl stats_; + Thread::ThreadPtr thread_; + Event::TestRealTimeSystem time_system_; + Api::Impl api_; + std::shared_ptr dispatcher_; + + std::mutex connections_lock_; + std::unordered_map connections_; + uint32_t connection_counter_{0U}; +}; + +class LoadGenerator : Logger::Loggable { +public: + /** + * A wrapper around Client and its callbacks that implements a simple load + * generator. + * + * @param socket_factory Socket factory (use for plain TCP vs. TLS) + * @param http_version HTTP version (h1 vs h2) + * @param address Address (ip addr, port, ip protocol version) to connect to + * @param sockopts Socket options for the client sockets. Use default if + * null. + */ + LoadGenerator(Client& client, Network::TransportSocketFactory& socket_factory, + Http::CodecClient::Type http_version, + Network::Address::InstanceConstSharedPtr& address, + const Network::ConnectionSocket::OptionsSharedPtr& sockopts = nullptr); + LoadGenerator(const LoadGenerator&) = delete; + void operator=(const LoadGenerator&) = delete; + virtual ~LoadGenerator() = default; + + /** + * Generate load and block until all connections have finished (successfully + * or otherwise). + * + * @param connections Connections to create + * @param requests Total requests across all connections to send + * @param request The request to send + * @param timeout The time in msec to wait to receive a response after sending + * each request. + */ + void run(uint32_t connections, uint32_t requests, Http::HeaderMapPtr&& request, + std::chrono::milliseconds timeout = std::chrono::milliseconds(5'000)); + + uint32_t connectFailures() const; + uint32_t connectSuccesses() const; + uint32_t responsesReceived() const; + uint32_t responseTimeouts() const; + uint32_t localCloses() const; + uint32_t remoteCloses() const; + uint32_t class2xxResponses() const; + uint32_t class4xxResponses() const; + uint32_t class5xxResponses() const; + +private: + uint32_t connections_to_initiate_{0}; + uint32_t requests_to_send_{0}; + Http::HeaderMapPtr request_{}; + Client& client_; + Network::TransportSocketFactory& socket_factory_; + Http::CodecClient::Type http_version_; + Network::Address::InstanceConstSharedPtr address_; + const Network::ConnectionSocket::OptionsSharedPtr sockopts_; + + ClientConnectCallback connect_callback_; + ClientResponseCallback response_callback_; + ClientCloseCallback close_callback_; + std::chrono::milliseconds timeout_{std::chrono::milliseconds(0)}; + std::atomic requests_remaining_{0}; + std::atomic connect_failures_{0}; + std::atomic connect_successes_{0}; + std::atomic responses_received_{0}; + std::atomic response_timeouts_{0}; + std::atomic local_closes_{0}; + std::atomic remote_closes_{0}; + std::atomic class_2xx_{0}; + std::atomic class_4xx_{0}; + std::atomic class_5xx_{0}; + std::promise promise_all_connections_closed_; +}; + +typedef std::unique_ptr LoadGeneratorPtr; + +} // namespace Stress +} // namespace Envoy \ No newline at end of file diff --git a/test/stress/stress_test_self_test.cc b/test/stress/stress_test_self_test.cc new file mode 100644 index 0000000000..ab8defc947 --- /dev/null +++ b/test/stress/stress_test_self_test.cc @@ -0,0 +1,379 @@ +#include "common/network/utility.h" + +#include "test/test_common/network_utility.h" + +#include "gtest/gtest.h" +#include "stress_test_downstream.h" +#include "stress_test_upstream.h" + +namespace Envoy { +namespace Stress { + +/** + * Test of the StressTest::Client against the StressTest::Server without an + * Envoy intermediary. + */ +class StressTestSelfTest + : public testing::TestWithParam>, + protected Logger::Loggable { +public: + StressTestSelfTest() + : transport_socket_factory_(), ip_version_(ipVersion(std::get<1>(GetParam()))), + http_type_(httpType(std::get<0>(GetParam()))), + use_grpc_(0 == std::get<2>(GetParam()).compare("gRPC")), + listening_socket_(Network::Utility::parseInternetAddressAndPort(fmt::format( + "{}:{}", Network::Test::getAnyAddressUrlString(ip_version_), 0)), + nullptr, true), + client_("client"), + server_("server", listening_socket_, transport_socket_factory_, http_type_) {} + +protected: + Network::RawBufferSocketFactory transport_socket_factory_; + Network::Address::IpVersion ip_version_; + Http::CodecClient::Type http_type_; + bool use_grpc_; + + Network::TcpListenSocket listening_socket_; + Client client_; + Server server_; +}; + +INSTANTIATE_TEST_SUITE_P(RuntimesAndLanguages, StressTestSelfTest, + testing::Combine(testing::Values("http1", "http2"), + testing::Values("IPv4", "IPv6"), + testing::Values("HTTP"))); + +TEST_P(StressTestSelfTest, HappyPath) { + // Logger::Registry::setLogLevel(spdlog::level::info); + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Server Setup + // + + try { + // Take a really long time (500 msec) to send a 200 OK response. + ServerCallbackHelper server_callbacks( + [use_grpc = use_grpc_](ServerConnection&, ServerStream& stream, Http::HeaderMapPtr&&) { + if (use_grpc) { + ProtobufWkt::Value response; + response.set_string_value("response"); + stream.sendGrpcResponse(Grpc::Status::Ok, response); + return; + } + + Http::TestHeaderMapImpl response{{":status", "200"}}; + stream.sendResponseHeaders(response); + }); + server_.start(server_callbacks); + + // + // Client setup + // + + Network::Address::InstanceConstSharedPtr address = listening_socket_.localAddress(); + LoadGenerator load_generator(client_, transport_socket_factory_, http_type_, address); + + // + // Exec test and wait for it to finish + // + + Http::HeaderMapPtr request{new Http::TestHeaderMapImpl{ + {":method", "GET"}, {":path", "/"}, {":scheme", "http"}, {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, std::move(request)); + + // wait until the server has closed all connections created by the client + server_callbacks.wait(load_generator.connectSuccesses()); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(load_generator.connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(load_generator.localCloses(), connections_to_initiate); + // Client response callback is called for every request sent + EXPECT_EQ(load_generator.responsesReceived(), requests_to_send); + // Every response was a 2xx class + EXPECT_EQ(load_generator.class2xxResponses(), requests_to_send); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + // No client sockets are rudely closed by server / no client sockets are + // reset. + EXPECT_EQ(0, load_generator.remoteCloses()); + EXPECT_EQ(0, load_generator.responseTimeouts()); + + // Server accept callback is called for every client connection initiated. + EXPECT_EQ(server_callbacks.connectionsAccepted(), connections_to_initiate); + // Server request callback is called for every client request sent + EXPECT_EQ(server_callbacks.requestsReceived(), requests_to_send); + // Server does not close its own sockets but instead relies on the client to + // initate the close + EXPECT_EQ(0, server_callbacks.localCloses()); + // Server sees a client-initiated close for every socket it accepts + EXPECT_EQ(server_callbacks.remoteCloses(), server_callbacks.connectionsAccepted()); + } catch (Network::SocketBindException& ex) { + if (Network::Address::IpVersion::v6 == ip_version_) { + ENVOY_LOG(info, "Environment does not support IPv6, skipping test"); + GTEST_SKIP(); + } + throw ex; + } +} + +TEST_P(StressTestSelfTest, AcceptAndClose) { + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Server Setup + // + + try { + // Immediately close any connection accepted. + ServerCallbackHelper server_callbacks( + [](ServerConnection&, ServerStream&, Http::HeaderMapPtr&&) { + GTEST_FATAL_FAILURE_("Connections immediately closed so no response should be received"); + }, + [](ServerConnection&) -> ServerCallbackResult { return ServerCallbackResult::CLOSE; }); + + server_.start(server_callbacks); + + // + // Client setup + // + + Network::Address::InstanceConstSharedPtr address = listening_socket_.localAddress(); + LoadGenerator load_generator(client_, transport_socket_factory_, http_type_, address); + + // + // Exec test and wait for it to finish + // + + Http::HeaderMapPtr request{new Http::TestHeaderMapImpl{ + {":method", "GET"}, {":path", "/"}, {":scheme", "http"}, {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, std::move(request)); + + // wait until the server has closed all connections created by the client + server_callbacks.wait(load_generator.connectSuccesses()); + + // + // Evaluate test + // + + // Assert that all connections succeed but no responses are received and the + // server closes the connections. + EXPECT_EQ(load_generator.connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.connectFailures()); + EXPECT_EQ(load_generator.remoteCloses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.localCloses()); + EXPECT_EQ(0, load_generator.responsesReceived()); + EXPECT_EQ(0, load_generator.class2xxResponses()); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + EXPECT_EQ(0, load_generator.responseTimeouts()); + + // Server accept callback is called for every client connection initiated. + EXPECT_EQ(server_callbacks.connectionsAccepted(), connections_to_initiate); + // Server request callback is never called + EXPECT_EQ(0, server_callbacks.requestsReceived()); + // Server closes every connection + EXPECT_EQ(server_callbacks.connectionsAccepted(), server_callbacks.localCloses()); + EXPECT_EQ(0, server_callbacks.remoteCloses()); + } catch (Network::SocketBindException& ex) { + if (Network::Address::IpVersion::v6 == ip_version_) { + ENVOY_LOG(info, "Environment does not support IPv6, skipping test"); + GTEST_SKIP(); + } + throw ex; + } +} + +TEST_P(StressTestSelfTest, SlowResponse) { + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Server Setup + // + + try { + // Take a really long time (500 msec) to send a 200 OK response. + ServerCallbackHelper server_callbacks( + [](ServerConnection&, ServerStream& stream, Http::HeaderMapPtr&&) { + Http::TestHeaderMapImpl response{{":status", "200"}}; + stream.sendResponseHeaders(response, std::chrono::milliseconds(500)); + }); + + server_.start(server_callbacks); + + // + // Client setup + // + + Network::Address::InstanceConstSharedPtr address = listening_socket_.localAddress(); + LoadGenerator load_generator(client_, transport_socket_factory_, http_type_, address); + + // + // Exec test and wait for it to finish + // + + Http::HeaderMapPtr request{new Http::TestHeaderMapImpl{ + {":method", "GET"}, {":path", "/"}, {":scheme", "http"}, {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, std::move(request), + std::chrono::milliseconds(250)); + + // wait until the server has closed all connections created by the client + server_callbacks.wait(load_generator.connectSuccesses()); + + // + // Evaluate test + // + + // Assert that all connections succeed but all responses timeout leading to + // local closing of all connections. + EXPECT_EQ(load_generator.connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.connectFailures()); + EXPECT_EQ(load_generator.responseTimeouts(), connections_to_initiate); + EXPECT_EQ(load_generator.localCloses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.remoteCloses()); + EXPECT_EQ(0, load_generator.responsesReceived()); + EXPECT_EQ(0, load_generator.class2xxResponses()); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + + // Server accept callback is called for every client connection initiated. + EXPECT_EQ(server_callbacks.connectionsAccepted(), connections_to_initiate); + // Server receives a request on each connection + EXPECT_EQ(server_callbacks.requestsReceived(), connections_to_initiate); + // Server sees that the client closes each connection after it gives up + EXPECT_EQ(server_callbacks.connectionsAccepted(), server_callbacks.remoteCloses()); + EXPECT_EQ(0, server_callbacks.localCloses()); + } catch (Network::SocketBindException& ex) { + if (Network::Address::IpVersion::v6 == ip_version_) { + ENVOY_LOG(info, "Environment does not support IPv6, skipping test"); + GTEST_SKIP(); + } + throw ex; + } +} + +TEST_P(StressTestSelfTest, NoServer) { + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // Create a listening socket bound to an ephemeral port picked by the kernel, + // but don't create a server to call listen() on it. Result will be + // ECONNREFUSEDs and we won't accidentally send connects to another process. + + try { + Network::TcpListenSocket listening_socket(loopbackAddress(ip_version_, 0), nullptr, true); + Network::Address::InstanceConstSharedPtr address{ + loopbackAddress(ip_version_, listening_socket.localAddress()->ip()->port())}; + + // + // Client setup + // + + LoadGenerator load_generator(client_, transport_socket_factory_, http_type_, address); + + // + // Exec test and wait for it to finish + // + + Http::HeaderMapPtr request{new Http::TestHeaderMapImpl{ + {":method", "GET"}, {":path", "/"}, {":scheme", "http"}, {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, std::move(request)); + + // + // Evaluate test + // + + // All client connections fail + EXPECT_EQ(load_generator.connectFailures(), connections_to_initiate); + // Nothing else happened + EXPECT_EQ(0, load_generator.connectSuccesses()); + EXPECT_EQ(0, load_generator.localCloses()); + EXPECT_EQ(0, load_generator.responseTimeouts()); + EXPECT_EQ(0, load_generator.responsesReceived()); + EXPECT_EQ(0, load_generator.class2xxResponses()); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + EXPECT_EQ(0, load_generator.remoteCloses()); + } catch (Network::SocketBindException& ex) { + if (Network::Address::IpVersion::v6 == ip_version_) { + ENVOY_LOG(info, "Environment does not support IPv6, skipping test"); + GTEST_SKIP(); + } + throw ex; + } +} + +TEST_P(StressTestSelfTest, NoAccept) { + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Server Setup + // + + try { + + ServerCallbackHelper server_callbacks; // sends a 200 OK to everything + server_.start(server_callbacks); + + // but don't call accept() on the listening socket + server_.stopAcceptingConnections(); + + // + // Client setup + // + + Network::Address::InstanceConstSharedPtr address = listening_socket_.localAddress(); + LoadGenerator load_generator(client_, transport_socket_factory_, http_type_, address); + + // + // Exec test and wait for it to finish + // + + Http::HeaderMapPtr request{new Http::TestHeaderMapImpl{ + {":method", "GET"}, {":path", "/"}, {":scheme", "http"}, {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, std::move(request), + std::chrono::milliseconds(250)); + + // + // Evaluate test + // + + // Assert that all connections succeed but all responses timeout leading to + // local closing of all connections. + EXPECT_EQ(load_generator.connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.connectFailures()); + EXPECT_EQ(load_generator.responseTimeouts(), connections_to_initiate); + EXPECT_EQ(load_generator.localCloses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.remoteCloses()); + EXPECT_EQ(0, load_generator.responsesReceived()); + EXPECT_EQ(0, load_generator.class2xxResponses()); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + + // From the server point of view, nothing happened + EXPECT_EQ(0, server_callbacks.connectionsAccepted()); + EXPECT_EQ(0, server_callbacks.requestsReceived()); + EXPECT_EQ(0, server_callbacks.connectionsAccepted()); + EXPECT_EQ(0, server_callbacks.remoteCloses()); + EXPECT_EQ(0, server_callbacks.localCloses()); + } catch (Network::SocketBindException& ex) { + if (Network::Address::IpVersion::v6 == ip_version_) { + ENVOY_LOG(info, "Environment does not support IPv6, skipping test"); + GTEST_SKIP(); + } + throw ex; + } +} + +} // namespace Stress +} // namespace Envoy diff --git a/test/stress/stress_test_upstream.cc b/test/stress/stress_test_upstream.cc new file mode 100644 index 0000000000..cdc7ecac5e --- /dev/null +++ b/test/stress/stress_test_upstream.cc @@ -0,0 +1,671 @@ +#include "stress_test_upstream.h" + +#include + +#include "envoy/http/codec.h" +#include "envoy/network/transport_socket.h" + +#include "common/common/lock_guard.h" +#include "common/common/logger.h" +#include "common/grpc/codec.h" +#include "common/http/conn_manager_config.h" +#include "common/http/conn_manager_impl.h" +#include "common/http/exception.h" +#include "common/http/http1/codec_impl.h" +#include "common/http/http2/codec_impl.h" +#include "common/network/listen_socket_impl.h" +#include "common/network/raw_buffer_socket.h" + +#include "server/connection_handler_impl.h" + +#include "test/test_common/network_utility.h" +#include "test/test_common/utility.h" + +#include "fmt/printf.h" + +namespace Envoy { +namespace Stress { + +static Http::LowerCaseString RequestId(std::string("x-request-id")); + +class ServerStreamImpl : public ServerStream, + public Http::StreamDecoder, + public Http::StreamCallbacks, + Logger::Loggable { +public: + ServerStreamImpl(uint32_t id, ServerConnection& connection, + ServerRequestCallback& request_callback, Http::StreamEncoder& stream_encoder) + : id_(id), connection_(connection), request_callback_(request_callback), + stream_encoder_(stream_encoder) {} + + ~ServerStreamImpl() override { + ENVOY_LOG(trace, "ServerStream({}:{}:{}) destroyed", connection_.name(), connection_.id(), id_); + } + + ServerStreamImpl(const ServerStreamImpl&) = delete; + + ServerStreamImpl& operator=(const ServerStreamImpl&) = delete; + + // + // ServerStream + // + + void sendResponseHeaders(const Http::HeaderMap& response_headers, + const std::chrono::milliseconds delay) override { + if (connection_.networkConnection().state() != Network::Connection::State::Open) { + ENVOY_LOG(warn, "ServerStream({}:{}:{})'s underlying connection is not open!", + connection_.name(), connection_.id(), id_); + return; + } + + if (delay <= std::chrono::milliseconds(0)) { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) sending response headers", connection_.name(), + connection_.id(), id_); + stream_encoder_.encodeHeaders(response_headers, true); + return; + } + + // Limitation: at most one response can be sent on a stream at a time. + assert(nullptr == delay_timer_.get()); + if (delay_timer_.get()) { + return; + } + + response_headers_ = std::make_unique(response_headers); + delay_timer_ = connection_.dispatcher().createTimer([this, delay]() { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) sending response headers after {} msec delay", + connection_.name(), connection_.id(), id_, static_cast(delay.count())); + stream_encoder_.encodeHeaders(*response_headers_, true); + delay_timer_->disableTimer(); + delay_timer_ = nullptr; + response_headers_ = nullptr; + }); + delay_timer_->enableTimer(delay); + } + + void sendGrpcResponse(Grpc::Status::GrpcStatus status, const ProtobufWkt::Message& message, + const std::chrono::milliseconds delay) override { + // Limitation: at most one response can be sent on a stream at a time. + assert(nullptr == delay_timer_.get()); + if (delay_timer_.get()) { + return; + } + + response_status_ = status; + response_body_ = Grpc::Common::serializeToGrpcFrame(message); + Event::TimerCb send_grpc_response = [this, delay]() { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) sending gRPC response after {} msec delay", + connection_.name(), connection_.id(), id_, static_cast(delay.count())); + stream_encoder_.encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + stream_encoder_.encodeData(*response_body_, false); + stream_encoder_.encodeTrailers(Http::TestHeaderMapImpl{ + {"grpc-status", std::to_string(static_cast(response_status_))}}); + }; + + if (delay <= std::chrono::milliseconds(0)) { + send_grpc_response(); + return; + } + + delay_timer_ = connection_.dispatcher().createTimer([this, send_grpc_response]() { + send_grpc_response(); + delay_timer_->disableTimer(); + }); + + delay_timer_->enableTimer(delay); + } + + // + // Http::StreamDecoder + // + + void decode100ContinueHeaders(Http::HeaderMapPtr&&) override { + ENVOY_LOG(error, "ServerStream({}:{}:{}) got continue headers?!?!", connection_.name(), + connection_.id(), id_); + } + + /** + * Called with decoded headers, optionally indicating end of stream. + * @param headers supplies the decoded headers map that is moved into the + * callee. + * @param end_stream supplies whether this is a header only request/response. + */ + void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) got request headers", connection_.name(), + connection_.id(), id_); + + request_headers_ = std::move(headers); + + if (end_stream) { + onEndStream(); + // stream is now destroyed + } + } + + void decodeData(Buffer::Instance&, bool end_stream) override { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) got request body data", connection_.name(), + connection_.id(), id_); + + if (end_stream) { + onEndStream(); + // stream is now destroyed + } + } + + void decodeTrailers(Http::HeaderMapPtr&&) override { + ENVOY_LOG(trace, "ServerStream({}:{}:{}) got request trailers", connection_.name(), + connection_.id(), id_); + onEndStream(); + // stream is now destroyed + } + + void decodeMetadata(Http::MetadataMapPtr&&) override { + ENVOY_LOG(trace, "ServerStream({}:{}):{} got metadata", connection_.name(), connection_.id(), + id_); + } + + // + // Http::StreamCallbacks + // + + void onResetStream(Http::StreamResetReason reason, absl::string_view) override { + switch (reason) { + case Http::StreamResetReason::LocalReset: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) was locally reset", connection_.name(), + connection_.id(), id_); + break; + case Http::StreamResetReason::LocalRefusedStreamReset: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) refused local stream reset", connection_.name(), + connection_.id(), id_); + break; + case Http::StreamResetReason::RemoteReset: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) was remotely reset", connection_.name(), + connection_.id(), id_); + break; + case Http::StreamResetReason::RemoteRefusedStreamReset: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) refused remote stream reset", connection_.name(), + connection_.id(), id_); + break; + case Http::StreamResetReason::ConnectionFailure: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) reseet due to initial connection failure", + connection_.name(), connection_.id(), id_); + break; + case Http::StreamResetReason::ConnectionTermination: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) reset due to underlying connection reset", + connection_.name(), connection_.id(), id_); + break; + case Http::StreamResetReason::Overflow: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) reset due to resource overflow", connection_.name(), + connection_.id(), id_); + break; + default: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) reset due to unknown reason", connection_.name(), + connection_.id(), id_); + break; + } + } + + void onAboveWriteBufferHighWatermark() override { + ENVOY_LOG(trace, "ServerStream({}:{}:{}) above write buffer high watermark", connection_.name(), + connection_.id(), id_); + } + + void onBelowWriteBufferLowWatermark() override { + ENVOY_LOG(trace, "ServerStream({}:{}:{}) below write buffer low watermark", connection_.name(), + connection_.id(), id_); + } + +private: + void onEndStream() { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) complete", connection_.name(), connection_.id(), id_); + request_callback_(connection_, *this, std::move(request_headers_)); + + connection_.removeStream(id_); + // This stream is now destroyed + } + + uint32_t id_; + ServerConnection& connection_; + Http::HeaderMapPtr request_headers_{nullptr}; + Http::HeaderMapPtr response_headers_{nullptr}; + Buffer::InstancePtr response_body_{nullptr}; + Grpc::Status::GrpcStatus response_status_{Grpc::Status::Ok}; + ServerRequestCallback& request_callback_; + Http::StreamEncoder& stream_encoder_; + Event::TimerPtr delay_timer_{nullptr}; +}; + +ServerConnection::ServerConnection(const std::string& name, uint32_t id, + ServerRequestCallback& request_callback, + ServerCloseCallback& close_callback, + Network::Connection& network_connection, + Event::Dispatcher& dispatcher, Http::CodecClient::Type http_type, + Stats::Scope& scope) + : name_(name), id_(id), network_connection_(network_connection), dispatcher_(dispatcher), + request_callback_(request_callback), close_callback_(close_callback) { + constexpr uint32_t max_request_headers_kb = 2U; + + switch (http_type) { + case Http::CodecClient::Type::HTTP1: + http_connection_ = std::make_unique( + network_connection, *this, Http::Http1Settings(), max_request_headers_kb); + break; + case Http::CodecClient::Type::HTTP2: { + Http::Http2Settings settings; + settings.allow_connect_ = true; + settings.allow_metadata_ = true; + http_connection_ = std::make_unique( + network_connection, *this, scope, settings, max_request_headers_kb); + } break; + default: + ENVOY_LOG(error, + "ServerConnection({}:{}) doesn't support http type %d, " + "defaulting to HTTP1", + name_, id_, static_cast(http_type) + 1); + http_connection_ = std::make_unique( + network_connection, *this, Http::Http1Settings(), max_request_headers_kb); + break; + } +} + +ServerConnection::~ServerConnection() { + ENVOY_LOG(trace, "ServerConnection({}:{}) destroyed", name_, id_); +} + +const std::string& ServerConnection::name() const { return name_; } + +uint32_t ServerConnection::id() const { return id_; } + +Network::Connection& ServerConnection::networkConnection() { return network_connection_; } + +const Network::Connection& ServerConnection::networkConnection() const { + return network_connection_; +} + +Http::ServerConnection& ServerConnection::httpConnection() { return *http_connection_; } + +const Http::ServerConnection& ServerConnection::httpConnection() const { return *http_connection_; } + +Event::Dispatcher& ServerConnection::dispatcher() { return dispatcher_; } + +Network::FilterStatus ServerConnection::onData(Buffer::Instance& data, bool end_stream) { + ENVOY_LOG(trace, "ServerConnection({}:{}) got data", name_, id_); + + try { + http_connection_->dispatch(data); + } catch (const Http::CodecProtocolException& e) { + ENVOY_LOG(error, "ServerConnection({}:{}) received the wrong protocol: {}", name_, id_, + e.what()); + network_connection_.close(Network::ConnectionCloseType::NoFlush); + return Network::FilterStatus::StopIteration; + } + + if (end_stream) { + ENVOY_LOG(error, "ServerConnection({}:{}) got end stream", name_, id_); + } + + return Network::FilterStatus::StopIteration; +} + +Network::FilterStatus ServerConnection::onNewConnection() { + ENVOY_LOG(trace, "ServerConnection({}:{}) onNewConnection", name_, id_); + return Network::FilterStatus::Continue; +} + +void ServerConnection::initializeReadFilterCallbacks(Network::ReadFilterCallbacks&) {} + +Http::StreamDecoder& ServerConnection::newStream(Http::StreamEncoder& stream_encoder, bool) { + ServerStreamImpl* raw = nullptr; + uint32_t id = 0U; + + { + std::lock_guard guard(streams_lock_); + + id = stream_counter_++; + auto stream = std::make_unique(id, *this, request_callback_, stream_encoder); + raw = stream.get(); + streams_[id] = std::move(stream); + } + + ENVOY_LOG(debug, "ServerConnection({}:{}) received new Stream({}:{}:{})", name_, id_, name_, id_, + id); + + return *raw; +} + +void ServerConnection::removeStream(uint32_t stream_id) { + unsigned long size = 0UL; + + { + std::lock_guard guard(streams_lock_); + streams_.erase(stream_id); + size = streams_.size(); + } + + if (0 == size) { + ENVOY_LOG(debug, "ServerConnection({}:{}) is idle", name_, id_); + } +} + +void ServerConnection::onEvent(Network::ConnectionEvent event) { + switch (event) { + case Network::ConnectionEvent::RemoteClose: + ENVOY_LOG(debug, "ServerConnection({}:{}) closed by peer or reset", name_, id_); + close_callback_(*this, ServerCloseReason::REMOTE_CLOSE); + return; + case Network::ConnectionEvent::LocalClose: + ENVOY_LOG(debug, "ServerConnection({}:{}) closed locally", name_, id_); + close_callback_(*this, ServerCloseReason::LOCAL_CLOSE); + return; + default: + ENVOY_LOG(error, "ServerConnection({}:{}) got unknown event", name_, id_); + } +} + +void ServerConnection::onAboveWriteBufferHighWatermark() { + ENVOY_LOG(debug, "ServerConnection({}:{}) above write buffer high watermark", name_, id_); + http_connection_->onUnderlyingConnectionAboveWriteBufferHighWatermark(); +} + +void ServerConnection::onBelowWriteBufferLowWatermark() { + ENVOY_LOG(debug, "ServerConnection({}:{}) below write buffer low watermark", name_, id_); + http_connection_->onUnderlyingConnectionBelowWriteBufferLowWatermark(); +} + +void ServerConnection::onGoAway() { ENVOY_LOG(warn, "ServerConnection({}) got go away", name_); } + +ServerFilterChain::ServerFilterChain(Network::TransportSocketFactory& transport_socket_factory) + : transport_socket_factory_(transport_socket_factory) {} + +const Network::TransportSocketFactory& ServerFilterChain::transportSocketFactory() const { + return transport_socket_factory_; +} + +const std::vector& ServerFilterChain::networkFilterFactories() const { + return network_filter_factories_; +} + +LocalListenSocket::LocalListenSocket(Network::Address::IpVersion ip_version, uint16_t port, + const Network::Socket::OptionsSharedPtr& options, + bool bind_to_port) + : NetworkListenSocket(loopbackAddress(ip_version, port), options, bind_to_port) {} + +ServerCallbackHelper::ServerCallbackHelper(ServerRequestCallback&& request_callback, + ServerAcceptCallback&& accept_callback, + ServerCloseCallback&& close_callback) { + if (request_callback) { + request_callback_ = [this, request_callback = std::move(request_callback)]( + ServerConnection& connection, ServerStream& stream, + Http::HeaderMapPtr&& request_headers) { + ++requests_received_; + request_callback(connection, stream, std::move(request_headers)); + }; + } else { + request_callback_ = [this](ServerConnection&, ServerStream& stream, Http::HeaderMapPtr&&) { + ++requests_received_; + Http::TestHeaderMapImpl response{{":status", "200"}}; + stream.sendResponseHeaders(response); + }; + } + + if (accept_callback) { + accept_callback_ = [this, accept_callback = std::move(accept_callback)]( + ServerConnection& connection) -> ServerCallbackResult { + ++accepts_; + return accept_callback(connection); + }; + } else { + accept_callback_ = [this](ServerConnection&) -> ServerCallbackResult { + ++accepts_; + return ServerCallbackResult::CONTINUE; + }; + } + + if (close_callback) { + close_callback_ = [this, close_callback = std::move(close_callback)]( + ServerConnection& connection, ServerCloseReason reason) { + absl::MutexLock lock(&mutex_); + + switch (reason) { + case ServerCloseReason::REMOTE_CLOSE: + ++remote_closes_; + break; + case ServerCloseReason::LOCAL_CLOSE: + ++local_closes_; + break; + } + + close_callback(connection, reason); + }; + } else { + close_callback_ = [this](ServerConnection&, ServerCloseReason reason) { + absl::MutexLock lock(&mutex_); + + switch (reason) { + case ServerCloseReason::REMOTE_CLOSE: + ++remote_closes_; + break; + case ServerCloseReason::LOCAL_CLOSE: + ++local_closes_; + break; + } + }; + } +} + +uint32_t ServerCallbackHelper::connectionsAccepted() const { return accepts_; } + +uint32_t ServerCallbackHelper::requestsReceived() const { return requests_received_; } + +uint32_t ServerCallbackHelper::localCloses() const { + absl::MutexLock lock(&mutex_); + return local_closes_; +} + +uint32_t ServerCallbackHelper::remoteCloses() const { + absl::MutexLock lock(&mutex_); + return remote_closes_; +} + +ServerAcceptCallback ServerCallbackHelper::acceptCallback() const { return accept_callback_; } + +ServerRequestCallback ServerCallbackHelper::requestCallback() const { return request_callback_; } + +ServerCloseCallback ServerCallbackHelper::closeCallback() const { return close_callback_; } + +void ServerCallbackHelper::wait(uint32_t connections_closed) { + auto constraints = [connections_closed, this]() { + return connections_closed <= local_closes_ + remote_closes_; + }; + + absl::MutexLock lock(&mutex_); + mutex_.Await(absl::Condition(&constraints)); +} + +void ServerCallbackHelper::wait() { + auto constraints = [this]() { return accepts_ <= local_closes_ + remote_closes_; }; + + absl::MutexLock lock(&mutex_); + mutex_.Await(absl::Condition(&constraints)); +} + +Server::Server(const std::string& name, Network::Socket& listening_socket, + Network::TransportSocketFactory& transport_socket_factory, + Http::CodecClient::Type http_type) + : name_(name), stats_(), time_system_(), + api_(Thread::threadFactoryForTest(), stats_, time_system_, Filesystem::fileSystemForTest()), + dispatcher_(api_.allocateDispatcher()), + connection_handler_(new Envoy::Server::ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher_)), + thread_(nullptr), listening_socket_(listening_socket), + server_filter_chain_(transport_socket_factory), http_type_(http_type) {} + +Server::~Server() { stop(); } + +void Server::start(ServerAcceptCallback&& accept_callback, ServerRequestCallback&& request_callback, + ServerCloseCallback&& close_callback) { + accept_callback_ = std::move(accept_callback); + request_callback_ = std::move(request_callback); + close_callback_ = std::move(close_callback); + std::promise promise; + + thread_ = api_.threadFactory().createThread([this, &promise]() { + is_running = true; + ENVOY_LOG(debug, "Server({}) started", name_.c_str()); + connection_handler_->addListener(*this); + + promise.set_value(true); // do not use promise again after this + while (is_running) { + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + ENVOY_LOG(debug, "Server({}) stopped", name_.c_str()); + + connection_handler_.reset(); + }); + + promise.get_future().get(); +} + +void Server::start(ServerCallbackHelper& helper) { + start(helper.acceptCallback(), helper.requestCallback(), helper.closeCallback()); +} + +void Server::stop() { + is_running = false; + + if (thread_) { + thread_->join(); + thread_ = nullptr; + } +} + +void Server::stopAcceptingConnections() { + ENVOY_LOG(debug, "Server({}) stopped accepting connections", name_); + connection_handler_->disableListeners(); +} + +void Server::startAcceptingConnections() { + ENVOY_LOG(debug, "Server({}) started accepting connections", name_); + connection_handler_->enableListeners(); +} + +const Stats::Store& Server::statsStore() const { return stats_; } + +void Server::setPerConnectionBufferLimitBytes(uint32_t limit) { + connection_buffer_limit_bytes_ = limit; +} + +// +// Network::ListenerConfig +// + +Network::FilterChainManager& Server::filterChainManager() { return *this; } + +Network::FilterChainFactory& Server::filterChainFactory() { return *this; } + +Network::Socket& Server::socket() { return listening_socket_; } + +const Network::Socket& Server::socket() const { return listening_socket_; } + +bool Server::bindToPort() { return true; } + +bool Server::handOffRestoredDestinationConnections() const { return false; } + +uint32_t Server::perConnectionBufferLimitBytes() const { return connection_buffer_limit_bytes_; } + +std::chrono::milliseconds Server::listenerFiltersTimeout() const { + return std::chrono::milliseconds(0); +} + +Stats::Scope& Server::listenerScope() { return stats_; } + +uint64_t Server::listenerTag() const { return 0; } + +const std::string& Server::name() const { return name_; } + +const Network::FilterChain* Server::findFilterChain(const Network::ConnectionSocket&) const { + return &server_filter_chain_; +} + +bool Server::createNetworkFilterChain(Network::Connection& network_connection, + const std::vector&) { + uint32_t id = connection_counter_++; + ENVOY_LOG(debug, "Server({}) accepted new Connection({}:{})", name_, name_, id); + + ServerConnectionSharedPtr connection = + std::make_shared(name_, id, request_callback_, close_callback_, + network_connection, *dispatcher_, http_type_, stats_); + network_connection.addReadFilter(connection); + network_connection.addConnectionCallbacks(*connection); + + return !(ServerCallbackResult::CLOSE == accept_callback_(*connection)); +} + +bool Server::createListenerFilterChain(Network::ListenerFilterManager&) { return true; } + +bool Server::createUdpListenerFilterChain(Network::UdpListenerFilterManager&, + Network::UdpReadFilterCallbacks&) { + return true; +} + +ClusterHelper::ClusterHelper(const std::string& name) : name_{name} {} + +ClusterHelper& ClusterHelper::addServer(ServerCallbackHelperPtr&& server_callback) { + server_callback_helpers_.push_back(std::move(server_callback)); + return *this; +} + +const std::vector& ClusterHelper::servers() const { + return server_callback_helpers_; +} + +std::vector& ClusterHelper::servers() { return server_callback_helpers_; } + +uint32_t ClusterHelper::connectionsAccepted() const { + uint32_t total = 0U; + + for (const auto& server_callback_helper : server_callback_helpers_) { + total += server_callback_helper->connectionsAccepted(); + } + + return total; +} + +uint32_t ClusterHelper::requestsReceived() const { + uint32_t total = 0U; + + for (const auto& server_callback_helper : server_callback_helpers_) { + total += server_callback_helper->requestsReceived(); + } + + return total; +} + +uint32_t ClusterHelper::localCloses() const { + uint32_t total = 0U; + + for (const auto& server_callback_helper : server_callback_helpers_) { + total += server_callback_helper->localCloses(); + } + + return total; +} + +uint32_t ClusterHelper::remoteCloses() const { + uint32_t total = 0U; + + for (const auto& server_callback_helper : server_callback_helpers_) { + total += server_callback_helper->remoteCloses(); + } + + return total; +} + +void ClusterHelper::wait() { + for (auto& server_callback_helper : server_callback_helpers_) { + server_callback_helper->wait(); + } +} + +} // namespace Stress +} // namespace Envoy diff --git a/test/stress/stress_test_upstream.h b/test/stress/stress_test_upstream.h new file mode 100644 index 0000000000..5ad63f62ca --- /dev/null +++ b/test/stress/stress_test_upstream.h @@ -0,0 +1,390 @@ +#pragma once + +#include "common/api/api_impl.h" +#include "common/common/thread.h" +#include "common/grpc/common.h" +#include "common/http/codec_client.h" +#include "common/network/listen_socket_impl.h" +#include "common/stats/isolated_store_impl.h" + +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +#include "stress_test_common.h" + +namespace Envoy { +namespace Stress { + +enum class ServerCloseReason { + REMOTE_CLOSE, // Peer closed or connection was reset after it was + // established. + LOCAL_CLOSE // This process decided to close the connection. +}; + +enum class ServerCallbackResult { + CONTINUE, // Leave the connection open + CLOSE // Close the connection. +}; + +class ServerStream { +public: + ServerStream() = default; + + virtual ~ServerStream() = default; + + ServerStream(const ServerStream&) = delete; + void operator=(const ServerStream&) = delete; + + /** + * Send a HTTP header-only response and close the stream. + * + * @param response_headers the response headers + * @param delay delay in msec before sending the response. if 0 send + * immediately + */ + virtual void + sendResponseHeaders(const Http::HeaderMap& response_headers, + const std::chrono::milliseconds delay = std::chrono::milliseconds(0)) PURE; + + /** + * Send a gRPC response and close the stream + * + * @param status The gRPC status (carried in the HTTP response trailer) + * @param response The gRPC response (carried in the HTTP response body) + * @param delay delay in msec before sending the response. if 0 send + * immediately + */ + virtual void + sendGrpcResponse(Grpc::Status::GrpcStatus status, const Protobuf::Message& response, + const std::chrono::milliseconds delay = std::chrono::milliseconds(0)) PURE; +}; + +typedef std::unique_ptr ServerStreamPtr; +typedef std::shared_ptr ServerStreamSharedPtr; + +class ServerConnection; + +// NB: references passed to any of these callbacks are owned by the caller and +// must not be used after the callback returns -- except for the request headers +// which may be moved into the caller. +typedef std::function + ServerAcceptCallback; +typedef std::function + ServerCloseCallback; +typedef std::function + ServerRequestCallback; + +class ServerConnection : public Network::ReadFilter, + public Network::ConnectionCallbacks, + public Http::ServerConnectionCallbacks, + Logger::Loggable { +public: + ServerConnection(const std::string& name, uint32_t id, ServerRequestCallback& request_callback, + ServerCloseCallback& close_callback, Network::Connection& network_connection, + Event::Dispatcher& dispatcher, Http::CodecClient::Type http_type, + Stats::Scope& scope); + + ~ServerConnection() override; + ServerConnection(const ServerConnection&) = delete; + ServerConnection& operator=(const ServerConnection&) = delete; + + const std::string& name() const; + + uint32_t id() const; + + Network::Connection& networkConnection(); + const Network::Connection& networkConnection() const; + + Http::ServerConnection& httpConnection(); + const Http::ServerConnection& httpConnection() const; + + Event::Dispatcher& dispatcher(); + + /** + * For internal use + */ + void removeStream(uint32_t stream_id); + + // + // Network::ReadFilter + // + + Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; + + Network::FilterStatus onNewConnection() override; + + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks&) override; + + // + // Http::ConnectionCallbacks + // + + void onGoAway() override; + + // + // Http::ServerConnectionCallbacks + // + + Http::StreamDecoder& newStream(Http::StreamEncoder& stream_encoder, + bool is_internally_created = false) override; + + // + // Network::ConnectionCallbacks + // + + void onEvent(Network::ConnectionEvent event) override; + + void onAboveWriteBufferHighWatermark() override; + + void onBelowWriteBufferLowWatermark() override; + +private: + std::string name_; + uint32_t id_; + Network::Connection& network_connection_; + Http::ServerConnectionPtr http_connection_; + Event::Dispatcher& dispatcher_; + ServerRequestCallback& request_callback_; + ServerCloseCallback& close_callback_; + + std::mutex streams_lock_; + std::unordered_map streams_; + uint32_t stream_counter_{0U}; +}; + +typedef std::unique_ptr ServerConnectionPtr; +typedef std::shared_ptr ServerConnectionSharedPtr; + +class ServerFilterChain : public Network::FilterChain { +public: + explicit ServerFilterChain(Network::TransportSocketFactory& transport_socket_factory); + ServerFilterChain(const ServerFilterChain&) = delete; + ServerFilterChain& operator=(const ServerFilterChain&) = delete; + + // + // Network::FilterChain + // + + const Network::TransportSocketFactory& transportSocketFactory() const override; + + const std::vector& networkFilterFactories() const override; + +private: + Network::TransportSocketFactory& transport_socket_factory_; + std::vector network_filter_factories_; +}; + +/** + * A convenience class for creating a listening socket bound to localhost + */ +class LocalListenSocket : public Network::TcpListenSocket { +public: + /** + * Create a listening socket bound to localhost. + * + * @param ip_version v4 or v6. v4 by default. + * @param port the port. If 0, let the kernel allocate an available ephemeral + * port. 0 by default. + * @param options socket options. nullptr by default + * @param bind_to_port if true immediately bind to the port, allocating one if + * necessary. true by default. + */ + explicit LocalListenSocket( + Network::Address::IpVersion ip_version = Network::Address::IpVersion::v4, uint16_t port = 0, + const Network::Socket::OptionsSharedPtr& options = nullptr, bool bind_to_port = true); + + LocalListenSocket(const LocalListenSocket&) = delete; + void operator=(const LocalListenSocket&) = delete; +}; + +/** + * A convenience class for passing callbacks to a Server. If no callbacks are + * provided, default callbacks that track some simple metrics will be used. If + * callbacks are provided, they will be wrapped with callbacks that maintain the + * same simple set of metrics. + */ +class ServerCallbackHelper { +public: + explicit ServerCallbackHelper(ServerRequestCallback&& request_callback = nullptr, + ServerAcceptCallback&& accept_callback = nullptr, + ServerCloseCallback&& close_callback = nullptr); + ServerCallbackHelper(const ServerCallbackHelper&) = delete; + ServerCallbackHelper& operator=(const ServerCallbackHelper&) = delete; + virtual ~ServerCallbackHelper() = default; + + uint32_t connectionsAccepted() const; + uint32_t requestsReceived() const; + uint32_t localCloses() const; + uint32_t remoteCloses() const; + ServerAcceptCallback acceptCallback() const; + ServerRequestCallback requestCallback() const; + ServerCloseCallback closeCallback() const; + + /* + * Wait until the server has accepted n connections and seen them closed (due + * to error or client close) + */ + void wait(uint32_t connections); + + /* + * Wait until the server has seen a close for every connection it has + * accepted. + */ + void wait(); + +private: + ServerAcceptCallback accept_callback_; + ServerRequestCallback request_callback_; + ServerCloseCallback close_callback_; + + std::atomic accepts_{0}; + std::atomic requests_received_{0}; + uint32_t local_closes_{0}; + uint32_t remote_closes_{0}; + mutable absl::Mutex mutex_; +}; + +typedef std::unique_ptr ServerCallbackHelperPtr; +typedef std::shared_ptr ServerCallbackHelperSharedPtr; + +class Server : public Network::FilterChainManager, + public Network::FilterChainFactory, + public Network::ListenerConfig, + Logger::Loggable { +public: + Server(const std::string& name, Network::Socket& listening_socket, + Network::TransportSocketFactory& transport_socket_factory, + Http::CodecClient::Type http_type); + Server(const Server&) = delete; + Server& operator=(const Server&) = delete; + ~Server() override; + + void start(ServerAcceptCallback&& accept_callback, ServerRequestCallback&& request_callback, + ServerCloseCallback&& close_callback); + + void start(ServerCallbackHelper& helper); + + void stop(); + + void stopAcceptingConnections(); + + void startAcceptingConnections(); + + const Stats::Store& statsStore() const; + + void setPerConnectionBufferLimitBytes(uint32_t limit); + + // + // Network::ListenerConfig + // + + Network::FilterChainManager& filterChainManager() override; + + Network::FilterChainFactory& filterChainFactory() override; + + Network::Socket& socket() override; + + const Network::Socket& socket() const override; + + bool bindToPort() override; + + bool handOffRestoredDestinationConnections() const override; + + uint32_t perConnectionBufferLimitBytes() const override; + + std::chrono::milliseconds listenerFiltersTimeout() const override; + + Stats::Scope& listenerScope() override; + + uint64_t listenerTag() const override; + + const std::string& name() const override; + + // + // Network::FilterChainManager + // + + const Network::FilterChain* findFilterChain(const Network::ConnectionSocket&) const override; + + // + // Network::FilterChainFactory + // + + bool createNetworkFilterChain(Network::Connection& network_connection, + const std::vector&) override; + + bool createListenerFilterChain(Network::ListenerFilterManager&) override; + + bool createUdpListenerFilterChain(Network::UdpListenerFilterManager&, + Network::UdpReadFilterCallbacks&) override; + +private: + std::string name_; + Stats::IsolatedStoreImpl stats_; + Event::TestRealTimeSystem time_system_; + Api::Impl api_; + Event::DispatcherPtr dispatcher_; + Network::ConnectionHandlerPtr connection_handler_; + Thread::ThreadPtr thread_; + std::atomic is_running{false}; + + ServerAcceptCallback accept_callback_{nullptr}; + ServerRequestCallback request_callback_{nullptr}; + ServerCloseCallback close_callback_{nullptr}; + + // + // Network::ListenerConfig + // + + Network::Socket& listening_socket_; + std::atomic connection_buffer_limit_bytes_{0U}; + + // + // Network::FilterChainManager + // + + ServerFilterChain server_filter_chain_; + + // + // Network::FilterChainFactory + // + + Http::CodecClient::Type http_type_; + std::atomic connection_counter_{0U}; +}; + +typedef std::unique_ptr ServerPtr; +typedef std::shared_ptr ServerSharedPtr; + +class ClusterHelper { +public: + explicit ClusterHelper(const std::string& name); + virtual ~ClusterHelper() = default; + ClusterHelper(const ClusterHelper&) = delete; + ClusterHelper& operator=(const ClusterHelper&) = delete; + + ClusterHelper& addServer(ServerCallbackHelperPtr&& server_callback); + + const std::vector& servers() const; + std::vector& servers(); + + inline const std::string& name() const { return name_; } + + uint32_t connectionsAccepted() const; + uint32_t requestsReceived() const; + uint32_t localCloses() const; + uint32_t remoteCloses() const; + + void wait(); + +private: + std::string name_; + std::vector server_callback_helpers_; +}; + +typedef std::unique_ptr ClusterHelperPtr; +typedef std::shared_ptr ClusterHelperSharedPtr; + +} // namespace Stress +} // namespace Envoy