Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Implement Forceflush for Periodic Metric Reader #2064

Merged
merged 16 commits into from
Mar 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ class PeriodicExportingMetricReader : public MetricReader
std::thread worker_thread_;

/* Synchronization primitives */
std::condition_variable cv_;
std::mutex cv_m_;
std::atomic<bool> is_force_flush_pending_;
std::atomic<bool> is_force_wakeup_background_worker_;
std::atomic<bool> is_force_flush_notified_;
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_m_;
};

} // namespace metrics
Expand Down
103 changes: 95 additions & 8 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
auto end = std::chrono::steady_clock::now();
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
cv_.wait_for(lk, remaining_wait_interval_ms);
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() {
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire))
{
is_force_wakeup_background_worker_.store(false, std::memory_order_release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can store fail then false should be returned?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it can ever fail. It's atomic assignment operation so should eventually be successful. Also, this method returns void, not sure how to get any failure status from this method.
Let me know if I am missing something :)

return true;
}
if (IsShutdown())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return IsShutdown();.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. thanks

{
return true;
}
return false;
});
} while (IsShutdown() != true);
// One last Collect and Export before shutdown
auto status = CollectAndExportOnce();
if (!status)
{
OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.")
}
}

bool PeriodicExportingMetricReader::CollectAndExportOnce()
Expand All @@ -86,6 +91,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
return true;
});
});

std::future_status status;
do
{
Expand All @@ -96,12 +102,93 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
break;
}
} while (status != std::future_status::ready);
bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel);
if (notify_force_flush)
{
is_force_flush_notified_.store(true, std::memory_order_release);
force_flush_cv_.notify_one();
}

return true;
}

bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
return exporter_->ForceFlush(timeout);
std::unique_lock<std::mutex> lk_cv(force_flush_m_);
is_force_flush_pending_.store(true, std::memory_order_release);
auto break_condition = [this]() {
if (IsShutdown())
{
return true;
}

// Wake up the worker thread once.
if (is_force_flush_pending_.load(std::memory_order_acquire))
{
is_force_wakeup_background_worker_.store(true, std::memory_order_release);
cv_.notify_one();
}
return is_force_flush_notified_.load(std::memory_order_acquire);
};

auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());
std::chrono::steady_clock::duration timeout_steady =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(wait_timeout);
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
{
timeout_steady = std::chrono::steady_clock::duration::max();
}

bool result = false;
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
{
// When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called
// between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait
// for ever
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}

// If it will be already signaled, we must wait util notified.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If it will be already signaled, we must wait util notified.
// If it will be already signaled, we must wait until notified.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. thanks.

// We use a spin lock here
if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel))
{
for (int retry_waiting_times = 0;
false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times)
{
opentelemetry::common::SpinLockMutex::fast_yield();
if ((retry_waiting_times & 127) == 127)
{
std::this_thread::yield();
}
}
}
is_force_flush_notified_.store(false, std::memory_order_release);

if (result)
{
// - If original `timeout` is `zero`, use that in exporter::forceflush
// - Else if remaining `timeout_steady` more than zero, use that in exporter::forceflush
// - Else don't invoke exporter::forceflush ( as remaining time is zero or less)
if (timeout <= std::chrono::steady_clock::duration::zero())
{
result =
exporter_->ForceFlush(std::chrono::duration_cast<std::chrono::microseconds>(timeout));
}
else if (timeout_steady > std::chrono::steady_clock::duration::zero())
{
result = exporter_->ForceFlush(
std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady));
}
else
{
// remaining timeout_steady is zero or less
result = false;
}
}
return result;
}

bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ TEST(PeriodicExporingMetricReader, BasicTests)
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
EXPECT_NO_THROW(reader.ForceFlush());
reader.Shutdown();
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
Expand Down