-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Dispatcher: keeps a stack of tracked objects. #14573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
07e671b
918383d
41d85bf
cb43d8a
98d318c
0940f3a
0fe46db
3f8ba3e
265b3a7
3434c7b
7182e95
3885c7a
0724016
be52ed4
984ffc0
e760c57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,10 +7,12 @@ | |
| #include <vector> | ||
|
|
||
| #include "envoy/api/api.h" | ||
| #include "envoy/common/scope_tracker.h" | ||
| #include "envoy/network/listen_socket.h" | ||
| #include "envoy/network/listener.h" | ||
|
|
||
| #include "common/buffer/buffer_impl.h" | ||
| #include "common/common/assert.h" | ||
| #include "common/common/lock_guard.h" | ||
| #include "common/common/thread.h" | ||
| #include "common/event/file_event_impl.h" | ||
|
|
@@ -36,6 +38,12 @@ | |
|
|
||
| namespace Envoy { | ||
| namespace Event { | ||
| namespace { | ||
| // The tracked object stack likely won't grow larger than this initial | ||
| // reservation; this should make appends constant time since the stack | ||
| // shouldn't have to grow larger. | ||
| constexpr size_t ExpectedMaxTrackedObjectStackDepth = 10; | ||
| } // namespace | ||
|
|
||
| DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, | ||
| Event::TimeSystem& time_system, | ||
|
|
@@ -49,6 +57,7 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, | |
| post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })), | ||
| current_to_delete_(&to_delete_1_) { | ||
| ASSERT(!name_.empty()); | ||
| tracked_object_stack_.reserve(ExpectedMaxTrackedObjectStackDepth); | ||
| FatalErrorHandler::registerFatalErrorHandler(*this); | ||
| updateApproximateMonotonicTimeInternal(); | ||
| base_scheduler_.registerOnPrepareCallback( | ||
|
|
@@ -287,6 +296,16 @@ void DispatcherImpl::runPostCallbacks() { | |
| } | ||
| } | ||
|
|
||
| void DispatcherImpl::onFatalError(std::ostream& os) const { | ||
| // Dump the state of the tracked objects in the dispatcher if thread safe. This generally | ||
| // results in dumping the active state only for the thread which caused the fatal error. | ||
| if (isThreadSafe()) { | ||
| for (auto iter = tracked_object_stack_.rbegin(); iter != tracked_object_stack_.rend(); ++iter) { | ||
| (*iter)->dumpState(os); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test that covers dumping of multiple tracked objects, including relative ordering of the dumps.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| void DispatcherImpl::runFatalActionsOnTrackedObject( | ||
| const FatalAction::FatalActionPtrList& actions) const { | ||
| // Only run if this is the dispatcher of the current thread and | ||
|
|
@@ -296,7 +315,7 @@ void DispatcherImpl::runFatalActionsOnTrackedObject( | |
| } | ||
|
|
||
| for (const auto& action : actions) { | ||
| action->run(current_object_); | ||
| action->run(tracked_object_stack_); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -306,5 +325,26 @@ void DispatcherImpl::touchWatchdog() { | |
| } | ||
| } | ||
|
|
||
| void DispatcherImpl::pushTrackedObject(const ScopeTrackedObject* object) { | ||
|
KBaichoo marked this conversation as resolved.
|
||
| ASSERT(isThreadSafe()); | ||
| ASSERT(object != nullptr); | ||
| tracked_object_stack_.push_back(object); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| } | ||
|
|
||
| void DispatcherImpl::popTrackedObject(const ScopeTrackedObject* expected_object) { | ||
|
antoniovicente marked this conversation as resolved.
|
||
| ASSERT(isThreadSafe()); | ||
| ASSERT(expected_object != nullptr); | ||
| if (tracked_object_stack_.empty()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should probably be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before on release it'd just end up returning and suppressing, but this is a good point. Done. |
||
| ASSERT(!expected_object, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Done. |
||
| "Tracked object stack is empty yet we expected a non-null tracked object on the top."); | ||
| return; | ||
| } | ||
|
|
||
| const ScopeTrackedObject* top = tracked_object_stack_.back(); | ||
| tracked_object_stack_.pop_back(); | ||
| ASSERT(top == expected_object, | ||
| "Popped the top of the tracked object stack, but it wasn't the expected object!"); | ||
| } | ||
|
|
||
| } // namespace Event | ||
| } // namespace Envoy | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,25 +74,13 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, | |
| void post(std::function<void()> callback) override; | ||
| void run(RunType type) override; | ||
| Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; } | ||
| const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) override { | ||
| const ScopeTrackedObject* return_object = current_object_; | ||
| current_object_ = object; | ||
| return return_object; | ||
| } | ||
| void pushTrackedObject(const ScopeTrackedObject* object) override; | ||
| void popTrackedObject(const ScopeTrackedObject* expected_object) override; | ||
| MonotonicTime approximateMonotonicTime() const override; | ||
| void updateApproximateMonotonicTime() override; | ||
|
|
||
| // FatalErrorInterface | ||
| void onFatalError(std::ostream& os) const override { | ||
| // Dump the state of the tracked object if it is in the current thread. This generally results | ||
| // in dumping the active state only for the thread which caused the fatal error. | ||
| if (isThreadSafe()) { | ||
| if (current_object_) { | ||
| current_object_->dumpState(os); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| void onFatalError(std::ostream& os) const override; | ||
| void | ||
| runFatalActionsOnTrackedObject(const FatalAction::FatalActionPtrList& actions) const override; | ||
|
|
||
|
|
@@ -150,7 +138,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, | |
| std::vector<DeferredDeletablePtr>* current_to_delete_; | ||
| Thread::MutexBasicLockable post_lock_; | ||
| std::list<std::function<void()>> post_callbacks_ ABSL_GUARDED_BY(post_lock_); | ||
| const ScopeTrackedObject* current_object_{}; | ||
| std::vector<const ScopeTrackedObject*> tracked_object_stack_; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. absl::InlineVector?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| bool deferred_deleting_{}; | ||
| MonotonicTime approximate_monotonic_time_; | ||
| WatchdogRegistrationPtr watchdog_registration_; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| #include <iostream> | ||
|
|
||
| #include "common/api/api_impl.h" | ||
| #include "common/common/scope_tracker.h" | ||
| #include "common/event/dispatcher_impl.h" | ||
|
|
||
| #include "test/mocks/common.h" | ||
| #include "test/test_common/utility.h" | ||
|
|
||
| #include "gmock/gmock.h" | ||
| #include "gtest/gtest.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace { | ||
|
|
||
| using testing::_; | ||
|
|
||
| TEST(ScopeTrackerScopeStateTest, ShouldManageTrackedObjectOnDispatcherStack) { | ||
| Api::ApiPtr api(Api::createApiForTest()); | ||
| Event::DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); | ||
| MockScopedTrackedObject tracked_object; | ||
| { | ||
| ScopeTrackerScopeState scope(&tracked_object, *dispatcher); | ||
| // Check that the tracked_object is on the tracked object stack | ||
| dispatcher->popTrackedObject(&tracked_object); | ||
|
|
||
| // Restore it to the top, it should be removed in the dtor of scope. | ||
| dispatcher->pushTrackedObject(&tracked_object); | ||
| } | ||
|
|
||
| // Check nothing is tracked now. | ||
| EXPECT_CALL(tracked_object, dumpState(_, _)).Times(0); | ||
| static_cast<Event::DispatcherImpl*>(dispatcher.get())->onFatalError(std::cerr); | ||
| } | ||
|
|
||
| } // namespace | ||
| } // namespace Envoy |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,13 @@ | ||
| #include <functional> | ||
|
|
||
| #include "envoy/common/scope_tracker.h" | ||
| #include "envoy/thread/thread.h" | ||
|
|
||
| #include "common/api/api_impl.h" | ||
| #include "common/api/os_sys_calls_impl.h" | ||
| #include "common/common/lock_guard.h" | ||
| #include "common/common/scope_tracker.h" | ||
| #include "common/common/utility.h" | ||
| #include "common/event/deferred_task.h" | ||
| #include "common/event/dispatcher_impl.h" | ||
| #include "common/event/timer_impl.h" | ||
|
|
@@ -533,9 +536,71 @@ TEST_F(DispatcherImplTest, IsThreadSafe) { | |
| EXPECT_FALSE(dispatcher_->isThreadSafe()); | ||
| } | ||
|
|
||
| TEST_F(DispatcherImplTest, ShouldDumpNothingIfNoTrackedObjects) { | ||
| std::array<char, 1024> buffer; | ||
| OutputBufferStream ostream{buffer.data(), buffer.size()}; | ||
|
|
||
| // Call on FatalError to trigger dumps of tracked objects. | ||
| dispatcher_->post([this, &ostream]() { | ||
| Thread::LockGuard lock(mu_); | ||
| static_cast<DispatcherImpl*>(dispatcher_.get())->onFatalError(ostream); | ||
| work_finished_ = true; | ||
| cv_.notifyOne(); | ||
| }); | ||
|
|
||
| Thread::LockGuard lock(mu_); | ||
| while (!work_finished_) { | ||
| cv_.wait(mu_); | ||
| } | ||
|
|
||
| // Check ostream still empty. | ||
| EXPECT_EQ(ostream.contents(), ""); | ||
| } | ||
|
|
||
| class MessageTrackedObject : public ScopeTrackedObject { | ||
| public: | ||
| MessageTrackedObject(absl::string_view sv) : sv_(sv) {} | ||
| void dumpState(std::ostream& os, int /*indent_level*/) const override { os << sv_; } | ||
|
|
||
| private: | ||
| absl::string_view sv_; | ||
| }; | ||
|
|
||
| TEST_F(DispatcherImplTest, ShouldDumpTrackedObjectsInFILO) { | ||
| std::array<char, 1024> buffer; | ||
| OutputBufferStream ostream{buffer.data(), buffer.size()}; | ||
|
|
||
| // Call on FatalError to trigger dumps of tracked objects. | ||
| dispatcher_->post([this, &ostream]() { | ||
| Thread::LockGuard lock(mu_); | ||
|
|
||
| // Add several tracked objects to the dispatcher | ||
| MessageTrackedObject first{"first"}; | ||
| ScopeTrackerScopeState first_state{&first, *dispatcher_}; | ||
| MessageTrackedObject second{"second"}; | ||
| ScopeTrackerScopeState second_state{&second, *dispatcher_}; | ||
| MessageTrackedObject third{"third"}; | ||
| ScopeTrackerScopeState third_state{&third, *dispatcher_}; | ||
|
|
||
| static_cast<DispatcherImpl*>(dispatcher_.get())->onFatalError(ostream); | ||
| work_finished_ = true; | ||
| cv_.notifyOne(); | ||
| }); | ||
|
|
||
| Thread::LockGuard lock(mu_); | ||
| while (!work_finished_) { | ||
| cv_.wait(mu_); | ||
| } | ||
|
|
||
| // Check the dump includes and registered objects in a FILO order. | ||
| EXPECT_EQ(ostream.contents(), "thirdsecondfirst"); | ||
| } | ||
|
|
||
| class TestFatalAction : public Server::Configuration::FatalAction { | ||
| public: | ||
| void run(const ScopeTrackedObject* /*current_object*/) override { ++times_ran_; } | ||
| void run(absl::Span<const ScopeTrackedObject* const> /*tracked_objects*/) override { | ||
| ++times_ran_; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some tests that cover 0, 1, >1 entries in tracked_objects ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| } | ||
| bool isAsyncSignalSafe() const override { return true; } | ||
| int getNumTimesRan() { return times_ran_; } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.