Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
156ad10
add multi thread test
Aug 16, 2021
cf697ed
use absl notification
Aug 16, 2021
1599227
clean up
Aug 17, 2021
4fc988e
Merge branch 'main' of https://github.com/envoyproxy/envoy into threa…
Aug 17, 2021
82aba23
make mock client change state on send to cache race condition
Aug 17, 2021
3658754
fix deadlock
Aug 29, 2021
58d3b7e
fix deadlock
Aug 29, 2021
afba040
fix deadlock
Aug 29, 2021
5931931
fix deadlock
Aug 29, 2021
85069f8
fix deadlock
Aug 29, 2021
210e656
fix dead lock
Aug 29, 2021
6064910
use real main thread instead of test thread
Sep 3, 2021
02a27c2
Merge branch 'main' of https://github.com/envoyproxy/envoy into threa…
Sep 3, 2021
2d99028
fix build failure
Sep 3, 2021
f4f4057
reset manager before clean up threading
Sep 3, 2021
dafbf24
remove cerr
Sep 4, 2021
098a446
fix clang tidy
Sep 4, 2021
72b4e8f
use start counter when posting to worker
Sep 4, 2021
1c63555
clean up
Sep 7, 2021
98bad3c
clean up
Sep 7, 2021
3323148
fix clang tidy
Sep 7, 2021
f03b4e3
add comments
Sep 9, 2021
4e44018
use absl barrier
Sep 16, 2021
4fa2321
refactor thread local store test
Sep 17, 2021
3a4d438
add threading test utility
Sep 18, 2021
e4193b2
fix format
Sep 18, 2021
405efc5
rename and cleanup
Sep 18, 2021
7297eff
move implementation to cc file
Sep 18, 2021
f7f5142
add TODO
Sep 20, 2021
9a2b405
add TODO
Sep 20, 2021
a3931c5
add TODO
Sep 20, 2021
48b6a62
add function comment
Sep 21, 2021
6b5356b
fix spelling
Sep 21, 2021
6267882
add comment
Sep 24, 2021
1ee0ebd
fix nits
Sep 24, 2021
674e48e
Merge branch 'main' of https://github.com/envoyproxy/envoy into threa…
Sep 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/common/stats/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ envoy_cc_test(
"//test/mocks/stats:stats_mocks",
"//test/mocks/thread_local:thread_local_mocks",
"//test/test_common:logging_lib",
"//test/test_common:real_threads_test_helper_lib",
"//test/test_common:test_time_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/metrics/v3:pkg_cc_proto",
Expand Down
146 changes: 27 additions & 119 deletions test/common/stats/thread_local_store_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
#include "test/mocks/stats/mocks.h"
#include "test/mocks/thread_local/mocks.h"
#include "test/test_common/logging.h"
#include "test/test_common/real_threads_test_helper.h"
#include "test/test_common/utility.h"

#include "absl/strings/str_split.h"
#include "absl/synchronization/blocking_counter.h"
#include "absl/synchronization/notification.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"

Expand Down Expand Up @@ -1565,75 +1564,36 @@ TEST_F(HistogramTest, ParentHistogramBucketSummary) {
"B3.6e+06(1,1)",
parent_histogram->bucketSummary());
}

class ThreadLocalRealThreadsTestBase : public ThreadLocalStoreNoMocksTestBase {
class ThreadLocalRealThreadsTestBase : public Thread::RealThreadsTestHelper,
public ThreadLocalStoreNoMocksTestBase {
protected:
static constexpr uint32_t NumScopes = 1000;
static constexpr uint32_t NumIters = 35;

// Helper class to block on a number of multi-threaded operations occurring.
class BlockingBarrier {
public:
explicit BlockingBarrier(uint32_t count) : blocking_counter_(count) {}
~BlockingBarrier() { blocking_counter_.Wait(); }

/**
* Returns a function that first executes 'f', and then decrements the count
* toward unblocking the scope. This is intended to be used as a post() callback.
*
* @param f the function to run prior to decrementing the count.
*/
std::function<void()> run(std::function<void()> f) {
return [this, f]() {
f();
decrementCount();
};
}

/**
* @return a function that, when run, decrements the count, intended for passing to post().
*/
std::function<void()> decrementCountFn() {
return [this] { decrementCount(); };
}

void decrementCount() { blocking_counter_.DecrementCount(); }

private:
absl::BlockingCounter blocking_counter_;
};

public:
ThreadLocalRealThreadsTestBase(uint32_t num_threads)
: num_threads_(num_threads), api_(Api::createApiForTest()),
thread_factory_(api_->threadFactory()), pool_(store_->symbolTable()) {
// This is the same order as InstanceImpl::initialize in source/server/server.cc.
thread_dispatchers_.resize(num_threads_);
{
BlockingBarrier blocking_barrier(num_threads_ + 1);
main_thread_ = thread_factory_.createThread(
[this, &blocking_barrier]() { mainThreadFn(blocking_barrier); });
for (uint32_t i = 0; i < num_threads_; ++i) {
threads_.emplace_back(thread_factory_.createThread(
[this, i, &blocking_barrier]() { workerThreadFn(i, blocking_barrier); }));
}
}

{
BlockingBarrier blocking_barrier(1);
main_dispatcher_->post(blocking_barrier.run([this]() {
tls_ = std::make_unique<ThreadLocal::InstanceImpl>();
tls_->registerThread(*main_dispatcher_, true);
for (Event::DispatcherPtr& dispatcher : thread_dispatchers_) {
// Worker threads must be registered from the main thread, per assert in registerThread().
tls_->registerThread(*dispatcher, false);
}
store_->initializeThreading(*main_dispatcher_, *tls_);
}));
}
: RealThreadsTestHelper(num_threads), pool_(store_->symbolTable()) {
runOnMainBlocking([this]() { store_->initializeThreading(*main_dispatcher_, *tls_); });
}

~ThreadLocalRealThreadsTestBase() override {
// TODO(chaoqin-li1123): clean this up when we figure out how to free the threading resources in
// RealThreadsTestHelper.
shutdownThreading();
exitThreads();
}

void shutdownThreading() {
runOnMainBlocking([this]() {
if (!tls_->isShutdown()) {
tls_->shutdownGlobalThreading();
}
store_->shutdownThreading();
tls_->shutdownThread();
});
}

void exitThreads() {
for (Event::DispatcherPtr& dispatcher : thread_dispatchers_) {
dispatcher->post([&dispatcher]() { dispatcher->exit(); });
}
Expand All @@ -1650,52 +1610,6 @@ class ThreadLocalRealThreadsTestBase : public ThreadLocalStoreNoMocksTestBase {
main_thread_->join();
}

void shutdownThreading() {
BlockingBarrier blocking_barrier(1);
main_dispatcher_->post(blocking_barrier.run([this]() {
if (!tls_->isShutdown()) {
tls_->shutdownGlobalThreading();
}
store_->shutdownThreading();
tls_->shutdownThread();
}));
}

void workerThreadFn(uint32_t thread_index, BlockingBarrier& blocking_barrier) {
thread_dispatchers_[thread_index] =
api_->allocateDispatcher(absl::StrCat("test_worker_", thread_index));
blocking_barrier.decrementCount();
thread_dispatchers_[thread_index]->run(Event::Dispatcher::RunType::RunUntilExit);
}

void mainThreadFn(BlockingBarrier& blocking_barrier) {
main_dispatcher_ = api_->allocateDispatcher("test_main_thread");
blocking_barrier.decrementCount();
main_dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit);
}

void mainDispatchBlock() {
// To ensure all stats are freed we have to wait for a few posts() to clear.
// First, wait for the main-dispatcher to initiate the cross-thread TLS cleanup.
BlockingBarrier blocking_barrier(1);
main_dispatcher_->post(blocking_barrier.run([]() {}));
}

void tlsBlock() {
BlockingBarrier blocking_barrier(num_threads_);
for (Event::DispatcherPtr& thread_dispatcher : thread_dispatchers_) {
thread_dispatcher->post(blocking_barrier.run([]() {}));
}
}

const uint32_t num_threads_;
Api::ApiPtr api_;
Event::DispatcherPtr main_dispatcher_;
std::vector<Event::DispatcherPtr> thread_dispatchers_;
Thread::ThreadFactory& thread_factory_;
ThreadLocal::InstanceImplPtr tls_;
Thread::ThreadPtr main_thread_;
std::vector<Thread::ThreadPtr> threads_;
StatNamePool pool_;
};

Expand All @@ -1717,11 +1631,8 @@ class ClusterShutdownCleanupStarvationTest : public ThreadLocalRealThreadsTestBa
}

void createScopesIncCountersAndCleanupAllThreads() {
BlockingBarrier blocking_barrier(NumThreads);
for (Event::DispatcherPtr& thread_dispatcher : thread_dispatchers_) {
thread_dispatcher->post(
blocking_barrier.run([this]() { createScopesIncCountersAndCleanup(); }));
}

runOnAllWorkersBlocking([this]() { createScopesIncCountersAndCleanup(); });
}

std::chrono::seconds elapsedTime() {
Expand Down Expand Up @@ -1795,7 +1706,7 @@ class HistogramThreadTest : public ThreadLocalRealThreadsTestBase {

void mergeHistograms() {
BlockingBarrier blocking_barrier(1);
main_dispatcher_->post([this, &blocking_barrier]() {
runOnMainBlocking([this, &blocking_barrier]() {
store_->mergeHistograms(blocking_barrier.decrementCountFn());
});
}
Expand All @@ -1804,7 +1715,7 @@ class HistogramThreadTest : public ThreadLocalRealThreadsTestBase {
uint32_t num;
{
BlockingBarrier blocking_barrier(1);
main_dispatcher_->post([this, &num, &blocking_barrier]() {
runOnMainBlocking([this, &num, &blocking_barrier]() {
ThreadLocalStoreTestingPeer::numTlsHistograms(*store_,
[&num, &blocking_barrier](uint32_t num_hist) {
num = num_hist;
Expand All @@ -1817,10 +1728,7 @@ class HistogramThreadTest : public ThreadLocalRealThreadsTestBase {

// Executes a function on every worker thread dispatcher.
void foreachThread(const std::function<void()>& fn) {
BlockingBarrier blocking_barrier(NumThreads);
for (Event::DispatcherPtr& thread_dispatcher : thread_dispatchers_) {
thread_dispatcher->post(blocking_barrier.run(fn));
}
runOnAllWorkersBlocking([&fn]() { fn(); });
}
};

Expand Down
4 changes: 4 additions & 0 deletions test/extensions/filters/http/ext_authz/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ envoy_extension_cc_test(
srcs = ["config_test.cc"],
extension_names = ["envoy.filters.http.ext_authz"],
deps = [
"//source/common/grpc:async_client_manager_lib",
"//source/common/network:address_lib",
"//source/common/thread_local:thread_local_lib",
"//source/extensions/filters/http/ext_authz:config",
"//test/mocks/server:factory_context_mocks",
"//test/test_common:real_threads_test_helper_lib",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/ext_authz/v3:pkg_cc_proto",
Expand Down
Loading