From a40041066ecd48d14a97db9f43a6555dc64f5c52 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Wed, 15 Sep 2021 07:13:19 +0000 Subject: [PATCH 01/13] grpc: implement BufferedAsyncClient for bidi gRPC stream Signed-off-by: Shikugawa --- source/common/grpc/BUILD | 9 ++ source/common/grpc/buffered_async_client.h | 114 ++++++++++++++++++ test/common/grpc/BUILD | 15 +++ .../common/grpc/buffered_async_client_test.cc | 113 +++++++++++++++++ 4 files changed, 251 insertions(+) create mode 100644 source/common/grpc/buffered_async_client.h create mode 100644 test/common/grpc/buffered_async_client_test.cc diff --git a/source/common/grpc/BUILD b/source/common/grpc/BUILD index 8e3cc89fd7e77..6729a434b2466 100644 --- a/source/common/grpc/BUILD +++ b/source/common/grpc/BUILD @@ -206,3 +206,12 @@ envoy_cc_library( "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], ) + +envoy_cc_library( + name = "buffered_async_client_lib", + hdrs = ["buffered_async_client.h"], + deps = [ + ":typed_async_client_lib", + "//source/common/protobuf:utility_lib", + ], +) diff --git a/source/common/grpc/buffered_async_client.h b/source/common/grpc/buffered_async_client.h new file mode 100644 index 0000000000000..e2dca0a90c3e2 --- /dev/null +++ b/source/common/grpc/buffered_async_client.h @@ -0,0 +1,114 @@ +#pragma once + +#include "source/common/grpc/typed_async_client.h" +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Grpc { + +enum class BufferState { Buffered, PendingFlush }; + +template class BufferedAsyncClient { +public: + BufferedAsyncClient(uint32_t max_buffer_bytes, const Protobuf::MethodDescriptor& service_method, + Grpc::AsyncStreamCallbacks& callbacks, + const Grpc::AsyncClient& client) + : max_buffer_bytes_(max_buffer_bytes), service_method_(service_method), callbacks_(callbacks), + client_(client) {} + + virtual ~BufferedAsyncClient() { cleanup(); } + + uint32_t publishId(RequestType& message) { return MessageUtil::hash(message); } + + void bufferMessage(uint32_t id, RequestType& message) { + const auto buffer_size = message.ByteSizeLong(); + if (current_buffer_bytes_ + buffer_size > max_buffer_bytes_) { + return; + } + + message_buffer_[id] = std::make_pair(BufferState::Buffered, message); + current_buffer_bytes_ += buffer_size; + } + + absl::flat_hash_set sendBufferedMessages() { + if (active_stream_ == nullptr) { + active_stream_ = + client_.start(service_method_, callbacks_, Http::AsyncClient::StreamOptions()); + } + + if (active_stream_->isAboveWriteBufferHighWatermark()) { + return {}; + } + + absl::flat_hash_set inflight_message_ids; + + for (auto&& it : message_buffer_) { + const auto id = it.first; + auto& state = it.second.first; + auto& message = it.second.second; + + if (state == BufferState::PendingFlush) { + continue; + } + + state = BufferState::PendingFlush; + inflight_message_ids.emplace(id); + active_stream_->sendMessage(message, false); + } + + return inflight_message_ids; + } + + void onSuccess(uint32_t message_id) { erasePendingMessage(message_id); } + + void onError(uint32_t message_id) { + if (message_buffer_.find(message_id) == message_buffer_.end()) { + return; + } + message_buffer_.at(message_id).first = BufferState::Buffered; + } + + void cleanup() { + if (active_stream_ != nullptr) { + active_stream_ = nullptr; + } + } + + bool hasActiveStream() { return active_stream_ != nullptr; } + + const absl::flat_hash_map>& messageBuffer() { + return message_buffer_; + } + +private: + void erasePendingMessage(uint32_t message_id) { + if (message_buffer_.find(message_id) == message_buffer_.end()) { + return; + } + auto& buffer = message_buffer_.at(message_id); + + // There may be cases where the buffer status is not PendingFlush when + // this function is called. For example, a message_buffer that was + // PendingFlush may become Buffered due to an external state change + // (e.g. re-buffering due to timeout). + if (buffer.first == BufferState::PendingFlush) { + const auto buffer_size = buffer.second.ByteSizeLong(); + current_buffer_bytes_ -= buffer_size; + message_buffer_.erase(message_id); + } + } + + uint32_t max_buffer_bytes_ = 0; + const Protobuf::MethodDescriptor& service_method_; + Grpc::AsyncStreamCallbacks& callbacks_; + Grpc::AsyncClient client_; + Grpc::AsyncStream active_stream_; + absl::flat_hash_map> message_buffer_; + uint32_t current_buffer_bytes_ = 0; +}; + +template +using BufferedAsyncClientPtr = std::unique_ptr>; + +} // namespace Grpc +} // namespace Envoy diff --git a/test/common/grpc/BUILD b/test/common/grpc/BUILD index c4e37301fa708..ca9a7a068b666 100644 --- a/test/common/grpc/BUILD +++ b/test/common/grpc/BUILD @@ -186,3 +186,18 @@ envoy_cc_test_library( "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], ) + +envoy_cc_test( + name = "buffered_async_client_test", + srcs = ["buffered_async_client_test.cc"], + deps = [ + "//source/common/grpc:async_client_lib", + "//source/common/grpc:buffered_async_client_lib", + "//test/mocks/http:http_mocks", + "//test/mocks/tracing:tracing_mocks", + "//test/mocks/upstream:cluster_manager_mocks", + "//test/proto:helloworld_proto_cc_proto", + "//test/test_common:test_time_lib", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) diff --git a/test/common/grpc/buffered_async_client_test.cc b/test/common/grpc/buffered_async_client_test.cc new file mode 100644 index 0000000000000..4caec0900132f --- /dev/null +++ b/test/common/grpc/buffered_async_client_test.cc @@ -0,0 +1,113 @@ +#include "envoy/config/core/v3/grpc_service.pb.h" + +#include "source/common/grpc/async_client_impl.h" +#include "source/common/grpc/buffered_async_client.h" +#include "source/common/network/address_impl.h" +#include "source/common/network/socket_impl.h" + +#include "test/mocks/http/mocks.h" +#include "test/mocks/tracing/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" +#include "test/proto/helloworld.pb.h" +#include "test/test_common/test_time.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Grpc { +namespace { + +class BufferedAsyncClientTest : public testing::Test { +public: + BufferedAsyncClientTest() + : method_descriptor_(helloworld::Greeter::descriptor()->FindMethodByName("SayHello")) { + config_.mutable_envoy_grpc()->set_cluster_name("test_cluster"); + + cm_.initializeThreadLocalClusters({"test_cluster"}); + ON_CALL(cm_.thread_local_cluster_, httpAsyncClient()).WillByDefault(ReturnRef(http_client_)); + } + + const Protobuf::MethodDescriptor* method_descriptor_; + envoy::config::core::v3::GrpcService config_; + NiceMock cm_; + NiceMock http_client_; +}; + +TEST_F(BufferedAsyncClientTest, BasicSendFlow) { + Http::MockAsyncClientStream http_stream; + EXPECT_CALL(http_client_, start(_, _)).WillOnce(Return(&http_stream)); + EXPECT_CALL(http_stream, sendHeaders(_, _)); + EXPECT_CALL(http_stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); + EXPECT_CALL(http_stream, sendData(_, _)); + EXPECT_CALL(http_stream, reset()); + + DangerousDeprecatedTestTime test_time_; + auto raw_client = std::make_shared(cm_, config_, test_time_.timeSystem()); + AsyncClient client(raw_client); + + NiceMock> callback; + BufferedAsyncClient buffered_client( + 100000, *method_descriptor_, callback, client); + + helloworld::HelloRequest request; + request.set_name("Alice"); + auto id = buffered_client.publishId(request); + buffered_client.bufferMessage(id, request); + EXPECT_EQ(1, buffered_client.sendBufferedMessages().size()); + + // Re-buffer, and transport. + buffered_client.onError(id); + + EXPECT_CALL(http_stream, sendData(_, _)).Times(2); + EXPECT_CALL(http_stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); + + helloworld::HelloRequest request2; + request2.set_name("Bob"); + auto id2 = buffered_client.publishId(request2); + buffered_client.bufferMessage(id2, request2); + auto ids2 = buffered_client.sendBufferedMessages(); + EXPECT_EQ(2, ids2.size()); + + // Clear existing messages. + for (auto&& id : ids2) { + buffered_client.onSuccess(id); + } + + // Successfully cleared pending messages. + EXPECT_CALL(http_stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); + auto ids3 = buffered_client.sendBufferedMessages(); + EXPECT_EQ(0, ids3.size()); +} + +TEST_F(BufferedAsyncClientTest, BufferLimitExceeded) { + Http::MockAsyncClientStream http_stream; + EXPECT_CALL(http_client_, start(_, _)).WillOnce(Return(&http_stream)); + EXPECT_CALL(http_stream, sendHeaders(_, _)); + EXPECT_CALL(http_stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); + EXPECT_CALL(http_stream, reset()); + + DangerousDeprecatedTestTime test_time_; + auto raw_client = std::make_shared(cm_, config_, test_time_.timeSystem()); + AsyncClient client(raw_client); + + NiceMock> callback; + BufferedAsyncClient buffered_client( + 0, *method_descriptor_, callback, client); + + helloworld::HelloRequest request; + request.set_name("Alice"); + auto id = buffered_client.publishId(request); + buffered_client.bufferMessage(id, request); + + EXPECT_EQ(0, buffered_client.sendBufferedMessages().size()); +} + +} // namespace +} // namespace Grpc +} // namespace Envoy From 4885e81065183de21a02dfd1a617eb1e43e7325d Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Thu, 16 Sep 2021 03:01:15 +0000 Subject: [PATCH 02/13] add write buf high watermark test Signed-off-by: Shikugawa --- .../common/grpc/buffered_async_client_test.cc | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/common/grpc/buffered_async_client_test.cc b/test/common/grpc/buffered_async_client_test.cc index 4caec0900132f..3231e3a50f781 100644 --- a/test/common/grpc/buffered_async_client_test.cc +++ b/test/common/grpc/buffered_async_client_test.cc @@ -108,6 +108,29 @@ TEST_F(BufferedAsyncClientTest, BufferLimitExceeded) { EXPECT_EQ(0, buffered_client.sendBufferedMessages().size()); } +TEST_F(BufferedAsyncClientTest, BufferHighWatermarkTest) { + Http::MockAsyncClientStream http_stream; + EXPECT_CALL(http_client_, start(_, _)).WillOnce(Return(&http_stream)); + EXPECT_CALL(http_stream, sendHeaders(_, _)); + EXPECT_CALL(http_stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(true)); + EXPECT_CALL(http_stream, reset()); + + DangerousDeprecatedTestTime test_time_; + auto raw_client = std::make_shared(cm_, config_, test_time_.timeSystem()); + AsyncClient client(raw_client); + + NiceMock> callback; + BufferedAsyncClient buffered_client( + 100000, *method_descriptor_, callback, client); + + helloworld::HelloRequest request; + request.set_name("Alice"); + auto id = buffered_client.publishId(request); + buffered_client.bufferMessage(id, request); + + EXPECT_EQ(0, buffered_client.sendBufferedMessages().size()); +} + } // namespace } // namespace Grpc } // namespace Envoy From 82bf7c32d1316244513c1ead9ec3e2f11328a18f Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Mon, 20 Sep 2021 10:26:24 +0000 Subject: [PATCH 03/13] fix Signed-off-by: Shikugawa --- source/common/grpc/buffered_async_client.h | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/common/grpc/buffered_async_client.h b/source/common/grpc/buffered_async_client.h index e2dca0a90c3e2..136c0d92bb454 100644 --- a/source/common/grpc/buffered_async_client.h +++ b/source/common/grpc/buffered_async_client.h @@ -3,6 +3,8 @@ #include "source/common/grpc/typed_async_client.h" #include "source/common/protobuf/utility.h" +#include "absl/container/btree_map.h" + namespace Envoy { namespace Grpc { @@ -18,7 +20,11 @@ template class BufferedAsyncClient { virtual ~BufferedAsyncClient() { cleanup(); } - uint32_t publishId(RequestType& message) { return MessageUtil::hash(message); } + virtual uint32_t publishId(RequestType&) { + auto tmp = next_message_id_; + ++next_message_id_; + return tmp; + } void bufferMessage(uint32_t id, RequestType& message) { const auto buffer_size = message.ByteSizeLong(); @@ -65,6 +71,7 @@ template class BufferedAsyncClient { if (message_buffer_.find(message_id) == message_buffer_.end()) { return; } + message_buffer_.at(message_id).first = BufferState::Buffered; } @@ -103,8 +110,9 @@ template class BufferedAsyncClient { Grpc::AsyncStreamCallbacks& callbacks_; Grpc::AsyncClient client_; Grpc::AsyncStream active_stream_; - absl::flat_hash_map> message_buffer_; + absl::btree_map> message_buffer_; uint32_t current_buffer_bytes_ = 0; + uint64_t next_message_id_ = 0; }; template From 84073b0561bb8d5b8637d63a618a505fb37efc3e Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Thu, 23 Sep 2021 08:14:19 +0000 Subject: [PATCH 04/13] fix Signed-off-by: Shikugawa --- source/common/grpc/buffered_async_client.h | 28 +++++++++++----------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/source/common/grpc/buffered_async_client.h b/source/common/grpc/buffered_async_client.h index 136c0d92bb454..ec7d137ccedfa 100644 --- a/source/common/grpc/buffered_async_client.h +++ b/source/common/grpc/buffered_async_client.h @@ -10,6 +10,10 @@ namespace Grpc { enum class BufferState { Buffered, PendingFlush }; +// This class wraps bidirectional gRPC and provides message arrival guarantee. +// It stores messages to be sent or in the process of being sent in a buffer, +// and can track the status of the message based on the ID assigned to each message. +// If a message fails to be sent, it can be re-buffered to guarantee its arrival. template class BufferedAsyncClient { public: BufferedAsyncClient(uint32_t max_buffer_bytes, const Protobuf::MethodDescriptor& service_method, @@ -20,13 +24,9 @@ template class BufferedAsyncClient { virtual ~BufferedAsyncClient() { cleanup(); } - virtual uint32_t publishId(RequestType&) { - auto tmp = next_message_id_; - ++next_message_id_; - return tmp; - } + virtual uint64_t publishId(RequestType&) { return next_message_id_++; } - void bufferMessage(uint32_t id, RequestType& message) { + void bufferMessage(uint64_t id, RequestType& message) { const auto buffer_size = message.ByteSizeLong(); if (current_buffer_bytes_ + buffer_size > max_buffer_bytes_) { return; @@ -36,7 +36,7 @@ template class BufferedAsyncClient { current_buffer_bytes_ += buffer_size; } - absl::flat_hash_set sendBufferedMessages() { + absl::flat_hash_set sendBufferedMessages() { if (active_stream_ == nullptr) { active_stream_ = client_.start(service_method_, callbacks_, Http::AsyncClient::StreamOptions()); @@ -46,7 +46,7 @@ template class BufferedAsyncClient { return {}; } - absl::flat_hash_set inflight_message_ids; + absl::flat_hash_set inflight_message_ids; for (auto&& it : message_buffer_) { const auto id = it.first; @@ -65,9 +65,9 @@ template class BufferedAsyncClient { return inflight_message_ids; } - void onSuccess(uint32_t message_id) { erasePendingMessage(message_id); } + void onSuccess(uint64_t message_id) { erasePendingMessage(message_id); } - void onError(uint32_t message_id) { + void onError(uint64_t message_id) { if (message_buffer_.find(message_id) == message_buffer_.end()) { return; } @@ -83,12 +83,12 @@ template class BufferedAsyncClient { bool hasActiveStream() { return active_stream_ != nullptr; } - const absl::flat_hash_map>& messageBuffer() { + const absl::flat_hash_map>& messageBuffer() { return message_buffer_; } private: - void erasePendingMessage(uint32_t message_id) { + void erasePendingMessage(uint64_t message_id) { if (message_buffer_.find(message_id) == message_buffer_.end()) { return; } @@ -105,12 +105,12 @@ template class BufferedAsyncClient { } } - uint32_t max_buffer_bytes_ = 0; + const uint32_t max_buffer_bytes_ = 0; const Protobuf::MethodDescriptor& service_method_; Grpc::AsyncStreamCallbacks& callbacks_; Grpc::AsyncClient client_; Grpc::AsyncStream active_stream_; - absl::btree_map> message_buffer_; + absl::btree_map> message_buffer_; uint32_t current_buffer_bytes_ = 0; uint64_t next_message_id_ = 0; }; From 8bff3c46f7ad4b81dd73ceb3b06c6888621fd148 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Thu, 30 Sep 2021 03:41:44 +0000 Subject: [PATCH 05/13] not to expose puslishId() Signed-off-by: Shikugawa --- source/common/grpc/buffered_async_client.h | 7 ++++--- test/common/grpc/buffered_async_client_test.cc | 17 +++++++---------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/source/common/grpc/buffered_async_client.h b/source/common/grpc/buffered_async_client.h index ec7d137ccedfa..8da230701e096 100644 --- a/source/common/grpc/buffered_async_client.h +++ b/source/common/grpc/buffered_async_client.h @@ -24,14 +24,13 @@ template class BufferedAsyncClient { virtual ~BufferedAsyncClient() { cleanup(); } - virtual uint64_t publishId(RequestType&) { return next_message_id_++; } - - void bufferMessage(uint64_t id, RequestType& message) { + void bufferMessage(RequestType& message) { const auto buffer_size = message.ByteSizeLong(); if (current_buffer_bytes_ + buffer_size > max_buffer_bytes_) { return; } + auto id = publishId(); message_buffer_[id] = std::make_pair(BufferState::Buffered, message); current_buffer_bytes_ += buffer_size; } @@ -105,6 +104,8 @@ template class BufferedAsyncClient { } } + uint64_t publishId() { return next_message_id_++; } + const uint32_t max_buffer_bytes_ = 0; const Protobuf::MethodDescriptor& service_method_; Grpc::AsyncStreamCallbacks& callbacks_; diff --git a/test/common/grpc/buffered_async_client_test.cc b/test/common/grpc/buffered_async_client_test.cc index 3231e3a50f781..cb6dd554096e2 100644 --- a/test/common/grpc/buffered_async_client_test.cc +++ b/test/common/grpc/buffered_async_client_test.cc @@ -57,20 +57,19 @@ TEST_F(BufferedAsyncClientTest, BasicSendFlow) { helloworld::HelloRequest request; request.set_name("Alice"); - auto id = buffered_client.publishId(request); - buffered_client.bufferMessage(id, request); - EXPECT_EQ(1, buffered_client.sendBufferedMessages().size()); + buffered_client.bufferMessage(request); + const auto inflight_message_ids = buffered_client.sendBufferedMessages(); + EXPECT_EQ(1, inflight_message_ids.size()); // Re-buffer, and transport. - buffered_client.onError(id); + buffered_client.onError(*inflight_message_ids.begin()); EXPECT_CALL(http_stream, sendData(_, _)).Times(2); EXPECT_CALL(http_stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); helloworld::HelloRequest request2; request2.set_name("Bob"); - auto id2 = buffered_client.publishId(request2); - buffered_client.bufferMessage(id2, request2); + buffered_client.bufferMessage(request2); auto ids2 = buffered_client.sendBufferedMessages(); EXPECT_EQ(2, ids2.size()); @@ -102,8 +101,7 @@ TEST_F(BufferedAsyncClientTest, BufferLimitExceeded) { helloworld::HelloRequest request; request.set_name("Alice"); - auto id = buffered_client.publishId(request); - buffered_client.bufferMessage(id, request); + buffered_client.bufferMessage(request); EXPECT_EQ(0, buffered_client.sendBufferedMessages().size()); } @@ -125,8 +123,7 @@ TEST_F(BufferedAsyncClientTest, BufferHighWatermarkTest) { helloworld::HelloRequest request; request.set_name("Alice"); - auto id = buffered_client.publishId(request); - buffered_client.bufferMessage(id, request); + buffered_client.bufferMessage(request); EXPECT_EQ(0, buffered_client.sendBufferedMessages().size()); } From c43a12892d948b3250c0f4ec6e0b996123cd9786 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Thu, 30 Sep 2021 03:48:45 +0000 Subject: [PATCH 06/13] more comment Signed-off-by: Shikugawa --- source/common/grpc/buffered_async_client.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/common/grpc/buffered_async_client.h b/source/common/grpc/buffered_async_client.h index 8da230701e096..13b938e34be2f 100644 --- a/source/common/grpc/buffered_async_client.h +++ b/source/common/grpc/buffered_async_client.h @@ -88,6 +88,8 @@ template class BufferedAsyncClient { private: void erasePendingMessage(uint64_t message_id) { + // This case will be considered if `onSuccess` had called with message id that is not received + // by envoy as response. if (message_buffer_.find(message_id) == message_buffer_.end()) { return; } From 680eff82c8c776e55052836e4fd04cab257586f0 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Fri, 8 Oct 2021 09:42:11 +0000 Subject: [PATCH 07/13] fix Signed-off-by: Shikugawa --- source/common/grpc/buffered_async_client.h | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/source/common/grpc/buffered_async_client.h b/source/common/grpc/buffered_async_client.h index 13b938e34be2f..f3f8277f11a3b 100644 --- a/source/common/grpc/buffered_async_client.h +++ b/source/common/grpc/buffered_async_client.h @@ -22,7 +22,11 @@ template class BufferedAsyncClient { : max_buffer_bytes_(max_buffer_bytes), service_method_(service_method), callbacks_(callbacks), client_(client) {} - virtual ~BufferedAsyncClient() { cleanup(); } + ~BufferedAsyncClient() { + if (active_stream_ != nullptr) { + active_stream_ = nullptr; + } + } void bufferMessage(RequestType& message) { const auto buffer_size = message.ByteSizeLong(); @@ -74,15 +78,9 @@ template class BufferedAsyncClient { message_buffer_.at(message_id).first = BufferState::Buffered; } - void cleanup() { - if (active_stream_ != nullptr) { - active_stream_ = nullptr; - } - } - bool hasActiveStream() { return active_stream_ != nullptr; } - const absl::flat_hash_map>& messageBuffer() { + const absl::btree_map>& messageBuffer() { return message_buffer_; } From 952122ee5dec1e40bcc8da3a0da20fdfb95c7423 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Fri, 8 Oct 2021 11:25:35 +0000 Subject: [PATCH 08/13] fix Signed-off-by: Shikugawa --- test/common/grpc/buffered_async_client_test.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/common/grpc/buffered_async_client_test.cc b/test/common/grpc/buffered_async_client_test.cc index cb6dd554096e2..b5ea21a908803 100644 --- a/test/common/grpc/buffered_async_client_test.cc +++ b/test/common/grpc/buffered_async_client_test.cc @@ -59,6 +59,7 @@ TEST_F(BufferedAsyncClientTest, BasicSendFlow) { request.set_name("Alice"); buffered_client.bufferMessage(request); const auto inflight_message_ids = buffered_client.sendBufferedMessages(); + EXPECT_TRUE(buffered_client.hasActiveStream()); EXPECT_EQ(1, inflight_message_ids.size()); // Re-buffer, and transport. @@ -104,6 +105,7 @@ TEST_F(BufferedAsyncClientTest, BufferLimitExceeded) { buffered_client.bufferMessage(request); EXPECT_EQ(0, buffered_client.sendBufferedMessages().size()); + EXPECT_TRUE(buffered_client.hasActiveStream()); } TEST_F(BufferedAsyncClientTest, BufferHighWatermarkTest) { @@ -126,6 +128,7 @@ TEST_F(BufferedAsyncClientTest, BufferHighWatermarkTest) { buffered_client.bufferMessage(request); EXPECT_EQ(0, buffered_client.sendBufferedMessages().size()); + EXPECT_TRUE(buffered_client.hasActiveStream()); } } // namespace From f00e8914a0df7796092d8c331e62ed6f330a3063 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Wed, 13 Oct 2021 17:44:49 +0900 Subject: [PATCH 09/13] Kick CI Signed-off-by: Shikugawa From 10ec8c5c623830354f23e698acd2aa3d35e30c81 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Tue, 2 Nov 2021 17:45:08 +0900 Subject: [PATCH 10/13] test Signed-off-by: Shikugawa From 4acfe2dd38cc933804f250acd2189e6f98395254 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Fri, 5 Nov 2021 13:04:51 +0900 Subject: [PATCH 11/13] fix Signed-off-by: Shikugawa --- source/common/grpc/buffered_async_client.h | 7 +++++-- test/common/grpc/buffered_async_client_test.cc | 10 ++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/source/common/grpc/buffered_async_client.h b/source/common/grpc/buffered_async_client.h index f3f8277f11a3b..55319e5069be9 100644 --- a/source/common/grpc/buffered_async_client.h +++ b/source/common/grpc/buffered_async_client.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "source/common/grpc/typed_async_client.h" #include "source/common/protobuf/utility.h" @@ -28,15 +30,16 @@ template class BufferedAsyncClient { } } - void bufferMessage(RequestType& message) { + absl::optional bufferMessage(RequestType& message) { const auto buffer_size = message.ByteSizeLong(); if (current_buffer_bytes_ + buffer_size > max_buffer_bytes_) { - return; + return absl::nullopt; } auto id = publishId(); message_buffer_[id] = std::make_pair(BufferState::Buffered, message); current_buffer_bytes_ += buffer_size; + return id; } absl::flat_hash_set sendBufferedMessages() { diff --git a/test/common/grpc/buffered_async_client_test.cc b/test/common/grpc/buffered_async_client_test.cc index b5ea21a908803..e8e27497c77d7 100644 --- a/test/common/grpc/buffered_async_client_test.cc +++ b/test/common/grpc/buffered_async_client_test.cc @@ -1,3 +1,5 @@ +#include + #include "envoy/config/core/v3/grpc_service.pb.h" #include "source/common/grpc/async_client_impl.h" @@ -57,7 +59,7 @@ TEST_F(BufferedAsyncClientTest, BasicSendFlow) { helloworld::HelloRequest request; request.set_name("Alice"); - buffered_client.bufferMessage(request); + EXPECT_EQ(0, buffered_client.bufferMessage(request).value()); const auto inflight_message_ids = buffered_client.sendBufferedMessages(); EXPECT_TRUE(buffered_client.hasActiveStream()); EXPECT_EQ(1, inflight_message_ids.size()); @@ -70,7 +72,7 @@ TEST_F(BufferedAsyncClientTest, BasicSendFlow) { helloworld::HelloRequest request2; request2.set_name("Bob"); - buffered_client.bufferMessage(request2); + EXPECT_EQ(1, buffered_client.bufferMessage(request2).value()); auto ids2 = buffered_client.sendBufferedMessages(); EXPECT_EQ(2, ids2.size()); @@ -102,7 +104,7 @@ TEST_F(BufferedAsyncClientTest, BufferLimitExceeded) { helloworld::HelloRequest request; request.set_name("Alice"); - buffered_client.bufferMessage(request); + EXPECT_EQ(absl::nullopt, buffered_client.bufferMessage(request)); EXPECT_EQ(0, buffered_client.sendBufferedMessages().size()); EXPECT_TRUE(buffered_client.hasActiveStream()); @@ -125,7 +127,7 @@ TEST_F(BufferedAsyncClientTest, BufferHighWatermarkTest) { helloworld::HelloRequest request; request.set_name("Alice"); - buffered_client.bufferMessage(request); + EXPECT_EQ(0, buffered_client.bufferMessage(request).value()); EXPECT_EQ(0, buffered_client.sendBufferedMessages().size()); EXPECT_TRUE(buffered_client.hasActiveStream()); From a4736665d2a78e72aaf8d294d6204d63e19f3f85 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Fri, 5 Nov 2021 14:35:29 +0900 Subject: [PATCH 12/13] fix Signed-off-by: Shikugawa --- test/common/grpc/buffered_async_client_test.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/common/grpc/buffered_async_client_test.cc b/test/common/grpc/buffered_async_client_test.cc index e8e27497c77d7..f8d23a8477686 100644 --- a/test/common/grpc/buffered_async_client_test.cc +++ b/test/common/grpc/buffered_async_client_test.cc @@ -1,5 +1,3 @@ -#include - #include "envoy/config/core/v3/grpc_service.pb.h" #include "source/common/grpc/async_client_impl.h" From a7df1b1329fc7078b6206fd90f9eef857e1eb2f5 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Tue, 9 Nov 2021 16:00:49 +0900 Subject: [PATCH 13/13] fix Signed-off-by: Shikugawa --- source/common/grpc/buffered_async_client.h | 6 ++++-- test/common/grpc/buffered_async_client_test.cc | 5 +++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/source/common/grpc/buffered_async_client.h b/source/common/grpc/buffered_async_client.h index 55319e5069be9..7c04673429429 100644 --- a/source/common/grpc/buffered_async_client.h +++ b/source/common/grpc/buffered_async_client.h @@ -30,6 +30,8 @@ template class BufferedAsyncClient { } } + // It push message into internal message buffer. + // If the buffer is full, it will return absl::nullopt. absl::optional bufferMessage(RequestType& message) { const auto buffer_size = message.ByteSizeLong(); if (current_buffer_bytes_ + buffer_size > max_buffer_bytes_) { @@ -89,8 +91,8 @@ template class BufferedAsyncClient { private: void erasePendingMessage(uint64_t message_id) { - // This case will be considered if `onSuccess` had called with message id that is not received - // by envoy as response. + // This case will be considered if `onSuccess` had called with unknown message id that is not + // received by envoy as response. if (message_buffer_.find(message_id) == message_buffer_.end()) { return; } diff --git a/test/common/grpc/buffered_async_client_test.cc b/test/common/grpc/buffered_async_client_test.cc index f8d23a8477686..fc025f6532a00 100644 --- a/test/common/grpc/buffered_async_client_test.cc +++ b/test/common/grpc/buffered_async_client_test.cc @@ -62,6 +62,11 @@ TEST_F(BufferedAsyncClientTest, BasicSendFlow) { EXPECT_TRUE(buffered_client.hasActiveStream()); EXPECT_EQ(1, inflight_message_ids.size()); + // Pending messages should not be re-sent. + EXPECT_CALL(http_stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); + const auto inflight_message_ids2 = buffered_client.sendBufferedMessages(); + EXPECT_EQ(0, inflight_message_ids2.size()); + // Re-buffer, and transport. buffered_client.onError(*inflight_message_ids.begin());