Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,27 @@ envoy_cc_extension(
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto",
],
)

envoy_cc_library(
name = "client_interface",
hdrs = ["client.h"],
deps = [
"//include/envoy/grpc:status",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)

envoy_cc_library(
name = "client_lib",
srcs = ["client_impl.cc"],
hdrs = ["client_impl.h"],
deps = [
":client_interface",
"//include/envoy/grpc:async_client_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/grpc:typed_async_client_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)
46 changes: 46 additions & 0 deletions source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#pragma once

#include <chrono>
#include <memory>

#include "envoy/common/pure.h"
#include "envoy/grpc/status.h"
#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

class ExternalProcessorStream {
public:
virtual ~ExternalProcessorStream() = default;
virtual void send(envoy::service::ext_proc::v3alpha::ProcessingRequest&& request,
bool end_stream) PURE;
virtual void close() PURE;
};

using ExternalProcessorStreamPtr = std::unique_ptr<ExternalProcessorStream>;

class ExternalProcessorCallbacks {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this wrapper layer give you much over the existing stream callbacks? Maybe you are planning on doing more in here later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. In general I wanted to have a clean abstraction to ease unit testing. One difference is that I don't think that we'll be needing to handle the metadata callbacks and this abstraction takes care of them for me.

public:
virtual ~ExternalProcessorCallbacks() = default;
virtual void onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& response) PURE;
virtual void onGrpcError(Grpc::Status::GrpcStatus error) PURE;
virtual void onGrpcClose() PURE;
};

class ExternalProcessorClient {
public:
virtual ~ExternalProcessorClient() = default;
virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const std::chrono::milliseconds& timeout) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
63 changes: 63 additions & 0 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include "extensions/filters/http/ext_proc/client_impl.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

static constexpr char kExternalMethod[] =
"envoy.service.ext_proc.v3alpha.ExternalProcessor.Process";

ExternalProcessorClientImpl::ExternalProcessorClientImpl(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, we're going to want to have a thread-local cache of aync clients for ext_proc similar to what is done for the gRPC access loggers and ext_authz. This is what I referenced in my comment on your doc a while back. But, I'm hoping that we can land #12598 soon to avoid you needing to have to do this explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that in the ext_authz filter and I was planning to first get the basic thing working and then add that. However I'm watching that PR and if it comes in soon, then even better.

Grpc::AsyncClientManager& client_manager,
const envoy::config::core::v3::GrpcService& grpc_service, Stats::Scope& scope) {
factory_ = client_manager.factoryForGrpcService(grpc_service, scope, true);
}

ExternalProcessorStreamPtr
ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks,
const std::chrono::milliseconds& timeout) {
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> grpcClient(factory_->create());
return std::make_unique<ExternalProcessorStreamImpl>(std::move(grpcClient), callbacks, timeout);
}

ExternalProcessorStreamImpl::ExternalProcessorStreamImpl(
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks, const std::chrono::milliseconds& timeout)
: callbacks_(callbacks) {
client_ = std::move(client);
auto descriptor = Protobuf::DescriptorPool::generated_pool()->FindMethodByName(kExternalMethod);
Http::AsyncClient::StreamOptions options;
options.setTimeout(timeout);

stream_ = client_.start(*descriptor, *this, options);
}

void ExternalProcessorStreamImpl::send(
envoy::service::ext_proc::v3alpha::ProcessingRequest&& request, bool end_stream) {
stream_.sendMessage(std::move(request), end_stream);
}

void ExternalProcessorStreamImpl::close() { stream_->closeStream(); }

void ExternalProcessorStreamImpl::onReceiveMessage(ProcessingResponsePtr&& response) {
callbacks_.onReceiveMessage(std::move(response));
}

void ExternalProcessorStreamImpl::onCreateInitialMetadata(Http::RequestHeaderMap&) {}
void ExternalProcessorStreamImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
void ExternalProcessorStreamImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}

void ExternalProcessorStreamImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
const std::string&) {
if (status == Grpc::Status::Ok) {
callbacks_.onGrpcClose();
} else {
callbacks_.onGrpcError(status);
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
66 changes: 66 additions & 0 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#pragma once

#include <chrono>
#include <memory>
#include <string>

#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h"
#include "envoy/stats/scope.h"

#include "common/grpc/typed_async_client.h"

#include "extensions/filters/http/ext_proc/client.h"

using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

using ProcessingResponsePtr = std::unique_ptr<ProcessingResponse>;

class ExternalProcessorClientImpl : public ExternalProcessorClient {
public:
ExternalProcessorClientImpl(Grpc::AsyncClientManager& client_manager,
const envoy::config::core::v3::GrpcService& grpc_service,
Stats::Scope& scope);

ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const std::chrono::milliseconds& timeout) override;

private:
Grpc::AsyncClientFactoryPtr factory_;
};

class ExternalProcessorStreamImpl : public ExternalProcessorStream,
public Grpc::AsyncStreamCallbacks<ProcessingResponse> {
public:
ExternalProcessorStreamImpl(Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks,
const std::chrono::milliseconds& timeout);
void send(ProcessingRequest&& request, bool end_stream) override;
void close() override;

// AsyncStreamCallbacks
void onReceiveMessage(ProcessingResponsePtr&& message) override;

// RawAsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override;
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override;
void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;

private:
ExternalProcessorCallbacks& callbacks_;
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> client_;
Grpc::AsyncStream<ProcessingRequest> stream_;
};

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
14 changes: 14 additions & 0 deletions test/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,17 @@ envoy_extension_cc_test(
"//test/test_common:test_runtime_lib",
],
)

envoy_extension_cc_test(
name = "client_test",
srcs = ["client_test.cc"],
extension_name = "envoy.filters.http.ext_proc",
deps = [
"//source/common/http:header_map_lib",
"//source/extensions/filters/http/ext_proc:client_lib",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/stats:stats_mocks",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
172 changes: 172 additions & 0 deletions test/extensions/filters/http/ext_proc/client_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#include "envoy/config/core/v3/grpc_service.pb.h"

#include "common/grpc/common.h"
#include "common/http/header_map_impl.h"

#include "extensions/filters/http/ext_proc/client_impl.h"

#include "test/mocks/grpc/mocks.h"
#include "test/mocks/stats/mocks.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using testing::Invoke;
using testing::Unused;

using namespace std::chrono_literals;

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {
namespace {

class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallbacks {
public:
~ExtProcStreamTest() override = default;

protected:
void SetUp() override {
envoy::config::core::v3::GrpcService service;
auto grpc = service.mutable_envoy_grpc();
grpc->set_cluster_name("test");

EXPECT_CALL(client_manager_, factoryForGrpcService(_, _, _))
.WillOnce(Invoke(this, &ExtProcStreamTest::doFactory));

client_ = std::make_unique<ExternalProcessorClientImpl>(client_manager_, service, stats_store_);
}

Grpc::AsyncClientFactoryPtr doFactory(Unused, Unused, Unused) {
auto factory = std::make_unique<Grpc::MockAsyncClientFactory>();
EXPECT_CALL(*factory, create()).WillOnce(Invoke(this, &ExtProcStreamTest::doCreate));
return factory;
}

Grpc::RawAsyncClientPtr doCreate() {
auto async_client = std::make_unique<Grpc::MockAsyncClient>();
EXPECT_CALL(*async_client,
startRaw("envoy.service.ext_proc.v3alpha.ExternalProcessor", "Process", _, _))
.WillOnce(Invoke(this, &ExtProcStreamTest::doStartRaw));
return async_client;
}

Grpc::RawAsyncStream* doStartRaw(Unused, Unused, Grpc::RawAsyncStreamCallbacks& callbacks,
const Http::AsyncClient::StreamOptions& options) {
EXPECT_GT(*options.timeout, 0ms);
stream_callbacks_ = &callbacks;
return &stream_;
}

// ExternalProcessorCallbacks
void onReceiveMessage(std::unique_ptr<ProcessingResponse>&& response) override {
last_response_ = std::move(response);
}

void onGrpcError(Grpc::Status::GrpcStatus status) override { grpc_status_ = status; }

void onGrpcClose() override { grpc_closed_ = true; }

std::unique_ptr<ProcessingResponse> last_response_;
Grpc::Status::GrpcStatus grpc_status_ = Grpc::Status::WellKnownGrpcStatus::Ok;
bool grpc_closed_ = false;

ExternalProcessorClientPtr client_;
Grpc::MockAsyncClientManager client_manager_;
Grpc::MockAsyncStream stream_;
Grpc::RawAsyncStreamCallbacks* stream_callbacks_;

testing::NiceMock<Stats::MockStore> stats_store_;
};

TEST_F(ExtProcStreamTest, OpenCloseStream) {
auto stream = client_->start(*this, 200ms);
EXPECT_CALL(stream_, closeStream()).Times(1);
stream->close();
}

TEST_F(ExtProcStreamTest, SendToStream) {
auto stream = client_->start(*this, 200ms);
// Send something and ensure that we get it. Doesn't really matter what.
EXPECT_CALL(stream_, sendMessageRaw_(_, false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots more of these in this PR, but I actually think I should just add a format check for this and stop harassing people :)

ProcessingRequest req;
stream->send(std::move(req), false);
EXPECT_CALL(stream_, closeStream()).Times(1);
stream->close();
}

TEST_F(ExtProcStreamTest, SendAndClose) {
auto stream = client_->start(*this, 200ms);
EXPECT_CALL(stream_, sendMessageRaw_(_, true)).Times(1);
ProcessingRequest req;
stream->send(std::move(req), true);
}

TEST_F(ExtProcStreamTest, ReceiveFromStream) {
auto stream = client_->start(*this, 200ms);
ASSERT_NE(stream_callbacks_, nullptr);
// Send something and ensure that we get it. Doesn't really matter what.
ProcessingResponse resp;

// These do nothing at the moment
auto empty_request_headers = Http::RequestHeaderMapImpl::create();
stream_callbacks_->onCreateInitialMetadata(*empty_request_headers);

auto empty_response_headers = Http::ResponseHeaderMapImpl::create();
stream_callbacks_->onReceiveInitialMetadata(std::move(empty_response_headers));

auto response_buf = Grpc::Common::serializeMessage(resp);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
EXPECT_EQ(grpc_status_, 0);
EXPECT_TRUE(stream_callbacks_->onReceiveMessageRaw(std::move(response_buf)));
EXPECT_TRUE(last_response_);
EXPECT_FALSE(grpc_closed_);
EXPECT_EQ(grpc_status_, 0);

auto empty_response_trailers = Http::ResponseTrailerMapImpl::create();
stream_callbacks_->onReceiveTrailingMetadata(std::move(empty_response_trailers));

EXPECT_CALL(stream_, closeStream()).Times(1);
stream->close();
}

TEST_F(ExtProcStreamTest, StreamClosed) {
auto stream = client_->start(*this, 200ms);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
EXPECT_EQ(grpc_status_, 0);
stream_callbacks_->onRemoteClose(0, "");
EXPECT_FALSE(last_response_);
EXPECT_TRUE(grpc_closed_);
EXPECT_EQ(grpc_status_, 0);

EXPECT_CALL(stream_, closeStream()).Times(1);
stream->close();
}

TEST_F(ExtProcStreamTest, StreamError) {
auto stream = client_->start(*this, 200ms);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
EXPECT_EQ(grpc_status_, 0);
stream_callbacks_->onRemoteClose(123, "Some sort of gRPC error");
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
EXPECT_EQ(grpc_status_, 123);

EXPECT_CALL(stream_, closeStream()).Times(1);
stream->close();
}

} // namespace
} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy