Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions include/envoy/thread/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

#include "common/common/thread_annotations.h"

#include "absl/strings/string_view.h"
#include "absl/types/optional.h"

namespace Envoy {
namespace Thread {

Expand Down Expand Up @@ -37,13 +40,25 @@ class Thread {
virtual ~Thread() = default;

/**
* Join on thread exit.
* @return the name of the thread.
*/
virtual std::string name() const PURE;

/**
* Blocks until the thread exits.
*/
virtual void join() PURE;
};

using ThreadPtr = std::unique_ptr<Thread>;

// Options specified during thread creation.
struct Options {
std::string name_; // A name supplied for the thread. On Linux this is limited to 15 chars.
};

using OptionsOptConstRef = const absl::optional<Options>&;

/**
* Interface providing a mechanism for creating threads.
*/
Expand All @@ -52,10 +67,13 @@ class ThreadFactory {
virtual ~ThreadFactory() = default;

/**
* Create a thread.
* Creates a thread, immediately starting the thread_routine.
*
* @param thread_routine supplies the function to invoke in the thread.
* @param options supplies options specified on thread creation.
*/
virtual ThreadPtr createThread(std::function<void()> thread_routine) PURE;
virtual ThreadPtr createThread(std::function<void()> thread_routine,
OptionsOptConstRef options = absl::nullopt) PURE;

/**
* Return the current system thread ID
Expand Down
3 changes: 2 additions & 1 deletion source/common/access_log/access_log_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ void AccessLogFileImpl::write(absl::string_view data) {
}

void AccessLogFileImpl::createFlushStructures() {
flush_thread_ = thread_factory_.createThread([this]() -> void { flushThreadFunc(); });
flush_thread_ = thread_factory_.createThread([this]() -> void { flushThreadFunc(); },
Thread::Options{"AccessLogFlush"});
flush_timer_->enableTimer(flush_interval_msec_);
}

Expand Down
111 changes: 93 additions & 18 deletions source/common/common/posix/thread_impl.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "common/common/assert.h"
#include "common/common/thread_impl.h"

#include "absl/strings/str_cat.h"

#if defined(__linux__)
#include <sys/syscall.h>
#endif
Expand All @@ -24,26 +26,99 @@ int64_t getCurrentThreadId() {

} // namespace

ThreadImplPosix::ThreadImplPosix(std::function<void()> thread_routine)
: thread_routine_(std::move(thread_routine)) {
RELEASE_ASSERT(Logger::Registry::initialized(), "");
const int rc = pthread_create(
&thread_handle_, nullptr,
[](void* arg) -> void* {
static_cast<ThreadImplPosix*>(arg)->thread_routine_();
return nullptr;
},
this);
RELEASE_ASSERT(rc == 0, "");
}
// See https://www.man7.org/linux/man-pages/man3/pthread_setname_np.3.html.
// The maximum thread name is 16 bytes including the terminating nul byte,
// so we need to truncate the string_view to 15 bytes.
#define PTHREAD_MAX_THREADNAME_LEN_INCLUDING_NULL_BYTE 16

void ThreadImplPosix::join() {
const int rc = pthread_join(thread_handle_, nullptr);
RELEASE_ASSERT(rc == 0, "");
}
/**
* Wrapper for a pthread thread. We don't use std::thread because it eats exceptions and leads to
* unusable stack traces.
*/
class ThreadImplPosix : public Thread {
public:
ThreadImplPosix(std::function<void()> thread_routine, OptionsOptConstRef options)
: thread_routine_(std::move(thread_routine)) {
if (options) {
name_ = options->name_.substr(0, PTHREAD_MAX_THREADNAME_LEN_INCLUDING_NULL_BYTE - 1);
}
RELEASE_ASSERT(Logger::Registry::initialized(), "");
const int rc = pthread_create(
&thread_handle_, nullptr,
[](void* arg) -> void* {
static_cast<ThreadImplPosix*>(arg)->thread_routine_();
return nullptr;
},
this);
RELEASE_ASSERT(rc == 0, "");

#ifdef __linux__
// If the name was not specified, get it from the OS. If the name was
// specified, write it into the thread, and assert that the OS sees it the
// same way.
if (name_.empty()) {
getNameFromOS(name_);
} else {
const int set_name_rc = pthread_setname_np(thread_handle_, name_.c_str());
if (set_name_rc != 0) {
ENVOY_LOG_MISC(trace, "Error {} setting name `{}'", set_name_rc, name_);
} else {
// When compiling in debug mode, read back the thread-name from the OS,
// and verify it's what we asked for. This ensures the truncation is as
// expected, and that the OS will actually retain all the bytes of the
// name we expect.
//
// Note that the system-call to read the thread name may fail in case
// the thread exits after the call to set the name above, and before the
// call to get the name, so we can only do the assert if that call
// succeeded.
std::string check_name;
ASSERT(!getNameFromOS(check_name) || check_name == name_,
absl::StrCat("configured name=", name_, " os name=", check_name));
}
}
#endif
}

~ThreadImplPosix() override { ASSERT(joined_); }

std::string name() const override { return name_; }

// Thread::Thread
void join() override {
ASSERT(!joined_);
joined_ = true;
const int rc = pthread_join(thread_handle_, nullptr);
RELEASE_ASSERT(rc == 0, "");
}

private:
#ifdef __linux__
// Attempts to get the name from the operating system, returning true and
// updating 'name' if successful. Note that during normal operation this
// may fail, if the thread exits prior to the system call.
bool getNameFromOS(std::string& name) {
// Verify that the name got written into the thread as expected.
char buf[PTHREAD_MAX_THREADNAME_LEN_INCLUDING_NULL_BYTE];
const int get_name_rc = pthread_getname_np(thread_handle_, buf, sizeof(buf));
if (get_name_rc != 0) {
ENVOY_LOG_MISC(trace, "Error {} getting name", get_name_rc);
return false;
}
name = buf;
return true;
}
#endif

std::function<void()> thread_routine_;
pthread_t thread_handle_;
std::string name_;
bool joined_{false};
};

ThreadPtr ThreadFactoryImplPosix::createThread(std::function<void()> thread_routine) {
return std::make_unique<ThreadImplPosix>(thread_routine);
ThreadPtr ThreadFactoryImplPosix::createThread(std::function<void()> thread_routine,
OptionsOptConstRef options) {
return std::make_unique<ThreadImplPosix>(thread_routine, options);
}

ThreadId ThreadFactoryImplPosix::currentThreadId() { return ThreadId(getCurrentThreadId()); }
Expand Down
18 changes: 1 addition & 17 deletions source/common/common/posix/thread_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,13 @@
namespace Envoy {
namespace Thread {

/**
* Wrapper for a pthread thread. We don't use std::thread because it eats exceptions and leads to
* unusable stack traces.
*/
class ThreadImplPosix : public Thread {
public:
ThreadImplPosix(std::function<void()> thread_routine);

// Thread::Thread
void join() override;

private:
std::function<void()> thread_routine_;
pthread_t thread_handle_;
};

/**
* Implementation of ThreadFactory
*/
class ThreadFactoryImplPosix : public ThreadFactory {
public:
// Thread::ThreadFactory
ThreadPtr createThread(std::function<void()> thread_routine) override;
ThreadPtr createThread(std::function<void()> thread_routine, OptionsOptConstRef options) override;
ThreadId currentThreadId() override;
};

Expand Down
13 changes: 10 additions & 3 deletions source/common/common/win32/thread_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@
namespace Envoy {
namespace Thread {

ThreadImplWin32::ThreadImplWin32(std::function<void()> thread_routine)
ThreadImplWin32::ThreadImplWin32(std::function<void()> thread_routine, OptionsOptConstRef options)
: thread_routine_(thread_routine) {
if (options) {
name_ = options->name_;
// TODO(jmarantz): set the thread name for task manager, etc, or pull the
// auto-generated name from the OS if options is not present.
}

RELEASE_ASSERT(Logger::Registry::initialized(), "");
thread_handle_ = reinterpret_cast<HANDLE>(::_beginthreadex(
nullptr, 0,
Expand All @@ -26,8 +32,9 @@ void ThreadImplWin32::join() {
RELEASE_ASSERT(rc == WAIT_OBJECT_0, "");
}

ThreadPtr ThreadFactoryImplWin32::createThread(std::function<void()> thread_routine) {
return std::make_unique<ThreadImplWin32>(thread_routine);
ThreadPtr ThreadFactoryImplWin32::createThread(std::function<void()> thread_routine,
OptionsOptConstRef options) {
return std::make_unique<ThreadImplWin32>(thread_routine, options);
}

ThreadId ThreadFactoryImplWin32::currentThreadId() {
Expand Down
6 changes: 4 additions & 2 deletions source/common/common/win32/thread_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ namespace Thread {
*/
class ThreadImplWin32 : public Thread {
public:
ThreadImplWin32(std::function<void()> thread_routine);
ThreadImplWin32(std::function<void()> thread_routine, OptionsOptConstRef options);
~ThreadImplWin32();

// Thread::Thread
void join() override;
std::string name() const override { return name_; }

// Needed for WatcherImpl for the QueueUserAPC callback context
HANDLE handle() const { return thread_handle_; }

private:
std::function<void()> thread_routine_;
HANDLE thread_handle_;
std::string name_;
};

/**
Expand All @@ -34,7 +36,7 @@ class ThreadImplWin32 : public Thread {
class ThreadFactoryImplWin32 : public ThreadFactory {
public:
// Thread::ThreadFactory
ThreadPtr createThread(std::function<void()> thread_routine) override;
ThreadPtr createThread(std::function<void()> thread_routine, OptionsOptConstRef options) override;
ThreadId currentThreadId() override;
};

Expand Down
5 changes: 4 additions & 1 deletion source/common/filesystem/win32/watcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ WatcherImpl::WatcherImpl(Event::Dispatcher& dispatcher, Api::Api& api)
thread_exit_event_ = ::CreateEvent(nullptr, false, false, nullptr);
ASSERT(thread_exit_event_ != NULL);
keep_watching_ = true;
watch_thread_ = thread_factory_.createThread([this]() -> void { watchLoop(); });

// See comments in WorkerImpl::start for the naming convention.
Thread::Options options{absl::StrCat("wat:", dispatcher.name())};
watch_thread_ = thread_factory_.createThread([this]() -> void { watchLoop(); }, options);
}

WatcherImpl::~WatcherImpl() {
Expand Down
3 changes: 2 additions & 1 deletion source/common/grpc/google_async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ static constexpr int DefaultBufferLimitBytes = 1024 * 1024;
}

GoogleAsyncClientThreadLocal::GoogleAsyncClientThreadLocal(Api::Api& api)
: completion_thread_(api.threadFactory().createThread([this] { completionThread(); })) {}
: completion_thread_(api.threadFactory().createThread([this] { completionThread(); },
Thread::Options{"GrpcGoogClient"})) {}

GoogleAsyncClientThreadLocal::~GoogleAsyncClientThreadLocal() {
// Force streams to shutdown and invoke TryCancel() to start the drain of
Expand Down
4 changes: 3 additions & 1 deletion source/server/guarddog_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ void GuardDogImpl::stopWatching(WatchDogSharedPtr wd) {

void GuardDogImpl::start(Api::Api& api) {
Thread::LockGuard guard(mutex_);
// See comments in WorkerImpl::start for the naming convention.
Thread::Options options{absl::StrCat("dog:", dispatcher_->name())};
thread_ = api.threadFactory().createThread(
[this]() -> void { dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit); });
[this]() -> void { dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit); }, options);
loop_timer_->enableTimer(std::chrono::milliseconds(0));
}

Expand Down
16 changes: 14 additions & 2 deletions source/server/worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,20 @@ void WorkerImpl::removeFilterChains(uint64_t listener_tag,

void WorkerImpl::start(GuardDog& guard_dog) {
ASSERT(!thread_);
thread_ =
api_.threadFactory().createThread([this, &guard_dog]() -> void { threadRoutine(guard_dog); });

// In posix, thread names are limited to 15 characters, so contrive to make
// sure all interesting data fits there. The naming occurs in
// ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
// have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
// for the thread index, leaving us 4 bytes left to distinguish between the
// two threads used per dispatcher. We'll call this one "dsp:" and the
// one allocated in guarddog_impl.cc "dog:".
//
// TODO(jmarantz): consider refactoring how this naming works so this naming
// architecture is centralized, resulting in clearer names.
Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
thread_ = api_.threadFactory().createThread(
[this, &guard_dog]() -> void { threadRoutine(guard_dog); }, options);
}

void WorkerImpl::initializeStats(Stats::Scope& scope) { dispatcher_->initializeStats(scope); }
Expand Down
Loading