Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ envoy_cc_library(
hdrs = ["client.h"],
deps = [
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "envoy/common/pure.h"
#include "envoy/grpc/status.h"
#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h"
#include "envoy/stream_info/stream_info.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -34,7 +35,8 @@ class ExternalProcessorCallbacks {
class ExternalProcessorClient {
public:
virtual ~ExternalProcessorClient() = default;
virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks) PURE;
virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo& stream_info) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
10 changes: 7 additions & 3 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@ ExternalProcessorClientImpl::ExternalProcessorClientImpl(
: client_manager_(client_manager), grpc_service_(grpc_service), scope_(scope) {}

ExternalProcessorStreamPtr
ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks) {
ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo& stream_info) {
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> grpcClient(
client_manager_.getOrCreateRawAsyncClient(grpc_service_, scope_, true,
Grpc::CacheOption::AlwaysCache));
return std::make_unique<ExternalProcessorStreamImpl>(std::move(grpcClient), callbacks);
return std::make_unique<ExternalProcessorStreamImpl>(std::move(grpcClient), callbacks,
stream_info);
}

ExternalProcessorStreamImpl::ExternalProcessorStreamImpl(
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks)
ExternalProcessorCallbacks& callbacks, const StreamInfo::StreamInfo& stream_info)
: callbacks_(callbacks) {
client_ = std::move(client);
auto descriptor = Protobuf::DescriptorPool::generated_pool()->FindMethodByName(kExternalMethod);
grpc_context_.stream_info = &stream_info;
Http::AsyncClient::StreamOptions options;
options.setParentContext(grpc_context_);
stream_ = client_.start(*descriptor, *this, options);
}

Expand Down
7 changes: 5 additions & 2 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
const envoy::config::core::v3::GrpcService& grpc_service,
Stats::Scope& scope);

ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks) override;
ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo& stream_info) override;

private:
Grpc::AsyncClientManager& client_manager_;
Expand All @@ -40,7 +41,8 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
public Logger::Loggable<Logger::Id::filter> {
public:
ExternalProcessorStreamImpl(Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks);
ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo& stream_info);
void send(ProcessingRequest&& request, bool end_stream) override;
// Close the stream. This is idempotent and will return true if we
// actually closed it.
Expand All @@ -59,6 +61,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
ExternalProcessorCallbacks& callbacks_;
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> client_;
Grpc::AsyncStream<ProcessingRequest> stream_;
Http::AsyncClient::ParentContext grpc_context_;
bool stream_closed_ = false;
};

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Filter::StreamOpenState Filter::openStream() {
ENVOY_BUG(!processing_complete_, "openStream should not have been called");
if (!stream_) {
ENVOY_LOG(debug, "Opening gRPC stream to external processor");
stream_ = client_->start(*this);
stream_ = client_->start(*this, decoder_callbacks_->streamInfo());
stats_.streams_started_.inc();
if (processing_complete_) {
// Stream failed while starting and either onGrpcError or onGrpcClose was already called
Expand Down
14 changes: 8 additions & 6 deletions test/extensions/filters/http/ext_proc/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "test/mocks/grpc/mocks.h"
#include "test/mocks/stats/mocks.h"
#include "test/mocks/stream_info/mocks.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"
Expand Down Expand Up @@ -69,19 +70,20 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
Grpc::MockAsyncClientManager client_manager_;
Grpc::MockAsyncStream stream_;
Grpc::RawAsyncStreamCallbacks* stream_callbacks_;
testing::NiceMock<StreamInfo::MockStreamInfo> stream_info_;

testing::NiceMock<Stats::MockStore> stats_store_;
};

TEST_F(ExtProcStreamTest, OpenCloseStream) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
EXPECT_CALL(stream_, closeStream());
EXPECT_CALL(stream_, resetStream());
stream->close();
}

TEST_F(ExtProcStreamTest, SendToStream) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
// Send something and ensure that we get it. Doesn't really matter what.
EXPECT_CALL(stream_, sendMessageRaw_(_, false));
ProcessingRequest req;
Expand All @@ -92,14 +94,14 @@ TEST_F(ExtProcStreamTest, SendToStream) {
}

TEST_F(ExtProcStreamTest, SendAndClose) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
EXPECT_CALL(stream_, sendMessageRaw_(_, true));
ProcessingRequest req;
stream->send(std::move(req), true);
}

TEST_F(ExtProcStreamTest, ReceiveFromStream) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
// Send something and ensure that we get it. Doesn't really matter what.
ProcessingResponse resp;
Expand Down Expand Up @@ -129,7 +131,7 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) {
}

TEST_F(ExtProcStreamTest, StreamClosed) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand All @@ -142,7 +144,7 @@ TEST_F(ExtProcStreamTest, StreamClosed) {
}

TEST_F(ExtProcStreamTest, StreamError) {
auto stream = client_->start(*this);
auto stream = client_->start(*this, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand Down
7 changes: 5 additions & 2 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "test/mocks/network/mocks.h"
#include "test/mocks/router/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/stream_info/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/mocks/upstream/cluster_manager.h"
#include "test/test_common/printers.h"
Expand Down Expand Up @@ -57,10 +58,11 @@ class HttpFilterTest : public testing::Test {
void initialize(std::string&& yaml) {
client_ = std::make_unique<MockClient>();
route_ = std::make_shared<NiceMock<Router::MockRoute>>();
EXPECT_CALL(*client_, start(_)).WillOnce(Invoke(this, &HttpFilterTest::doStart));
EXPECT_CALL(*client_, start(_, _)).WillOnce(Invoke(this, &HttpFilterTest::doStart));
EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_));
EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_));
EXPECT_CALL(dispatcher_, createTimer_(_))
.Times(AnyNumber())
.WillRepeatedly(Invoke([this](Unused) {
Expand Down Expand Up @@ -98,7 +100,7 @@ class HttpFilterTest : public testing::Test {
}
}

ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) {
ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, testing::Unused) {
stream_callbacks_ = &callbacks;

auto stream = std::make_unique<MockStream>();
Expand Down Expand Up @@ -244,6 +246,7 @@ class HttpFilterTest : public testing::Test {
Http::MockStreamDecoderFilterCallbacks decoder_callbacks_;
Http::MockStreamEncoderFilterCallbacks encoder_callbacks_;
Router::RouteConstSharedPtr route_;
testing::NiceMock<StreamInfo::MockStreamInfo> stream_info_;
Http::TestRequestHeaderMapImpl request_headers_;
Http::TestResponseHeaderMapImpl response_headers_;
Http::TestRequestTrailerMapImpl request_trailers_;
Expand Down
3 changes: 2 additions & 1 deletion test/extensions/filters/http/ext_proc/mock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class MockClient : public ExternalProcessorClient {
public:
MockClient();
~MockClient() override;
MOCK_METHOD(ExternalProcessorStreamPtr, start, (ExternalProcessorCallbacks&));
MOCK_METHOD(ExternalProcessorStreamPtr, start,
(ExternalProcessorCallbacks&, const StreamInfo::StreamInfo& stream_info));
};

class MockStream : public ExternalProcessorStream {
Expand Down
11 changes: 8 additions & 3 deletions test/extensions/filters/http/ext_proc/ordering_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "test/mocks/network/mocks.h"
#include "test/mocks/router/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/stream_info/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/mocks/upstream/cluster_manager.h"

Expand Down Expand Up @@ -56,10 +57,11 @@ class OrderingTest : public testing::Test {
void initialize(absl::optional<std::function<void(ExternalProcessor&)>> cb) {
client_ = std::make_unique<MockClient>();
route_ = std::make_shared<NiceMock<Router::MockRoute>>();
EXPECT_CALL(*client_, start(_)).WillOnce(Invoke(this, &OrderingTest::doStart));
EXPECT_CALL(*client_, start(_, _)).WillOnce(Invoke(this, &OrderingTest::doStart));
EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_));
EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_));

ExternalProcessor proto_config;
proto_config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("ext_proc_server");
Expand All @@ -75,7 +77,8 @@ class OrderingTest : public testing::Test {
void TearDown() override { filter_->onDestroy(); }

// Called by the "start" method on the stream by the filter
virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) {
virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo&) {
stream_callbacks_ = &callbacks;
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, send(_, _)).WillRepeatedly(Invoke(this, &OrderingTest::doSend));
Expand Down Expand Up @@ -205,6 +208,7 @@ class OrderingTest : public testing::Test {
Router::RouteConstSharedPtr route_;
Http::MockStreamDecoderFilterCallbacks decoder_callbacks_;
Http::MockStreamEncoderFilterCallbacks encoder_callbacks_;
testing::NiceMock<StreamInfo::MockStreamInfo> stream_info_;
Http::TestRequestHeaderMapImpl request_headers_;
Http::TestResponseHeaderMapImpl response_headers_;
Http::TestRequestTrailerMapImpl request_trailers_;
Expand All @@ -215,7 +219,8 @@ class OrderingTest : public testing::Test {

class FastFailOrderingTest : public OrderingTest {
// All tests using this class have gRPC streams that will fail while being opened.
ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks) override {
ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const StreamInfo::StreamInfo&) override {
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, close());
callbacks.onGrpcError(Grpc::Status::Internal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class StreamingIntegrationTest : public HttpIntegrationTest,
const auto addr = Network::Test::getCanonicalLoopbackAddress(ipVersion());
const auto addr_port = Network::Utility::getAddressWithPort(*addr, test_processor_.port());
setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server", addr_port);
// Insert some extra metadata. This ensures that we are actually passing the
// "stream info" from the original HTTP request all the way down to the
// ext_proc stream.
auto* metadata = proto_config_.mutable_grpc_service()->mutable_initial_metadata()->Add();
Comment on lines +72 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand, how is this related to stream info? I would have expected this to be used when the grpc stream is created by reading the config, not sure how this would be plumbed though via StreamInfo?

metadata->set_key("x-request-id");
metadata->set_value("%REQ(x-request-id)%");

// Merge the filter.
envoy::config::listener::v3::Filter ext_proc_filter;
Expand Down Expand Up @@ -134,14 +140,15 @@ INSTANTIATE_TEST_SUITE_P(StreamingProtocols, StreamingIntegrationTest,
GRPC_CLIENT_INTEGRATION_PARAMS);

// Send a body that's larger than the buffer limit, and have the processor return immediately
// after the headers come in.
// after the headers come in. Also check the metadata in this test.
TEST_P(StreamingIntegrationTest, PostAndProcessHeadersOnly) {
uint32_t num_chunks = 150;
uint32_t chunk_size = 1000;

// This starts the gRPC server in the background. It'll be shut down when we stop the tests.
test_processor_.start(
ipVersion(), [](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ipVersion(),
[](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
// This is the same gRPC stream processing code that a "user" of ext_proc
// would write. In this case, we expect to receive a request_headers
// message, and then close the stream.
Expand All @@ -154,12 +161,20 @@ TEST_P(StreamingIntegrationTest, PostAndProcessHeadersOnly) {
stream->Write(header_resp);
// Returning here closes the stream, unless we had an ASSERT failure
// previously.
},
[](grpc::ServerContext* ctx) {
// Verify that the metadata set in the grpc client configuration
// above is actually sent to our RPC.
auto request_id = ctx->client_metadata().find("x-request-id");
ASSERT_NE(request_id, ctx->client_metadata().end());
EXPECT_EQ(request_id->second, "sent some metadata");
});

initializeConfig();
HttpIntegrationTest::initialize();
auto& encoder = sendClientRequestHeaders([num_chunks, chunk_size](Http::HeaderMap& headers) {
headers.addCopy(LowerCaseString("expect_request_size_bytes"), num_chunks * chunk_size);
headers.addCopy(LowerCaseString("x-request-id"), "sent some metadata");
});

for (uint32_t i = 0; i < num_chunks; i++) {
Expand Down
10 changes: 7 additions & 3 deletions test/extensions/filters/http/ext_proc/test_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ namespace HttpFilters {
namespace ExternalProcessing {

grpc::Status ProcessorWrapper::Process(
grpc::ServerContext*,
grpc::ServerContext* ctx,
grpc::ServerReaderWriter<envoy::service::ext_proc::v3alpha::ProcessingResponse,
envoy::service::ext_proc::v3alpha::ProcessingRequest>* stream) {
if (context_callback_) {
(*context_callback_)(ctx);
}
callback_(stream);
if (testing::Test::HasFatalFailure()) {
// This is not strictly necessary, but it may help in troubleshooting to
Expand All @@ -26,8 +29,9 @@ grpc::Status ProcessorWrapper::Process(
return grpc::Status::OK;
}

void TestProcessor::start(const Network::Address::IpVersion ip_version, ProcessingFunc cb) {
wrapper_ = std::make_unique<ProcessorWrapper>(cb);
void TestProcessor::start(const Network::Address::IpVersion ip_version, ProcessingFunc cb,
absl::optional<ContextProcessingFunc> context_cb) {
wrapper_ = std::make_unique<ProcessorWrapper>(cb, context_cb);
grpc::ServerBuilder builder;
builder.RegisterService(wrapper_.get());
builder.AddListeningPort(
Expand Down
11 changes: 9 additions & 2 deletions test/extensions/filters/http/ext_proc/test_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ using ProcessingFunc = std::function<void(
grpc::ServerReaderWriter<envoy::service::ext_proc::v3alpha::ProcessingResponse,
envoy::service::ext_proc::v3alpha::ProcessingRequest>*)>;

// An implementation of this function may be called so that a test may verify
// the gRPC context.
using ContextProcessingFunc = std::function<void(grpc::ServerContext*)>;

// An implementation of the ExternalProcessor service that may be included
// in integration tests.
class ProcessorWrapper : public envoy::service::ext_proc::v3alpha::ExternalProcessor::Service {
public:
ProcessorWrapper(ProcessingFunc& cb) : callback_(cb) {}
ProcessorWrapper(ProcessingFunc& cb, absl::optional<ContextProcessingFunc> context_cb)
: callback_(cb), context_callback_(context_cb) {}

grpc::Status
Process(grpc::ServerContext*,
Expand All @@ -35,6 +40,7 @@ class ProcessorWrapper : public envoy::service::ext_proc::v3alpha::ExternalProce

private:
ProcessingFunc callback_;
absl::optional<ContextProcessingFunc> context_callback_;
};

// This class starts a gRPC server supporting the ExternalProcessor service.
Expand All @@ -45,7 +51,8 @@ class TestProcessor {
// Start the processor listening on an ephemeral port (port 0) on the local host.
// All new streams will be delegated to the specified function. The function
// will be invoked in a background thread controlled by the gRPC server.
void start(const Network::Address::IpVersion ip_version, ProcessingFunc cb);
void start(const Network::Address::IpVersion ip_version, ProcessingFunc cb,
absl::optional<ContextProcessingFunc> context_cb = absl::nullopt);

// Stop the processor from listening once all streams are closed, and exit
// the listening threads.
Expand Down