diff --git a/.bazelrc b/.bazelrc index 75a93323f7c..a0bfbcb421d 100644 --- a/.bazelrc +++ b/.bazelrc @@ -2,6 +2,11 @@ # Envoy specific Bazel build/test options. build --workspace_status_command=tools/bazel_get_workspace_status +# Bazel doesn't need more than 200MB of memory based on memory profiling: +# https://docs.bazel.build/versions/master/skylark/performance.html#memory-profiling +# Limiting JVM heapsize here to let it do GC more when approaching the limit to +# leave room for compiler/linker. +startup --host_jvm_args=-Xmx512m # Basic ASAN/UBSAN that works for gcc build:asan --define ENVOY_CONFIG_ASAN=1 @@ -18,6 +23,7 @@ build:asan --define signal_trace=disabled # Clang 5.0 ASAN build:clang-asan --define ENVOY_CONFIG_ASAN=1 build:clang-asan --copt -D__SANITIZE_ADDRESS__ +build:clang-asan --copt -DADDRESS_SANITIZER=1 build:clang-asan --copt -fsanitize=address,undefined build:clang-asan --linkopt -fsanitize=address,undefined build:clang-asan --copt -fno-sanitize=vptr @@ -29,12 +35,14 @@ build:clang-asan --build_tag_filters=-no_asan build:clang-asan --test_tag_filters=-no_asan build:clang-asan --define signal_trace=disabled build:clang-asan --test_env=ASAN_SYMBOLIZER_PATH +build:clang-asan --linkopt -fuse-ld=lld # Clang 5.0 TSAN build:clang-tsan --define ENVOY_CONFIG_TSAN=1 build:clang-tsan --copt -fsanitize=thread build:clang-tsan --linkopt -fsanitize=thread build:clang-tsan --define tcmalloc=disabled +build:clang-tsan --linkopt -fuse-ld=lld # Clang 5.0 MSAN - broken today since we need to rebuild lib[std]c++ and external deps with MSAN # support (see https://github.com/envoyproxy/envoy/issues/443). diff --git a/.circleci/config.yml b/.circleci/config.yml index b91adec4b24..5731eb3c377 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,9 +3,10 @@ version: 2 jobs: build: docker: - - image: istio/ci:go1.10-bazel0.18-clang6.0 + - image: istio/ci:go1.11-bazel0.22-clang7 environment: - - BAZEL_TEST_ARGS: "--test_env=ENVOY_IP_TEST_VERSIONS=v4only --test_output=all" + - BAZEL_BUILD_ARGS: "--local_resources=12288,5,1" + - BAZEL_TEST_ARGS: "--test_env=ENVOY_IP_TEST_VERSIONS=v4only --test_output=all --local_resources=12288,5,1 --local_test_jobs=8" resource_class: xlarge steps: - checkout @@ -30,9 +31,10 @@ jobs: destination: /proxy/bin linux_asan: docker: - - image: istio/ci:go1.10-bazel0.18-clang6.0 + - image: istio/ci:go1.11-bazel0.22-clang7 environment: - - BAZEL_TEST_ARGS: "--test_env=ENVOY_IP_TEST_VERSIONS=v4only --test_output=all" + - BAZEL_BUILD_ARGS: "--local_resources=12288,5,1" + - BAZEL_TEST_ARGS: "--test_env=ENVOY_IP_TEST_VERSIONS=v4only --test_output=all --local_resources=12288,5,1 --local_test_jobs=8" resource_class: xlarge steps: - checkout @@ -49,9 +51,10 @@ jobs: - /home/circleci/.cache/bazel linux_tsan: docker: - - image: istio/ci:go1.10-bazel0.18-clang6.0 + - image: istio/ci:go1.11-bazel0.22-clang7 environment: - - BAZEL_TEST_ARGS: "--test_env=ENVOY_IP_TEST_VERSIONS=v4only --test_output=all" + - BAZEL_BUILD_ARGS: "--local_resources=12288,5,1" + - BAZEL_TEST_ARGS: "--test_env=ENVOY_IP_TEST_VERSIONS=v4only --test_output=all --local_resources=12288,5,1 --local_test_jobs=8" resource_class: xlarge steps: - checkout @@ -71,7 +74,8 @@ jobs: xcode: "9.3.0" environment: - BAZEL_STARTUP_ARGS: "--output_base /Users/distiller/.cache/bazel" - - BAZEL_TEST_ARGS: "--test_env=ENVOY_IP_TEST_VERSIONS=v4only --test_output=all" + - BAZEL_BUILD_ARGS: "--local_resources=12288,5,1" + - BAZEL_TEST_ARGS: "--test_env=ENVOY_IP_TEST_VERSIONS=v4only --test_output=all --local_resources=12288,5,1 --local_test_jobs=8" - CC: clang - CXX: clang++ steps: diff --git a/Makefile b/Makefile index 6ccc580258e..7c7699b9445 100644 --- a/Makefile +++ b/Makefile @@ -24,19 +24,20 @@ BAZEL_TARGETS ?= //... HUB ?= TAG ?= ifeq "$(origin CC)" "default" -CC := clang-6.0 +CC := clang-7 endif ifeq "$(origin CXX)" "default" -CXX := clang++-6.0 +CXX := clang++-7 endif +PATH := /usr/lib/llvm-7/bin:$(PATH) build: - CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) build $(BAZEL_BUILD_ARGS) $(BAZEL_TARGETS) + PATH=$(PATH) CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) build $(BAZEL_BUILD_ARGS) $(BAZEL_TARGETS) @bazel shutdown # Build only envoy - fast build_envoy: - CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) build $(BAZEL_BUILD_ARGS) //src/envoy:envoy + PATH=$(PATH) CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) build $(BAZEL_BUILD_ARGS) //src/envoy:envoy @bazel shutdown clean: @@ -44,15 +45,15 @@ clean: @bazel shutdown test: - CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) $(BAZEL_TARGETS) + PATH=$(PATH) CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) $(BAZEL_TARGETS) @bazel shutdown test_asan: - CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) --config=clang-asan $(BAZEL_TARGETS) + PATH=$(PATH) CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) --config=clang-asan $(BAZEL_TARGETS) @bazel shutdown test_tsan: - CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) --config=clang-tsan $(BAZEL_TARGETS) + PATH=$(PATH) CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) --config=clang-tsan $(BAZEL_TARGETS) @bazel shutdown check: diff --git a/WORKSPACE b/WORKSPACE index 89342046ac8..1b3aa623ddb 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -35,8 +35,8 @@ bind( # When updating envoy sha manually please update the sha in istio.deps file also # # Determine SHA256 `wget https://github.com/envoyproxy/envoy/archive/COMMIT.tar.gz && sha256sum COMMIT.tar.gz` -ENVOY_SHA = "b3be5713f2100ab5c40316e73ce34581245bd26a" -ENVOY_SHA256 = "79629284ae143d66b873c08883dc6382fac2e8ed45f6f3521f7e7282b6650216" +ENVOY_SHA = "925810d00b0d3095a8e67fd4e04e0f597ed188bb" +ENVOY_SHA256 = "26d1f14e881455546cf0e222ec92a8e1e5f65cb2c5761d63c66598b39cd9c47d" http_archive( name = "envoy", @@ -48,6 +48,9 @@ http_archive( load("@envoy//bazel:repositories.bzl", "envoy_dependencies") envoy_dependencies() +load("@rules_foreign_cc//:workspace_definitions.bzl", "rules_foreign_cc_dependencies") +rules_foreign_cc_dependencies() + load("@envoy//bazel:cc_configure.bzl", "cc_configure") cc_configure() diff --git a/istio.deps b/istio.deps index eb49329ec52..574515b92be 100644 --- a/istio.deps +++ b/istio.deps @@ -11,6 +11,6 @@ "name": "ENVOY_SHA", "repoName": "envoyproxy/envoy", "file": "WORKSPACE", - "lastStableSHA": "b3be5713f2100ab5c40316e73ce34581245bd26a" + "lastStableSHA": "925810d00b0d3095a8e67fd4e04e0f597ed188bb" } ] diff --git a/repositories.bzl b/repositories.bzl index 6b9a1dde189..a8fe0ad77e1 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -99,8 +99,8 @@ cc_library( actual = "@googletest_git//:googletest_prod", ) -ISTIO_API = "aec9db9d9a57faf688b4d5606fddede85d4d3855" -ISTIO_API_SHA256 = "52a23e3453b0e639879e34365f9b80d0c7888851ed51034aad89268d4100e908" +ISTIO_API = "3094619c84733caef53723bfc96fa63ceb58cd57" +ISTIO_API_SHA256 = "f1fb0b79d4c6af4dda9cba1cbd76f8dd3be8a1c6e4d8341fc62f33d7a8d57e6c" def mixerapi_repositories(bind = True): BUILD = """ diff --git a/script/release-binary b/script/release-binary index ebde3fc43c3..bd1cb424049 100755 --- a/script/release-binary +++ b/script/release-binary @@ -19,8 +19,8 @@ set -ex # Use clang for the release builds. -CC=${CC:-clang-6.0} -CXX=${CXX:-clang++-6.0} +CC=${CC:-clang-7} +CXX=${CXX:-clang++-7} # The bucket name to store proxy binary DST="gs://istio-build/proxy" diff --git a/src/istio/mixerclient/attribute_compressor.cc b/src/istio/mixerclient/attribute_compressor.cc index 34c980ae35e..ec7a6af8353 100644 --- a/src/istio/mixerclient/attribute_compressor.cc +++ b/src/istio/mixerclient/attribute_compressor.cc @@ -140,6 +140,9 @@ class BatchCompressorImpl : public BatchCompressor { report_.add_default_words(word); } report_.set_global_word_count(global_dict_.size()); + report_.set_repeated_attributes_semantics( + mixer::v1:: + ReportRequest_RepeatedAttributesSemantics_INDEPENDENT_ENCODING); return report_; } diff --git a/src/istio/mixerclient/attribute_compressor_test.cc b/src/istio/mixerclient/attribute_compressor_test.cc index 614381bb16c..397905b0099 100644 --- a/src/istio/mixerclient/attribute_compressor_test.cc +++ b/src/istio/mixerclient/attribute_compressor_test.cc @@ -259,6 +259,7 @@ attributes { } default_words: "JWT-Token" global_word_count: 221 +repeated_attributes_semantics: INDEPENDENT_ENCODING )"; class AttributeCompressorTest : public ::testing::Test { diff --git a/test/integration/BUILD b/test/integration/BUILD index 5ea6b708487..7abc7baf610 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -19,11 +19,31 @@ package(default_visibility = ["//visibility:public"]) load( "@envoy//bazel:envoy_build_system.bzl", "envoy_cc_test", + "envoy_cc_test_library", +) + +envoy_cc_test_library( + name = "int_client_server", + srcs = [ + "int_server.cc", + "int_client.cc", + ], + hdrs = [ + "int_server.h", + "int_client.h", + ], + repository = "@envoy", + deps = [ + "@envoy//source/server:server_lib", + "@envoy//test/integration:http_protocol_integration_lib", + ], ) envoy_cc_test( name = "istio_http_integration_test", - srcs = ["istio_http_integration_test.cc"], + srcs = [ + "istio_http_integration_test.cc", + ], repository = "@envoy", deps = [ "@envoy//source/common/common:utility_lib", @@ -32,8 +52,19 @@ envoy_cc_test( "//src/envoy/http/authn:filter_lib", "//src/envoy/http/jwt_auth:http_filter_factory", "//src/envoy/http/jwt_auth:jwt_lib", - "//src/envoy/utils:filter_names_lib", "//src/envoy/http/mixer:filter_lib", + "//src/envoy/utils:filter_names_lib", + ], +) + +envoy_cc_test( + name = "int_client_server_test", + srcs = [ + "int_client_server_test.cc", + ], + repository = "@envoy", + deps = [ + ":int_client_server", ], ) diff --git a/test/integration/int_client.cc b/test/integration/int_client.cc new file mode 100644 index 00000000000..1c344741eca --- /dev/null +++ b/test/integration/int_client.cc @@ -0,0 +1,678 @@ +/* Copyright 2019 Istio Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "int_client.h" + +#include +#include "common/http/http1/codec_impl.h" +#include "common/http/http2/codec_impl.h" +#include "common/stats/isolated_store_impl.h" +#include "envoy/thread/thread.h" + +namespace Mixer { +namespace Integration { + +class ClientStream : public Envoy::Http::StreamDecoder, + public Envoy::Http::StreamCallbacks, + Envoy::Logger::Loggable { + public: + ClientStream(uint32_t id, ClientConnection &connection, + ClientResponseCallback callback) + : id_(id), connection_(connection), callback_(callback) {} + + virtual ~ClientStream() { + ENVOY_LOG(trace, "ClientStream({}:{}:{}) destroyed", connection_.name(), + connection_.id(), id_); + } + + // + // Envoy::Http::StreamDecoder + // + + virtual void decode100ContinueHeaders(Envoy::Http::HeaderMapPtr &&) override { + ENVOY_LOG(trace, "ClientStream({}:{}:{}) got continue headers", + connection_.name(), connection_.id(), id_); + } + + virtual void decodeHeaders(Envoy::Http::HeaderMapPtr &&response_headers, + bool end_stream) override { + ENVOY_LOG(debug, "ClientStream({}:{}:{}) got response headers", + connection_.name(), connection_.id(), id_); + + response_headers_ = std::move(response_headers); + + if (end_stream) { + onEndStream(); + // stream is now destroyed + } + } + + virtual void decodeData(Envoy::Buffer::Instance &, bool end_stream) override { + ENVOY_LOG(debug, "ClientStream({}:{}:{}) got response body data", + connection_.name(), connection_.id(), id_); + + if (end_stream) { + onEndStream(); + // stream is now destroyed + } + } + + virtual void decodeTrailers(Envoy::Http::HeaderMapPtr &&) override { + ENVOY_LOG(trace, "ClientStream({}:{}:{}) got response trailers", + connection_.name(), connection_.id(), id_); + onEndStream(); + // stream is now destroyed + } + + virtual void decodeMetadata(Envoy::Http::MetadataMapPtr &&) override { + ENVOY_LOG(trace, "ClientStream({}:{}):{} got metadata", connection_.name(), + connection_.id(), id_); + } + + // + // Envoy::Http::StreamCallbacks + // + + virtual void onResetStream(Envoy::Http::StreamResetReason reason) override { + // TODO test with h2 to see if we get any of these and whether the + // connection error handling is enough to handle it. + switch (reason) { + case Envoy::Http::StreamResetReason::LocalReset: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) was locally reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::LocalRefusedStreamReset: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) refused local stream reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::RemoteReset: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) was remotely reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::RemoteRefusedStreamReset: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) refused remote stream reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::ConnectionFailure: + ENVOY_LOG( + trace, + "ClientStream({}:{}:{}) reseet due to initial connection failure", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::ConnectionTermination: + ENVOY_LOG( + trace, + "ClientStream({}:{}:{}) reset due to underlying connection reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::Overflow: + ENVOY_LOG(trace, + "ClientStream({}:{}:{}) reset due to resource overflow", + connection_.name(), connection_.id(), id_); + break; + default: + ENVOY_LOG(trace, "ClientStream({}:{}:{}) reset due to unknown reason", + connection_.name(), connection_.id(), id_); + break; + } + } + + virtual void onAboveWriteBufferHighWatermark() override { + // TODO how should this be handled? + ENVOY_LOG(trace, "ClientStream({}:{}:{}) above write buffer high watermark", + connection_.name(), connection_.id(), id_); + } + + virtual void onBelowWriteBufferLowWatermark() override { + // TODO how should this be handled? + ENVOY_LOG(trace, "ClientStream({}:{}:{}) below write buffer low watermark", + connection_.name(), connection_.id(), id_); + } + + virtual void sendRequest(const Envoy::Http::HeaderMap &request_headers, + const std::chrono::milliseconds timeout) { + if (connection_.networkConnection().state() != + Envoy::Network::Connection::State::Open) { + ENVOY_LOG(warn, + "ClientStream({}:{}:{})'s underlying connection is not open!", + connection_.name(), connection_.id(), id_); + connection_.removeStream(id_); + // This stream is now destroyed + return; + } + + Envoy::Http::StreamEncoder &encoder = + connection_.httpConnection().newStream(*this); + encoder.getStream().addCallbacks(*this); + + ENVOY_LOG(debug, "ClientStream({}:{}:{}) sending request headers", + connection_.name(), connection_.id(), id_); + encoder.encodeHeaders(request_headers, true); + + timeout_timer_ = connection_.dispatcher().createTimer([this, timeout]() { + ENVOY_LOG( + debug, + "ClientStream({}:{}:{}) timed out after {} msec waiting for response", + connection_.name(), connection_.id(), id_, + static_cast(timeout.count())); + callback_(connection_, nullptr); + connection_.removeStream(id_); + // This stream is now destroyed + }); + timeout_timer_->enableTimer(timeout); + } + + private: + virtual void onEndStream() { + ENVOY_LOG(debug, "ClientStream({}:{}:{}) complete", connection_.name(), + connection_.id(), id_); + callback_(connection_, std::move(response_headers_)); + connection_.removeStream(id_); + // This stream is now destroyed + } + + ClientStream(const ClientStream &) = delete; + + void operator=(const ClientStream &) = delete; + + uint32_t id_; + ClientConnection &connection_; + Envoy::Http::HeaderMapPtr response_headers_{nullptr}; + ClientResponseCallback callback_; + Envoy::Event::TimerPtr timeout_timer_{nullptr}; +}; + +class HttpClientReadFilter + : public Envoy::Network::ReadFilter, + Envoy::Logger::Loggable { + public: + HttpClientReadFilter(const std::string name, uint32_t id, + Envoy::Http::ClientConnection &connection) + : name_(name), id_(id), connection_(connection) {} + + virtual ~HttpClientReadFilter() {} + + // + // Envoy::Network::ReadFilter + // + + virtual Envoy::Network::FilterStatus onData(Envoy::Buffer::Instance &data, + bool end_stream) override { + ENVOY_LOG(trace, "ClientConnection({}:{}) got data", name_, id_); + + connection_.dispatch(data); + + if (end_stream) { + // TODO how should this be handled? + ENVOY_LOG(error, "ClientConnection({}:{}) got end stream", name_, id_); + } + + return Envoy::Network::FilterStatus::StopIteration; + } + + virtual Envoy::Network::FilterStatus onNewConnection() override { + return Envoy::Network::FilterStatus::Continue; + } + + virtual void initializeReadFilterCallbacks( + Envoy::Network::ReadFilterCallbacks &) override {} + + private: + HttpClientReadFilter(const HttpClientReadFilter &) = delete; + + void operator=(const HttpClientReadFilter &) = delete; + + std::string name_; + uint32_t id_; + Envoy::Http::ClientConnection &connection_; +}; + +typedef std::unique_ptr HttpClientReadFilterPtr; +typedef std::shared_ptr HttpClientReadFilterSharedPtr; + +class Http1ClientConnection : public ClientConnection { + public: + Http1ClientConnection(Client &client, uint32_t id, + ClientConnectCallback connect_callback, + ClientCloseCallback close_callback, + std::shared_ptr &dispatcher, + Envoy::Network::ClientConnectionPtr network_connection) + : ClientConnection(client, id, connect_callback, close_callback, + dispatcher), + network_connection_(std::move(network_connection)), + http_connection_(*network_connection_, *this), + read_filter_{std::make_shared(client.name(), id, + http_connection_)} { + network_connection_->addReadFilter(read_filter_); + network_connection_->addConnectionCallbacks(*this); + } + + virtual ~Http1ClientConnection() {} + + virtual Envoy::Network::ClientConnection &networkConnection() override { + return *network_connection_; + } + + virtual Envoy::Http::ClientConnection &httpConnection() override { + return http_connection_; + } + + private: + Http1ClientConnection(const Http1ClientConnection &) = delete; + + Http1ClientConnection &operator=(const Http1ClientConnection &) = delete; + + Envoy::Network::ClientConnectionPtr network_connection_; + Envoy::Http::Http1::ClientConnectionImpl http_connection_; + HttpClientReadFilterSharedPtr read_filter_; +}; + +static constexpr uint32_t max_request_headers_kb = 2U; + +class Http2ClientConnection : public ClientConnection { + public: + Http2ClientConnection(Client &client, uint32_t id, + ClientConnectCallback connect_callback, + ClientCloseCallback close_callback, + std::shared_ptr &dispatcher, + Envoy::Network::ClientConnectionPtr network_connection) + : ClientConnection(client, id, connect_callback, close_callback, + dispatcher), + stats_(), + settings_(), + network_connection_(std::move(network_connection)), + http_connection_(*network_connection_, *this, stats_, settings_, + max_request_headers_kb), + read_filter_{std::make_shared(client.name(), id, + http_connection_)} { + network_connection_->addReadFilter(read_filter_); + network_connection_->addConnectionCallbacks(*this); + } + + virtual ~Http2ClientConnection() {} + + virtual Envoy::Network::ClientConnection &networkConnection() override { + return *network_connection_; + } + + virtual Envoy::Http::ClientConnection &httpConnection() override { + return http_connection_; + } + + private: + Http2ClientConnection(const Http2ClientConnection &) = delete; + + Http2ClientConnection &operator=(const Http2ClientConnection &) = delete; + + Envoy::Stats::IsolatedStoreImpl stats_; + Envoy::Http::Http2Settings settings_; + Envoy::Network::ClientConnectionPtr network_connection_; + Envoy::Http::Http2::ClientConnectionImpl http_connection_; + HttpClientReadFilterSharedPtr read_filter_; +}; + +ClientStream &ClientConnection::newStream(ClientResponseCallback callback) { + std::lock_guard guard(streams_lock_); + + uint32_t id = stream_counter_++; + ClientStreamPtr stream = std::make_unique(id, *this, callback); + ClientStream *raw = stream.get(); + streams_[id] = std::move(stream); + + return *raw; +} + +ClientConnection::ClientConnection( + Client &client, uint32_t id, ClientConnectCallback connect_callback, + ClientCloseCallback close_callback, + std::shared_ptr &dispatcher) + : client_(client), + id_(id), + connect_callback_(connect_callback), + close_callback_(close_callback), + dispatcher_(dispatcher) {} + +ClientConnection::~ClientConnection() { + ENVOY_LOG(trace, "ClientConnection({}:{}) destroyed", client_.name(), id_); +} + +const std::string &ClientConnection::name() const { return client_.name(); } + +uint32_t ClientConnection::id() const { return id_; } + +Envoy::Event::Dispatcher &ClientConnection::dispatcher() { + return *dispatcher_; +}; + +void ClientConnection::removeStream(uint32_t stream_id) { + unsigned long size = 0UL; + + { + std::lock_guard guard(streams_lock_); + streams_.erase(stream_id); + size = streams_.size(); + } + + if (0 == size) { + ENVOY_LOG(debug, "ClientConnection({}:{}) is idle", client_.name(), id_); + if (ClientCallbackResult::CLOSE == + connect_callback_(*this, ClientConnectionState::IDLE)) { + // This will trigger a + // networkConnection().onEvent(Envoy::Network::ConnectionEvent::LocalClose) + networkConnection().close(Envoy::Network::ConnectionCloseType::NoFlush); + } + } +} + +void ClientConnection::onEvent(Envoy::Network::ConnectionEvent event) { + switch (event) { + // properly on connection destruction. + case Envoy::Network::ConnectionEvent::RemoteClose: + if (established_) { + ENVOY_LOG(debug, "ClientConnection({}:{}) closed by peer or reset", + client_.name(), id_); + close_callback_(*this, ClientCloseReason::REMOTE_CLOSE); + } else { + ENVOY_LOG(debug, "ClientConnection({}:{}) cannot connect to peer", + client_.name(), id_); + close_callback_(*this, ClientCloseReason::CONNECT_FAILED); + } + client_.releaseConnection(*this); + // ClientConnection has been destroyed + return; + case Envoy::Network::ConnectionEvent::LocalClose: + ENVOY_LOG(debug, "ClientConnection({}:{}) closed locally", client_.name(), + id_); + close_callback_(*this, ClientCloseReason::LOCAL_CLOSE); + client_.releaseConnection(*this); + // ClientConnection has been destroyed + return; + case Envoy::Network::ConnectionEvent::Connected: + established_ = true; + ENVOY_LOG(debug, "ClientConnection({}:{}) established", client_.name(), + id_); + if (ClientCallbackResult::CLOSE == + connect_callback_(*this, ClientConnectionState::CONNECTED)) { + // This will trigger a + // networkConnection().onEvent(Envoy::Network::ConnectionEvent::LocalClose) + networkConnection().close(Envoy::Network::ConnectionCloseType::NoFlush); + } + break; + default: + ENVOY_LOG(error, "ClientConnection({}:{}) got unknown event", + client_.name(), id_); + }; +} + +void ClientConnection::onAboveWriteBufferHighWatermark() { + ENVOY_LOG(warn, "ClientConnection({}:{}) above write buffer high watermark", + client_.name(), id_); + // TODO how should this be handled? + httpConnection().onUnderlyingConnectionAboveWriteBufferHighWatermark(); +} + +void ClientConnection::onBelowWriteBufferLowWatermark() { + ENVOY_LOG(warn, "ClientConnection({}:{}) below write buffer low watermark", + client_.name(), id_); + // TODO how should this be handled? + httpConnection().onUnderlyingConnectionBelowWriteBufferLowWatermark(); +} + +void ClientConnection::onGoAway() { + ENVOY_LOG(warn, "ClientConnection({}:{}) remote closed", client_.name(), id_); + // TODO how should this be handled? +} + +void ClientConnection::sendRequest(const Envoy::Http::HeaderMap &headers, + ClientResponseCallback callback, + const std::chrono::milliseconds timeout) { + newStream(callback).sendRequest(headers, timeout); +} + +Client::Client(const std::string &name) + : name_(name), + stats_(), + thread_(nullptr), + time_system_(), + api_(std::chrono::milliseconds(1), + Envoy::Thread::ThreadFactorySingleton::get(), stats_, time_system_), + dispatcher_{api_.allocateDispatcher()} {} + +Client::~Client() { + stop(); + ENVOY_LOG(trace, "Client({}) destroyed", name_); +} + +const std::string &Client::name() const { return name_; } + +void Client::connect( + Envoy::Network::TransportSocketFactory &socket_factory, + HttpVersion http_version, + Envoy::Network::Address::InstanceConstSharedPtr &address, + const Envoy::Network::ConnectionSocket::OptionsSharedPtr &sockopts, + ClientConnectCallback connect_cb, ClientCloseCallback close_cb) { + dispatcher_->post([this, &socket_factory, http_version, address, sockopts, + connect_cb, close_cb]() { + Envoy::Network::ClientConnectionPtr connection = + dispatcher_->createClientConnection( + address, nullptr, socket_factory.createTransportSocket(nullptr), + sockopts); + uint32_t id = connection_counter_++; + + ClientConnectionPtr ptr; + if (HttpVersion::HTTP1 == http_version) { + ptr = std::make_unique( + *this, id, connect_cb, close_cb, dispatcher_, std::move(connection)); + } else { + ptr = std::make_unique( + *this, id, connect_cb, close_cb, dispatcher_, std::move(connection)); + } + ClientConnection *raw = ptr.get(); + + { + std::lock_guard guard(connections_lock_); + connections_[id] = std::move(ptr); + } + + ENVOY_LOG(debug, "ClientConnection({}:{}) connecting to {}", name_, id, + address->asString()); + raw->networkConnection().connect(); + }); +} + +void Client::start() { + std::promise promise; + + if (is_running_) { + return; + } + + thread_ = api_.threadFactory().createThread([this, &promise]() { + ENVOY_LOG(debug, "Client({}) dispatcher started", name_); + + is_running_ = true; + promise.set_value(true); // do not use promise again after this + while (is_running_) { + dispatcher_->run(Envoy::Event::Dispatcher::RunType::NonBlock); + } + + ENVOY_LOG(debug, "Client({}) dispatcher stopped", name_); + }); + + promise.get_future().get(); +} + +void Client::stop() { + ENVOY_LOG(debug, "Client({}) stop requested", name_); + + is_running_ = false; + if (thread_) { + thread_->join(); + thread_ = nullptr; + } + + ENVOY_LOG(debug, "Client({}) stopped", name_); +} + +void Client::releaseConnection(uint32_t id) { + size_t erased = 0; + { + std::lock_guard guard(connections_lock_); + dispatcher_->deferredDelete(std::move(connections_[id])); + erased = connections_.erase(id); + } + if (1 > erased) { + ENVOY_LOG(error, "Client({}) cannot remove ClientConnection({}:{})", name_, + name_, id); + } +} + +void Client::releaseConnection(ClientConnection &connection) { + releaseConnection(connection.id()); +} + +LoadGenerator::LoadGenerator( + Client &client, Envoy::Network::TransportSocketFactory &socket_factory, + HttpVersion http_version, + Envoy::Network::Address::InstanceConstSharedPtr &address, + const Envoy::Network::ConnectionSocket::OptionsSharedPtr &sockopts) + : client_(client), + socket_factory_(socket_factory), + http_version_(http_version), + address_(address), + sockopts_(sockopts) { + response_callback_ = [this](ClientConnection &connection, + Envoy::Http::HeaderMapPtr response) { + if (!response) { + ENVOY_LOG(debug, "Connection({}:{}) timedout waiting for response", + connection.name(), connection.id()); + ++response_timeouts_; + return; + } + + ++responses_received_; + + uint64_t status = 0; + if (!Envoy::StringUtil::atoul(response->Status()->value().c_str(), + status)) { + ENVOY_LOG(error, "Connection({}:{}) received response with bad status", + connection.name(), connection.id()); + } else if (200 <= status && status < 300) { + ++class_2xx_; + } else if (400 <= status && status < 500) { + ++class_4xx_; + } else if (500 <= status && status < 600) { + ++class_5xx_; + } + + if (0 >= requests_remaining_--) { + // Break if we've already sent or scheduled every request we wanted to + return; + } + + connection.sendRequest(*request_, response_callback_, timeout_); + }; + + connect_callback_ = [this]( + ClientConnection &connection, + ClientConnectionState state) -> ClientCallbackResult { + if (state == ClientConnectionState::IDLE) { + // This will result in a CloseReason::LOCAL_CLOSE passed to the + // close_callback + return ClientCallbackResult::CLOSE; + } + // If ConnectionResult::SUCCESS: + + ++connect_successes_; + + if (0 >= requests_remaining_--) { + // This will result in a ConnectionState::IDLE passed to this callback + // once all active streams have finished. + return ClientCallbackResult::CONTINUE; + } + + connection.sendRequest(*request_, response_callback_, timeout_); + + return ClientCallbackResult::CONTINUE; + }; + + close_callback_ = [this](ClientConnection &, ClientCloseReason reason) { + switch (reason) { + case ClientCloseReason::CONNECT_FAILED: + ++connect_failures_; + break; + case ClientCloseReason::REMOTE_CLOSE: + ++remote_closes_; + break; + case ClientCloseReason::LOCAL_CLOSE: + // We initiated this by responding to ConnectionState::IDLE with a + // CallbackResult::Close + ++local_closes_; + break; + } + + // Unblock run() once we've seen a close for every connection initiated. + if (remote_closes_ + local_closes_ + connect_failures_ >= + connections_to_initiate_) { + promise_all_connections_closed_.set_value(true); + } + }; +} + +LoadGenerator::~LoadGenerator() {} + +void LoadGenerator::run(uint32_t connections, uint32_t requests, + Envoy::Http::HeaderMapPtr request, + const std::chrono::milliseconds timeout) { + connections_to_initiate_ = connections; + requests_to_send_ = requests; + request_ = std::move(request); + promise_all_connections_closed_ = std::promise(); + timeout_ = timeout; + requests_remaining_ = requests_to_send_; + connect_failures_ = 0; + connect_successes_ = 0; + responses_received_ = 0; + response_timeouts_ = 0; + local_closes_ = 0; + remote_closes_ = 0; + class_2xx_ = 0; + class_4xx_ = 0; + class_5xx_ = 0; + + client_.start(); // idempotent + + for (uint32_t i = 0; i < connections_to_initiate_; ++i) { + client_.connect(socket_factory_, http_version_, address_, sockopts_, + connect_callback_, close_callback_); + } + + promise_all_connections_closed_.get_future().get(); +} + +uint32_t LoadGenerator::connectFailures() const { return connect_failures_; } +uint32_t LoadGenerator::connectSuccesses() const { return connect_successes_; } +uint32_t LoadGenerator::responsesReceived() const { + return responses_received_; +} +uint32_t LoadGenerator::responseTimeouts() const { return response_timeouts_; } +uint32_t LoadGenerator::localCloses() const { return local_closes_; } +uint32_t LoadGenerator::remoteCloses() const { return remote_closes_; } +uint32_t LoadGenerator::class2xxResponses() const { return class_2xx_; } +uint32_t LoadGenerator::class4xxResponses() const { return class_4xx_; } +uint32_t LoadGenerator::class5xxResponses() const { return class_5xx_; } + +} // namespace Integration +} // namespace Mixer diff --git a/test/integration/int_client.h b/test/integration/int_client.h new file mode 100644 index 00000000000..2a243c98784 --- /dev/null +++ b/test/integration/int_client.h @@ -0,0 +1,316 @@ +/* Copyright 2019 Istio Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "common/api/api_impl.h" +#include "common/common/thread.h" +#include "common/network/raw_buffer_socket.h" +#include "common/stats/isolated_store_impl.h" +#include "envoy/api/api.h" +#include "envoy/event/dispatcher.h" +#include "envoy/http/codec.h" +#include "envoy/network/address.h" +#include "envoy/thread/thread.h" +#include "fmt/printf.h" +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +namespace Mixer { +namespace Integration { +enum class HttpVersion { HTTP1, HTTP2 }; + +class ClientStream; +class ClientConnection; +class Client; +typedef std::unique_ptr ClientStreamPtr; +typedef std::shared_ptr ClientStreamSharedPtr; +typedef std::unique_ptr ClientConnectionPtr; +typedef std::shared_ptr ClientConnectionSharedPtr; +typedef std::unique_ptr ClientPtr; +typedef std::shared_ptr ClientSharedPtr; + +enum class ClientConnectionState { + CONNECTED, // Connection established. Non-Terminal. Will be followed by one + // of the codes below. + IDLE, // Connection has no active streams. Non-Terminal. Close it, use it, + // or put it in a pool. +}; + +enum class ClientCloseReason { + CONNECT_FAILED, // Connection could not be established + REMOTE_CLOSE, // Peer closed or connection was reset after it was + // established. + LOCAL_CLOSE // This process decided to close the connection. +}; + +enum class ClientCallbackResult { + CONTINUE, // Leave the connection open + CLOSE // Close the connection. +}; + +/** + * Handle a non-terminal connection event asynchronously. + * + * @param connection The connection with the event + * @param state The state of the connection (connected or idle). + */ +typedef std::function + ClientConnectCallback; + +/** + * Handle a terminal connection close event asynchronously. + * + * @param connection The connection that was closed + * @param reason The reason the connection was closed + */ +typedef std::function + ClientCloseCallback; + +/** + * Handle a response asynchronously. + * + * @param connection The connection that received the response. + * @param response_headers The response headers or null if timed out. + */ +typedef std::function + ClientResponseCallback; + +class ClientConnection + : public Envoy::Network::ConnectionCallbacks, + public Envoy::Http::ConnectionCallbacks, + public Envoy::Event::DeferredDeletable, + protected Envoy::Logger::Loggable { + public: + ClientConnection(Client &client, uint32_t id, + ClientConnectCallback connect_callback, + ClientCloseCallback close_callback, + std::shared_ptr &dispatcher); + + virtual ~ClientConnection(); + + const std::string &name() const; + + uint32_t id() const; + + virtual Envoy::Network::ClientConnection &networkConnection() PURE; + + virtual Envoy::Http::ClientConnection &httpConnection() PURE; + + Envoy::Event::Dispatcher &dispatcher(); + + /** + * Asynchronously send a request. On HTTP1.1 connections at most one request + * can be outstanding on a connection. For HTTP2 multiple requests may + * outstanding. + * + * @param request_headers + * @param callback + */ + virtual void sendRequest(const Envoy::Http::HeaderMap &request_headers, + ClientResponseCallback callback, + const std::chrono::milliseconds timeout = + std::chrono::milliseconds(5'000)); + + /** + * For internal use + * + * @param stream_id + */ + void removeStream(uint32_t stream_id); + + // + // Envoy::Network::ConnectionCallbacks + // + + virtual void onEvent(Envoy::Network::ConnectionEvent event) override; + + virtual void onAboveWriteBufferHighWatermark() override; + + virtual void onBelowWriteBufferLowWatermark() override; + + // + // Envoy::Http::ConnectionCallbacks + // + + virtual void onGoAway() override; + + private: + ClientConnection(const ClientConnection &) = delete; + + ClientConnection &operator=(const ClientConnection &) = delete; + + ClientStream &newStream(ClientResponseCallback callback); + + Client &client_; + uint32_t id_; + ClientConnectCallback connect_callback_; + ClientCloseCallback close_callback_; + std::shared_ptr dispatcher_; + bool established_{false}; + + std::mutex streams_lock_; + std::unordered_map streams_; + std::atomic stream_counter_{0U}; +}; + +class Client : Envoy::Logger::Loggable { + public: + Client(const std::string &name); + + virtual ~Client(); + + const std::string &name() const; + + /** + * Start the client's dispatcher in a background thread. This is a noop if + * the client has already been started. This will block until the dispatcher + * is running on another thread. + */ + void start(); + + /** + * Stop the client's dispatcher and join the background thread. This will + * block until the background thread exits. + */ + void stop(); + + /** + * For internal use + */ + void releaseConnection(uint32_t id); + + /** + * For internal use + */ + void releaseConnection(ClientConnection &connection); + + /** + * Asynchronously connect to a peer. The connect_callback will be called on + * successful connection establishment and also on idle state, giving the + * caller the opportunity to reuse or close connections. The close_callback + * will be called after the connection is closed, giving the caller the + * opportunity to cleanup additional resources, etc. + */ + void connect( + Envoy::Network::TransportSocketFactory &socket_factory, + HttpVersion http_version, + Envoy::Network::Address::InstanceConstSharedPtr &address, + const Envoy::Network::ConnectionSocket::OptionsSharedPtr &sockopts, + ClientConnectCallback connect_callback, + ClientCloseCallback close_callback); + + private: + Client(const Client &) = delete; + + Client &operator=(const Client &) = delete; + + std::atomic is_running_{false}; + std::string name_; + Envoy::Stats::IsolatedStoreImpl stats_; + Envoy::Thread::ThreadPtr thread_; + Envoy::Event::TestRealTimeSystem time_system_; + Envoy::Api::Impl api_; + std::shared_ptr dispatcher_; + + std::mutex connections_lock_; + std::unordered_map connections_; + uint32_t connection_counter_{0U}; +}; + +class LoadGenerator : Envoy::Logger::Loggable { + public: + /** + * A wrapper around Client and its callbacks that implements a simple load + * generator. + * + * @param socket_factory Socket factory (use for plain TCP vs. TLS) + * @param http_version HTTP version (h1 vs h2) + * @param address Address (ip addr, port, ip protocol version) to connect to + * @param sockopts Socket options for the client sockets. Use default if + * null. + */ + LoadGenerator(Client &client, + Envoy::Network::TransportSocketFactory &socket_factory, + HttpVersion http_version, + Envoy::Network::Address::InstanceConstSharedPtr &address, + const Envoy::Network::ConnectionSocket::OptionsSharedPtr + &sockopts = nullptr); + + virtual ~LoadGenerator(); + + /** + * Generate load and block until all connections have finished (successfully + * or otherwise). + * + * @param connections Connections to create + * @param requests Total requests across all connections to send + * @param request The request to send + * @param timeout The time in msec to wait to receive a response after sending + * each request. + */ + void run(uint32_t connections, uint32_t requests, + Envoy::Http::HeaderMapPtr request, + const std::chrono::milliseconds timeout = + std::chrono::milliseconds(5'000)); + + uint32_t connectFailures() const; + uint32_t connectSuccesses() const; + uint32_t responsesReceived() const; + uint32_t responseTimeouts() const; + uint32_t localCloses() const; + uint32_t remoteCloses() const; + uint32_t class2xxResponses() const; + uint32_t class4xxResponses() const; + uint32_t class5xxResponses() const; + + private: + LoadGenerator(const LoadGenerator &) = delete; + void operator=(const LoadGenerator &) = delete; + + uint32_t connections_to_initiate_{0}; + uint32_t requests_to_send_{0}; + Envoy::Http::HeaderMapPtr request_{}; + Client &client_; + Envoy::Network::TransportSocketFactory &socket_factory_; + HttpVersion http_version_; + Envoy::Network::Address::InstanceConstSharedPtr address_; + const Envoy::Network::ConnectionSocket::OptionsSharedPtr sockopts_; + + ClientConnectCallback connect_callback_; + ClientResponseCallback response_callback_; + ClientCloseCallback close_callback_; + std::chrono::milliseconds timeout_{std::chrono::milliseconds(0)}; + std::atomic requests_remaining_{0}; + std::atomic connect_failures_{0}; + std::atomic connect_successes_{0}; + std::atomic responses_received_{0}; + std::atomic response_timeouts_{0}; + std::atomic local_closes_{0}; + std::atomic remote_closes_{0}; + std::atomic class_2xx_{0}; + std::atomic class_4xx_{0}; + std::atomic class_5xx_{0}; + std::promise promise_all_connections_closed_; +}; + +typedef std::unique_ptr LoadGeneratorPtr; + +} // namespace Integration +} // namespace Mixer \ No newline at end of file diff --git a/test/integration/int_client_server_test.cc b/test/integration/int_client_server_test.cc new file mode 100644 index 00000000000..b229d7f5b74 --- /dev/null +++ b/test/integration/int_client_server_test.cc @@ -0,0 +1,379 @@ +/* Copyright 2019 Istio Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "common/network/utility.h" +#include "gtest/gtest.h" +#include "int_client.h" +#include "int_server.h" +#include "test/test_common/network_utility.h" + +namespace Mixer { +namespace Integration { + +class ClientServerTest : public testing::Test, + Envoy::Logger::Loggable { + public: + ClientServerTest() + : transport_socket_factory_(), + ip_version_(Envoy::Network::Address::IpVersion::v4), + listening_socket_( + Envoy::Network::Utility::parseInternetAddressAndPort(fmt::format( + "{}:{}", + Envoy::Network::Test::getAnyAddressUrlString(ip_version_), 0)), + nullptr, true), + client_("client"), + server_("server", listening_socket_, transport_socket_factory_, + Envoy::Http::CodecClient::Type::HTTP1) {} + + protected: + Envoy::Network::RawBufferSocketFactory transport_socket_factory_; + Envoy::Network::Address::IpVersion ip_version_; + Envoy::Network::TcpListenSocket listening_socket_; + Client client_; + Server server_; +}; + +TEST_F(ClientServerTest, HappyPath) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::info); + + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Server Setup + // + + ServerCallbackHelper server_callbacks; // sends a 200 OK to everything + server_.start(server_callbacks); + + // + // Client setup + // + + Envoy::Network::Address::InstanceConstSharedPtr address = + listening_socket_.localAddress(); + LoadGenerator load_generator(client_, transport_socket_factory_, + HttpVersion::HTTP1, address); + + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, + std::move(request)); + + // wait until the server has closed all connections created by the client + server_callbacks.wait(load_generator.connectSuccesses()); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(load_generator.connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(load_generator.localCloses(), connections_to_initiate); + // Client response callback is called for every request sent + EXPECT_EQ(load_generator.responsesReceived(), requests_to_send); + // Every response was a 2xx class + EXPECT_EQ(load_generator.class2xxResponses(), requests_to_send); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + // No client sockets are rudely closed by server / no client sockets are + // reset. + EXPECT_EQ(0, load_generator.remoteCloses()); + EXPECT_EQ(0, load_generator.responseTimeouts()); + + // Server accept callback is called for every client connection initiated. + EXPECT_EQ(server_callbacks.connectionsAccepted(), connections_to_initiate); + // Server request callback is called for every client request sent + EXPECT_EQ(server_callbacks.requestsReceived(), requests_to_send); + // Server does not close its own sockets but instead relies on the client to + // initate the close + EXPECT_EQ(0, server_callbacks.localCloses()); + // Server sees a client-initiated close for every socket it accepts + EXPECT_EQ(server_callbacks.remoteCloses(), + server_callbacks.connectionsAccepted()); +} + +TEST_F(ClientServerTest, AcceptAndClose) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::info); + + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Server Setup + // + + // Immediately close any connection accepted. + ServerCallbackHelper server_callbacks( + [](ServerConnection &, ServerStream &, Envoy::Http::HeaderMapPtr &&) { + GTEST_FATAL_FAILURE_( + "Connections immediately closed so no response should be received"); + }, + [](ServerConnection &) -> ServerCallbackResult { + return ServerCallbackResult::CLOSE; + }); + + server_.start(server_callbacks); + + // + // Client setup + // + + Envoy::Network::Address::InstanceConstSharedPtr address = + listening_socket_.localAddress(); + LoadGenerator load_generator(client_, transport_socket_factory_, + HttpVersion::HTTP1, address); + + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, + std::move(request)); + + // wait until the server has closed all connections created by the client + server_callbacks.wait(load_generator.connectSuccesses()); + + // + // Evaluate test + // + + // Assert that all connections succeed but no responses are received and the + // server closes the connections. + EXPECT_EQ(load_generator.connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.connectFailures()); + EXPECT_EQ(load_generator.remoteCloses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.localCloses()); + EXPECT_EQ(0, load_generator.responsesReceived()); + EXPECT_EQ(0, load_generator.class2xxResponses()); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + EXPECT_EQ(0, load_generator.responseTimeouts()); + + // Server accept callback is called for every client connection initiated. + EXPECT_EQ(server_callbacks.connectionsAccepted(), connections_to_initiate); + // Server request callback is never called + EXPECT_EQ(0, server_callbacks.requestsReceived()); + // Server closes every connection + EXPECT_EQ(server_callbacks.connectionsAccepted(), + server_callbacks.localCloses()); + EXPECT_EQ(0, server_callbacks.remoteCloses()); +} + +TEST_F(ClientServerTest, SlowResponse) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::info); + + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Server Setup + // + + // Take a really long time (500 msec) to send a 200 OK response. + ServerCallbackHelper server_callbacks([](ServerConnection &, + ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + Envoy::Http::TestHeaderMapImpl response{{":status", "200"}}; + stream.sendResponseHeaders(response, std::chrono::milliseconds(500)); + }); + + server_.start(server_callbacks); + + // + // Client setup + // + + Envoy::Network::Address::InstanceConstSharedPtr address = + listening_socket_.localAddress(); + LoadGenerator load_generator(client_, transport_socket_factory_, + HttpVersion::HTTP1, address); + + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, + std::move(request), std::chrono::milliseconds(250)); + + // wait until the server has closed all connections created by the client + server_callbacks.wait(load_generator.connectSuccesses()); + + // + // Evaluate test + // + + // Assert that all connections succeed but all responses timeout leading to + // local closing of all connections. + EXPECT_EQ(load_generator.connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.connectFailures()); + EXPECT_EQ(load_generator.responseTimeouts(), connections_to_initiate); + EXPECT_EQ(load_generator.localCloses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.remoteCloses()); + EXPECT_EQ(0, load_generator.responsesReceived()); + EXPECT_EQ(0, load_generator.class2xxResponses()); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + + // Server accept callback is called for every client connection initiated. + EXPECT_EQ(server_callbacks.connectionsAccepted(), connections_to_initiate); + // Server receives a request on each connection + EXPECT_EQ(server_callbacks.requestsReceived(), connections_to_initiate); + // Server sees that the client closes each connection after it gives up + EXPECT_EQ(server_callbacks.connectionsAccepted(), + server_callbacks.remoteCloses()); + EXPECT_EQ(0, server_callbacks.localCloses()); +} + +TEST_F(ClientServerTest, NoServer) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::info); + + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // Create a listening socket bound to an ephemeral port picked by the kernel, + // but don't create a server to call listen() on it. Result will be + // ECONNREFUSEDs and we won't accidentally send connects to another process. + + Envoy::Network::TcpListenSocket listening_socket( + Envoy::Network::Utility::parseInternetAddressAndPort(fmt::format( + "{}:{}", Envoy::Network::Test::getAnyAddressUrlString(ip_version_), + 0)), + nullptr, true); + uint16_t port = + static_cast(listening_socket.localAddress()->ip()->port()); + + Envoy::Network::Address::InstanceConstSharedPtr address = + Envoy::Network::Utility::parseInternetAddress("127.0.0.1", port); + + // + // Client setup + // + + LoadGenerator load_generator(client_, transport_socket_factory_, + HttpVersion::HTTP1, address); + + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, + std::move(request)); + + // + // Evaluate test + // + + // All client connections fail + EXPECT_EQ(load_generator.connectFailures(), connections_to_initiate); + // Nothing else happened + EXPECT_EQ(0, load_generator.connectSuccesses()); + EXPECT_EQ(0, load_generator.localCloses()); + EXPECT_EQ(0, load_generator.responseTimeouts()); + EXPECT_EQ(0, load_generator.responsesReceived()); + EXPECT_EQ(0, load_generator.class2xxResponses()); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + EXPECT_EQ(0, load_generator.remoteCloses()); +} + +TEST_F(ClientServerTest, NoAccept) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::info); + + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Server Setup + // + + ServerCallbackHelper server_callbacks; // sends a 200 OK to everything + server_.start(server_callbacks); + + // but don't call accept() on the listening socket + server_.stopAcceptingConnections(); + + // + // Client setup + // + + Envoy::Network::Address::InstanceConstSharedPtr address = + listening_socket_.localAddress(); + LoadGenerator load_generator(client_, transport_socket_factory_, + HttpVersion::HTTP1, address); + + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + load_generator.run(connections_to_initiate, requests_to_send, + std::move(request), std::chrono::milliseconds(250)); + + // + // Evaluate test + // + + // Assert that all connections succeed but all responses timeout leading to + // local closing of all connections. + EXPECT_EQ(load_generator.connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.connectFailures()); + EXPECT_EQ(load_generator.responseTimeouts(), connections_to_initiate); + EXPECT_EQ(load_generator.localCloses(), connections_to_initiate); + EXPECT_EQ(0, load_generator.remoteCloses()); + EXPECT_EQ(0, load_generator.responsesReceived()); + EXPECT_EQ(0, load_generator.class2xxResponses()); + EXPECT_EQ(0, load_generator.class4xxResponses()); + EXPECT_EQ(0, load_generator.class5xxResponses()); + + // From the server point of view, nothing happened + EXPECT_EQ(0, server_callbacks.connectionsAccepted()); + EXPECT_EQ(0, server_callbacks.requestsReceived()); + EXPECT_EQ(0, server_callbacks.connectionsAccepted()); + EXPECT_EQ(0, server_callbacks.remoteCloses()); + EXPECT_EQ(0, server_callbacks.localCloses()); +} + +} // namespace Integration +} // namespace Mixer diff --git a/test/integration/int_server.cc b/test/integration/int_server.cc new file mode 100644 index 00000000000..8526c7fdbac --- /dev/null +++ b/test/integration/int_server.cc @@ -0,0 +1,817 @@ +/* Copyright 2019 Istio Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "int_server.h" +#include +#include "common/common/lock_guard.h" +#include "common/common/logger.h" +#include "common/grpc/codec.h" +#include "common/http/conn_manager_config.h" +#include "common/http/conn_manager_impl.h" +#include "common/http/exception.h" +#include "common/http/http1/codec_impl.h" +#include "common/http/http2/codec_impl.h" +#include "common/network/listen_socket_impl.h" +#include "common/network/raw_buffer_socket.h" +#include "envoy/http/codec.h" +#include "envoy/network/transport_socket.h" +#include "fmt/printf.h" +#include "server/connection_handler_impl.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/utility.h" + +namespace Mixer { +namespace Integration { + +static Envoy::Http::LowerCaseString RequestId(std::string("x-request-id")); + +ServerStream::ServerStream() {} + +ServerStream::~ServerStream() {} + +class ServerStreamImpl : public ServerStream, + public Envoy::Http::StreamDecoder, + public Envoy::Http::StreamCallbacks, + Envoy::Logger::Loggable { + public: + ServerStreamImpl(uint32_t id, ServerConnection &connection, + ServerRequestCallback request_callback, + Envoy::Http::StreamEncoder &stream_encoder) + : id_(id), + connection_(connection), + request_callback_(request_callback), + stream_encoder_(stream_encoder) {} + + virtual ~ServerStreamImpl() { + ENVOY_LOG(trace, "ServerStream({}:{}:{}) destroyed", connection_.name(), + connection_.id(), id_); + } + + ServerStreamImpl(ServerStreamImpl &&) = default; + ServerStreamImpl &operator=(ServerStreamImpl &&) = default; + + // + // ServerStream + // + + virtual void sendResponseHeaders( + const Envoy::Http::HeaderMap &response_headers, + const std::chrono::milliseconds delay) override { + if (connection_.networkConnection().state() != + Envoy::Network::Connection::State::Open) { + ENVOY_LOG(warn, + "ServerStream({}:{}:{})'s underlying connection is not open!", + connection_.name(), connection_.id(), id_); + // TODO return error to caller + return; + } + + if (delay <= std::chrono::milliseconds(0)) { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) sending response headers", + connection_.name(), connection_.id(), id_); + stream_encoder_.encodeHeaders(response_headers, true); + return; + } + + // Limitation: at most one response can be sent on a stream at a time. + assert(nullptr == delay_timer_.get()); + if (delay_timer_.get()) { + return; + } + + response_headers_ = + std::make_unique(response_headers); + delay_timer_ = connection_.dispatcher().createTimer([this, delay]() { + ENVOY_LOG( + debug, + "ServerStream({}:{}:{}) sending response headers after {} msec delay", + connection_.name(), connection_.id(), id_, + static_cast(delay.count())); + stream_encoder_.encodeHeaders(*response_headers_, true); + delay_timer_->disableTimer(); + delay_timer_ = nullptr; + response_headers_ = nullptr; + }); + delay_timer_->enableTimer(delay); + } + + virtual void sendGrpcResponse( + Envoy::Grpc::Status::GrpcStatus status, + const Envoy::Protobuf::Message &message, + const std::chrono::milliseconds delay) override { + // Limitation: at most one response can be sent on a stream at a time. + assert(nullptr == delay_timer_.get()); + if (delay_timer_.get()) { + return; + } + + response_status_ = status; + response_body_ = Envoy::Grpc::Common::serializeBody(message); + Envoy::Event::TimerCb send_grpc_response = [this, delay]() { + ENVOY_LOG( + debug, + "ServerStream({}:{}:{}) sending gRPC response after {} msec delay", + connection_.name(), connection_.id(), id_, + static_cast(delay.count())); + stream_encoder_.encodeHeaders( + Envoy::Http::TestHeaderMapImpl{{":status", "200"}}, false); + stream_encoder_.encodeData(*response_body_, false); + stream_encoder_.encodeTrailers(Envoy::Http::TestHeaderMapImpl{ + {"grpc-status", + std::to_string(static_cast(response_status_))}}); + }; + + if (delay <= std::chrono::milliseconds(0)) { + send_grpc_response(); + return; + } + + delay_timer_ = + connection_.dispatcher().createTimer([this, send_grpc_response]() { + send_grpc_response(); + delay_timer_->disableTimer(); + }); + + delay_timer_->enableTimer(delay); + } + + // + // Envoy::Http::StreamDecoder + // + + virtual void decode100ContinueHeaders(Envoy::Http::HeaderMapPtr &&) override { + ENVOY_LOG(error, "ServerStream({}:{}:{}) got continue headers?!?!", + connection_.name(), connection_.id(), id_); + } + + /** + * Called with decoded headers, optionally indicating end of stream. + * @param headers supplies the decoded headers map that is moved into the + * callee. + * @param end_stream supplies whether this is a header only request/response. + */ + virtual void decodeHeaders(Envoy::Http::HeaderMapPtr &&headers, + bool end_stream) override { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) got request headers", + connection_.name(), connection_.id(), id_); + + request_headers_ = std::move(headers); + + /* TODO use x-request-id for e2e logging + * + const Envoy::Http::HeaderEntry *header = + request_headers_->get(RequestId); + + if (header) { + request_id_ = header->value().c_str(); + } + */ + + if (end_stream) { + onEndStream(); + // stream is now destroyed + } + } + + virtual void decodeData(Envoy::Buffer::Instance &, bool end_stream) override { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) got request body data", + connection_.name(), connection_.id(), id_); + + if (end_stream) { + onEndStream(); + // stream is now destroyed + } + } + + virtual void decodeTrailers(Envoy::Http::HeaderMapPtr &&) override { + ENVOY_LOG(trace, "ServerStream({}:{}:{}) got request trailers", + connection_.name(), connection_.id(), id_); + onEndStream(); + // stream is now destroyed + } + + virtual void decodeMetadata(Envoy::Http::MetadataMapPtr &&) override { + ENVOY_LOG(trace, "ServerStream({}:{}):{} got metadata", connection_.name(), + connection_.id(), id_); + } + + // + // Envoy::Http::StreamCallbacks + // + + virtual void onResetStream(Envoy::Http::StreamResetReason reason) override { + // TODO test with h2 to see if we get these and whether the connection error + // handling is enough to handle it. + switch (reason) { + case Envoy::Http::StreamResetReason::LocalReset: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) was locally reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::LocalRefusedStreamReset: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) refused local stream reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::RemoteReset: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) was remotely reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::RemoteRefusedStreamReset: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) refused remote stream reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::ConnectionFailure: + ENVOY_LOG( + trace, + "ServerStream({}:{}:{}) reseet due to initial connection failure", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::ConnectionTermination: + ENVOY_LOG( + trace, + "ServerStream({}:{}:{}) reset due to underlying connection reset", + connection_.name(), connection_.id(), id_); + break; + case Envoy::Http::StreamResetReason::Overflow: + ENVOY_LOG(trace, + "ServerStream({}:{}:{}) reset due to resource overflow", + connection_.name(), connection_.id(), id_); + break; + default: + ENVOY_LOG(trace, "ServerStream({}:{}:{}) reset due to unknown reason", + connection_.name(), connection_.id(), id_); + break; + } + } + + virtual void onAboveWriteBufferHighWatermark() override { + // TODO is their anything to be done here? + ENVOY_LOG(trace, "ServerStream({}:{}:{}) above write buffer high watermark", + connection_.name(), connection_.id(), id_); + } + + virtual void onBelowWriteBufferLowWatermark() override { + // TODO is their anything to be done here? + ENVOY_LOG(trace, "ServerStream({}:{}:{}) below write buffer low watermark", + connection_.name(), connection_.id(), id_); + } + + private: + virtual void onEndStream() { + ENVOY_LOG(debug, "ServerStream({}:{}:{}) complete", connection_.name(), + connection_.id(), id_); + request_callback_(connection_, *this, std::move(request_headers_)); + + connection_.removeStream(id_); + // This stream is now destroyed + } + + ServerStreamImpl(const ServerStreamImpl &) = delete; + + ServerStreamImpl &operator=(const ServerStreamImpl &) = delete; + + uint32_t id_; + ServerConnection &connection_; + Envoy::Http::HeaderMapPtr request_headers_{nullptr}; + Envoy::Http::HeaderMapPtr response_headers_{nullptr}; + Envoy::Buffer::InstancePtr response_body_{nullptr}; + Envoy::Grpc::Status::GrpcStatus response_status_{Envoy::Grpc::Status::Ok}; + ServerRequestCallback request_callback_; + Envoy::Http::StreamEncoder &stream_encoder_; + Envoy::Event::TimerPtr delay_timer_{nullptr}; +}; + +ServerConnection::ServerConnection( + const std::string &name, uint32_t id, + ServerRequestCallback request_callback, ServerCloseCallback close_callback, + Envoy::Network::Connection &network_connection, + Envoy::Event::Dispatcher &dispatcher, + Envoy::Http::CodecClient::Type http_type, Envoy::Stats::Scope &scope) + : name_(name), + id_(id), + network_connection_(network_connection), + dispatcher_(dispatcher), + request_callback_(request_callback), + close_callback_(close_callback) { + // TODO make use of network_connection_->socketOptions() and possibly http + // settings; + + switch (http_type) { + case Envoy::Http::CodecClient::Type::HTTP1: + http_connection_ = + std::make_unique( + network_connection, *this, Envoy::Http::Http1Settings()); + break; + case Envoy::Http::CodecClient::Type::HTTP2: { + Envoy::Http::Http2Settings settings; + settings.allow_connect_ = true; + settings.allow_metadata_ = true; + constexpr uint32_t max_request_headers_kb = 2U; + http_connection_ = + std::make_unique( + network_connection, *this, scope, settings, + max_request_headers_kb); + } break; + default: + ENVOY_LOG(error, + "ServerConnection({}:{}) doesn't support http type %d, " + "defaulting to HTTP1", + name_, id_, static_cast(http_type) + 1); + http_connection_ = + std::make_unique( + network_connection, *this, Envoy::Http::Http1Settings()); + break; + } +} + +ServerConnection::~ServerConnection() { + ENVOY_LOG(trace, "ServerConnection({}:{}) destroyed", name_, id_); +} + +const std::string &ServerConnection::name() const { return name_; } + +uint32_t ServerConnection::id() const { return id_; } + +Envoy::Network::Connection &ServerConnection::networkConnection() { + return network_connection_; +} + +const Envoy::Network::Connection &ServerConnection::networkConnection() const { + return network_connection_; +} + +Envoy::Http::ServerConnection &ServerConnection::httpConnection() { + return *http_connection_; +} + +const Envoy::Http::ServerConnection &ServerConnection::httpConnection() const { + return *http_connection_; +} + +Envoy::Event::Dispatcher &ServerConnection::dispatcher() { return dispatcher_; } + +Envoy::Network::FilterStatus ServerConnection::onData( + Envoy::Buffer::Instance &data, bool end_stream) { + ENVOY_LOG(trace, "ServerConnection({}:{}) got data", name_, id_); + + try { + http_connection_->dispatch(data); + } catch (const Envoy::Http::CodecProtocolException &e) { + ENVOY_LOG(error, "ServerConnection({}:{}) received the wrong protocol: {}", + name_, id_, e.what()); + network_connection_.close(Envoy::Network::ConnectionCloseType::NoFlush); + return Envoy::Network::FilterStatus::StopIteration; + } + + if (end_stream) { + ENVOY_LOG(error, + "ServerConnection({}:{}) got end stream - TODO relay to all " + "active streams?!?", + name_, id_); + } + + return Envoy::Network::FilterStatus::StopIteration; +} + +Envoy::Network::FilterStatus ServerConnection::onNewConnection() { + ENVOY_LOG(trace, "ServerConnection({}:{}) onNewConnection", name_, id_); + return Envoy::Network::FilterStatus::Continue; +} + +void ServerConnection::initializeReadFilterCallbacks( + Envoy::Network::ReadFilterCallbacks &) {} + +Envoy::Http::StreamDecoder &ServerConnection::newStream( + Envoy::Http::StreamEncoder &stream_encoder, bool) { + ServerStreamImpl *raw = nullptr; + uint32_t id = 0U; + + { + std::lock_guard guard(streams_lock_); + + id = stream_counter_++; + auto stream = std::make_unique( + id, *this, request_callback_, stream_encoder); + raw = stream.get(); + streams_[id] = std::move(stream); + } + + ENVOY_LOG(debug, "ServerConnection({}:{}) received new Stream({}:{}:{})", + name_, id_, name_, id_, id); + + return *raw; +} + +void ServerConnection::removeStream(uint32_t stream_id) { + unsigned long size = 0UL; + + { + std::lock_guard guard(streams_lock_); + streams_.erase(stream_id); + size = streams_.size(); + } + + if (0 == size) { + // TODO do anything special here? + ENVOY_LOG(debug, "ServerConnection({}:{}) is idle", name_, id_); + } +} + +void ServerConnection::onEvent(Envoy::Network::ConnectionEvent event) { + switch (event) { + case Envoy::Network::ConnectionEvent::RemoteClose: + ENVOY_LOG(debug, "ServerConnection({}:{}) closed by peer or reset", name_, + id_); + close_callback_(*this, ServerCloseReason::REMOTE_CLOSE); + return; + case Envoy::Network::ConnectionEvent::LocalClose: + ENVOY_LOG(debug, "ServerConnection({}:{}) closed locally", name_, id_); + close_callback_(*this, ServerCloseReason::LOCAL_CLOSE); + return; + default: + ENVOY_LOG(error, "ServerConnection({}:{}) got unknown event", name_, id_); + } +} + +void ServerConnection::onAboveWriteBufferHighWatermark() { + ENVOY_LOG(debug, "ServerConnection({}:{}) above write buffer high watermark", + name_, id_); + // TODO - is this the right way to handle? + http_connection_->onUnderlyingConnectionAboveWriteBufferHighWatermark(); +} + +void ServerConnection::onBelowWriteBufferLowWatermark() { + ENVOY_LOG(debug, "ServerConnection({}:{}) below write buffer low watermark", + name_, id_); + // TODO - is this the right way to handle? + http_connection_->onUnderlyingConnectionBelowWriteBufferLowWatermark(); +} + +void ServerConnection::onGoAway() { + ENVOY_LOG(warn, "ServerConnection({}) got go away", name_); + // TODO how should this be handled? I've never seen it fire. +} + +ServerFilterChain::ServerFilterChain( + Envoy::Network::TransportSocketFactory &transport_socket_factory) + : transport_socket_factory_(transport_socket_factory) {} + +ServerFilterChain::~ServerFilterChain() {} + +const Envoy::Network::TransportSocketFactory & +ServerFilterChain::transportSocketFactory() const { + return transport_socket_factory_; +} + +const std::vector + &ServerFilterChain::networkFilterFactories() const { + return network_filter_factories_; +} + +LocalListenSocket::LocalListenSocket( + Envoy::Network::Address::IpVersion ip_version, uint16_t port, + const Envoy::Network::Socket::OptionsSharedPtr &options, bool bind_to_port) + : NetworkListenSocket( + Envoy::Network::Utility::parseInternetAddress( + Envoy::Network::Test::getAnyAddressUrlString(ip_version), port), + options, bind_to_port) {} + +LocalListenSocket::~LocalListenSocket() {} + +ServerCallbackHelper::ServerCallbackHelper( + ServerRequestCallback request_callback, + ServerAcceptCallback accept_callback, ServerCloseCallback close_callback) + : accept_callback_(accept_callback), + request_callback_(request_callback), + close_callback_(close_callback) { + if (request_callback) { + request_callback_ = [this, &request_callback]( + ServerConnection &connection, ServerStream &stream, + Envoy::Http::HeaderMapPtr request_headers) { + ++requests_received_; + request_callback(connection, stream, std::move(request_headers)); + }; + } else { + request_callback_ = [this](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ++requests_received_; + Envoy::Http::TestHeaderMapImpl response{{":status", "200"}}; + stream.sendResponseHeaders(response); + }; + } + + if (accept_callback) { + accept_callback_ = + [this, &accept_callback]( + ServerConnection &connection) -> ServerCallbackResult { + ++accepts_; + return accept_callback(connection); + }; + } else { + accept_callback_ = [this](ServerConnection &) -> ServerCallbackResult { + ++accepts_; + return ServerCallbackResult::CONTINUE; + }; + } + + if (close_callback) { + close_callback_ = [this, &close_callback](ServerConnection &connection, + ServerCloseReason reason) { + absl::MutexLock lock(&mutex_); + + switch (reason) { + case ServerCloseReason::REMOTE_CLOSE: + ++remote_closes_; + break; + case ServerCloseReason::LOCAL_CLOSE: + ++local_closes_; + break; + } + + close_callback(connection, reason); + }; + } else { + close_callback_ = [this](ServerConnection &, ServerCloseReason reason) { + absl::MutexLock lock(&mutex_); + + switch (reason) { + case ServerCloseReason::REMOTE_CLOSE: + ++remote_closes_; + break; + case ServerCloseReason::LOCAL_CLOSE: + ++local_closes_; + break; + } + }; + } +} + +ServerCallbackHelper::~ServerCallbackHelper() {} + +uint32_t ServerCallbackHelper::connectionsAccepted() const { return accepts_; } + +uint32_t ServerCallbackHelper::requestsReceived() const { + return requests_received_; +} + +uint32_t ServerCallbackHelper::localCloses() const { + absl::MutexLock lock(&mutex_); + return local_closes_; +} + +uint32_t ServerCallbackHelper::remoteCloses() const { + absl::MutexLock lock(&mutex_); + return remote_closes_; +} + +ServerAcceptCallback ServerCallbackHelper::acceptCallback() const { + return accept_callback_; +} + +ServerRequestCallback ServerCallbackHelper::requestCallback() const { + return request_callback_; +} + +ServerCloseCallback ServerCallbackHelper::closeCallback() const { + return close_callback_; +} + +void ServerCallbackHelper::wait(uint32_t connections_closed) { + auto constraints = [connections_closed, this]() { + return connections_closed <= local_closes_ + remote_closes_; + }; + + absl::MutexLock lock(&mutex_); + mutex_.Await(absl::Condition(&constraints)); +} + +void ServerCallbackHelper::wait() { + auto constraints = [this]() { + return accepts_ <= local_closes_ + remote_closes_; + }; + + absl::MutexLock lock(&mutex_); + mutex_.Await(absl::Condition(&constraints)); +} + +Server::Server(const std::string &name, + Envoy::Network::Socket &listening_socket, + Envoy::Network::TransportSocketFactory &transport_socket_factory, + Envoy::Http::CodecClient::Type http_type) + : name_(name), + stats_(), + time_system_(), + api_(std::chrono::milliseconds(1), + Envoy::Thread::ThreadFactorySingleton::get(), stats_, time_system_), + dispatcher_(api_.allocateDispatcher()), + connection_handler_(new Envoy::Server::ConnectionHandlerImpl( + ENVOY_LOGGER(), *dispatcher_)), + thread_(nullptr), + listening_socket_(listening_socket), + server_filter_chain_(transport_socket_factory), + http_type_(http_type) {} + +Server::~Server() { stop(); } + +void Server::start(ServerAcceptCallback accept_callback, + ServerRequestCallback request_callback, + ServerCloseCallback close_callback) { + accept_callback_ = accept_callback; + request_callback_ = request_callback; + close_callback_ = close_callback; + std::promise promise; + + thread_ = api_.threadFactory().createThread([this, &promise]() { + is_running = true; + ENVOY_LOG(debug, "Server({}) started", name_.c_str()); + connection_handler_->addListener(*this); + + promise.set_value(true); // do not use promise again after this + while (is_running) { + dispatcher_->run(Envoy::Event::Dispatcher::RunType::NonBlock); + } + + ENVOY_LOG(debug, "Server({}) stopped", name_.c_str()); + + connection_handler_.reset(); + }); + + promise.get_future().get(); +} + +void Server::start(ServerCallbackHelper &helper) { + start(helper.acceptCallback(), helper.requestCallback(), + helper.closeCallback()); +} + +void Server::stop() { + is_running = false; + + if (thread_) { + thread_->join(); + thread_ = nullptr; + } +} + +void Server::stopAcceptingConnections() { + ENVOY_LOG(debug, "Server({}) stopped accepting connections", name_); + connection_handler_->disableListeners(); +} + +void Server::startAcceptingConnections() { + ENVOY_LOG(debug, "Server({}) started accepting connections", name_); + connection_handler_->enableListeners(); +} + +const Envoy::Stats::Store &Server::statsStore() const { return stats_; } + +void Server::setPerConnectionBufferLimitBytes(uint32_t limit) { + connection_buffer_limit_bytes_ = limit; +} + +// +// Envoy::Network::ListenerConfig +// + +Envoy::Network::FilterChainManager &Server::filterChainManager() { + return *this; +} + +Envoy::Network::FilterChainFactory &Server::filterChainFactory() { + return *this; +} + +Envoy::Network::Socket &Server::socket() { return listening_socket_; } + +const Envoy::Network::Socket &Server::socket() const { + return listening_socket_; +} + +bool Server::bindToPort() { return true; } + +bool Server::handOffRestoredDestinationConnections() const { return false; } + +uint32_t Server::perConnectionBufferLimitBytes() const { + return connection_buffer_limit_bytes_; +} + +std::chrono::milliseconds Server::listenerFiltersTimeout() const { + return std::chrono::milliseconds(0); +} + +Envoy::Stats::Scope &Server::listenerScope() { return stats_; } + +uint64_t Server::listenerTag() const { return 0; } + +const std::string &Server::name() const { return name_; } + +bool Server::reverseWriteFilterOrder() const { return true; } + +const Envoy::Network::FilterChain *Server::findFilterChain( + const Envoy::Network::ConnectionSocket &) const { + return &server_filter_chain_; +} + +bool Server::createNetworkFilterChain( + Envoy::Network::Connection &network_connection, + const std::vector &) { + uint32_t id = connection_counter_++; + ENVOY_LOG(debug, "Server({}) accepted new Connection({}:{})", name_, name_, + id); + + ServerConnectionSharedPtr connection = std::make_shared( + name_, id, request_callback_, close_callback_, network_connection, + *dispatcher_, http_type_, stats_); + network_connection.addReadFilter(connection); + network_connection.addConnectionCallbacks(*connection); + + if (ServerCallbackResult::CLOSE == accept_callback_(*connection)) { + // Envoy will close the connection immediately, which will in turn + // trigger the user supplied close callback. + return false; + } + + return true; +} + +bool Server::createListenerFilterChain( + Envoy::Network::ListenerFilterManager &) { + return true; +} + +ClusterHelper::ClusterHelper( + std::initializer_list server_callbacks) { + for (auto it = server_callbacks.begin(); it != server_callbacks.end(); ++it) { + server_callback_helpers_.emplace_back(*it); + } +} + +ClusterHelper::~ClusterHelper() {} + +const std::vector &ClusterHelper::servers() const { + return server_callback_helpers_; +} + +std::vector &ClusterHelper::servers() { + return server_callback_helpers_; +} + +uint32_t ClusterHelper::connectionsAccepted() const { + uint32_t total = 0U; + + for (size_t i = 0; i < server_callback_helpers_.size(); ++i) { + total += server_callback_helpers_[i]->connectionsAccepted(); + } + + return total; +} + +uint32_t ClusterHelper::requestsReceived() const { + uint32_t total = 0U; + + for (size_t i = 0; i < server_callback_helpers_.size(); ++i) { + total += server_callback_helpers_[i]->requestsReceived(); + } + + return total; +} + +uint32_t ClusterHelper::localCloses() const { + uint32_t total = 0U; + + for (size_t i = 0; i < server_callback_helpers_.size(); ++i) { + total += server_callback_helpers_[i]->localCloses(); + } + + return total; +} + +uint32_t ClusterHelper::remoteCloses() const { + uint32_t total = 0U; + + for (size_t i = 0; i < server_callback_helpers_.size(); ++i) { + total += server_callback_helpers_[i]->remoteCloses(); + } + + return total; +} + +void ClusterHelper::wait() { + for (size_t i = 0; i < server_callback_helpers_.size(); ++i) { + server_callback_helpers_[i]->wait(); + } +} + +} // namespace Integration +} // namespace Mixer diff --git a/test/integration/int_server.h b/test/integration/int_server.h new file mode 100644 index 00000000000..b7fe7653378 --- /dev/null +++ b/test/integration/int_server.h @@ -0,0 +1,450 @@ +/* Copyright 2019 Istio Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "common/api/api_impl.h" +#include "common/grpc/common.h" +#include "common/http/codec_client.h" +#include "common/network/listen_socket_impl.h" +#include "common/stats/isolated_store_impl.h" +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +namespace Mixer { +namespace Integration { + +enum class ServerCloseReason { + REMOTE_CLOSE, // Peer closed or connection was reset after it was + // established. + LOCAL_CLOSE // This process decided to close the connection. +}; + +enum class ServerCallbackResult { + CONTINUE, // Leave the connection open + CLOSE // Close the connection. +}; + +class ServerStream { + public: + ServerStream(); + + virtual ~ServerStream(); + + ServerStream(ServerStream &&) = default; + ServerStream &operator=(ServerStream &&) = default; + + /** + * Send a HTTP header-only response and close the stream. + * + * @param response_headers the response headers + * @param delay delay in msec before sending the response. if 0 send + * immediately + */ + virtual void sendResponseHeaders( + const Envoy::Http::HeaderMap &response_headers, + const std::chrono::milliseconds delay = + std::chrono::milliseconds(0)) PURE; + + /** + * Send a gRPC response and close the stream + * + * @param status The gRPC status (carried in the HTTP response trailer) + * @param response The gRPC response (carried in the HTTP response body) + * @param delay delay in msec before sending the response. if 0 send + * immediately + */ + virtual void sendGrpcResponse(Envoy::Grpc::Status::GrpcStatus status, + const Envoy::Protobuf::Message &response, + const std::chrono::milliseconds delay = + std::chrono::milliseconds(0)) PURE; + + private: + ServerStream(const ServerStream &) = delete; + void operator=(const ServerStream &) = delete; +}; + +typedef std::unique_ptr ServerStreamPtr; +typedef std::shared_ptr ServerStreamSharedPtr; + +class ServerConnection; + +// NB: references passed to any of these callbacks are owned by the caller and +// must not be used after the callback returns -- except for the request headers +// which may be moved into the caller. +typedef std::function + ServerAcceptCallback; +typedef std::function + ServerCloseCallback; +// TODO support sending delayed responses +typedef std::function + ServerRequestCallback; + +class ServerConnection : public Envoy::Network::ReadFilter, + public Envoy::Network::ConnectionCallbacks, + public Envoy::Http::ServerConnectionCallbacks, + Envoy::Logger::Loggable { + public: + ServerConnection(const std::string &name, uint32_t id, + ServerRequestCallback request_callback, + ServerCloseCallback close_callback, + Envoy::Network::Connection &network_connection, + Envoy::Event::Dispatcher &dispatcher, + Envoy::Http::CodecClient::Type http_type, + Envoy::Stats::Scope &scope); + + virtual ~ServerConnection(); + + ServerConnection(ServerConnection &&) = default; + ServerConnection &operator=(ServerConnection &&) = default; + + const std::string &name() const; + + uint32_t id() const; + + Envoy::Network::Connection &networkConnection(); + const Envoy::Network::Connection &networkConnection() const; + + Envoy::Http::ServerConnection &httpConnection(); + const Envoy::Http::ServerConnection &httpConnection() const; + + Envoy::Event::Dispatcher &dispatcher(); + + /** + * For internal use + */ + void removeStream(uint32_t stream_id); + + // + // Envoy::Network::ReadFilter + // + + virtual Envoy::Network::FilterStatus onData(Envoy::Buffer::Instance &data, + bool end_stream) override; + + virtual Envoy::Network::FilterStatus onNewConnection() override; + + virtual void initializeReadFilterCallbacks( + Envoy::Network::ReadFilterCallbacks &) override; + + // + // Envoy::Http::ConnectionCallbacks + // + + virtual void onGoAway() override; + + // + // Envoy::Http::ServerConnectionCallbacks + // + + virtual Envoy::Http::StreamDecoder &newStream( + Envoy::Http::StreamEncoder &stream_encoder, + bool is_internally_created = false) override; + + // + // Envoy::Network::ConnectionCallbacks + // + + virtual void onEvent(Envoy::Network::ConnectionEvent event) override; + + virtual void onAboveWriteBufferHighWatermark() override; + + virtual void onBelowWriteBufferLowWatermark() override; + + private: + ServerConnection(const ServerConnection &) = delete; + ServerConnection &operator=(const ServerConnection &) = delete; + + std::string name_; + uint32_t id_; + Envoy::Network::Connection &network_connection_; + Envoy::Http::ServerConnectionPtr http_connection_; + Envoy::Event::Dispatcher &dispatcher_; + ServerRequestCallback request_callback_; + ServerCloseCallback close_callback_; + + std::mutex streams_lock_; + std::unordered_map streams_; + uint32_t stream_counter_{0U}; +}; + +typedef std::unique_ptr ServerConnectionPtr; +typedef std::shared_ptr ServerConnectionSharedPtr; + +class ServerFilterChain : public Envoy::Network::FilterChain { + public: + ServerFilterChain( + Envoy::Network::TransportSocketFactory &transport_socket_factory); + + virtual ~ServerFilterChain(); + + ServerFilterChain(ServerFilterChain &&) = default; + ServerFilterChain &operator=(ServerFilterChain &&) = default; + + // + // Envoy::Network::FilterChain + // + + virtual const Envoy::Network::TransportSocketFactory &transportSocketFactory() + const override; + + virtual const std::vector + &networkFilterFactories() const override; + + private: + ServerFilterChain(const ServerFilterChain &) = delete; + ServerFilterChain &operator=(const ServerFilterChain &) = delete; + + Envoy::Network::TransportSocketFactory &transport_socket_factory_; + std::vector network_filter_factories_; +}; + +/** + * A convenience class for creating a listening socket bound to localhost + */ +class LocalListenSocket : public Envoy::Network::TcpListenSocket { + public: + /** + * Create a listening socket bound to localhost. + * + * @param ip_version v4 or v6. v4 by default. + * @param port the port. If 0, let the kernel allocate an avaiable ephemeral + * port. 0 by default. + * @param options socket options. nullptr by default + * @param bind_to_port if true immediately bind to the port, allocating one if + * necessary. true by default. + */ + LocalListenSocket( + Envoy::Network::Address::IpVersion ip_version = + Envoy::Network::Address::IpVersion::v4, + uint16_t port = 0, + const Envoy::Network::Socket::OptionsSharedPtr &options = nullptr, + bool bind_to_port = true); + + virtual ~LocalListenSocket(); + + LocalListenSocket(LocalListenSocket &&) = default; + LocalListenSocket &operator=(LocalListenSocket &&) = default; + + private: + LocalListenSocket(const LocalListenSocket &) = delete; + void operator=(const LocalListenSocket &) = delete; +}; + +/** + * A convenience class for passing callbacks to a Server. If no callbacks are + * provided, default callbacks that track some simple metrics will be used. If + * callbacks are provided, they will be wrapped with callbacks that maintain the + * same simple set of metrics. + */ +class ServerCallbackHelper { + public: + ServerCallbackHelper(ServerRequestCallback request_callback = nullptr, + ServerAcceptCallback accept_callback = nullptr, + ServerCloseCallback close_callback = nullptr); + + virtual ~ServerCallbackHelper(); + + ServerCallbackHelper(ServerCallbackHelper &&) = default; + ServerCallbackHelper &operator=(ServerCallbackHelper &&) = default; + + uint32_t connectionsAccepted() const; + uint32_t requestsReceived() const; + uint32_t localCloses() const; + uint32_t remoteCloses() const; + ServerAcceptCallback acceptCallback() const; + ServerRequestCallback requestCallback() const; + ServerCloseCallback closeCallback() const; + + /* + * Wait until the server has accepted n connections and seen them closed (due + * to error or client close) + */ + void wait(uint32_t connections); + + /* + * Wait until the server has seen a close for every connection it has + * accepted. + */ + void wait(); + + private: + ServerCallbackHelper(const ServerCallbackHelper &) = delete; + void operator=(const ServerCallbackHelper &) = delete; + + ServerAcceptCallback accept_callback_; + ServerRequestCallback request_callback_; + ServerCloseCallback close_callback_; + + std::atomic accepts_{0}; + std::atomic requests_received_{0}; + uint32_t local_closes_{0}; + uint32_t remote_closes_{0}; + mutable absl::Mutex mutex_; +}; + +typedef std::unique_ptr ServerCallbackHelperPtr; +typedef std::shared_ptr ServerCallbackHelperSharedPtr; + +class Server : public Envoy::Network::FilterChainManager, + public Envoy::Network::FilterChainFactory, + public Envoy::Network::ListenerConfig, + Envoy::Logger::Loggable { + public: + // TODO make use of Network::Socket::OptionsSharedPtr + Server(const std::string &name, Envoy::Network::Socket &listening_socket, + Envoy::Network::TransportSocketFactory &transport_socket_factory, + Envoy::Http::CodecClient::Type http_type); + + virtual ~Server(); + + Server(Server &&) = default; + Server &operator=(Server &&) = default; + + void start(ServerAcceptCallback accept_callback, + ServerRequestCallback request_callback, + ServerCloseCallback close_callback); + + void start(ServerCallbackHelper &helper); + + void stop(); + + void stopAcceptingConnections(); + + void startAcceptingConnections(); + + const Envoy::Stats::Store &statsStore() const; + + // TODO does this affect socket recv buffer size? Only for new connections? + void setPerConnectionBufferLimitBytes(uint32_t limit); + + // + // Envoy::Network::ListenerConfig + // + + virtual Envoy::Network::FilterChainManager &filterChainManager() override; + + virtual Envoy::Network::FilterChainFactory &filterChainFactory() override; + + virtual Envoy::Network::Socket &socket() override; + + virtual const Envoy::Network::Socket &socket() const override; + + virtual bool bindToPort() override; + + virtual bool handOffRestoredDestinationConnections() const override; + + // TODO does this affect socket recv buffer size? Only for new connections? + virtual uint32_t perConnectionBufferLimitBytes() const override; + + virtual std::chrono::milliseconds listenerFiltersTimeout() const override; + + virtual Envoy::Stats::Scope &listenerScope() override; + + virtual uint64_t listenerTag() const override; + + virtual const std::string &name() const override; + + virtual bool reverseWriteFilterOrder() const override; + + // + // Envoy::Network::FilterChainManager + // + + virtual const Envoy::Network::FilterChain *findFilterChain( + const Envoy::Network::ConnectionSocket &) const override; + + // + // Envoy::Network::FilterChainFactory + // + + virtual bool createNetworkFilterChain( + Envoy::Network::Connection &network_connection, + const std::vector &) override; + + virtual bool createListenerFilterChain( + Envoy::Network::ListenerFilterManager &) override; + + private: + Server(const Server &) = delete; + void operator=(const Server &) = delete; + + std::string name_; + Envoy::Stats::IsolatedStoreImpl stats_; + Envoy::Event::TestRealTimeSystem time_system_; + Envoy::Api::Impl api_; + Envoy::Event::DispatcherPtr dispatcher_; + Envoy::Network::ConnectionHandlerPtr connection_handler_; + Envoy::Thread::ThreadPtr thread_; + std::atomic is_running{false}; + + ServerAcceptCallback accept_callback_{nullptr}; + ServerRequestCallback request_callback_{nullptr}; + ServerCloseCallback close_callback_{nullptr}; + + // + // Envoy::Network::ListenerConfig + // + + Envoy::Network::Socket &listening_socket_; + std::atomic connection_buffer_limit_bytes_{0U}; + + // + // Envoy::Network::FilterChainManager + // + + ServerFilterChain server_filter_chain_; + + // + // Envoy::Network::FilterChainFactory + // + + Envoy::Http::CodecClient::Type http_type_; + std::atomic connection_counter_{0U}; +}; + +typedef std::unique_ptr ServerPtr; +typedef std::shared_ptr ServerSharedPtr; + +class ClusterHelper { + public: + /*template + ClusterHelper(Args &&... args) : servers_(std::forward(args)...){};*/ + + ClusterHelper(std::initializer_list server_callbacks); + + virtual ~ClusterHelper(); + + const std::vector &servers() const; + std::vector &servers(); + + uint32_t connectionsAccepted() const; + uint32_t requestsReceived() const; + uint32_t localCloses() const; + uint32_t remoteCloses() const; + + void wait(); + + private: + ClusterHelper(const ClusterHelper &) = delete; + void operator=(const ClusterHelper &) = delete; + + std::vector server_callback_helpers_; +}; + +} // namespace Integration +} // namespace Mixer