Skip to content

Commit

Permalink
[Metrics SDK] Implement Forceflush for Periodic Metric Reader (#2064)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Mar 27, 2023
1 parent 380f0f2 commit bd7a3c7
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ class PeriodicExportingMetricReader : public MetricReader
std::thread worker_thread_;

/* Synchronization primitives */
std::condition_variable cv_;
std::mutex cv_m_;
std::atomic<bool> is_force_flush_pending_;
std::atomic<bool> is_force_wakeup_background_worker_;
std::atomic<bool> is_force_flush_notified_;
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_m_;
};

} // namespace metrics
Expand Down
99 changes: 91 additions & 8 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
auto end = std::chrono::steady_clock::now();
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
cv_.wait_for(lk, remaining_wait_interval_ms);
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() {
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire))
{
is_force_wakeup_background_worker_.store(false, std::memory_order_release);
return true;
}
return IsShutdown();
});
} while (IsShutdown() != true);
// One last Collect and Export before shutdown
auto status = CollectAndExportOnce();
if (!status)
{
OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.")
}
}

bool PeriodicExportingMetricReader::CollectAndExportOnce()
Expand All @@ -86,6 +87,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
return true;
});
});

std::future_status status;
do
{
Expand All @@ -96,12 +98,93 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
break;
}
} while (status != std::future_status::ready);
bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel);
if (notify_force_flush)
{
is_force_flush_notified_.store(true, std::memory_order_release);
force_flush_cv_.notify_one();
}

return true;
}

bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
return exporter_->ForceFlush(timeout);
std::unique_lock<std::mutex> lk_cv(force_flush_m_);
is_force_flush_pending_.store(true, std::memory_order_release);
auto break_condition = [this]() {
if (IsShutdown())
{
return true;
}

// Wake up the worker thread once.
if (is_force_flush_pending_.load(std::memory_order_acquire))
{
is_force_wakeup_background_worker_.store(true, std::memory_order_release);
cv_.notify_one();
}
return is_force_flush_notified_.load(std::memory_order_acquire);
};

auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());
std::chrono::steady_clock::duration timeout_steady =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(wait_timeout);
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
{
timeout_steady = std::chrono::steady_clock::duration::max();
}

bool result = false;
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
{
// When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called
// between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait
// for ever
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}

// If it will be already signaled, we must wait until notified.
// We use a spin lock here
if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel))
{
for (int retry_waiting_times = 0;
false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times)
{
opentelemetry::common::SpinLockMutex::fast_yield();
if ((retry_waiting_times & 127) == 127)
{
std::this_thread::yield();
}
}
}
is_force_flush_notified_.store(false, std::memory_order_release);

if (result)
{
// - If original `timeout` is `zero`, use that in exporter::forceflush
// - Else if remaining `timeout_steady` more than zero, use that in exporter::forceflush
// - Else don't invoke exporter::forceflush ( as remaining time is zero or less)
if (timeout <= std::chrono::steady_clock::duration::zero())
{
result =
exporter_->ForceFlush(std::chrono::duration_cast<std::chrono::microseconds>(timeout));
}
else if (timeout_steady > std::chrono::steady_clock::duration::zero())
{
result = exporter_->ForceFlush(
std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady));
}
else
{
// remaining timeout_steady is zero or less
result = false;
}
}
return result;
}

bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
Expand Down
1 change: 1 addition & 0 deletions sdk/test/metrics/periodic_exporting_metric_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ TEST(PeriodicExporingMetricReader, BasicTests)
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
EXPECT_NO_THROW(reader.ForceFlush());
reader.Shutdown();
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
Expand Down

0 comments on commit bd7a3c7

Please sign in to comment.