Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ message ExtProcPerRoute {
}

// Overrides that may be set on a per-route basis
// [#next-free-field: 6]
message ExtProcOverrides {
// Set a different processing mode for this route than the default.
ProcessingMode processing_mode = 1;
Expand All @@ -195,4 +196,7 @@ message ExtProcOverrides {
// Set different optional properties than the default setting of the
// ``response_attributes`` field.
repeated string response_attributes = 4;

// Set a different gRPC service for this route than the default.
config.core.v3.GrpcService grpc_service = 5;
Comment thread
mpwarres marked this conversation as resolved.
}
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ envoy_cc_library(
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/config/common/mutation_rules/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
Expand All @@ -56,6 +57,7 @@ envoy_cc_library(
deps = [
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
)
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <memory>

#include "envoy/common/pure.h"
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/grpc/status.h"
#include "envoy/service/ext_proc/v3/external_processor.pb.h"
#include "envoy/stream_info/stream_info.h"
Expand Down Expand Up @@ -36,6 +37,7 @@ class ExternalProcessorClient {
public:
virtual ~ExternalProcessorClient() = default;
virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const StreamInfo::StreamInfo& stream_info) PURE;
};

Expand Down
10 changes: 5 additions & 5 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ namespace ExternalProcessing {

static constexpr char kExternalMethod[] = "envoy.service.ext_proc.v3.ExternalProcessor.Process";

ExternalProcessorClientImpl::ExternalProcessorClientImpl(
Grpc::AsyncClientManager& client_manager,
const envoy::config::core::v3::GrpcService& grpc_service, Stats::Scope& scope)
: client_manager_(client_manager), grpc_service_(grpc_service), scope_(scope) {}
ExternalProcessorClientImpl::ExternalProcessorClientImpl(Grpc::AsyncClientManager& client_manager,
Stats::Scope& scope)
: client_manager_(client_manager), scope_(scope) {}

ExternalProcessorStreamPtr
ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const StreamInfo::StreamInfo& stream_info) {
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> grpcClient(
client_manager_.getOrCreateRawAsyncClient(grpc_service_, scope_, true,
client_manager_.getOrCreateRawAsyncClient(grpc_service, scope_, true,
Grpc::CacheOption::AlwaysCache));
return std::make_unique<ExternalProcessorStreamImpl>(std::move(grpcClient), callbacks,
stream_info);
Expand Down
6 changes: 2 additions & 4 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ using ProcessingResponsePtr = std::unique_ptr<ProcessingResponse>;

class ExternalProcessorClientImpl : public ExternalProcessorClient {
public:
ExternalProcessorClientImpl(Grpc::AsyncClientManager& client_manager,
const envoy::config::core::v3::GrpcService& grpc_service,
Stats::Scope& scope);
ExternalProcessorClientImpl(Grpc::AsyncClientManager& client_manager, Stats::Scope& scope);

ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const StreamInfo::StreamInfo& stream_info) override;

private:
Grpc::AsyncClientManager& client_manager_;
const envoy::config::core::v3::GrpcService grpc_service_;
Stats::Scope& scope_;
};

Expand Down
6 changes: 3 additions & 3 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro
return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), grpc_service, context.scope());
context.clusterManager().grpcAsyncClientManager(), context.scope());

callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
callbacks.addStreamFilter(Http::StreamFilterSharedPtr{
std::make_shared<Filter>(filter_config, std::move(client), grpc_service)});
};
}

Expand Down
41 changes: 27 additions & 14 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config)
if (config.has_overrides()) {
processing_mode_ = config.overrides().processing_mode();
}
if (config.overrides().has_grpc_service()) {
grpc_service_ = config.overrides().grpc_service();
}
}

void FilterConfigPerRoute::merge(const FilterConfigPerRoute& src) {
disabled_ = src.disabled_;
processing_mode_ = src.processing_mode_;
if (src.grpcService()) {
Comment thread
mpwarres marked this conversation as resolved.
Outdated
grpc_service_ = src.grpcService();
}
}

void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
Expand All @@ -82,7 +88,7 @@ Filter::StreamOpenState Filter::openStream() {
ENVOY_BUG(!processing_complete_, "openStream should not have been called");
if (!stream_) {
ENVOY_LOG(debug, "Opening gRPC stream to external processor");
stream_ = client_->start(*this, decoder_callbacks_->streamInfo());
stream_ = client_->start(*this, grpc_service_, decoder_callbacks_->streamInfo());
stats_.streams_started_.inc();
if (processing_complete_) {
// Stream failed while starting and either onGrpcError or onGrpcClose was already called
Expand Down Expand Up @@ -705,19 +711,26 @@ void Filter::mergePerRouteConfig() {
auto&& merged_config = Http::Utility::getMergedPerFilterConfig<FilterConfigPerRoute>(
FilterName, decoder_callbacks_->route(),
[](FilterConfigPerRoute& dst, const FilterConfigPerRoute& src) { dst.merge(src); });
if (merged_config) {
if (merged_config->disabled()) {
// Rather than introduce yet another flag, use the processing mode
// structure to disable all the callbacks.
ENVOY_LOG(trace, "Disabling filter due to per-route configuration");
const auto all_disabled = allDisabledMode();
decoding_state_.setProcessingMode(all_disabled);
encoding_state_.setProcessingMode(all_disabled);
} else if (merged_config->processingMode()) {
ENVOY_LOG(trace, "Setting new processing mode from per-route configuration");
decoding_state_.setProcessingMode(*(merged_config->processingMode()));
encoding_state_.setProcessingMode(*(merged_config->processingMode()));
}
if (!merged_config) {
return;
}
if (merged_config->disabled()) {
// Rather than introduce yet another flag, use the processing mode
// structure to disable all the callbacks.
ENVOY_LOG(trace, "Disabling filter due to per-route configuration");
const auto all_disabled = allDisabledMode();
decoding_state_.setProcessingMode(all_disabled);
encoding_state_.setProcessingMode(all_disabled);
return;
}
if (merged_config->processingMode()) {
ENVOY_LOG(trace, "Setting new processing mode from per-route configuration");
decoding_state_.setProcessingMode(*(merged_config->processingMode()));
encoding_state_.setProcessingMode(*(merged_config->processingMode()));
}
if (merged_config->grpcService()) {
ENVOY_LOG(trace, "Setting new GrpcService from per-route configuration");
grpc_service_ = *merged_config->grpcService();
Comment thread
htuch marked this conversation as resolved.
}
}

Expand Down
11 changes: 9 additions & 2 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <memory>
#include <string>

#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/event/timer.h"
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h"
#include "envoy/grpc/async_client.h"
Expand Down Expand Up @@ -90,10 +91,14 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig {
processingMode() const {
return processing_mode_;
}
const absl::optional<envoy::config::core::v3::GrpcService>& grpcService() const {
return grpc_service_;
}

private:
bool disabled_;
absl::optional<envoy::extensions::filters::http::ext_proc::v3::ProcessingMode> processing_mode_;
absl::optional<envoy::config::core::v3::GrpcService> grpc_service_;
};

class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
Expand All @@ -112,9 +117,10 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
};

public:
Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client)
Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client,
const envoy::config::core::v3::GrpcService& grpc_service)
: config_(config), client_(std::move(client)), stats_(config->stats()),
decoding_state_(*this, config->processingMode()),
grpc_service_(grpc_service), decoding_state_(*this, config->processingMode()),
encoding_state_(*this, config->processingMode()) {}

const FilterConfig& config() const { return *config_; }
Expand Down Expand Up @@ -175,6 +181,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;
ExtProcFilterStats stats_;
envoy::config::core::v3::GrpcService grpc_service_;

// The state of the filter on both the encoding and decoding side.
DecodingProcessorState decoding_state_;
Expand Down
2 changes: 2 additions & 0 deletions test/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ envoy_extension_cc_test(
"//test/mocks/event:event_mocks",
"//test/mocks/server:factory_context_mocks",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
)
Expand All @@ -65,6 +66,7 @@ envoy_extension_cc_test(
"//test/mocks/event:event_mocks",
"//test/mocks/server:factory_context_mocks",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

Expand Down
19 changes: 9 additions & 10 deletions test/extensions/filters/http/ext_proc/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback

protected:
void SetUp() override {
envoy::config::core::v3::GrpcService service;
auto grpc = service.mutable_envoy_grpc();
grpc->set_cluster_name("test");
grpc_service_.mutable_envoy_grpc()->set_cluster_name("test");

EXPECT_CALL(client_manager_, getOrCreateRawAsyncClient(_, _, _, _))
.WillOnce(Invoke(this, &ExtProcStreamTest::doFactory));

client_ = std::make_unique<ExternalProcessorClientImpl>(client_manager_, service, stats_store_);
client_ = std::make_unique<ExternalProcessorClientImpl>(client_manager_, stats_store_);
}

Grpc::RawAsyncClientSharedPtr doFactory(Unused, Unused, Unused, Unused) {
Expand Down Expand Up @@ -66,6 +64,7 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
Grpc::Status::GrpcStatus grpc_status_ = Grpc::Status::WellKnownGrpcStatus::Ok;
bool grpc_closed_ = false;

envoy::config::core::v3::GrpcService grpc_service_;
ExternalProcessorClientPtr client_;
Grpc::MockAsyncClientManager client_manager_;
Grpc::MockAsyncStream stream_;
Expand All @@ -76,14 +75,14 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
};

TEST_F(ExtProcStreamTest, OpenCloseStream) {
auto stream = client_->start(*this, stream_info_);
auto stream = client_->start(*this, grpc_service_, stream_info_);
EXPECT_CALL(stream_, closeStream());
EXPECT_CALL(stream_, resetStream());
stream->close();
}

TEST_F(ExtProcStreamTest, SendToStream) {
auto stream = client_->start(*this, stream_info_);
auto stream = client_->start(*this, grpc_service_, stream_info_);
// Send something and ensure that we get it. Doesn't really matter what.
EXPECT_CALL(stream_, sendMessageRaw_(_, false));
ProcessingRequest req;
Expand All @@ -94,14 +93,14 @@ TEST_F(ExtProcStreamTest, SendToStream) {
}

TEST_F(ExtProcStreamTest, SendAndClose) {
auto stream = client_->start(*this, stream_info_);
auto stream = client_->start(*this, grpc_service_, stream_info_);
EXPECT_CALL(stream_, sendMessageRaw_(_, true));
ProcessingRequest req;
stream->send(std::move(req), true);
}

TEST_F(ExtProcStreamTest, ReceiveFromStream) {
auto stream = client_->start(*this, stream_info_);
auto stream = client_->start(*this, grpc_service_, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
// Send something and ensure that we get it. Doesn't really matter what.
ProcessingResponse resp;
Expand Down Expand Up @@ -131,7 +130,7 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) {
}

TEST_F(ExtProcStreamTest, StreamClosed) {
auto stream = client_->start(*this, stream_info_);
auto stream = client_->start(*this, grpc_service_, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand All @@ -144,7 +143,7 @@ TEST_F(ExtProcStreamTest, StreamClosed) {
}

TEST_F(ExtProcStreamTest, StreamError) {
auto stream = client_->start(*this, stream_info_);
auto stream = client_->start(*this, grpc_service_, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand Down
Loading