Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions test/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ envoy_extension_cc_test_library(
hdrs = ["test_processor.h"],
extension_names = ["envoy.filters.http.ext_proc"],
deps = [
"//envoy/network:address_interface",
"//test/test_common:network_utility_lib",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_grpc",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
Expand Down
13 changes: 4 additions & 9 deletions test/extensions/filters/http/ext_proc/ext_proc_grpc_fuzz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,9 @@ class ExtProcIntegrationFuzz : public HttpIntegrationTest,
ConfigHelper::setHttp2(*processor_cluster);

// Make sure both flavors of gRPC client use the right address.
if (ipVersion() == Network::Address::IpVersion::v4) {
const auto addr = std::make_shared<Network::Address::Ipv4Instance>(
Network::Test::getLoopbackAddressString(ipVersion()), test_processor_.port());
setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server", addr);
} else {
const auto addr = std::make_shared<Network::Address::Ipv6Instance>(
Network::Test::getLoopbackAddressString(ipVersion()), test_processor_.port());
setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server", addr);
}
const auto addr = Network::Test::getCanonicalLoopbackAddress(ipVersion());
const auto addr_port = Network::Utility::getAddressWithPort(*addr, test_processor_.port());
setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server", addr_port);

// Merge the filter.
envoy::config::listener::v3::Filter ext_proc_filter;
Expand Down Expand Up @@ -252,6 +246,7 @@ DEFINE_FUZZER(const uint8_t* buf, size_t len) {
// external process to consume messages in a loop without blocking the fuzz
// target from receiving the response.
fuzzer.test_processor_.start(
fuzzer.ip_version_,
[&fuzz_helper](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
while (true) {
ProcessingRequest req;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class StreamingIntegrationTest : public HttpIntegrationTest,
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
address->set_address("127.0.0.1");
address->set_address(Network::Test::getLoopbackAddressString(ipVersion()));
address->set_port_value(test_processor_.port());

// Ensure "HTTP2 with no prior knowledge." Necessary for gRPC.
Expand All @@ -66,9 +66,9 @@ class StreamingIntegrationTest : public HttpIntegrationTest,
ConfigHelper::setHttp2(*processor_cluster);

// Make sure both flavors of gRPC client use the right address.
const auto addr =
std::make_shared<Network::Address::Ipv4Instance>("127.0.0.1", test_processor_.port());
setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server", addr);
const auto addr = Network::Test::getCanonicalLoopbackAddress(ipVersion());
const auto addr_port = Network::Utility::getAddressWithPort(*addr, test_processor_.port());
setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server", addr_port);

// Merge the filter.
envoy::config::listener::v3::Filter ext_proc_filter;
Expand Down Expand Up @@ -141,7 +141,7 @@ TEST_P(StreamingIntegrationTest, PostAndProcessHeadersOnly) {

// This starts the gRPC server in the background. It'll be shut down when we stop the tests.
test_processor_.start(
[](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ipVersion(), [](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
// This is the same gRPC stream processing code that a "user" of ext_proc
// would write. In this case, we expect to receive a request_headers
// message, and then close the stream.
Expand Down Expand Up @@ -183,6 +183,7 @@ TEST_P(StreamingIntegrationTest, PostAndProcessBufferedRequestBody) {
uint32_t total_size = num_chunks * chunk_size;

test_processor_.start(
ipVersion(),
[total_size](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
Expand Down Expand Up @@ -223,6 +224,7 @@ TEST_P(StreamingIntegrationTest, PostAndProcessStreamedRequestBody) {
uint32_t total_size = num_chunks * chunk_size;

test_processor_.start(
ipVersion(),
[total_size](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
// Expect a request_headers message as the first message on the stream,
// and send back an empty response.
Expand Down Expand Up @@ -274,7 +276,7 @@ TEST_P(StreamingIntegrationTest, PostAndProcessStreamedRequestBodyPartially) {
uint32_t total_size = num_chunks * chunk_size;

test_processor_.start(
[](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ipVersion(), [](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
ASSERT_TRUE(header_req.has_request_headers());
Expand Down Expand Up @@ -332,6 +334,7 @@ TEST_P(StreamingIntegrationTest, PostAndProcessStreamedRequestBodyAndClose) {
uint32_t total_size = num_chunks * chunk_size;

test_processor_.start(
ipVersion(),
[total_size](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
Expand Down Expand Up @@ -372,6 +375,7 @@ TEST_P(StreamingIntegrationTest, GetAndProcessBufferedResponseBody) {
uint32_t response_size = 90000;

test_processor_.start(
ipVersion(),
[response_size](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
Expand Down Expand Up @@ -409,8 +413,8 @@ TEST_P(StreamingIntegrationTest, GetAndProcessStreamedResponseBody) {
uint32_t response_size = 170000;

test_processor_.start(
[this,
response_size](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ipVersion(), [this, response_size](
grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
ASSERT_TRUE(header_req.has_request_headers());
Expand Down Expand Up @@ -467,8 +471,8 @@ TEST_P(StreamingIntegrationTest, PostAndProcessStreamBothBodies) {
uint32_t response_size = 1700000;

test_processor_.start(
[this, request_size,
response_size](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ipVersion(), [this, request_size, response_size](
grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
ASSERT_TRUE(header_req.has_request_headers());
Expand Down Expand Up @@ -554,7 +558,7 @@ TEST_P(StreamingIntegrationTest, PostAndStreamAndTransformBothBodies) {
uint32_t response_size = 180000;

test_processor_.start(
[](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ipVersion(), [](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
ASSERT_TRUE(header_req.has_request_headers());
Expand Down Expand Up @@ -631,7 +635,7 @@ TEST_P(StreamingIntegrationTest, PostAndProcessBufferedRequestBodyTooBig) {
uint32_t total_size = num_chunks * chunk_size;

test_processor_.start(
[](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ipVersion(), [](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
ASSERT_TRUE(header_req.has_request_headers());
Expand Down Expand Up @@ -667,6 +671,7 @@ TEST_P(StreamingIntegrationTest, PostAndProcessBufferedPartialRequestBody) {
uint32_t total_size = num_chunks * chunk_size;

test_processor_.start(
ipVersion(),
[total_size](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
Expand Down Expand Up @@ -709,6 +714,7 @@ TEST_P(StreamingIntegrationTest, PostAndProcessBufferedPartialBigRequestBody) {
uint32_t total_size = num_chunks * chunk_size;

test_processor_.start(
ipVersion(),
[total_size](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
Expand Down
9 changes: 7 additions & 2 deletions test/extensions/filters/http/ext_proc/test_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h"

#include "test/test_common/network_utility.h"

#include "absl/strings/str_format.h"
#include "grpc++/server_builder.h"

namespace Envoy {
Expand All @@ -23,11 +26,13 @@ grpc::Status ProcessorWrapper::Process(
return grpc::Status::OK;
}

void TestProcessor::start(ProcessingFunc cb) {
void TestProcessor::start(const Network::Address::IpVersion ip_version, ProcessingFunc cb) {
wrapper_ = std::make_unique<ProcessorWrapper>(cb);
grpc::ServerBuilder builder;
builder.RegisterService(wrapper_.get());
builder.AddListeningPort("127.0.0.1:0", grpc::InsecureServerCredentials(), &listening_port_);
builder.AddListeningPort(
absl::StrFormat("%s:0", Network::Test::getLoopbackAddressUrlString(ip_version)),
grpc::InsecureServerCredentials(), &listening_port_);
server_ = builder.BuildAndStart();
}

Expand Down
5 changes: 3 additions & 2 deletions test/extensions/filters/http/ext_proc/test_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>
#include <memory>

#include "envoy/network/address.h"
#include "envoy/service/ext_proc/v3alpha/external_processor.grpc.pb.h"
#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h"

Expand Down Expand Up @@ -41,10 +42,10 @@ class ProcessorWrapper : public envoy::service::ext_proc::v3alpha::ExternalProce
// use ASSERT_ and EXPECT_ macros to validate test results.
class TestProcessor {
public:
// Start the processor listening on an ephemeral port (port 0) on 127.0.0.1.
// Start the processor listening on an ephemeral port (port 0) on the local host.
// All new streams will be delegated to the specified function. The function
// will be invoked in a background thread controlled by the gRPC server.
void start(ProcessingFunc cb);
void start(const Network::Address::IpVersion ip_version, ProcessingFunc cb);

// Stop the processor from listening once all streams are closed, and exit
// the listening threads.
Expand Down