Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions envoy/grpc/async_client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ using AsyncClientFactoryPtr = std::unique_ptr<AsyncClientFactory>;

class GrpcServiceConfigWithHashKey {
public:
GrpcServiceConfigWithHashKey() = default;

explicit GrpcServiceConfigWithHashKey(const envoy::config::core::v3::GrpcService& config)
: config_(config), pre_computed_hash_(Envoy::MessageUtil::hash(config)){};

Expand All @@ -53,9 +55,14 @@ class GrpcServiceConfigWithHashKey {

const envoy::config::core::v3::GrpcService& config() const { return config_; }

void setConfig(const envoy::config::core::v3::GrpcService g) {
config_ = g;
pre_computed_hash_ = Envoy::MessageUtil::hash(g);
}

private:
const envoy::config::core::v3::GrpcService config_;
const std::size_t pre_computed_hash_;
envoy::config::core::v3::GrpcService config_;
std::size_t pre_computed_hash_;
};

// Singleton gRPC client manager. Grpc::AsyncClientManager can be used to create per-service
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ envoy_cc_library(
name = "client_interface",
hdrs = ["client.h"],
deps = [
"//envoy/grpc:async_client_manager_interface",
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
8 changes: 5 additions & 3 deletions source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/common/pure.h"
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/grpc/status.h"
#include "envoy/service/ext_proc/v3/external_processor.pb.h"
#include "envoy/stream_info/stream_info.h"
Expand Down Expand Up @@ -38,9 +39,10 @@ class ExternalProcessorCallbacks {
class ExternalProcessorClient {
public:
virtual ~ExternalProcessorClient() = default;
virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const StreamInfo::StreamInfo& stream_info) PURE;
virtual ExternalProcessorStreamPtr
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const StreamInfo::StreamInfo& stream_info) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ ExternalProcessorClientImpl::ExternalProcessorClientImpl(Grpc::AsyncClientManage

ExternalProcessorStreamPtr
ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const StreamInfo::StreamInfo& stream_info) {
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> grpcClient(
client_manager_.getOrCreateRawAsyncClient(grpc_service, scope_, true));
client_manager_.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, scope_, true));
return ExternalProcessorStreamImpl::create(std::move(grpcClient), callbacks, stream_info);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
ExternalProcessorClientImpl(Grpc::AsyncClientManager& client_manager, Stats::Scope& scope);

ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const StreamInfo::StreamInfo& stream_info) override;

private:
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Filter::StreamOpenState Filter::openStream() {
}
if (!stream_) {
ENVOY_LOG(debug, "Opening gRPC stream to external processor");
stream_ = client_->start(*this, grpc_service_, decoder_callbacks_->streamInfo());
stream_ = client_->start(*this, config_with_hash_key_, decoder_callbacks_->streamInfo());
if (processing_complete_) {
// Stream failed while starting and either onGrpcError or onGrpcClose was already called
// Asserts that `stream_` is nullptr since it is not valid to be used any further
Expand Down Expand Up @@ -874,6 +874,7 @@ void Filter::mergePerRouteConfig() {
if (merged_config->grpcService()) {
ENVOY_LOG(trace, "Setting new GrpcService from per-route configuration");
grpc_service_ = *merged_config->grpcService();
config_with_hash_key_.setConfig(*merged_config->grpcService());
}
}

Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client,
const envoy::config::core::v3::GrpcService& grpc_service)
: config_(config), client_(std::move(client)), stats_(config->stats()),
grpc_service_(grpc_service), decoding_state_(*this, config->processingMode()),
grpc_service_(grpc_service), config_with_hash_key_(grpc_service),

@tyxia tyxia Nov 2, 2023

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DiazAlan grpc_service_ seems to be replaced by config_with_hash_key_, more specifically config_with_hash_key_.config().

Then it can be removed?

(Just noticed this when I am currently looking at ext_proc client)

decoding_state_(*this, config->processingMode()),
encoding_state_(*this, config->processingMode()) {}

const FilterConfig& config() const { return *config_; }
Expand Down Expand Up @@ -304,6 +305,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
ExtProcFilterStats stats_;
ExtProcLoggingInfo* logging_info_;
envoy::config::core::v3::GrpcService grpc_service_;
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_;

// The state of the filter on both the encoding and decoding side.
DecodingProcessorState decoding_state_;
Expand Down
16 changes: 9 additions & 7 deletions test/extensions/filters/http/ext_proc/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
protected:
void SetUp() override {
grpc_service_.mutable_envoy_grpc()->set_cluster_name("test");
config_with_hash_key_.setConfig(grpc_service_);

EXPECT_CALL(client_manager_, getOrCreateRawAsyncClient(_, _, _))
EXPECT_CALL(client_manager_, getOrCreateRawAsyncClientWithHashKey(_, _, _))
.WillOnce(Invoke(this, &ExtProcStreamTest::doFactory));

client_ =
Expand Down Expand Up @@ -67,6 +68,7 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
bool grpc_closed_ = false;

envoy::config::core::v3::GrpcService grpc_service_;
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_;
ExternalProcessorClientPtr client_;
Grpc::MockAsyncClientManager client_manager_;
Grpc::MockAsyncStream stream_;
Expand All @@ -77,14 +79,14 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
};

TEST_F(ExtProcStreamTest, OpenCloseStream) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
EXPECT_CALL(stream_, closeStream());
EXPECT_CALL(stream_, resetStream());
stream->close();
}

TEST_F(ExtProcStreamTest, SendToStream) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
// Send something and ensure that we get it. Doesn't really matter what.
EXPECT_CALL(stream_, sendMessageRaw_(_, false));
ProcessingRequest req;
Expand All @@ -95,14 +97,14 @@ TEST_F(ExtProcStreamTest, SendToStream) {
}

TEST_F(ExtProcStreamTest, SendAndClose) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
EXPECT_CALL(stream_, sendMessageRaw_(_, true));
ProcessingRequest req;
stream->send(std::move(req), true);
}

TEST_F(ExtProcStreamTest, ReceiveFromStream) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
// Send something and ensure that we get it. Doesn't really matter what.
ProcessingResponse resp;
Expand Down Expand Up @@ -132,7 +134,7 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) {
}

TEST_F(ExtProcStreamTest, StreamClosed) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand All @@ -145,7 +147,7 @@ TEST_F(ExtProcStreamTest, StreamClosed) {
}

TEST_F(ExtProcStreamTest, StreamError) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand Down
9 changes: 7 additions & 2 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,13 @@ class HttpFilterTest : public testing::Test {
}

ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
testing::Unused) {
if (final_expected_grpc_service_.has_value()) {
EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), grpc_service));
EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(),
config_with_hash_key.config()));
std::cout << final_expected_grpc_service_.value().DebugString();
std::cout << config_with_hash_key.config().DebugString();
}

stream_callbacks_ = &callbacks;
Expand Down Expand Up @@ -463,6 +466,7 @@ class HttpFilterTest : public testing::Test {
}

absl::optional<envoy::config::core::v3::GrpcService> final_expected_grpc_service_;
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_;
std::unique_ptr<MockClient> client_;
ExternalProcessorCallbacks* stream_callbacks_ = nullptr;
ProcessingRequest last_request_;
Expand Down Expand Up @@ -2462,6 +2466,7 @@ TEST_F(HttpFilterTest, ProcessingModeResponseHeadersOnlyWithoutCallingDecodeHead
cb(route_config);
}));
final_expected_grpc_service_.emplace(route_proto.overrides().grpc_service());
config_with_hash_key_.setConfig(route_proto.overrides().grpc_service());

response_headers_.addCopy(LowerCaseString(":status"), "200");
response_headers_.addCopy(LowerCaseString("content-type"), "text/plain");
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/http/ext_proc/mock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MockClient : public ExternalProcessorClient {
MockClient();
~MockClient() override;
MOCK_METHOD(ExternalProcessorStreamPtr, start,
(ExternalProcessorCallbacks&, const envoy::config::core::v3::GrpcService&,
(ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&,
const StreamInfo::StreamInfo&));
};

Expand Down
4 changes: 2 additions & 2 deletions test/extensions/filters/http/ext_proc/ordering_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class OrderingTest : public testing::Test {

// Called by the "start" method on the stream by the filter
virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService&,
const Grpc::GrpcServiceConfigWithHashKey&,
const StreamInfo::StreamInfo&) {
stream_callbacks_ = &callbacks;
auto stream = std::make_unique<MockStream>();
Expand Down Expand Up @@ -218,7 +218,7 @@ class OrderingTest : public testing::Test {
class FastFailOrderingTest : public OrderingTest {
// All tests using this class have gRPC streams that will fail while being opened.
ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService&,
const Grpc::GrpcServiceConfigWithHashKey&,
const StreamInfo::StreamInfo&) override {
callbacks.onGrpcError(Grpc::Status::Internal);
// Returns nullptr on start stream failure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ DEFINE_PROTO_FUZZER(
EXPECT_CALL(*client, start(_, _, _))
.WillRepeatedly(Invoke(
[&](ExternalProcessing::ExternalProcessorCallbacks&,
const envoy::config::core::v3::GrpcService&,
const Grpc::GrpcServiceConfigWithHashKey&,
const StreamInfo::StreamInfo&) -> ExternalProcessing::ExternalProcessorStreamPtr {
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, send(_, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MockClient : public ExternalProcessing::ExternalProcessorClient {

MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start,
(ExternalProcessing::ExternalProcessorCallbacks & callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const StreamInfo::StreamInfo& stream_info));
};

Expand Down