From 605c3e8ea7307dbc8d2c54db2587476cc6a0cbb7 Mon Sep 17 00:00:00 2001 From: WenTao Ou Date: Tue, 28 May 2024 03:50:01 +0800 Subject: [PATCH] [SDK] Fix forceflush may wait for ever (#2584) Co-authored-by: Marc Alff Co-authored-by: Tom Tan --- .../sdk/logs/batch_log_record_processor.h | 15 ++-- .../export/periodic_exporting_metric_reader.h | 5 +- .../sdk/trace/batch_span_processor.h | 16 ++-- sdk/src/logs/batch_log_record_processor.cc | 74 +++++++++--------- .../periodic_exporting_metric_reader.cc | 66 +++++++--------- sdk/src/trace/batch_span_processor.cc | 75 ++++++++++--------- 6 files changed, 128 insertions(+), 123 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h index d6a44df142..01a39348e2 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h @@ -115,20 +115,25 @@ class BatchLogRecordProcessor : public LogRecordProcessor /* Important boolean flags to handle the workflow of the processor */ std::atomic is_force_wakeup_background_worker{false}; - std::atomic is_force_flush_pending{false}; - std::atomic is_force_flush_notified{false}; - std::atomic force_flush_timeout_us{0}; std::atomic is_shutdown{false}; + std::atomic force_flush_pending_sequence{0}; + std::atomic force_flush_notified_sequence{0}; + std::atomic force_flush_timeout_us{0}; + + // Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs + // and may not initialize the member correctly. See also + // https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be + inline SynchronizationData() {} }; /** * @brief Notify completion of shutdown and force flush. This may be called from the any thread at * any time * - * @param notify_force_flush Flag to indicate whether to notify force flush completion. + * @param notify_force_flush Sequence to indicate whether to notify force flush completion. * @param synchronization_data Synchronization data to be notified. */ - static void NotifyCompletion(bool notify_force_flush, + static void NotifyCompletion(uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data); diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index b9221193fb..9fb0fbf5a0 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -50,9 +51,9 @@ class PeriodicExportingMetricReader : public MetricReader std::thread worker_thread_; /* Synchronization primitives */ - std::atomic is_force_flush_pending_{false}; std::atomic is_force_wakeup_background_worker_{false}; - std::atomic is_force_flush_notified_{false}; + std::atomic force_flush_pending_sequence_{0}; + std::atomic force_flush_notified_sequence_{0}; std::condition_variable cv_, force_flush_cv_; std::mutex cv_m_, force_flush_m_; }; diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index afbf4486b0..e23b0a2df2 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -115,20 +116,25 @@ class BatchSpanProcessor : public SpanProcessor /* Important boolean flags to handle the workflow of the processor */ std::atomic is_force_wakeup_background_worker{false}; - std::atomic is_force_flush_pending{false}; - std::atomic is_force_flush_notified{false}; - std::atomic force_flush_timeout_us{0}; std::atomic is_shutdown{false}; + std::atomic force_flush_pending_sequence{0}; + std::atomic force_flush_notified_sequence{0}; + std::atomic force_flush_timeout_us{0}; + + // Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs + // and may not initialize the member correctly. See also + // https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be + inline SynchronizationData() {} }; /** * @brief Notify completion of shutdown and force flush. This may be called from the any thread at * any time * - * @param notify_force_flush Flag to indicate whether to notify force flush completion. + * @param notify_force_flush Sequence to indicate whether to notify force flush completion. * @param synchronization_data Synchronization data to be notified. */ - static void NotifyCompletion(bool notify_force_flush, + static void NotifyCompletion(uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data); diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc index 44e623e4f7..c29e27bdd3 100644 --- a/sdk/src/logs/batch_log_record_processor.cc +++ b/sdk/src/logs/batch_log_record_processor.cc @@ -65,7 +65,7 @@ void BatchLogRecordProcessor::OnEmit(std::unique_ptr &&record) noexc { // signal the worker thread synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } } @@ -79,21 +79,25 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex // Now wait for the worker thread to signal back from the Export method std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); - synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release); + std::uint64_t current_sequence = + synchronization_data_->force_flush_pending_sequence.fetch_add(1, std::memory_order_release) + + 1; synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release); - auto break_condition = [this]() { + auto break_condition = [this, current_sequence]() { if (synchronization_data_->is_shutdown.load() == true) { return true; } // Wake up the worker thread once. - if (synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire)) + if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) > + synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } - return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); + return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= + current_sequence; }; // Fix timeout to meet requirement of wait_for @@ -110,35 +114,22 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex bool result = false; while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called - // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait - // for ever + // When force_flush_notified_sequence.compare_exchange_strong(...) and + // force_flush_cv.notify_all() is called between force_flush_pending_sequence.load(...) and + // force_flush_cv.wait(). We must not wait for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); - result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_, - break_condition); - timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; - } + std::chrono::microseconds wait_timeout = scheduled_delay_millis_; - // If it's already signaled, we must wait util notified. - // We use a spin lock here - if (false == - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel)) - { - for (int retry_waiting_times = 0; - false == synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); - ++retry_waiting_times) + if (wait_timeout > timeout_steady) { - opentelemetry::common::SpinLockMutex::fast_yield(); - if ((retry_waiting_times & 127) == 127) - { - std::this_thread::yield(); - } + wait_timeout = std::chrono::duration_cast(timeout_steady); } + result = synchronization_data_->force_flush_cv.wait_for(lk_cv, wait_timeout, break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } - synchronization_data_->is_force_flush_notified.store(false, std::memory_order_release); - - return result; + return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= + current_sequence; } void BatchLogRecordProcessor::DoBackgroundWork() @@ -182,8 +173,8 @@ void BatchLogRecordProcessor::Export() { std::vector> records_arr; size_t num_records_to_export; - bool notify_force_flush = - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel); + std::uint64_t notify_force_flush = + synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire); if (notify_force_flush) { num_records_to_export = buffer_.size(); @@ -217,7 +208,7 @@ void BatchLogRecordProcessor::Export() } void BatchLogRecordProcessor::NotifyCompletion( - bool notify_force_flush, + std::uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data) { @@ -226,7 +217,8 @@ void BatchLogRecordProcessor::NotifyCompletion( return; } - if (notify_force_flush) + if (notify_force_flush > + synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire)) { if (exporter) { @@ -236,8 +228,15 @@ void BatchLogRecordProcessor::NotifyCompletion( std::chrono::microseconds::zero()); exporter->ForceFlush(timeout); } - synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); - synchronization_data->force_flush_cv.notify_one(); + + std::uint64_t notified_sequence = + synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire); + while (notify_force_flush > notified_sequence) + { + synchronization_data->force_flush_notified_sequence.compare_exchange_strong( + notified_sequence, notify_force_flush, std::memory_order_acq_rel); + synchronization_data->force_flush_cv.notify_all(); + } } } @@ -246,7 +245,8 @@ void BatchLogRecordProcessor::DrainQueue() while (true) { if (buffer_.empty() && - false == synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire)) + synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) <= + synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { break; } @@ -285,7 +285,7 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce if (worker_thread_.joinable()) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); worker_thread_.join(); } diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index b79b8a32e6..1de1ea71c8 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -90,6 +90,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() }); std::future_status status; + std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire); do { status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); @@ -99,12 +100,13 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() break; } } while (status != std::future_status::ready); - bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel); - if (notify_force_flush) + + std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire); + while (notify_force_flush > notified_sequence) { - std::unique_lock lk(force_flush_m_); - is_force_flush_notified_.store(true, std::memory_order_release); - force_flush_cv_.notify_one(); + force_flush_notified_sequence_.compare_exchange_strong(notified_sequence, notify_force_flush, + std::memory_order_acq_rel); + force_flush_cv_.notify_all(); } return true; @@ -113,24 +115,27 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept { std::unique_lock lk_cv(force_flush_m_); - is_force_flush_pending_.store(true, std::memory_order_release); - auto break_condition = [this]() { + std::uint64_t current_sequence = + force_flush_pending_sequence_.fetch_add(1, std::memory_order_release) + 1; + auto break_condition = [this, current_sequence]() { if (IsShutdown()) { return true; } - // Wake up the worker thread once. - if (is_force_flush_pending_.load(std::memory_order_acquire)) + // Wake up the worker thread. + if (force_flush_pending_sequence_.load(std::memory_order_acquire) > + force_flush_notified_sequence_.load(std::memory_order_acquire)) { is_force_wakeup_background_worker_.store(true, std::memory_order_release); - cv_.notify_one(); + cv_.notify_all(); } - return is_force_flush_notified_.load(std::memory_order_acquire); + return force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence; }; - auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( - timeout, std::chrono::microseconds::zero()); + std::chrono::microseconds wait_timeout = + opentelemetry::common::DurationUtil::AdjustWaitForTimeout(timeout, + std::chrono::microseconds::zero()); std::chrono::steady_clock::duration timeout_steady = std::chrono::duration_cast(wait_timeout); if (timeout_steady <= std::chrono::steady_clock::duration::zero()) @@ -141,29 +146,19 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo bool result = false; while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - // When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called - // between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait - // for ever + // When force_flush_notified_sequence_.compare_exchange_strong(...) and + // force_flush_cv_.notify_all() is called between force_flush_pending_sequence_.load(...) and + // force_flush_cv_.wait(). We must not wait for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); - result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition); - timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; - } - // If it will be already signaled, we must wait until notified. - // We use a spin lock here - if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel)) - { - for (int retry_waiting_times = 0; - false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times) + wait_timeout = export_interval_millis_; + if (wait_timeout > timeout_steady) { - opentelemetry::common::SpinLockMutex::fast_yield(); - if ((retry_waiting_times & 127) == 127) - { - std::this_thread::yield(); - } + wait_timeout = std::chrono::duration_cast(timeout_steady); } + result = force_flush_cv_.wait_for(lk_cv, wait_timeout, break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } - is_force_flush_notified_.store(false, std::memory_order_release); if (result) { @@ -186,18 +181,15 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo result = false; } } - return result; + return result && + force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence; } bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept { if (worker_thread_.joinable()) { - { - // ensure that `cv_` is awaiting, and the update doesn't get lost - std::unique_lock lk(cv_m_); - cv_.notify_all(); - } + cv_.notify_all(); worker_thread_.join(); } return exporter_->Shutdown(timeout); diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index d5b96f2cc8..3827fad495 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -62,7 +62,7 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr &&span) noexcept if (buffer_size >= max_queue_size_ / 2 || buffer_size >= max_export_batch_size_) { // signal the worker thread - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } } @@ -76,23 +76,27 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept // Now wait for the worker thread to signal back from the Export method std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); - synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release); + std::uint64_t current_sequence = + synchronization_data_->force_flush_pending_sequence.fetch_add(1, std::memory_order_release) + + 1; synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release); - auto break_condition = [this]() { + auto break_condition = [this, current_sequence]() { if (synchronization_data_->is_shutdown.load() == true) { return true; } - // Wake up the worker thread once. - if (synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire)) + // Wake up the worker thread. + if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) > + synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } - return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); + return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= + current_sequence; }; // Fix timeout to meet requirement of wait_for @@ -108,34 +112,23 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept bool result = false; while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called - // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait - // for ever + // When force_flush_notified_sequence.compare_exchange_strong(...) and + // force_flush_cv.notify_all() is called between force_flush_pending_sequence.load(...) and + // force_flush_cv.wait(). We must not wait for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); - result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_, - break_condition); - timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; - } - // If it will be already signaled, we must wait util notified. - // We use a spin lock here - if (false == - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel)) - { - for (int retry_waiting_times = 0; - false == synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); - ++retry_waiting_times) + std::chrono::microseconds wait_timeout = schedule_delay_millis_; + + if (wait_timeout > timeout_steady) { - opentelemetry::common::SpinLockMutex::fast_yield(); - if ((retry_waiting_times & 127) == 127) - { - std::this_thread::yield(); - } + wait_timeout = std::chrono::duration_cast(timeout_steady); } + result = synchronization_data_->force_flush_cv.wait_for(lk_cv, wait_timeout, break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } - synchronization_data_->is_force_flush_notified.store(false, std::memory_order_release); - return result; + return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= + current_sequence; } void BatchSpanProcessor::DoBackgroundWork() @@ -179,8 +172,8 @@ void BatchSpanProcessor::Export() { std::vector> spans_arr; size_t num_records_to_export; - bool notify_force_flush = - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel); + std::uint64_t notify_force_flush = + synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire); if (notify_force_flush) { num_records_to_export = buffer_.size(); @@ -212,7 +205,7 @@ void BatchSpanProcessor::Export() } void BatchSpanProcessor::NotifyCompletion( - bool notify_force_flush, + std::uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data) { @@ -221,7 +214,8 @@ void BatchSpanProcessor::NotifyCompletion( return; } - if (notify_force_flush) + if (notify_force_flush > + synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire)) { if (exporter) { @@ -232,8 +226,14 @@ void BatchSpanProcessor::NotifyCompletion( exporter->ForceFlush(timeout); } - synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); - synchronization_data->force_flush_cv.notify_one(); + std::uint64_t notified_sequence = + synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire); + while (notify_force_flush > notified_sequence) + { + synchronization_data->force_flush_notified_sequence.compare_exchange_strong( + notified_sequence, notify_force_flush, std::memory_order_acq_rel); + synchronization_data->force_flush_cv.notify_all(); + } } } @@ -242,7 +242,8 @@ void BatchSpanProcessor::DrainQueue() while (true) { if (buffer_.empty() && - false == synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire)) + synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) <= + synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { break; } @@ -280,7 +281,7 @@ bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept if (worker_thread_.joinable()) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); worker_thread_.join(); }