From 5263f63e957d69c0ba5ce33c730de617101ee61e Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Fri, 4 Dec 2020 19:48:38 -0500 Subject: [PATCH] event: Remove a source of non-determinism by always running deferred deletion before post callbacks Signed-off-by: Antonio Vicente --- source/common/event/dispatcher_impl.cc | 5 ++ test/common/event/dispatcher_impl_test.cc | 18 +++++ test/integration/fake_upstream.cc | 93 ++++++++++++++++++++--- 3 files changed, 104 insertions(+), 12 deletions(-) diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index 382ea30f1d665..e83cd8c91f104 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -258,6 +258,11 @@ void DispatcherImpl::updateApproximateMonotonicTimeInternal() { } void DispatcherImpl::runPostCallbacks() { + // Clear the deferred delete list before running post callbacks to reduce non-determinism in + // callback processing, and more easily detect if a scheduled post callback refers to one of the + // objects that is being deferred deleted. + clearDeferredDeleteList(); + while (true) { // It is important that this declaration is inside the body of the loop so that the callback is // destructed while post_lock_ is not held. If callback is declared outside the loop and reused diff --git a/test/common/event/dispatcher_impl_test.cc b/test/common/event/dispatcher_impl_test.cc index 468c79bb62b7a..f5d724cc2b7e4 100644 --- a/test/common/event/dispatcher_impl_test.cc +++ b/test/common/event/dispatcher_impl_test.cc @@ -277,6 +277,24 @@ TEST(DeferredTaskTest, DeferredTask) { dispatcher->clearDeferredDeleteList(); } +TEST(DeferredDeleteTest, DeferredDeleteAndPostOrdering) { + InSequence s; + + Api::ApiPtr api = Api::createApiForTest(); + DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); + ReadyWatcher post_watcher; + ReadyWatcher delete_watcher; + + // DeferredDelete should always run before post callbacks. + EXPECT_CALL(delete_watcher, ready()); + EXPECT_CALL(post_watcher, ready()); + + dispatcher->post([&]() { post_watcher.ready(); }); + dispatcher->deferredDelete( + std::make_unique([&]() -> void { delete_watcher.ready(); })); + dispatcher->run(Dispatcher::RunType::NonBlock); +} + class DispatcherImplTest : public testing::Test { protected: DispatcherImplTest() diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 7e2fae5dc8f38..9fee25ff2f6cd 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -69,8 +69,16 @@ void FakeStream::postToConnectionThread(std::function cb) { void FakeStream::encode100ContinueHeaders(const Http::ResponseHeaderMap& headers) { std::shared_ptr headers_copy( Http::createHeaderMap(headers)); - parent_.connection().dispatcher().post( - [this, headers_copy]() -> void { encoder_.encode100ContinueHeaders(*headers_copy); }); + parent_.connection().dispatcher().post([this, headers_copy]() -> void { + { + absl::MutexLock lock(&lock_); + if (saw_reset_) { + // Encoded already deleted. + return; + } + } + encoder_.encode100ContinueHeaders(*headers_copy); + }); } void FakeStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream) { @@ -82,12 +90,26 @@ void FakeStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream) } parent_.connection().dispatcher().post([this, headers_copy, end_stream]() -> void { + { + absl::MutexLock lock(&lock_); + if (saw_reset_) { + // Encoded already deleted. + return; + } + } encoder_.encodeHeaders(*headers_copy, end_stream); }); } void FakeStream::encodeData(absl::string_view data, bool end_stream) { parent_.connection().dispatcher().post([this, data, end_stream]() -> void { + { + absl::MutexLock lock(&lock_); + if (saw_reset_) { + // Encoded already deleted. + return; + } + } Buffer::OwnedImpl fake_data(data.data(), data.size()); encoder_.encodeData(fake_data, end_stream); }); @@ -95,6 +117,13 @@ void FakeStream::encodeData(absl::string_view data, bool end_stream) { void FakeStream::encodeData(uint64_t size, bool end_stream) { parent_.connection().dispatcher().post([this, size, end_stream]() -> void { + { + absl::MutexLock lock(&lock_); + if (saw_reset_) { + // Encoded already deleted. + return; + } + } Buffer::OwnedImpl data(std::string(size, 'a')); encoder_.encodeData(data, end_stream); }); @@ -102,30 +131,70 @@ void FakeStream::encodeData(uint64_t size, bool end_stream) { void FakeStream::encodeData(Buffer::Instance& data, bool end_stream) { std::shared_ptr data_copy = std::make_shared(data); - parent_.connection().dispatcher().post( - [this, data_copy, end_stream]() -> void { encoder_.encodeData(*data_copy, end_stream); }); + parent_.connection().dispatcher().post([this, data_copy, end_stream]() -> void { + { + absl::MutexLock lock(&lock_); + if (saw_reset_) { + // Encoded already deleted. + return; + } + } + encoder_.encodeData(*data_copy, end_stream); + }); } void FakeStream::encodeTrailers(const Http::HeaderMap& trailers) { std::shared_ptr trailers_copy( Http::createHeaderMap(trailers)); - parent_.connection().dispatcher().post( - [this, trailers_copy]() -> void { encoder_.encodeTrailers(*trailers_copy); }); + parent_.connection().dispatcher().post([this, trailers_copy]() -> void { + { + absl::MutexLock lock(&lock_); + if (saw_reset_) { + // Encoded already deleted. + return; + } + } + encoder_.encodeTrailers(*trailers_copy); + }); } void FakeStream::encodeResetStream() { - parent_.connection().dispatcher().post( - [this]() -> void { encoder_.getStream().resetStream(Http::StreamResetReason::LocalReset); }); + parent_.connection().dispatcher().post([this]() -> void { + { + absl::MutexLock lock(&lock_); + if (saw_reset_) { + // Encoded already deleted. + return; + } + } + encoder_.getStream().resetStream(Http::StreamResetReason::LocalReset); + }); } void FakeStream::encodeMetadata(const Http::MetadataMapVector& metadata_map_vector) { - parent_.connection().dispatcher().post( - [this, &metadata_map_vector]() -> void { encoder_.encodeMetadata(metadata_map_vector); }); + parent_.connection().dispatcher().post([this, &metadata_map_vector]() -> void { + { + absl::MutexLock lock(&lock_); + if (saw_reset_) { + // Encoded already deleted. + return; + } + } + encoder_.encodeMetadata(metadata_map_vector); + }); } void FakeStream::readDisable(bool disable) { - parent_.connection().dispatcher().post( - [this, disable]() -> void { encoder_.getStream().readDisable(disable); }); + parent_.connection().dispatcher().post([this, disable]() -> void { + { + absl::MutexLock lock(&lock_); + if (saw_reset_) { + // Encoded already deleted. + return; + } + } + encoder_.getStream().readDisable(disable); + }); } void FakeStream::onResetStream(Http::StreamResetReason, absl::string_view) {