Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
156ad10
add multi thread test
Aug 16, 2021
cf697ed
use absl notification
Aug 16, 2021
1599227
clean up
Aug 17, 2021
4fc988e
Merge branch 'main' of https://github.com/envoyproxy/envoy into threa…
Aug 17, 2021
82aba23
make mock client change state on send to cache race condition
Aug 17, 2021
3658754
fix deadlock
Aug 29, 2021
58d3b7e
fix deadlock
Aug 29, 2021
afba040
fix deadlock
Aug 29, 2021
5931931
fix deadlock
Aug 29, 2021
85069f8
fix deadlock
Aug 29, 2021
210e656
fix dead lock
Aug 29, 2021
6064910
use real main thread instead of test thread
Sep 3, 2021
02a27c2
Merge branch 'main' of https://github.com/envoyproxy/envoy into threa…
Sep 3, 2021
2d99028
fix build failure
Sep 3, 2021
f4f4057
reset manager before clean up threading
Sep 3, 2021
dafbf24
remove cerr
Sep 4, 2021
098a446
fix clang tidy
Sep 4, 2021
72b4e8f
use start counter when posting to worker
Sep 4, 2021
1c63555
clean up
Sep 7, 2021
98bad3c
clean up
Sep 7, 2021
3323148
fix clang tidy
Sep 7, 2021
f03b4e3
add comments
Sep 9, 2021
4e44018
use absl barrier
Sep 16, 2021
4fa2321
refactor thread local store test
Sep 17, 2021
3a4d438
add threading test utility
Sep 18, 2021
e4193b2
fix format
Sep 18, 2021
405efc5
rename and cleanup
Sep 18, 2021
7297eff
move implementation to cc file
Sep 18, 2021
f7f5142
add TODO
Sep 20, 2021
9a2b405
add TODO
Sep 20, 2021
a3931c5
add TODO
Sep 20, 2021
48b6a62
add function comment
Sep 21, 2021
6b5356b
fix spelling
Sep 21, 2021
6267882
add comment
Sep 24, 2021
1ee0ebd
fix nits
Sep 24, 2021
674e48e
Merge branch 'main' of https://github.com/envoyproxy/envoy into threa…
Sep 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions test/extensions/filters/http/ext_authz/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ envoy_extension_cc_test(
srcs = ["config_test.cc"],
extension_names = ["envoy.filters.http.ext_authz"],
deps = [
"//source/common/grpc:async_client_manager_lib",
"//source/common/network:address_lib",
"//source/common/thread_local:thread_local_lib",
"//source/extensions/filters/http/ext_authz:config",
"//test/mocks/server:factory_context_mocks",
"//test/test_common:test_runtime_lib",
Expand Down
155 changes: 155 additions & 0 deletions test/extensions/filters/http/ext_authz/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,177 @@
#include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.validate.h"
#include "envoy/stats/scope.h"

#include "source/common/grpc/async_client_manager_impl.h"
#include "source/common/network/address_impl.h"
#include "source/common/thread_local/thread_local_impl.h"
#include "source/extensions/filters/http/ext_authz/config.h"

#include "test/mocks/server/factory_context.h"
#include "test/test_common/test_runtime.h"
#include "test/test_common/utility.h"

#include "absl/synchronization/notification.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::_;
using testing::Invoke;
using testing::NiceMock;

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExtAuthz {

class MultiThreadTest {
public:
MultiThreadTest(size_t num_threads) : num_threads_(num_threads), api_(Api::createApiForTest()) {}
virtual ~MultiThreadTest() { tls_.shutdownGlobalThreading(); }
virtual void workerFunc() {}
void run() { spawnAllWorkers(); }

private:
void spawnAllWorkers() {
// Create dispatcher and register.
for (size_t i = 0; i < num_threads_; i++) {
dispatchers_.emplace_back(api_->allocateDispatcher(std::to_string(i)));
tls_.registerThread(*dispatchers_[i], false);
}
// Create running threads.
for (size_t i = 0; i < num_threads_; i++) {
// i must be explicitly captured by value.
workers_.emplace_back(api_->threadFactory().createThread(
[&, i]() { dispatchers_[i]->run(Event::Dispatcher::RunType::RunUntilExit); }));
}
// Post callback to fire all threads at the same time.
for (Event::DispatcherPtr& dispatcher : dispatchers_) {
Comment thread
chaoqin-li1123 marked this conversation as resolved.
Outdated
dispatcher->post([&]() {
waitForAllWorkersToFire();
workerFunc();
});
}
// Post exit signals and wait for thread to end.
for (Event::DispatcherPtr& dispatcher : dispatchers_) {
dispatcher->post([&dispatcher]() { dispatcher->exit(); });
}
for (Thread::ThreadPtr& worker : workers_) {
worker->join();
}
}

void waitForAllWorkersToFire() {
if (shouldFire()) {
// The last thread spawned should notify all to start.
workers_should_fire_.Notify();
} else {
// Wait for notification if the current thread is not the last one spawned.
workers_should_fire_.WaitForNotification();
}
}

bool shouldFire() {
Thread::LockGuard lock(mutex_);
active_threads_cnt_++;
Comment thread
chaoqin-li1123 marked this conversation as resolved.
Outdated
return active_threads_cnt_ == num_threads_;
}

size_t num_threads_;
absl::Notification workers_should_fire_;
Thread::MutexBasicLockable mutex_;
size_t active_threads_cnt_{};
Comment thread
chaoqin-li1123 marked this conversation as resolved.
Outdated
ThreadLocal::InstanceImpl tls_;
Api::ApiPtr api_;

std::vector<Event::DispatcherPtr> dispatchers_;
std::vector<Thread::ThreadPtr> workers_;
};

class ExtAuthzFilterTest : public MultiThreadTest, public testing::Test {
public:
ExtAuthzFilterTest() : MultiThreadTest(10), stat_names_(symbol_table_) {}

void initialize(const std::string& ext_authz_config_yaml) {
initFilterFactory(ext_authz_config_yaml);
initAddress();
}

void initFilterFactory(const std::string& ext_authz_config_yaml) {
EXPECT_CALL(context_, getServerFactoryContext())
.WillRepeatedly(testing::ReturnRef(server_context_));
ON_CALL(context_.cluster_manager_.async_client_manager_, getOrCreateRawAsyncClient(_, _, _, _))
Comment thread
chaoqin-li1123 marked this conversation as resolved.
Outdated
.WillByDefault(Invoke([&](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool,
Grpc::CacheOption) {
return std::make_unique<NiceMock<Grpc::MockAsyncClient>>();
}));
ExtAuthzFilterConfig factory;
ProtobufTypes::MessagePtr proto_config = factory.createEmptyConfigProto();
TestUtility::loadFromYaml(ext_authz_config_yaml, *proto_config);
filter_factory_ = factory.createFilterFactoryFromProto(*proto_config, "stats", context_);
}

void initAddress() {
addr_ = std::make_shared<Network::Address::Ipv4Instance>("1.2.3.4", 1111);
connection_.stream_info_.downstream_address_provider_->setRemoteAddress(addr_);
connection_.stream_info_.downstream_address_provider_->setLocalAddress(addr_);
}

void testFilter() {
Http::MockFilterChainFactoryCallbacks filter_callbacks;
Comment thread
chaoqin-li1123 marked this conversation as resolved.
Outdated

Http::StreamFilterSharedPtr filter;
EXPECT_CALL(filter_callbacks, addStreamFilter(_)).WillOnce(::testing::SaveArg<0>(&filter));
filter_factory_(filter_callbacks);
NiceMock<Http::MockStreamDecoderFilterCallbacks> decoder_callbacks;
ON_CALL(decoder_callbacks, connection()).WillByDefault(Return(&connection_));
filter->setDecoderFilterCallbacks(decoder_callbacks);
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter->decodeHeaders(request_headers_, false));
std::shared_ptr<Http::StreamDecoderFilter> decoder_filter = filter;
decoder_filter->onDestroy();
}

void workerFunc() override { testFilter(); }

private:
NiceMock<Upstream::MockClusterManager> cm_;
Stats::SymbolTableImpl symbol_table_;
Grpc::StatNames stat_names_;

std::unique_ptr<Grpc::AsyncClientManagerImpl> async_client_manager_;
NiceMock<Server::Configuration::MockFactoryContext> context_;
NiceMock<Server::Configuration::MockServerFactoryContext> server_context_;
Http::FilterFactoryCb filter_factory_;

Network::Address::InstanceConstSharedPtr addr_;
NiceMock<Envoy::Network::MockConnection> connection_;
Http::TestRequestHeaderMapImpl request_headers_;
};

TEST_F(ExtAuthzFilterTest, ExtAuthzFilterThreadingTestEnvoyGrpc) {
std::string ext_authz_config_yaml = R"EOF(
Comment thread
chaoqin-li1123 marked this conversation as resolved.
Outdated
transport_api_version: V3
grpc_service:
envoy_grpc:
cluster_name: test_cluster
failure_mode_allow: false
)EOF";
initialize(ext_authz_config_yaml);
run();
}

TEST_F(ExtAuthzFilterTest, ExtAuthzFilterThreadingTestGoogleGrpc) {
std::string ext_authz_config_yaml = R"EOF(
transport_api_version: V3
grpc_service:
google_grpc:
target_uri: ext_authz_server
stat_prefix: google
failure_mode_allow: false
)EOF";
initialize(ext_authz_config_yaml);
run();
}

namespace {

void expectCorrectProtoGrpc(envoy::config::core::v3::ApiVersion api_version,
Expand Down
8 changes: 7 additions & 1 deletion test/mocks/grpc/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ namespace Grpc {

MockAsyncClient::MockAsyncClient() {
async_request_ = std::make_unique<testing::NiceMock<Grpc::MockAsyncRequest>>();
ON_CALL(*this, sendRaw(_, _, _, _, _, _)).WillByDefault(Return(async_request_.get()));
ON_CALL(*this, sendRaw(_, _, _, _, _, _))
.WillByDefault(Invoke([this](absl::string_view, absl::string_view, Buffer::InstancePtr&&,
RawAsyncRequestCallbacks&, Tracing::Span&,
const Http::AsyncClient::RequestOptions&) {
send_cnt_++;
return async_request_.get();
}));
}
MockAsyncClient::~MockAsyncClient() = default;

Expand Down
1 change: 1 addition & 0 deletions test/mocks/grpc/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class MockAsyncClient : public RawAsyncClient {
const Http::AsyncClient::StreamOptions& options));

std::unique_ptr<testing::NiceMock<Grpc::MockAsyncRequest>> async_request_;
int send_cnt_{};
Comment thread
chaoqin-li1123 marked this conversation as resolved.
Outdated
Comment thread
chaoqin-li1123 marked this conversation as resolved.
Outdated
};

class MockAsyncClientFactory : public AsyncClientFactory {
Expand Down