Skip to content
Merged
2 changes: 2 additions & 0 deletions library/common/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ envoy_status_t Engine::terminate() {
main_thread_.join();
}

dispatcher_->terminate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be before the join? I'm not that familiar with this but isn't the dispatcher running on main_thread_, so if we're calling it here then the dispatcher is already not running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. I can move it and see if the tests in #2129 still pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests still pass with #2129, pushed in 27c9e25.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this causes a data race, caught by the TSan tests: https://github.com/envoyproxy/envoy-mobile/runs/6026097993?check_suite_focus=true

WARNING: ThreadSanitizer: data race (pid=22)
  Read of size 8 at 0x7b5400021280 by main thread:
    #0 memcpy ??:? (main_interface_test+0x19060ae)
    #1 Envoy::Thread::ThreadId::isEmpty() const ??:? (main_interface_test+0x1b9d8ee)
    #2 Envoy::Event::DispatcherImpl::runFatalActionsOnTrackedObject(std::__cxx11::list<std::unique_ptr<Envoy::Server::Configuration::FatalAction, std::default_delete<Envoy::Server::Configuration::FatalAction> >, std::allocator<std::unique_ptr<Envoy::Server::Configuration::FatalAction, std::default_delete<Envoy::Server::Configuration::FatalAction> > > > const&) const ??:? (main_interface_test+0x3eac80c)

Reverting.


return ENVOY_SUCCESS;
}

Expand Down
16 changes: 16 additions & 0 deletions library/common/event/provisional_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ void ProvisionalDispatcher::drain(Event::Dispatcher& event_dispatcher) {
// because of behavioral oddities in Event::Dispatcher: event_dispatcher_->isThreadSafe() will
// crash.
Thread::LockGuard lock(state_lock_);

// Don't perform any work on the dispatcher if marked as terminated.
if (terminated_) {
return;
}
Comment on lines +16 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this ever happen? Can we terminate before we've drained?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but it seems safest to check here considering engine termination is exposed in the public API so a user could invoke it at any time.

We've also already acquired the lock here, which is the expensive part, the boolean check should add effectively no overhead.


RELEASE_ASSERT(!drained_, "ProvisionalDispatcher::drain must only occur once");
drained_ = true;
event_dispatcher_ = &event_dispatcher;
Expand All @@ -24,6 +30,11 @@ void ProvisionalDispatcher::drain(Event::Dispatcher& event_dispatcher) {
envoy_status_t ProvisionalDispatcher::post(Event::PostCb callback) {
Thread::LockGuard lock(state_lock_);

// Don't perform any work on the dispatcher if marked as terminated.
if (terminated_) {
return ENVOY_FAILURE;
}

if (drained_) {
event_dispatcher_->post(callback);
return ENVOY_SUCCESS;
Expand Down Expand Up @@ -68,5 +79,10 @@ bool ProvisionalDispatcher::trackedObjectStackIsEmpty() const {

TimeSource& ProvisionalDispatcher::timeSource() { return event_dispatcher_->timeSource(); }

void ProvisionalDispatcher::terminate() {
Thread::LockGuard lock(state_lock_);
terminated_ = true;
}

} // namespace Event
} // namespace Envoy
6 changes: 6 additions & 0 deletions library/common/event/provisional_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class ProvisionalDispatcher : public ScopeTracker {
*/
virtual TimeSource& timeSource();

/**
* Marks the dispatcher as terminated, preventing any future work from being enqueued.
*/
virtual void terminate();

// Used for testing.
Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; }

Expand All @@ -69,6 +74,7 @@ class ProvisionalDispatcher : public ScopeTracker {
std::list<Event::PostCb> init_queue_ GUARDED_BY(state_lock_);
Event::Dispatcher* event_dispatcher_{};
Thread::ThreadSynchronizer synchronizer_;
bool terminated_ GUARDED_BY(state_lock_){};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't matter a ton but if you made this atomic it could be checked outside of the lock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that this only needs to be checked in places where we've already acquired the lock, it seems we should leverage that.

};

using ProvisionalDispatcherPtr = std::unique_ptr<ProvisionalDispatcher>;
Expand Down
10 changes: 7 additions & 3 deletions test/common/engine_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct TestEngineHandle {
run_engine(handle_, MINIMAL_TEST_CONFIG.c_str(), level.c_str());
}

~TestEngineHandle() { terminate_engine(handle_); }
void terminate() { terminate_engine(handle_); }
};

class EngineTest : public testing::Test {
Expand Down Expand Up @@ -85,10 +85,12 @@ TEST_F(EngineTest, EarlyExit) {
envoy_engine_t handle = engine_->handle_;
ASSERT_TRUE(test_context.on_engine_running.WaitForNotificationWithTimeout(absl::Seconds(3)));

engine_.reset();
engine_->terminate();
ASSERT_TRUE(test_context.on_exit.WaitForNotificationWithTimeout(absl::Seconds(3)));

start_stream(handle, 0, {}, false);

engine_.reset();
}

TEST_F(EngineTest, AccessEngineAfterInitialization) {
Expand Down Expand Up @@ -117,11 +119,13 @@ TEST_F(EngineTest, AccessEngineAfterInitialization) {
// Validate that we actually invoked the function.
EXPECT_TRUE(getClusterManagerInvoked.WaitForNotificationWithTimeout(absl::Seconds(1)));

engine_.reset();
engine_->terminate();

// Now that the engine has been shut down, we no longer expect scheduling to work.
EXPECT_EQ(ENVOY_FAILURE, EngineHandle::runOnEngineDispatcher(
handle, [](Envoy::Engine& engine) { engine.getClusterManager(); }));

engine_.reset();
}

} // namespace Envoy