Skip to content

Commit

Permalink
[SDK] Fix forceflush may wait for ever (open-telemetry#2584)
Browse files Browse the repository at this point in the history
Co-authored-by: Marc Alff <[email protected]>
Co-authored-by: Tom Tan <[email protected]>
  • Loading branch information
3 people authored May 27, 2024
1 parent 78d488c commit 605c3e8
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 123 deletions.
15 changes: 10 additions & 5 deletions sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,25 @@ class BatchLogRecordProcessor : public LogRecordProcessor

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker{false};
std::atomic<bool> is_force_flush_pending{false};
std::atomic<bool> is_force_flush_notified{false};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
std::atomic<bool> is_shutdown{false};
std::atomic<uint64_t> force_flush_pending_sequence{0};
std::atomic<uint64_t> force_flush_notified_sequence{0};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};

// Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs
// and may not initialize the member correctly. See also
// https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be
inline SynchronizationData() {}
};

/**
* @brief Notify completion of shutdown and force flush. This may be called from the any thread at
* any time
*
* @param notify_force_flush Flag to indicate whether to notify force flush completion.
* @param notify_force_flush Sequence to indicate whether to notify force flush completion.
* @param synchronization_data Synchronization data to be notified.
*/
static void NotifyCompletion(bool notify_force_flush,
static void NotifyCompletion(uint64_t notify_force_flush,
const std::unique_ptr<LogRecordExporter> &exporter,
const std::shared_ptr<SynchronizationData> &synchronization_data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <thread>
Expand Down Expand Up @@ -50,9 +51,9 @@ class PeriodicExportingMetricReader : public MetricReader
std::thread worker_thread_;

/* Synchronization primitives */
std::atomic<bool> is_force_flush_pending_{false};
std::atomic<bool> is_force_wakeup_background_worker_{false};
std::atomic<bool> is_force_flush_notified_{false};
std::atomic<uint64_t> force_flush_pending_sequence_{0};
std::atomic<uint64_t> force_flush_notified_sequence_{0};
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_m_;
};
Expand Down
16 changes: 11 additions & 5 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <thread>
Expand Down Expand Up @@ -115,20 +116,25 @@ class BatchSpanProcessor : public SpanProcessor

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker{false};
std::atomic<bool> is_force_flush_pending{false};
std::atomic<bool> is_force_flush_notified{false};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
std::atomic<bool> is_shutdown{false};
std::atomic<uint64_t> force_flush_pending_sequence{0};
std::atomic<uint64_t> force_flush_notified_sequence{0};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};

// Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs
// and may not initialize the member correctly. See also
// https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be
inline SynchronizationData() {}
};

/**
* @brief Notify completion of shutdown and force flush. This may be called from the any thread at
* any time
*
* @param notify_force_flush Flag to indicate whether to notify force flush completion.
* @param notify_force_flush Sequence to indicate whether to notify force flush completion.
* @param synchronization_data Synchronization data to be notified.
*/
static void NotifyCompletion(bool notify_force_flush,
static void NotifyCompletion(uint64_t notify_force_flush,
const std::unique_ptr<SpanExporter> &exporter,
const std::shared_ptr<SynchronizationData> &synchronization_data);

Expand Down
74 changes: 37 additions & 37 deletions sdk/src/logs/batch_log_record_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void BatchLogRecordProcessor::OnEmit(std::unique_ptr<Recordable> &&record) noexc
{
// signal the worker thread
synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release);
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
}
}

Expand All @@ -79,21 +79,25 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
// Now wait for the worker thread to signal back from the Export method
std::unique_lock<std::mutex> lk_cv(synchronization_data_->force_flush_cv_m);

synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release);
std::uint64_t current_sequence =
synchronization_data_->force_flush_pending_sequence.fetch_add(1, std::memory_order_release) +
1;
synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release);
auto break_condition = [this]() {
auto break_condition = [this, current_sequence]() {
if (synchronization_data_->is_shutdown.load() == true)
{
return true;
}

// Wake up the worker thread once.
if (synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire))
if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) >
synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire))
{
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
}

return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire);
return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >=
current_sequence;
};

// Fix timeout to meet requirement of wait_for
Expand All @@ -110,35 +114,22 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
bool result = false;
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
{
// When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
// between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
// for ever
// When force_flush_notified_sequence.compare_exchange_strong(...) and
// force_flush_cv.notify_all() is called between force_flush_pending_sequence.load(...) and
// force_flush_cv.wait(). We must not wait for ever
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_,
break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}
std::chrono::microseconds wait_timeout = scheduled_delay_millis_;

// If it's already signaled, we must wait util notified.
// We use a spin lock here
if (false ==
synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel))
{
for (int retry_waiting_times = 0;
false == synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire);
++retry_waiting_times)
if (wait_timeout > timeout_steady)
{
opentelemetry::common::SpinLockMutex::fast_yield();
if ((retry_waiting_times & 127) == 127)
{
std::this_thread::yield();
}
wait_timeout = std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady);
}
result = synchronization_data_->force_flush_cv.wait_for(lk_cv, wait_timeout, break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}

synchronization_data_->is_force_flush_notified.store(false, std::memory_order_release);

return result;
return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >=
current_sequence;
}

void BatchLogRecordProcessor::DoBackgroundWork()
Expand Down Expand Up @@ -182,8 +173,8 @@ void BatchLogRecordProcessor::Export()
{
std::vector<std::unique_ptr<Recordable>> records_arr;
size_t num_records_to_export;
bool notify_force_flush =
synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel);
std::uint64_t notify_force_flush =
synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire);
if (notify_force_flush)
{
num_records_to_export = buffer_.size();
Expand Down Expand Up @@ -217,7 +208,7 @@ void BatchLogRecordProcessor::Export()
}

void BatchLogRecordProcessor::NotifyCompletion(
bool notify_force_flush,
std::uint64_t notify_force_flush,
const std::unique_ptr<LogRecordExporter> &exporter,
const std::shared_ptr<SynchronizationData> &synchronization_data)
{
Expand All @@ -226,7 +217,8 @@ void BatchLogRecordProcessor::NotifyCompletion(
return;
}

if (notify_force_flush)
if (notify_force_flush >
synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire))
{
if (exporter)
{
Expand All @@ -236,8 +228,15 @@ void BatchLogRecordProcessor::NotifyCompletion(
std::chrono::microseconds::zero());
exporter->ForceFlush(timeout);
}
synchronization_data->is_force_flush_notified.store(true, std::memory_order_release);
synchronization_data->force_flush_cv.notify_one();

std::uint64_t notified_sequence =
synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire);
while (notify_force_flush > notified_sequence)
{
synchronization_data->force_flush_notified_sequence.compare_exchange_strong(
notified_sequence, notify_force_flush, std::memory_order_acq_rel);
synchronization_data->force_flush_cv.notify_all();
}
}
}

Expand All @@ -246,7 +245,8 @@ void BatchLogRecordProcessor::DrainQueue()
while (true)
{
if (buffer_.empty() &&
false == synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire))
synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) <=
synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire))
{
break;
}
Expand Down Expand Up @@ -285,7 +285,7 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce
if (worker_thread_.joinable())
{
synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release);
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
worker_thread_.join();
}

Expand Down
66 changes: 29 additions & 37 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
});

std::future_status status;
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire);
do
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
Expand All @@ -99,12 +100,13 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
break;
}
} while (status != std::future_status::ready);
bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel);
if (notify_force_flush)

std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire);
while (notify_force_flush > notified_sequence)
{
std::unique_lock<std::mutex> lk(force_flush_m_);
is_force_flush_notified_.store(true, std::memory_order_release);
force_flush_cv_.notify_one();
force_flush_notified_sequence_.compare_exchange_strong(notified_sequence, notify_force_flush,
std::memory_order_acq_rel);
force_flush_cv_.notify_all();
}

return true;
Expand All @@ -113,24 +115,27 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
std::unique_lock<std::mutex> lk_cv(force_flush_m_);
is_force_flush_pending_.store(true, std::memory_order_release);
auto break_condition = [this]() {
std::uint64_t current_sequence =
force_flush_pending_sequence_.fetch_add(1, std::memory_order_release) + 1;
auto break_condition = [this, current_sequence]() {
if (IsShutdown())
{
return true;
}

// Wake up the worker thread once.
if (is_force_flush_pending_.load(std::memory_order_acquire))
// Wake up the worker thread.
if (force_flush_pending_sequence_.load(std::memory_order_acquire) >
force_flush_notified_sequence_.load(std::memory_order_acquire))
{
is_force_wakeup_background_worker_.store(true, std::memory_order_release);
cv_.notify_one();
cv_.notify_all();
}
return is_force_flush_notified_.load(std::memory_order_acquire);
return force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence;
};

auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());
std::chrono::microseconds wait_timeout =
opentelemetry::common::DurationUtil::AdjustWaitForTimeout(timeout,
std::chrono::microseconds::zero());
std::chrono::steady_clock::duration timeout_steady =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(wait_timeout);
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
Expand All @@ -141,29 +146,19 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo
bool result = false;
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
{
// When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called
// between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait
// for ever
// When force_flush_notified_sequence_.compare_exchange_strong(...) and
// force_flush_cv_.notify_all() is called between force_flush_pending_sequence_.load(...) and
// force_flush_cv_.wait(). We must not wait for ever
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}

// If it will be already signaled, we must wait until notified.
// We use a spin lock here
if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel))
{
for (int retry_waiting_times = 0;
false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times)
wait_timeout = export_interval_millis_;
if (wait_timeout > timeout_steady)
{
opentelemetry::common::SpinLockMutex::fast_yield();
if ((retry_waiting_times & 127) == 127)
{
std::this_thread::yield();
}
wait_timeout = std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady);
}
result = force_flush_cv_.wait_for(lk_cv, wait_timeout, break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}
is_force_flush_notified_.store(false, std::memory_order_release);

if (result)
{
Expand All @@ -186,18 +181,15 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo
result = false;
}
}
return result;
return result &&
force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence;
}

bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
{
if (worker_thread_.joinable())
{
{
// ensure that `cv_` is awaiting, and the update doesn't get lost
std::unique_lock<std::mutex> lk(cv_m_);
cv_.notify_all();
}
cv_.notify_all();
worker_thread_.join();
}
return exporter_->Shutdown(timeout);
Expand Down
Loading

0 comments on commit 605c3e8

Please sign in to comment.