Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions test/common/grpc/grpc_client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ TEST_P(GrpcClientIntegrationTest, ResetAfterCloseLocal) {
initialize();
auto stream = createStream(empty_metadata_);
stream->grpc_stream_->closeStream();
stream->fake_stream_->waitForEndStream(dispatcher_helper_.dispatcher_);
ASSERT_TRUE(stream->fake_stream_->waitForEndStream(dispatcher_helper_.dispatcher_));
stream->grpc_stream_->resetStream();
dispatcher_helper_.dispatcher_.run(Event::Dispatcher::RunType::NonBlock);
stream->fake_stream_->waitForReset();
ASSERT_TRUE(stream->fake_stream_->waitForReset());
}

// Validate that request cancel() works.
Expand All @@ -323,7 +323,7 @@ TEST_P(GrpcClientIntegrationTest, CancelRequest) {
EXPECT_CALL(*request->child_span_, finishSpan());
request->grpc_request_->cancel();
dispatcher_helper_.dispatcher_.run(Event::Dispatcher::RunType::NonBlock);
request->fake_stream_->waitForReset();
ASSERT_TRUE(request->fake_stream_->waitForReset());
}

// Parameterize the loopback test server socket address and gRPC client type.
Expand Down Expand Up @@ -352,7 +352,8 @@ TEST_P(GrpcSslClientIntegrationTest, BasicSslRequestWithClientCert) {
class GrpcAccessTokenClientIntegrationTest : public GrpcSslClientIntegrationTest {
public:
void expectExtraHeaders(FakeStream& fake_stream) override {
fake_stream.waitForHeadersComplete();
AssertionResult result = fake_stream.waitForHeadersComplete();
RELEASE_ASSERT(result, result.message());
Http::TestHeaderMapImpl stream_headers(fake_stream.headers());
if (access_token_value_ != "") {
if (access_token_value_2_ == "") {
Expand Down
33 changes: 23 additions & 10 deletions test/common/grpc/grpc_client_integration_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ class HelloworldStream : public MockAsyncStreamCallbacks<helloworld::HelloReply>
grpc_stream_->sendMessage(request_msg, end_stream);

helloworld::HelloRequest received_msg;
fake_stream_->waitForGrpcMessage(dispatcher_helper_.dispatcher_, received_msg);
AssertionResult result =
fake_stream_->waitForGrpcMessage(dispatcher_helper_.dispatcher_, received_msg);
RELEASE_ASSERT(result, result.message());
EXPECT_THAT(request_msg, ProtoEq(received_msg));
}

Expand Down Expand Up @@ -162,7 +164,8 @@ class HelloworldStream : public MockAsyncStreamCallbacks<helloworld::HelloReply>

void closeStream() {
grpc_stream_->closeStream();
fake_stream_->waitForEndStream(dispatcher_helper_.dispatcher_);
AssertionResult result = fake_stream_->waitForEndStream(dispatcher_helper_.dispatcher_);
RELEASE_ASSERT(result, result.message());
}

DispatcherHelper& dispatcher_helper_;
Expand Down Expand Up @@ -224,8 +227,10 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {

void TearDown() override {
if (fake_connection_) {
fake_connection_->close();
fake_connection_->waitForDisconnect();
AssertionResult result = fake_connection_->close();
RELEASE_ASSERT(result, result.message());
result = fake_connection_->waitForDisconnect();
RELEASE_ASSERT(result, result.message());
fake_connection_.reset();
}
}
Expand Down Expand Up @@ -291,7 +296,8 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {
}

void expectInitialHeaders(FakeStream& fake_stream, const TestMetadata& initial_metadata) {
fake_stream.waitForHeadersComplete();
AssertionResult result = fake_stream.waitForHeadersComplete();
RELEASE_ASSERT(result, result.message());
Http::TestHeaderMapImpl stream_headers(fake_stream.headers());
EXPECT_EQ("POST", stream_headers.get_(":method"));
EXPECT_EQ("/helloworld.Greeter/SayHello", stream_headers.get_(":path"));
Expand Down Expand Up @@ -333,17 +339,21 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {
EXPECT_NE(request->grpc_request_, nullptr);

if (!fake_connection_) {
fake_connection_ = fake_upstream_->waitForHttpConnection(dispatcher_);
AssertionResult result = fake_upstream_->waitForHttpConnection(dispatcher_, fake_connection_);
RELEASE_ASSERT(result, result.message());
}
fake_streams_.push_back(fake_connection_->waitForNewStream(dispatcher_));
fake_streams_.emplace_back();
AssertionResult result = fake_connection_->waitForNewStream(dispatcher_, fake_streams_.back());
RELEASE_ASSERT(result, result.message());
auto& fake_stream = *fake_streams_.back();
request->fake_stream_ = &fake_stream;

expectInitialHeaders(fake_stream, initial_metadata);
expectExtraHeaders(fake_stream);

helloworld::HelloRequest received_msg;
fake_stream.waitForGrpcMessage(dispatcher_, received_msg);
result = fake_stream.waitForGrpcMessage(dispatcher_, received_msg);
RELEASE_ASSERT(result, result.message());
EXPECT_THAT(request_msg, ProtoEq(received_msg));

return request;
Expand All @@ -362,9 +372,12 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {
EXPECT_NE(stream->grpc_stream_, nullptr);

if (!fake_connection_) {
fake_connection_ = fake_upstream_->waitForHttpConnection(dispatcher_);
AssertionResult result = fake_upstream_->waitForHttpConnection(dispatcher_, fake_connection_);
RELEASE_ASSERT(result, result.message());
}
fake_streams_.push_back(fake_connection_->waitForNewStream(dispatcher_));
fake_streams_.emplace_back();
AssertionResult result = fake_connection_->waitForNewStream(dispatcher_, fake_streams_.back());
RELEASE_ASSERT(result, result.message());
auto& fake_stream = *fake_streams_.back();
stream->fake_stream_ = &fake_stream;

Expand Down
1 change: 1 addition & 0 deletions test/extensions/access_loggers/http_grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ envoy_extension_cc_test(
"//source/extensions/access_loggers/http_grpc:config",
"//test/common/grpc:grpc_client_integration_lib",
"//test/integration:http_integration_lib",
"//test/test_common:utility_lib",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

#include "test/common/grpc/grpc_client_integration.h"
#include "test/integration/http_integration.h"
#include "test/test_common/utility.h"

#include "gtest/gtest.h"

using testing::AssertionResult;

namespace Envoy {
namespace {

Expand Down Expand Up @@ -49,17 +52,20 @@ class AccessLogIntegrationTest : public HttpIntegrationTest,
HttpIntegrationTest::initialize();
}

void waitForAccessLogConnection() {
fake_access_log_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_);
ABSL_MUST_USE_RESULT
AssertionResult waitForAccessLogConnection() {
return fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_access_log_connection_);
}

void waitForAccessLogStream() {
access_log_request_ = fake_access_log_connection_->waitForNewStream(*dispatcher_);
ABSL_MUST_USE_RESULT
AssertionResult waitForAccessLogStream() {
return fake_access_log_connection_->waitForNewStream(*dispatcher_, access_log_request_);
}

void waitForAccessLogRequest(const std::string& expected_request_msg_yaml) {
ABSL_MUST_USE_RESULT
AssertionResult waitForAccessLogRequest(const std::string& expected_request_msg_yaml) {
envoy::service::accesslog::v2::StreamAccessLogsMessage request_msg;
access_log_request_->waitForGrpcMessage(*dispatcher_, request_msg);
VERIFY_ASSERTION(access_log_request_->waitForGrpcMessage(*dispatcher_, request_msg));
EXPECT_STREQ("POST", access_log_request_->headers().Method()->value().c_str());
EXPECT_STREQ("/envoy.service.accesslog.v2.AccessLogService/StreamAccessLogs",
access_log_request_->headers().Path()->value().c_str());
Expand All @@ -78,12 +84,16 @@ class AccessLogIntegrationTest : public HttpIntegrationTest,
log_entry->mutable_common_properties()->clear_time_to_last_downstream_tx_byte();
log_entry->mutable_request()->clear_request_id();
EXPECT_EQ(request_msg.DebugString(), expected_request_msg.DebugString());

return AssertionSuccess();
}

void cleanup() {
if (fake_access_log_connection_ != nullptr) {
fake_access_log_connection_->close();
fake_access_log_connection_->waitForDisconnect();
AssertionResult result = fake_access_log_connection_->close();
RELEASE_ASSERT(result, result.message());
result = fake_access_log_connection_->waitForDisconnect();
RELEASE_ASSERT(result, result.message());
}
}

Expand All @@ -97,9 +107,9 @@ INSTANTIATE_TEST_CASE_P(IpVersionsCientType, AccessLogIntegrationTest,
// Test a basic full access logging flow.
TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) {
testRouterNotFound();
waitForAccessLogConnection();
waitForAccessLogStream();
waitForAccessLogRequest(fmt::format(R"EOF(
ASSERT_TRUE(waitForAccessLogConnection());
ASSERT_TRUE(waitForAccessLogStream());
ASSERT_TRUE(waitForAccessLogRequest(fmt::format(R"EOF(
identifier:
node:
id: node_name
Expand All @@ -124,13 +134,13 @@ TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) {
value: 404
response_headers_bytes: 54
)EOF",
VersionInfo::version()));
VersionInfo::version())));

BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest(
lookupPort("http"), "GET", "/notfound", "", downstream_protocol_, version_);
EXPECT_TRUE(response->complete());
EXPECT_STREQ("404", response->headers().Status()->value().c_str());
waitForAccessLogRequest(R"EOF(
ASSERT_TRUE(waitForAccessLogRequest(R"EOF(
http_logs:
log_entry:
common_properties:
Expand All @@ -146,7 +156,7 @@ TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) {
response_code:
value: 404
response_headers_bytes: 54
)EOF");
)EOF"));

// Send an empty response and end the stream. This should never happen but make sure nothing
// breaks and we make a new stream on a follow up request.
Expand All @@ -168,8 +178,8 @@ TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) {
downstream_protocol_, version_);
EXPECT_TRUE(response->complete());
EXPECT_STREQ("404", response->headers().Status()->value().c_str());
waitForAccessLogStream();
waitForAccessLogRequest(fmt::format(R"EOF(
ASSERT_TRUE(waitForAccessLogStream());
ASSERT_TRUE(waitForAccessLogRequest(fmt::format(R"EOF(
identifier:
node:
id: node_name
Expand All @@ -194,7 +204,7 @@ TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) {
value: 404
response_headers_bytes: 54
)EOF",
VersionInfo::version()));
VersionInfo::version())));

cleanup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ class GrpcJsonTranscoderIntegrationTest
response = codec_client_->makeHeaderOnlyRequest(request_headers);
}

fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
if (!grpc_request_messages.empty()) {
upstream_request_->waitForEndStream(*dispatcher_);
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));

Grpc::Decoder grpc_decoder;
std::vector<Grpc::Frame> frames;
Expand Down Expand Up @@ -114,7 +114,7 @@ class GrpcJsonTranscoderIntegrationTest
}
EXPECT_TRUE(upstream_request_->complete());
} else {
upstream_request_->waitForReset();
ASSERT_TRUE(upstream_request_->waitForReset());
}

response->waitForEndStream();
Expand All @@ -136,8 +136,8 @@ class GrpcJsonTranscoderIntegrationTest
}

codec_client_->close();
fake_upstream_connection_->close();
fake_upstream_connection_->waitForDisconnect();
ASSERT_TRUE(fake_upstream_connection_->close());
ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect());
}
};

Expand Down
22 changes: 15 additions & 7 deletions test/extensions/filters/http/jwt_authn/filter_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ class RemoteJwksIntegrationTest : public HttpProtocolIntegrationTest {
}

void waitForJwksResponse(const std::string& status, const std::string& jwks_body) {
fake_jwks_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_);
jwks_request_ = fake_jwks_connection_->waitForNewStream(*dispatcher_);
jwks_request_->waitForEndStream(*dispatcher_);
AssertionResult result =
fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_jwks_connection_);
RELEASE_ASSERT(result, result.message());
result = fake_jwks_connection_->waitForNewStream(*dispatcher_, jwks_request_);
RELEASE_ASSERT(result, result.message());
result = jwks_request_->waitForEndStream(*dispatcher_);
RELEASE_ASSERT(result, result.message());

Http::TestHeaderMapImpl response_headers{{":status", status}};
jwks_request_->encodeHeaders(response_headers, false);
Expand All @@ -122,12 +126,16 @@ class RemoteJwksIntegrationTest : public HttpProtocolIntegrationTest {
void cleanup() {
codec_client_->close();
if (fake_jwks_connection_ != nullptr) {
fake_jwks_connection_->close();
fake_jwks_connection_->waitForDisconnect();
AssertionResult result = fake_jwks_connection_->close();
RELEASE_ASSERT(result, result.message());
result = fake_jwks_connection_->waitForDisconnect();
RELEASE_ASSERT(result, result.message());
}
if (fake_upstream_connection_ != nullptr) {
fake_upstream_connection_->close();
fake_upstream_connection_->waitForDisconnect();
AssertionResult result = fake_upstream_connection_->close();
RELEASE_ASSERT(result, result.message());
result = fake_upstream_connection_->waitForDisconnect();
RELEASE_ASSERT(result, result.message());
}
}

Expand Down
24 changes: 14 additions & 10 deletions test/extensions/filters/http/lua/lua_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,16 @@ class LuaIntegrationTest : public HttpIntegrationTest,
void cleanup() {
codec_client_->close();
if (fake_lua_connection_ != nullptr) {
fake_lua_connection_->close();
fake_lua_connection_->waitForDisconnect();
AssertionResult result = fake_lua_connection_->close();
RELEASE_ASSERT(result, result.message());
result = fake_lua_connection_->waitForDisconnect();
RELEASE_ASSERT(result, result.message());
}
if (fake_upstream_connection_ != nullptr) {
fake_upstream_connection_->close();
fake_upstream_connection_->waitForDisconnect();
AssertionResult result = fake_upstream_connection_->close();
RELEASE_ASSERT(result, result.message());
result = fake_upstream_connection_->waitForDisconnect();
RELEASE_ASSERT(result, result.message());
}
}

Expand Down Expand Up @@ -234,9 +238,9 @@ name: envoy.lua
{"x-forwarded-for", "10.0.0.1"}};
auto response = codec_client_->makeHeaderOnlyRequest(request_headers);

fake_lua_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_);
lua_request_ = fake_lua_connection_->waitForNewStream(*dispatcher_);
lua_request_->waitForEndStream(*dispatcher_);
ASSERT_TRUE(fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_lua_connection_));
ASSERT_TRUE(fake_lua_connection_->waitForNewStream(*dispatcher_, lua_request_));
ASSERT_TRUE(lua_request_->waitForEndStream(*dispatcher_));
Http::TestHeaderMapImpl response_headers{{":status", "200"}, {"foo", "bar"}};
lua_request_->encodeHeaders(response_headers, false);
Buffer::OwnedImpl response_data1("good");
Expand Down Expand Up @@ -292,9 +296,9 @@ name: envoy.lua
{"x-forwarded-for", "10.0.0.1"}};
auto response = codec_client_->makeHeaderOnlyRequest(request_headers);

fake_lua_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_);
lua_request_ = fake_lua_connection_->waitForNewStream(*dispatcher_);
lua_request_->waitForEndStream(*dispatcher_);
ASSERT_TRUE(fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_lua_connection_));
ASSERT_TRUE(fake_lua_connection_->waitForNewStream(*dispatcher_, lua_request_));
ASSERT_TRUE(lua_request_->waitForEndStream(*dispatcher_));
Http::TestHeaderMapImpl response_headers{{":status", "200"}, {"foo", "bar"}};
lua_request_->encodeHeaders(response_headers, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,27 @@ class SquashFilterIntegrationTest : public HttpIntegrationTest,

~SquashFilterIntegrationTest() {
if (fake_squash_connection_) {
fake_squash_connection_->close();
fake_squash_connection_->waitForDisconnect();
AssertionResult result = fake_squash_connection_->close();
RELEASE_ASSERT(result, result.message());
result = fake_squash_connection_->waitForDisconnect();
RELEASE_ASSERT(result, result.message());
}
}

FakeStreamPtr sendSquash(const std::string& status, const std::string& body) {

if (!fake_squash_connection_) {
fake_squash_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_);
AssertionResult result =
fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_squash_connection_);
RELEASE_ASSERT(result, result.message());
}

FakeStreamPtr request_stream = fake_squash_connection_->waitForNewStream(*dispatcher_);
request_stream->waitForEndStream(*dispatcher_);
FakeStreamPtr request_stream;
AssertionResult result =
fake_squash_connection_->waitForNewStream(*dispatcher_, request_stream);
RELEASE_ASSERT(result, result.message());
result = request_stream->waitForEndStream(*dispatcher_);
RELEASE_ASSERT(result, result.message());
if (body.empty()) {
request_stream->encodeHeaders(Http::TestHeaderMapImpl{{":status", status}}, true);
} else {
Expand Down
Loading