diff --git a/include/envoy/thread/thread.h b/include/envoy/thread/thread.h index 70452ca5d29ab..8633c03e1ebed 100644 --- a/include/envoy/thread/thread.h +++ b/include/envoy/thread/thread.h @@ -9,6 +9,9 @@ #include "common/common/thread_annotations.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + namespace Envoy { namespace Thread { @@ -37,13 +40,25 @@ class Thread { virtual ~Thread() = default; /** - * Join on thread exit. + * @return the name of the thread. + */ + virtual std::string name() const PURE; + + /** + * Blocks until the thread exits. */ virtual void join() PURE; }; using ThreadPtr = std::unique_ptr; +// Options specified during thread creation. +struct Options { + std::string name_; // A name supplied for the thread. On Linux this is limited to 15 chars. +}; + +using OptionsOptConstRef = const absl::optional&; + /** * Interface providing a mechanism for creating threads. */ @@ -52,10 +67,13 @@ class ThreadFactory { virtual ~ThreadFactory() = default; /** - * Create a thread. + * Creates a thread, immediately starting the thread_routine. + * * @param thread_routine supplies the function to invoke in the thread. + * @param options supplies options specified on thread creation. */ - virtual ThreadPtr createThread(std::function thread_routine) PURE; + virtual ThreadPtr createThread(std::function thread_routine, + OptionsOptConstRef options = absl::nullopt) PURE; /** * Return the current system thread ID diff --git a/source/common/access_log/access_log_manager_impl.cc b/source/common/access_log/access_log_manager_impl.cc index f173904d2dffb..055e602bdcfb5 100644 --- a/source/common/access_log/access_log_manager_impl.cc +++ b/source/common/access_log/access_log_manager_impl.cc @@ -203,7 +203,8 @@ void AccessLogFileImpl::write(absl::string_view data) { } void AccessLogFileImpl::createFlushStructures() { - flush_thread_ = thread_factory_.createThread([this]() -> void { flushThreadFunc(); }); + flush_thread_ = thread_factory_.createThread([this]() -> void { flushThreadFunc(); }, + Thread::Options{"AccessLogFlush"}); flush_timer_->enableTimer(flush_interval_msec_); } diff --git a/source/common/common/posix/thread_impl.cc b/source/common/common/posix/thread_impl.cc index 324230ade176b..359af8245ed97 100644 --- a/source/common/common/posix/thread_impl.cc +++ b/source/common/common/posix/thread_impl.cc @@ -1,6 +1,8 @@ #include "common/common/assert.h" #include "common/common/thread_impl.h" +#include "absl/strings/str_cat.h" + #if defined(__linux__) #include #endif @@ -24,26 +26,99 @@ int64_t getCurrentThreadId() { } // namespace -ThreadImplPosix::ThreadImplPosix(std::function thread_routine) - : thread_routine_(std::move(thread_routine)) { - RELEASE_ASSERT(Logger::Registry::initialized(), ""); - const int rc = pthread_create( - &thread_handle_, nullptr, - [](void* arg) -> void* { - static_cast(arg)->thread_routine_(); - return nullptr; - }, - this); - RELEASE_ASSERT(rc == 0, ""); -} +// See https://www.man7.org/linux/man-pages/man3/pthread_setname_np.3.html. +// The maximum thread name is 16 bytes including the terminating nul byte, +// so we need to truncate the string_view to 15 bytes. +#define PTHREAD_MAX_THREADNAME_LEN_INCLUDING_NULL_BYTE 16 -void ThreadImplPosix::join() { - const int rc = pthread_join(thread_handle_, nullptr); - RELEASE_ASSERT(rc == 0, ""); -} +/** + * Wrapper for a pthread thread. We don't use std::thread because it eats exceptions and leads to + * unusable stack traces. + */ +class ThreadImplPosix : public Thread { +public: + ThreadImplPosix(std::function thread_routine, OptionsOptConstRef options) + : thread_routine_(std::move(thread_routine)) { + if (options) { + name_ = options->name_.substr(0, PTHREAD_MAX_THREADNAME_LEN_INCLUDING_NULL_BYTE - 1); + } + RELEASE_ASSERT(Logger::Registry::initialized(), ""); + const int rc = pthread_create( + &thread_handle_, nullptr, + [](void* arg) -> void* { + static_cast(arg)->thread_routine_(); + return nullptr; + }, + this); + RELEASE_ASSERT(rc == 0, ""); + +#ifdef __linux__ + // If the name was not specified, get it from the OS. If the name was + // specified, write it into the thread, and assert that the OS sees it the + // same way. + if (name_.empty()) { + getNameFromOS(name_); + } else { + const int set_name_rc = pthread_setname_np(thread_handle_, name_.c_str()); + if (set_name_rc != 0) { + ENVOY_LOG_MISC(trace, "Error {} setting name `{}'", set_name_rc, name_); + } else { + // When compiling in debug mode, read back the thread-name from the OS, + // and verify it's what we asked for. This ensures the truncation is as + // expected, and that the OS will actually retain all the bytes of the + // name we expect. + // + // Note that the system-call to read the thread name may fail in case + // the thread exits after the call to set the name above, and before the + // call to get the name, so we can only do the assert if that call + // succeeded. + std::string check_name; + ASSERT(!getNameFromOS(check_name) || check_name == name_, + absl::StrCat("configured name=", name_, " os name=", check_name)); + } + } +#endif + } + + ~ThreadImplPosix() override { ASSERT(joined_); } + + std::string name() const override { return name_; } + + // Thread::Thread + void join() override { + ASSERT(!joined_); + joined_ = true; + const int rc = pthread_join(thread_handle_, nullptr); + RELEASE_ASSERT(rc == 0, ""); + } + +private: +#ifdef __linux__ + // Attempts to get the name from the operating system, returning true and + // updating 'name' if successful. Note that during normal operation this + // may fail, if the thread exits prior to the system call. + bool getNameFromOS(std::string& name) { + // Verify that the name got written into the thread as expected. + char buf[PTHREAD_MAX_THREADNAME_LEN_INCLUDING_NULL_BYTE]; + const int get_name_rc = pthread_getname_np(thread_handle_, buf, sizeof(buf)); + if (get_name_rc != 0) { + ENVOY_LOG_MISC(trace, "Error {} getting name", get_name_rc); + return false; + } + name = buf; + return true; + } +#endif + + std::function thread_routine_; + pthread_t thread_handle_; + std::string name_; + bool joined_{false}; +}; -ThreadPtr ThreadFactoryImplPosix::createThread(std::function thread_routine) { - return std::make_unique(thread_routine); +ThreadPtr ThreadFactoryImplPosix::createThread(std::function thread_routine, + OptionsOptConstRef options) { + return std::make_unique(thread_routine, options); } ThreadId ThreadFactoryImplPosix::currentThreadId() { return ThreadId(getCurrentThreadId()); } diff --git a/source/common/common/posix/thread_impl.h b/source/common/common/posix/thread_impl.h index 81c81d3be3fc5..9b373ecaceb60 100644 --- a/source/common/common/posix/thread_impl.h +++ b/source/common/common/posix/thread_impl.h @@ -9,29 +9,13 @@ namespace Envoy { namespace Thread { -/** - * Wrapper for a pthread thread. We don't use std::thread because it eats exceptions and leads to - * unusable stack traces. - */ -class ThreadImplPosix : public Thread { -public: - ThreadImplPosix(std::function thread_routine); - - // Thread::Thread - void join() override; - -private: - std::function thread_routine_; - pthread_t thread_handle_; -}; - /** * Implementation of ThreadFactory */ class ThreadFactoryImplPosix : public ThreadFactory { public: // Thread::ThreadFactory - ThreadPtr createThread(std::function thread_routine) override; + ThreadPtr createThread(std::function thread_routine, OptionsOptConstRef options) override; ThreadId currentThreadId() override; }; diff --git a/source/common/common/win32/thread_impl.cc b/source/common/common/win32/thread_impl.cc index 1d3eca9689570..8f26d63e0eb39 100644 --- a/source/common/common/win32/thread_impl.cc +++ b/source/common/common/win32/thread_impl.cc @@ -6,8 +6,14 @@ namespace Envoy { namespace Thread { -ThreadImplWin32::ThreadImplWin32(std::function thread_routine) +ThreadImplWin32::ThreadImplWin32(std::function thread_routine, OptionsOptConstRef options) : thread_routine_(thread_routine) { + if (options) { + name_ = options->name_; + // TODO(jmarantz): set the thread name for task manager, etc, or pull the + // auto-generated name from the OS if options is not present. + } + RELEASE_ASSERT(Logger::Registry::initialized(), ""); thread_handle_ = reinterpret_cast(::_beginthreadex( nullptr, 0, @@ -26,8 +32,9 @@ void ThreadImplWin32::join() { RELEASE_ASSERT(rc == WAIT_OBJECT_0, ""); } -ThreadPtr ThreadFactoryImplWin32::createThread(std::function thread_routine) { - return std::make_unique(thread_routine); +ThreadPtr ThreadFactoryImplWin32::createThread(std::function thread_routine, + OptionsOptConstRef options) { + return std::make_unique(thread_routine, options); } ThreadId ThreadFactoryImplWin32::currentThreadId() { diff --git a/source/common/common/win32/thread_impl.h b/source/common/common/win32/thread_impl.h index 8b5d0fe37e15b..87be085291c86 100644 --- a/source/common/common/win32/thread_impl.h +++ b/source/common/common/win32/thread_impl.h @@ -14,11 +14,12 @@ namespace Thread { */ class ThreadImplWin32 : public Thread { public: - ThreadImplWin32(std::function thread_routine); + ThreadImplWin32(std::function thread_routine, OptionsOptConstRef options); ~ThreadImplWin32(); // Thread::Thread void join() override; + std::string name() const override { return name_; } // Needed for WatcherImpl for the QueueUserAPC callback context HANDLE handle() const { return thread_handle_; } @@ -26,6 +27,7 @@ class ThreadImplWin32 : public Thread { private: std::function thread_routine_; HANDLE thread_handle_; + std::string name_; }; /** @@ -34,7 +36,7 @@ class ThreadImplWin32 : public Thread { class ThreadFactoryImplWin32 : public ThreadFactory { public: // Thread::ThreadFactory - ThreadPtr createThread(std::function thread_routine) override; + ThreadPtr createThread(std::function thread_routine, OptionsOptConstRef options) override; ThreadId currentThreadId() override; }; diff --git a/source/common/filesystem/win32/watcher_impl.cc b/source/common/filesystem/win32/watcher_impl.cc index 5bc400639109f..80531f78d54ea 100644 --- a/source/common/filesystem/win32/watcher_impl.cc +++ b/source/common/filesystem/win32/watcher_impl.cc @@ -31,7 +31,10 @@ WatcherImpl::WatcherImpl(Event::Dispatcher& dispatcher, Api::Api& api) thread_exit_event_ = ::CreateEvent(nullptr, false, false, nullptr); ASSERT(thread_exit_event_ != NULL); keep_watching_ = true; - watch_thread_ = thread_factory_.createThread([this]() -> void { watchLoop(); }); + + // See comments in WorkerImpl::start for the naming convention. + Thread::Options options{absl::StrCat("wat:", dispatcher.name())}; + watch_thread_ = thread_factory_.createThread([this]() -> void { watchLoop(); }, options); } WatcherImpl::~WatcherImpl() { diff --git a/source/common/grpc/google_async_client_impl.cc b/source/common/grpc/google_async_client_impl.cc index 18c4936e5e25b..ff7774034ac97 100644 --- a/source/common/grpc/google_async_client_impl.cc +++ b/source/common/grpc/google_async_client_impl.cc @@ -21,7 +21,8 @@ static constexpr int DefaultBufferLimitBytes = 1024 * 1024; } GoogleAsyncClientThreadLocal::GoogleAsyncClientThreadLocal(Api::Api& api) - : completion_thread_(api.threadFactory().createThread([this] { completionThread(); })) {} + : completion_thread_(api.threadFactory().createThread([this] { completionThread(); }, + Thread::Options{"GrpcGoogClient"})) {} GoogleAsyncClientThreadLocal::~GoogleAsyncClientThreadLocal() { // Force streams to shutdown and invoke TryCancel() to start the drain of diff --git a/source/server/guarddog_impl.cc b/source/server/guarddog_impl.cc index d05e84f6ff6e8..ad66b59a5299b 100644 --- a/source/server/guarddog_impl.cc +++ b/source/server/guarddog_impl.cc @@ -142,8 +142,10 @@ void GuardDogImpl::stopWatching(WatchDogSharedPtr wd) { void GuardDogImpl::start(Api::Api& api) { Thread::LockGuard guard(mutex_); + // See comments in WorkerImpl::start for the naming convention. + Thread::Options options{absl::StrCat("dog:", dispatcher_->name())}; thread_ = api.threadFactory().createThread( - [this]() -> void { dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit); }); + [this]() -> void { dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit); }, options); loop_timer_->enableTimer(std::chrono::milliseconds(0)); } diff --git a/source/server/worker_impl.cc b/source/server/worker_impl.cc index 17e02486e5f80..54ef058ea6e5e 100644 --- a/source/server/worker_impl.cc +++ b/source/server/worker_impl.cc @@ -81,8 +81,20 @@ void WorkerImpl::removeFilterChains(uint64_t listener_tag, void WorkerImpl::start(GuardDog& guard_dog) { ASSERT(!thread_); - thread_ = - api_.threadFactory().createThread([this, &guard_dog]() -> void { threadRoutine(guard_dog); }); + + // In posix, thread names are limited to 15 characters, so contrive to make + // sure all interesting data fits there. The naming occurs in + // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we + // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes + // for the thread index, leaving us 4 bytes left to distinguish between the + // two threads used per dispatcher. We'll call this one "dsp:" and the + // one allocated in guarddog_impl.cc "dog:". + // + // TODO(jmarantz): consider refactoring how this naming works so this naming + // architecture is centralized, resulting in clearer names. + Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())}; + thread_ = api_.threadFactory().createThread( + [this, &guard_dog]() -> void { threadRoutine(guard_dog); }, options); } void WorkerImpl::initializeStats(Stats::Scope& scope) { dispatcher_->initializeStats(scope); } diff --git a/test/common/common/thread_test.cc b/test/common/common/thread_test.cc index 431d4be38f325..9dac043921f72 100644 --- a/test/common/common/thread_test.cc +++ b/test/common/common/thread_test.cc @@ -5,7 +5,9 @@ #include "test/test_common/thread_factory_for_test.h" +#include "absl/strings/str_cat.h" #include "absl/synchronization/notification.h" +#include "gmock/gmock.h" #include "gtest/gtest.h" namespace Envoy { @@ -27,12 +29,15 @@ TEST_F(ThreadAsyncPtrTest, DeleteOnDestruct) { // On thread1, we will lazily instantiate the string as "thread1". However // in the creation function we will block on a sync-point. - auto thread1 = thread_factory_.createThread([&str, &sync]() { - str.get([&sync]() -> std::string* { - sync.syncPoint("creator"); - return new std::string("thread1"); - }); - }); + auto thread1 = thread_factory_.createThread( + [&str, &sync]() { + str.get([&sync]() -> std::string* { + sync.syncPoint("creator"); + return new std::string("thread1"); + }); + }, + Options{"thread1"}); + EXPECT_EQ("thread1", thread1->name()); sync.barrierOn("creator"); @@ -40,7 +45,9 @@ TEST_F(ThreadAsyncPtrTest, DeleteOnDestruct) { // string as "thread2", but that allocator will never run because // the allocator on thread1 has already locked the AtomicPtr's mutex. auto thread2 = thread_factory_.createThread( - [&str]() { str.get([]() -> std::string* { return new std::string("thread2"); }); }); + [&str]() { str.get([]() -> std::string* { return new std::string("thread2"); }); }, + Options{"thread2"}); + EXPECT_EQ("thread2", thread2->name()); // Now let thread1's initializer finish. sync.signal("creator"); @@ -68,21 +75,25 @@ TEST_F(ThreadAsyncPtrTest, DoNotDelete) { // On thread1, we will lazily instantiate the string as "thread1". However // in the creation function we will block on a sync-point. - auto thread1 = thread_factory_.createThread([&str, &sync, &thread1_str]() { - str.get([&sync, &thread1_str]() -> const std::string* { - sync.syncPoint("creator"); - return &thread1_str; - }); - }); + auto thread1 = thread_factory_.createThread( + [&str, &sync, &thread1_str]() { + str.get([&sync, &thread1_str]() -> const std::string* { + sync.syncPoint("creator"); + return &thread1_str; + }); + }, + Options{"thread1"}); sync.barrierOn("creator"); // Now spawn a separate thread that will attempt to lazy-initialize the // string as "thread2", but that allocator will never run because // the allocator on thread1 has already locked the AtomicPtr's mutex. - auto thread2 = thread_factory_.createThread([&str, &thread2_str]() { - str.get([&thread2_str]() -> const std::string* { return &thread2_str; }); - }); + auto thread2 = thread_factory_.createThread( + [&str, &thread2_str]() { + str.get([&thread2_str]() -> const std::string* { return &thread2_str; }); + }, + Options{"thread2"}); // Now let thread1's initializer finish. sync.signal("creator"); @@ -113,7 +124,9 @@ TEST_F(ThreadAsyncPtrTest, ThreadSpammer) { }; std::vector threads; for (uint32_t i = 0; i < num_threads; ++i) { - threads.emplace_back(thread_factory_.createThread(thread_fn)); + std::string name = absl::StrCat("thread", i); + threads.emplace_back(thread_factory_.createThread(thread_fn, Options{name})); + EXPECT_EQ(name, threads.back()->name()); } EXPECT_EQ(0, calls); go.Notify(); @@ -190,6 +203,49 @@ TEST_F(ThreadAsyncPtrTest, ManagedAlloc) { } } +TEST_F(ThreadAsyncPtrTest, TruncateWait) { + absl::Notification notify; + auto thread = thread_factory_.createThread([¬ify]() { notify.WaitForNotification(); }, + Options{"this name is way too long for posix"}); + notify.Notify(); + + // To make this test work on multiple platforms, just assume the first 10 characters + // are retained. + EXPECT_THAT(thread->name(), testing::StartsWith("this name ")); + thread->join(); +} + +TEST_F(ThreadAsyncPtrTest, TruncateNoWait) { + auto thread = + thread_factory_.createThread([]() {}, Options{"this name is way too long for posix"}); + + // In general, across platforms, just assume the first 10 characters are + // retained. + EXPECT_THAT(thread->name(), testing::StartsWith("this name ")); + + // On Linux we can check for 15 exactly. +#ifdef __linux__ + EXPECT_EQ("this name is wa", thread->name()) << "truncated to 15 chars"; +#endif + + thread->join(); +} + +TEST_F(ThreadAsyncPtrTest, NameNotSpecifiedWait) { + absl::Notification notify; + auto thread = thread_factory_.createThread([¬ify]() { notify.WaitForNotification(); }); + notify.Notify(); + + // For linux builds, the thread name defaults to the name of the + // binary. However the name of the binary is different depending on whether + // this is a coverage test or not. Currently, this population does not occur + // for Mac or Windows. +#ifdef __linux__ + EXPECT_FALSE(thread->name().empty()); +#endif + thread->join(); +} + } // namespace } // namespace Thread } // namespace Envoy