diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index 42b414c665f64..3775c84b3af66 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -29,6 +29,7 @@ struct RawSlice { size_t len_ = 0; bool operator==(const RawSlice& rhs) const { return mem_ == rhs.mem_ && len_ == rhs.len_; } + bool operator!=(const RawSlice& rhs) const { return !(*this == rhs); } }; using RawSliceVector = absl::InlinedVector; @@ -187,6 +188,16 @@ class Instance { */ virtual void* linearize(uint32_t size) PURE; + /** + * Get a pointer to a linear chunk of this buffer. The chunk may be smaller than max_size, even if + * the length of the buffer is larger. The function will heuristically determine how much data to + * copy based on `desired_min_size`, in order to avoid patterns in which all the data is copied + * when it doesn't need to be. For example, if the buffer contains a slice containing 1 byte, + * followed by 100 slices containing ``max_size``, repeatedly calling this function would avoid + * repeatedly copying ``max_size - 1`` bytes to make chunks of ``max_size``. + */ + virtual RawSlice maybeLinearize(uint32_t max_size, uint32_t desired_min_size) PURE; + /** * Move a buffer into this buffer. As little copying is done as possible. * @param rhs supplies the buffer to move. diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index f43507ac9530e..d64eb9be13f5e 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -277,6 +277,30 @@ void* OwnedImpl::linearize(uint32_t size) { return slices_.front()->data(); } +RawSlice OwnedImpl::maybeLinearize(uint32_t max_size, uint32_t desired_min_size) { + while (!slices_.empty() && slices_[0]->dataSize() == 0) { + slices_.pop_front(); + } + + if (slices_.empty()) { + return {nullptr, 0}; + } + + const uint64_t slice_size = std::min(slices_[0]->dataSize(), max_size); + if (slice_size >= desired_min_size) { + return {slices_[0]->data(), slice_size}; + } + + // The next slice will already be of the desired size, so don't copy and + // return the front slice. + if (slices_.size() >= 2 && slices_[1]->dataSize() >= max_size) { + return {slices_[0]->data(), slice_size}; + } + + auto size = std::min(max_size, length_); + return {linearize(size), size}; +} + void OwnedImpl::coalesceOrAddSlice(SlicePtr&& other_slice) { const uint64_t slice_size = other_slice->dataSize(); // The `other_slice` content can be coalesced into the existing slice IFF: diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 1801068867323..b4544f1219896 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -568,6 +568,7 @@ class OwnedImpl : public LibEventInstance { SliceDataPtr extractMutableFrontSlice() override; uint64_t length() const override; void* linearize(uint32_t size) override; + RawSlice maybeLinearize(uint32_t max_size, uint32_t desired_min_size) override; void move(Instance& rhs) override; void move(Instance& rhs, uint64_t length) override; uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override; diff --git a/source/extensions/transport_sockets/tls/context_impl.cc b/source/extensions/transport_sockets/tls/context_impl.cc index e79a1aeadb6dc..4798a20a41493 100644 --- a/source/extensions/transport_sockets/tls/context_impl.cc +++ b/source/extensions/transport_sockets/tls/context_impl.cc @@ -84,6 +84,10 @@ ContextImpl::ContextImpl(Stats::Scope& scope, const Envoy::Ssl::ContextConfig& c int rc = SSL_CTX_set_app_data(ctx.ssl_ctx_.get(), this); RELEASE_ASSERT(rc == 1, Utility::getLastCryptoError().value_or("")); + constexpr uint32_t mode = SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER; + rc = SSL_CTX_set_mode(ctx.ssl_ctx_.get(), mode); + RELEASE_ASSERT((rc & mode) == mode, Utility::getLastCryptoError().value_or("")); + rc = SSL_CTX_set_min_proto_version(ctx.ssl_ctx_.get(), config.minProtocolVersion()); RELEASE_ASSERT(rc == 1, Utility::getLastCryptoError().value_or("")); diff --git a/source/extensions/transport_sockets/tls/context_impl.h b/source/extensions/transport_sockets/tls/context_impl.h index c533e0cc9c4fe..effb6536e2977 100644 --- a/source/extensions/transport_sockets/tls/context_impl.h +++ b/source/extensions/transport_sockets/tls/context_impl.h @@ -45,7 +45,8 @@ namespace Tls { COUNTER(ocsp_staple_failed) \ COUNTER(ocsp_staple_omitted) \ COUNTER(ocsp_staple_responses) \ - COUNTER(ocsp_staple_requests) + COUNTER(ocsp_staple_requests) \ + HISTOGRAM(write_size, Bytes) /** * Wrapper struct for SSL stats. @see stats_macros.h diff --git a/source/extensions/transport_sockets/tls/ssl_socket.cc b/source/extensions/transport_sockets/tls/ssl_socket.cc index 37a1f8547952b..48e56c674d98a 100644 --- a/source/extensions/transport_sockets/tls/ssl_socket.cc +++ b/source/extensions/transport_sockets/tls/ssl_socket.cc @@ -236,35 +236,27 @@ Network::IoResult SslSocket::doWrite(Buffer::Instance& write_buffer, bool end_st } } - uint64_t bytes_to_write; - if (bytes_to_retry_) { - bytes_to_write = bytes_to_retry_; - bytes_to_retry_ = 0; - } else { - bytes_to_write = std::min(write_buffer.length(), static_cast(16384)); - } - uint64_t total_bytes_written = 0; - while (bytes_to_write > 0) { + while (true) { // TODO(mattklein123): As it relates to our fairness efforts, we might want to limit the number // of iterations of this loop, either by pure iterations, bytes written, etc. + const auto slice = write_buffer.maybeLinearize(16384, 4096); + if (slice.len_ == 0) { + break; + } - // SSL_write() requires that if a previous call returns SSL_ERROR_WANT_WRITE, we need to call - // it again with the same parameters. This is done by tracking last write size, but not write - // data, since linearize() will return the same undrained data anyway. - ASSERT(bytes_to_write <= write_buffer.length()); - int rc = SSL_write(rawSsl(), write_buffer.linearize(bytes_to_write), bytes_to_write); + ASSERT(slice.mem_ != nullptr); + int rc = SSL_write(rawSsl(), slice.mem_, slice.len_); ENVOY_CONN_LOG(trace, "ssl write returns: {}", callbacks_->connection(), rc); if (rc > 0) { - ASSERT(rc == static_cast(bytes_to_write)); + ASSERT(rc == static_cast(slice.len_)); + ctx_->stats().write_size_.recordValue(rc); total_bytes_written += rc; write_buffer.drain(rc); - bytes_to_write = std::min(write_buffer.length(), static_cast(16384)); } else { int err = SSL_get_error(rawSsl(), rc); switch (err) { case SSL_ERROR_WANT_WRITE: - bytes_to_retry_ = bytes_to_write; break; case SSL_ERROR_WANT_READ: // Renegotiation has started. We don't handle renegotiation so just fall through. diff --git a/source/extensions/transport_sockets/tls/ssl_socket.h b/source/extensions/transport_sockets/tls/ssl_socket.h index 81b4712979fd7..33394a233ccae 100644 --- a/source/extensions/transport_sockets/tls/ssl_socket.h +++ b/source/extensions/transport_sockets/tls/ssl_socket.h @@ -91,7 +91,6 @@ class SslSocket : public Network::TransportSocket, const Network::TransportSocketOptionsSharedPtr transport_socket_options_; Network::TransportSocketCallbacks* callbacks_{}; ContextImplSharedPtr ctx_; - uint64_t bytes_to_retry_{}; std::string failure_reason_; SslHandshakerImplSharedPtr info_; diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 7dc34ca3e58fa..d9890fae65f12 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -135,6 +135,10 @@ class StringBuffer : public Buffer::Instance { return mutableStart(); } + Buffer::RawSlice maybeLinearize(uint32_t max_size, uint32_t /*desired_min_size*/) override { + return {mutableStart(), std::min(size_, max_size)}; + } + Buffer::SliceDataPtr extractMutableFrontSlice() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } void move(Buffer::Instance& rhs) override { move(rhs, rhs.length()); } diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index e7249e2b65220..d605996cac14b 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -786,6 +786,60 @@ TEST_F(OwnedImplTest, LinearizeDrainTracking) { expectSlices({}, buffer); } +TEST_F(OwnedImplTest, MaybeLinearizeEmpty) { + Buffer::OwnedImpl empty; + EXPECT_EQ(0, empty.maybeLinearize(1024, 1024).len_); +} + +// Test that the correct value is returned in both the case where +// the slice has a larger and a smaller length than `desired_min_size`. +TEST_F(OwnedImplTest, MaybeLinearizeSingleSlice) { + Buffer::OwnedImpl buffer; + buffer.add(std::string(100, 'a')); + EXPECT_EQ(100, buffer.maybeLinearize(1024, 512).len_); + EXPECT_EQ(100, buffer.maybeLinearize(1024, 1).len_); +} + +TEST_F(OwnedImplTest, MaybeLinearizeDesiredMinSize) { + Buffer::OwnedImpl buffer; + buffer.add(std::string(10000, 'a')); + Buffer::OwnedImpl other; + other.add(std::string(10000, 'b')); + buffer.move(other); + + // Verify test slices are as expected + const auto slices = buffer.getRawSlices(); + ASSERT_EQ(2, slices.size()); + ASSERT_EQ(10000, slices[0].len_); + ASSERT_EQ(10000, slices[1].len_); + + // Ask for the entire buffer size. This should return only the first slice because + // `desired_min_size` is less than the size of that slice. + EXPECT_EQ(slices[0], buffer.maybeLinearize(20000, 9999)); + + // Ask for the entire buffer size, but with a desired_min_size greater than the first + // slice. This should get fully linearized into a single slice. + EXPECT_EQ(20000, buffer.maybeLinearize(20000, 10001).len_); +} + +// Test that a smaller slice than `desired_min_size` is returned if the next slice +// after it is full-sized. +TEST_F(OwnedImplTest, MaybeLinearizePreferNextSlice) { + Buffer::OwnedImpl buffer; + buffer.add("a"); + Buffer::OwnedImpl other; + other.add(std::string(10000, 'b')); + buffer.move(other); + + // Verify test slices are as expected + const auto slices = buffer.getRawSlices(); + ASSERT_EQ(2, slices.size()); + ASSERT_EQ(1, slices[0].len_); + ASSERT_EQ(10000, slices[1].len_); + + EXPECT_EQ(1, buffer.maybeLinearize(10000, 1024).len_); +} + TEST_F(OwnedImplTest, ReserveCommit) { // This fragment will later be added to the buffer. It is declared in an enclosing scope to // ensure it is not destructed until after the buffer is. diff --git a/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc index 1643cc83d2fa9..5188ca37e9d69 100644 --- a/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc +++ b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc @@ -527,6 +527,8 @@ class FakeBuffer : public Buffer::Instance { MOCK_METHOD(Buffer::SliceDataPtr, extractMutableFrontSlice, (), (override)); MOCK_METHOD(uint64_t, length, (), (const, override)); MOCK_METHOD(void*, linearize, (uint32_t), (override)); + MOCK_METHOD(Buffer::RawSlice, maybeLinearize, (uint32_t max_size, uint32_t desired_min_size), + (override)); MOCK_METHOD(void, move, (Instance&), (override)); MOCK_METHOD(void, move, (Instance&, uint64_t), (override)); MOCK_METHOD(uint64_t, reserve, (uint64_t, Buffer::RawSlice*, uint64_t), (override)); diff --git a/test/extensions/transport_sockets/tls/BUILD b/test/extensions/transport_sockets/tls/BUILD index cfcb065655bbb..d0eff7767d31c 100644 --- a/test/extensions/transport_sockets/tls/BUILD +++ b/test/extensions/transport_sockets/tls/BUILD @@ -1,5 +1,7 @@ load( "//bazel:envoy_build_system.bzl", + "envoy_benchmark_test", + "envoy_cc_benchmark_binary", "envoy_cc_test", "envoy_cc_test_library", "envoy_package", @@ -179,3 +181,23 @@ envoy_cc_test( "//test/mocks/stats:stats_mocks", ], ) + +envoy_cc_benchmark_binary( + name = "tls_throughput_benchmark", + srcs = ["tls_throughput_test.cc"], + data = [ + "//test/extensions/transport_sockets/tls/test_data:certs", + ], + external_deps = [ + "benchmark", + "ssl", + ], + deps = [ + "//source/common/buffer:buffer_lib", + ], +) + +envoy_benchmark_test( + name = "tls_throughput_benchmark_test", + benchmark_binary = "tls_throughput_benchmark", +) diff --git a/test/extensions/transport_sockets/tls/tls_throughput_test.cc b/test/extensions/transport_sockets/tls/tls_throughput_test.cc new file mode 100644 index 0000000000000..2709bff41421a --- /dev/null +++ b/test/extensions/transport_sockets/tls/tls_throughput_test.cc @@ -0,0 +1,179 @@ +#include "common/buffer/buffer_impl.h" + +#include "test/test_common/environment.h" + +#include "benchmark/benchmark.h" +#include "openssl/ssl.h" +#include "tools/cpp/runfiles/runfiles.h" + +namespace Envoy { +namespace Extensions::TransportSockets::Tls { + +static void drainErrorQueue() { + while (uint64_t err = ERR_get_error()) { + std::string failure_reason = + absl::StrCat(err, ":", ERR_lib_error_string(err), ":", ERR_func_error_string(err), ":", + ERR_reason_error_string(err)); + ENVOY_LOG_MISC(error, "{}", failure_reason); + } +} + +static void appendSlice(Buffer::Instance& buffer, uint32_t size) { + Buffer::RawSlice slice; + std::string data(size, 'a'); + RELEASE_ASSERT(data.size() <= 16384, "short_slice_size can't be larger than full slice"); + + // A 16kb request currently has inline metadata, which makes it 16384+8. This gets rounded up + // to the next page size. Request enough that there is no extra space, to ensure that this results + // in a new slice. + buffer.reserve(20416, &slice, 1); + + memcpy(slice.mem_, data.data(), data.size()); + slice.len_ = data.size(); + buffer.commit(&slice, 1); +} + +static void testThroughput(benchmark::State& state) { + std::string error; + std::unique_ptr runfiles( + bazel::tools::cpp::runfiles::Runfiles::Create("tls_throughput_test", &error)); + Envoy::TestEnvironment::setRunfiles(runfiles.get()); + + int sockets[2]; + socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sockets); + + auto* server_ctx = SSL_CTX_new(TLS_method()); + auto* client_ctx = SSL_CTX_new(TLS_method()); + std::string cert_path = TestEnvironment::substitute( + "{{ test_rundir }}/test/extensions/transport_sockets/tls/test_data/san_dns_cert.pem"); + std::string key_path = TestEnvironment::substitute( + "{{ test_rundir }}/test/extensions/transport_sockets/tls/test_data/san_dns_key.pem"); + auto err = SSL_CTX_use_certificate_file(server_ctx, cert_path.c_str(), SSL_FILETYPE_PEM); + drainErrorQueue(); + RELEASE_ASSERT(err > 0, "SSL_CTX_use_certificate_file"); + err = SSL_CTX_use_PrivateKey_file(server_ctx, key_path.c_str(), SSL_FILETYPE_PEM); + RELEASE_ASSERT(err > 0, "SSL_CTX_use_PrivateKey_file"); + + SSL* server_ssl = SSL_new(server_ctx); + SSL_set_fd(server_ssl, sockets[0]); + SSL_set_accept_state(server_ssl); + + SSL* client_ssl = SSL_new(client_ctx); + SSL_set_fd(client_ssl, sockets[1]); + SSL_set_connect_state(client_ssl); + + bool handshake_success = false; + for (int i = 0; i < 50; i++) { + int client_err = SSL_do_handshake(client_ssl); + int server_err = SSL_do_handshake(server_ssl); + if (client_err == 1 && server_err == 1) { + handshake_success = true; + break; + } + int err = SSL_get_error(server_ssl, server_err); + switch (err) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + continue; + default: + drainErrorQueue(); + PANIC("Unexpected error during handshake"); + } + + ENVOY_LOG_MISC(error, "client_err {} server_err {}", client_err, server_err); + } + + RELEASE_ASSERT(handshake_success, "handshake completed successfully"); + + static uint8_t read_buf[1024 * 1024]; + + auto full_linearize = state.range(0); + unsigned short_slice_size = state.range(1); + unsigned num_short_slices = state.range(2); + + uint64_t bytes_written = 0; + for (auto _ : state) { + state.PauseTiming(); + + // Empty out the read side to make space for the writes. + while (SSL_read(server_ssl, read_buf, sizeof(read_buf)) > 0) + ; + + std::string send(short_slice_size, 'a'); + Buffer::OwnedImpl write_buf; + for (unsigned i = 0; i < num_short_slices; i++) { + if (short_slice_size > 0) { + appendSlice(write_buf, short_slice_size); + } + } + appendSlice(write_buf, 16384 - (num_short_slices * short_slice_size)); + RELEASE_ASSERT(write_buf.length() == 16384, + fmt::format("expected length 16384, got {}", write_buf.length())); + RELEASE_ASSERT(write_buf.getRawSlices().size() == (num_short_slices + 1), + fmt::format("buffer number of slices expected {}, got {}", num_short_slices + 1, + write_buf.getRawSlices().size())); + + // Append many full-sized slices, the same manner that an envoy socket read would. + for (unsigned i = 0; i < 10; i++) { + auto start_size = write_buf.length(); + Buffer::RawSlice slices[2]; + auto num_slices = write_buf.reserve(16384, slices, 2); + for (unsigned i = 0; i < num_slices; i++) { + memset(slices[i].mem_, 'a', slices[i].len_); + } + write_buf.commit(slices, 2); + RELEASE_ASSERT(write_buf.length() - start_size == 16384, "correct reserve/commit"); + } + + bytes_written += write_buf.length(); + + state.ResumeTiming(); + uint32_t num_writes = 0; + uint32_t num_times_linearize_did_something = 0; + while (write_buf.length() > 0) { + const Buffer::RawSlice initial = write_buf.frontSlice(); + void* mem; + size_t len = std::min(write_buf.length(), 16384); + if (full_linearize) { + mem = write_buf.linearize(len); + } else { + auto slice = write_buf.maybeLinearize(len, 4096); + mem = slice.mem_; + len = slice.len_; + } + if (write_buf.frontSlice() != initial) { + num_times_linearize_did_something++; + } + + err = SSL_write(client_ssl, mem, len); + RELEASE_ASSERT(err == static_cast(len), "SSL_write"); + write_buf.drain(len); + num_writes++; + } + + state.counters["writes_per_iteration"] = num_writes; + state.counters["num_times_linearize_did_something"] = num_times_linearize_did_something; + } + state.counters["throughput"] = benchmark::Counter(bytes_written, benchmark::Counter::kIsRate); + ::close(sockets[0]); + ::close(sockets[1]); +} + +static void TestParams(benchmark::internal::Benchmark* b) { + // Add a single case of no short slices; don't iterate over the sizes + // which duplicates test cases when count is zero. + b->Args({false, 0, 0}); + b->Args({true, 0, 0}); + + for (auto num_short_slices : {1, 2, 3}) { + for (auto short_slice_size : {1, 128, 4096}) { + b->Args({false, short_slice_size, num_short_slices}); + b->Args({true, short_slice_size, num_short_slices}); + } + } +} + +BENCHMARK(testThroughput)->Unit(::benchmark::kMicrosecond)->Apply(TestParams); + +} // namespace Extensions::TransportSockets::Tls +} // namespace Envoy diff --git a/test/test_common/environment.cc b/test/test_common/environment.cc index b70a9d9c8cf7b..670984f8a08da 100644 --- a/test/test_common/environment.cc +++ b/test/test_common/environment.cc @@ -286,7 +286,7 @@ std::string TestEnvironment::substitute(const std::string& str, const absl::node_hash_map path_map = { {"test_tmpdir", TestEnvironment::temporaryDirectory()}, {"test_udsdir", TestEnvironment::unixDomainSocketDirectory()}, - {"test_rundir", runfiles_ != nullptr ? TestEnvironment::runfilesDirectory() : "invalid"}, + {"test_rundir", runfiles_ != nullptr ? TestEnvironment::runfilesDirectory() : "."}, }; std::string out_json_string = str;