Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions include/envoy/thread/thread.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <functional>
#include <memory>

#include "envoy/common/pure.h"
Expand All @@ -21,6 +22,20 @@ class Thread {

typedef std::unique_ptr<Thread> ThreadPtr;

/**
* Interface providing a mechanism for creating threads.
*/
class ThreadFactory {
public:
virtual ~ThreadFactory() {}

/**
* Create a thread.
* @param thread_routine supplies the function to invoke in the thread.
*/
virtual ThreadPtr createThread(std::function<void()> thread_routine) PURE;
};

/**
* Like the C++11 "basic lockable concept" but a pure virtual interface vs. a template, and
* with thread annotations.
Expand Down
9 changes: 5 additions & 4 deletions source/common/api/api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
namespace Envoy {
namespace Api {

Impl::Impl(std::chrono::milliseconds file_flush_interval_msec,
Thread::ThreadFactory& thread_factory)
: file_flush_interval_msec_(file_flush_interval_msec), thread_factory_(thread_factory) {}

Event::DispatcherPtr Impl::allocateDispatcher(Event::TimeSystem& time_system) {
return Event::DispatcherPtr{new Event::DispatcherImpl(time_system)};
}

Impl::Impl(std::chrono::milliseconds file_flush_interval_msec)
: file_flush_interval_msec_(file_flush_interval_msec) {}

Filesystem::FileSharedPtr Impl::createFile(const std::string& path, Event::Dispatcher& dispatcher,
Thread::BasicLockable& lock, Stats::Store& stats_store) {
return std::make_shared<Filesystem::FileImpl>(path, dispatcher, lock, stats_store, *this,
Expand All @@ -28,7 +29,7 @@ bool Impl::fileExists(const std::string& path) { return Filesystem::fileExists(p
std::string Impl::fileReadToEnd(const std::string& path) { return Filesystem::fileReadToEnd(path); }

Thread::ThreadPtr Impl::createThread(std::function<void()> thread_routine) {
return std::make_unique<Thread::ThreadImpl>(thread_routine);
return thread_factory_.createThread(thread_routine);
}

} // namespace Api
Expand Down
4 changes: 3 additions & 1 deletion source/common/api/api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/api/api.h"
#include "envoy/event/timer.h"
#include "envoy/filesystem/filesystem.h"
#include "envoy/thread/thread.h"

namespace Envoy {
namespace Api {
Expand All @@ -15,7 +16,7 @@ namespace Api {
*/
class Impl : public Api::Api {
public:
Impl(std::chrono::milliseconds file_flush_interval_msec = std::chrono::milliseconds(1000));
Impl(std::chrono::milliseconds file_flush_interval_msec, Thread::ThreadFactory& thread_factory);

// Api::Api
Event::DispatcherPtr allocateDispatcher(Event::TimeSystem& time_system) override;
Expand All @@ -28,6 +29,7 @@ class Impl : public Api::Api {

private:
std::chrono::milliseconds file_flush_interval_msec_;
Thread::ThreadFactory& thread_factory_;
};

} // namespace Api
Expand Down
43 changes: 29 additions & 14 deletions source/common/common/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,35 @@
namespace Envoy {
namespace Thread {

ThreadImpl::ThreadImpl(std::function<void()> thread_routine) : thread_routine_(thread_routine) {
RELEASE_ASSERT(Logger::Registry::initialized(), "");
int rc = pthread_create(&thread_id_, nullptr,
[](void* arg) -> void* {
static_cast<ThreadImpl*>(arg)->thread_routine_();
return nullptr;
},
this);
RELEASE_ASSERT(rc == 0, "");
}

void ThreadImpl::join() {
int rc = pthread_join(thread_id_, nullptr);
RELEASE_ASSERT(rc == 0, "");
/**
* Wrapper for a pthread thread. We don't use std::thread because it eats exceptions and leads to
* unusable stack traces.
*/
class ThreadImpl : public Thread {
public:
ThreadImpl(std::function<void()> thread_routine) : thread_routine_(thread_routine) {
RELEASE_ASSERT(Logger::Registry::initialized(), "");
int rc = pthread_create(&thread_id_, nullptr,
[](void* arg) -> void* {
static_cast<ThreadImpl*>(arg)->thread_routine_();
return nullptr;
},
this);
RELEASE_ASSERT(rc == 0, "");
}

void join() override {
int rc = pthread_join(thread_id_, nullptr);
RELEASE_ASSERT(rc == 0, "");
}

private:
std::function<void()> thread_routine_;
pthread_t thread_id_;
};

ThreadPtr ThreadFactoryImpl::createThread(std::function<void()> thread_routine) {
return std::make_unique<ThreadImpl>(thread_routine);
}

int32_t currentThreadId() {
Expand Down
16 changes: 4 additions & 12 deletions source/common/common/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,13 @@ typedef int32_t ThreadId;
ThreadId currentThreadId();

/**
* Wrapper for a pthread thread. We don't use std::thread because it eats exceptions and leads to
* unusable stack traces.
* Implementation of ThreadFactory
*/
class ThreadImpl : public Thread {
class ThreadFactoryImpl : public ThreadFactory {
public:
ThreadImpl(std::function<void()> thread_routine);
ThreadFactoryImpl() {}

/**
* Join on thread exit.
*/
void join() override;

private:
std::function<void()> thread_routine_;
pthread_t thread_id_;
ThreadPtr createThread(std::function<void()> thread_routine) override;
};

/**
Expand Down
14 changes: 8 additions & 6 deletions source/exe/main_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ Runtime::LoaderPtr ProdComponentFactory::createRuntime(Server::Instance& server,

MainCommonBase::MainCommonBase(OptionsImpl& options, Event::TimeSystem& time_system,
TestHooks& test_hooks, Server::ComponentFactory& component_factory,
std::unique_ptr<Runtime::RandomGenerator>&& random_generator)
: options_(options), component_factory_(component_factory) {
std::unique_ptr<Runtime::RandomGenerator>&& random_generator,
Thread::ThreadFactory& thread_factory)
: options_(options), component_factory_(component_factory), thread_factory_(thread_factory) {
ares_library_init(ARES_LIB_INIT_ALL);
Event::Libevent::Global::initialize();
RELEASE_ASSERT(Envoy::Server::validateProtoDescriptors(), "");
Expand Down Expand Up @@ -74,7 +75,7 @@ MainCommonBase::MainCommonBase(OptionsImpl& options, Event::TimeSystem& time_sys

server_ = std::make_unique<Server::InstanceImpl>(
options_, time_system, local_address, test_hooks, *restarter_, *stats_store_,
access_log_lock, component_factory, std::move(random_generator), *tls_);
access_log_lock, component_factory, std::move(random_generator), *tls_, thread_factory);

break;
}
Expand Down Expand Up @@ -103,7 +104,7 @@ bool MainCommonBase::run() {
return true;
case Server::Mode::Validate: {
auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion());
return Server::validateConfig(options_, local_address, component_factory_);
return Server::validateConfig(options_, local_address, component_factory_, thread_factory_);
}
case Server::Mode::InitOnly:
PERF_DUMP();
Expand All @@ -127,7 +128,7 @@ void MainCommonBase::adminRequest(absl::string_view path_and_query, absl::string
MainCommon::MainCommon(int argc, const char* const* argv)
: options_(argc, argv, &MainCommon::hotRestartVersion, spdlog::level::info),
base_(options_, real_time_system_, default_test_hooks_, prod_component_factory_,
std::make_unique<Runtime::RandomGeneratorImpl>()) {}
std::make_unique<Runtime::RandomGeneratorImpl>(), thread_factory_) {}

std::string MainCommon::hotRestartVersion(uint64_t max_num_stats, uint64_t max_stat_name_len,
bool hot_restart_enabled) {
Expand All @@ -152,9 +153,10 @@ int main_common(OptionsImpl& options) {
Event::RealTimeSystem real_time_system_;
DefaultTestHooks default_test_hooks_;
ProdComponentFactory prod_component_factory_;
Thread::ThreadFactoryImpl thread_factory_;
MainCommonBase main_common(options, real_time_system_, default_test_hooks_,
prod_component_factory_,
std::make_unique<Runtime::RandomGeneratorImpl>());
std::make_unique<Runtime::RandomGeneratorImpl>(), thread_factory_);
return main_common.run() ? EXIT_SUCCESS : EXIT_FAILURE;
} catch (EnvoyException& e) {
return EXIT_FAILURE;
Expand Down
6 changes: 5 additions & 1 deletion source/exe/main_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/event/timer.h"
#include "envoy/runtime/runtime.h"

#include "common/common/thread.h"
#include "common/event/real_time_system.h"
#include "common/stats/thread_local_store.h"
#include "common/thread_local/thread_local_impl.h"
Expand Down Expand Up @@ -32,7 +33,8 @@ class MainCommonBase {
// destructed.
MainCommonBase(OptionsImpl& options, Event::TimeSystem& time_system, TestHooks& test_hooks,
Server::ComponentFactory& component_factory,
std::unique_ptr<Runtime::RandomGenerator>&& random_generator);
std::unique_ptr<Runtime::RandomGenerator>&& random_generator,
Thread::ThreadFactory& thread_factory);
~MainCommonBase();

bool run();
Expand Down Expand Up @@ -62,6 +64,7 @@ class MainCommonBase {
Envoy::OptionsImpl& options_;

Server::ComponentFactory& component_factory_;
Thread::ThreadFactory& thread_factory_;

std::unique_ptr<ThreadLocal::InstanceImpl> tls_;
std::unique_ptr<Server::HotRestart> restarter_;
Expand Down Expand Up @@ -107,6 +110,7 @@ class MainCommon {
Event::RealTimeSystem real_time_system_;
DefaultTestHooks default_test_hooks_;
ProdComponentFactory prod_component_factory_;
Thread::ThreadFactoryImpl thread_factory_;
MainCommonBase base_;
};

Expand Down
5 changes: 3 additions & 2 deletions source/server/config_validation/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
namespace Envoy {
namespace Api {

ValidationImpl::ValidationImpl(std::chrono::milliseconds file_flush_interval_msec)
: Impl(file_flush_interval_msec) {}
ValidationImpl::ValidationImpl(std::chrono::milliseconds file_flush_interval_msec,
Thread::ThreadFactory& thread_factory)
: Impl(file_flush_interval_msec, thread_factory) {}

Event::DispatcherPtr ValidationImpl::allocateDispatcher(Event::TimeSystem& time_system) {
return Event::DispatcherPtr{new Event::ValidationDispatcher(time_system)};
Expand Down
3 changes: 2 additions & 1 deletion source/server/config_validation/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace Api {
*/
class ValidationImpl : public Impl {
public:
ValidationImpl(std::chrono::milliseconds file_flush_interval_msec);
ValidationImpl(std::chrono::milliseconds file_flush_interval_msec,
Thread::ThreadFactory& thread_factory);

Event::DispatcherPtr allocateDispatcher(Event::TimeSystem&) override;
};
Expand Down
9 changes: 5 additions & 4 deletions source/server/config_validation/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ namespace Envoy {
namespace Server {

bool validateConfig(Options& options, Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory) {
ComponentFactory& component_factory, Thread::ThreadFactory& thread_factory) {
Thread::MutexBasicLockable access_log_lock;
Stats::IsolatedStoreImpl stats_store;

try {
Event::RealTimeSystem time_system;
ValidationInstance server(options, time_system, local_address, stats_store, access_log_lock,
component_factory);
component_factory, thread_factory);
std::cout << "configuration '" << options.configPath() << "' OK" << std::endl;
server.shutdown();
return true;
Expand All @@ -40,9 +40,10 @@ ValidationInstance::ValidationInstance(Options& options, Event::TimeSystem& time
Network::Address::InstanceConstSharedPtr local_address,
Stats::IsolatedStoreImpl& store,
Thread::BasicLockable& access_log_lock,
ComponentFactory& component_factory)
ComponentFactory& component_factory,
Thread::ThreadFactory& thread_factory)
: options_(options), time_system_(time_system), stats_store_(store),
api_(new Api::ValidationImpl(options.fileFlushIntervalMsec())),
api_(new Api::ValidationImpl(options.fileFlushIntervalMsec(), thread_factory)),
dispatcher_(api_->allocateDispatcher(time_system)),
singleton_manager_(new Singleton::ManagerImpl()),
access_log_manager_(*api_, *dispatcher_, access_log_lock, store), mutex_tracer_(nullptr) {
Expand Down
4 changes: 2 additions & 2 deletions source/server/config_validation/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace Server {
* the config is valid, false if invalid.
*/
bool validateConfig(Options& options, Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory);
ComponentFactory& component_factory, Thread::ThreadFactory& thread_factory);

/**
* ValidationInstance does the bulk of the work for config-validation runs of Envoy. It implements
Expand All @@ -56,7 +56,7 @@ class ValidationInstance : Logger::Loggable<Logger::Id::main>,
ValidationInstance(Options& options, Event::TimeSystem& time_system,
Network::Address::InstanceConstSharedPtr local_address,
Stats::IsolatedStoreImpl& store, Thread::BasicLockable& access_log_lock,
ComponentFactory& component_factory);
ComponentFactory& component_factory, Thread::ThreadFactory& thread_factory);

// Server::Instance
Admin& admin() override { return admin_; }
Expand Down
4 changes: 2 additions & 2 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ InstanceImpl::InstanceImpl(Options& options, Event::TimeSystem& time_system,
Thread::BasicLockable& access_log_lock,
ComponentFactory& component_factory,
Runtime::RandomGeneratorPtr&& random_generator,
ThreadLocal::Instance& tls)
ThreadLocal::Instance& tls, Thread::ThreadFactory& thread_factory)
: shutdown_(false), options_(options), time_system_(time_system), restarter_(restarter),
start_time_(time(nullptr)), original_start_time_(start_time_), stats_store_(store),
thread_local_(tls), api_(new Api::Impl(options.fileFlushIntervalMsec())),
thread_local_(tls), api_(new Api::Impl(options.fileFlushIntervalMsec(), thread_factory)),
secret_manager_(std::make_unique<Secret::SecretManagerImpl>()),
dispatcher_(api_->allocateDispatcher(time_system)),
singleton_manager_(new Singleton::ManagerImpl()),
Expand Down
3 changes: 2 additions & 1 deletion source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
Network::Address::InstanceConstSharedPtr local_address, TestHooks& hooks,
HotRestart& restarter, Stats::StoreRoot& store,
Thread::BasicLockable& access_log_lock, ComponentFactory& component_factory,
Runtime::RandomGeneratorPtr&& random_generator, ThreadLocal::Instance& tls);
Runtime::RandomGeneratorPtr&& random_generator, ThreadLocal::Instance& tls,
Thread::ThreadFactory& thread_factory);

~InstanceImpl() override;

Expand Down
1 change: 1 addition & 0 deletions test/common/api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ envoy_cc_test(
deps = [
"//source/common/api:api_lib",
"//test/test_common:environment_lib",
"//test/test_common:utility_lib",
],
)
5 changes: 3 additions & 2 deletions test/common/api/api_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
#include "common/api/api_impl.h"

#include "test/test_common/environment.h"
#include "test/test_common/utility.h"

#include "gtest/gtest.h"

namespace Envoy {
namespace Api {

TEST(ApiImplTest, readFileToEnd) {
Impl api(std::chrono::milliseconds(10000));
Impl api(std::chrono::milliseconds(1000), Thread::threadFactoryForTest());

const std::string data = "test read To End\nWith new lines.";
const std::string file_path = TestEnvironment::writeStringToFileForTest("test_api_envoy", data);
Expand All @@ -20,7 +21,7 @@ TEST(ApiImplTest, readFileToEnd) {
}

TEST(ApiImplTest, fileExists) {
Impl api(std::chrono::milliseconds(10000));
Impl api(std::chrono::milliseconds(1000), Thread::threadFactoryForTest());

EXPECT_TRUE(api.fileExists("/dev/null"));
EXPECT_FALSE(api.fileExists("/dev/blahblahblah"));
Expand Down
2 changes: 2 additions & 0 deletions test/common/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ envoy_cc_test(
"//source/common/event:dispatcher_lib",
"//test/mocks:common_lib",
"//test/test_common:test_time_lib",
"//test/test_common:utility_lib",
],
)

Expand Down Expand Up @@ -46,5 +47,6 @@ envoy_cc_test(
"//test/mocks/server:server_mocks",
"//test/mocks/stats:stats_mocks",
"//test/test_common:test_time_lib",
"//test/test_common:utility_lib",
],
)
Loading