-
Notifications
You must be signed in to change notification settings - Fork 438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SDK] Implement Forceflush for Periodic Metric Reader #2064
Changes from 12 commits
17a70e9
85fd20a
1e96f20
2e24554
df46084
57dac9c
fdaf848
9ff3d23
94315f3
3a86962
5f9c783
5973ec7
ea5ca54
450cd97
8a3eb55
1b1bce9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -60,14 +60,19 @@ void PeriodicExportingMetricReader::DoBackgroundWork() | |||||
auto end = std::chrono::steady_clock::now(); | ||||||
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start); | ||||||
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; | ||||||
cv_.wait_for(lk, remaining_wait_interval_ms); | ||||||
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { | ||||||
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) | ||||||
{ | ||||||
is_force_wakeup_background_worker_.store(false, std::memory_order_release); | ||||||
return true; | ||||||
} | ||||||
if (IsShutdown()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed. thanks |
||||||
{ | ||||||
return true; | ||||||
} | ||||||
return false; | ||||||
}); | ||||||
} while (IsShutdown() != true); | ||||||
// One last Collect and Export before shutdown | ||||||
auto status = CollectAndExportOnce(); | ||||||
if (!status) | ||||||
{ | ||||||
OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.") | ||||||
} | ||||||
} | ||||||
|
||||||
bool PeriodicExportingMetricReader::CollectAndExportOnce() | ||||||
|
@@ -86,6 +91,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() | |||||
return true; | ||||||
}); | ||||||
}); | ||||||
|
||||||
std::future_status status; | ||||||
do | ||||||
{ | ||||||
|
@@ -96,12 +102,103 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() | |||||
break; | ||||||
} | ||||||
} while (status != std::future_status::ready); | ||||||
bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel); | ||||||
if (notify_force_flush) | ||||||
{ | ||||||
is_force_flush_notified_.store(true, std::memory_order_release); | ||||||
force_flush_cv_.notify_one(); | ||||||
} | ||||||
|
||||||
return true; | ||||||
} | ||||||
|
||||||
bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept | ||||||
{ | ||||||
return exporter_->ForceFlush(timeout); | ||||||
std::unique_lock<std::mutex> lk_cv(force_flush_m_); | ||||||
is_force_flush_pending_.store(true, std::memory_order_release); | ||||||
auto break_condition = [this]() { | ||||||
if (IsShutdown()) | ||||||
{ | ||||||
return true; | ||||||
} | ||||||
|
||||||
// Wake up the worker thread once. | ||||||
if (is_force_flush_pending_.load(std::memory_order_acquire)) | ||||||
{ | ||||||
is_force_wakeup_background_worker_.store(true, std::memory_order_release); | ||||||
cv_.notify_one(); | ||||||
} | ||||||
return is_force_flush_notified_.load(std::memory_order_acquire); | ||||||
}; | ||||||
|
||||||
auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( | ||||||
timeout, std::chrono::microseconds::zero()); | ||||||
std::chrono::steady_clock::duration timeout_steady = | ||||||
std::chrono::duration_cast<std::chrono::steady_clock::duration>(wait_timeout); | ||||||
if (timeout_steady <= std::chrono::steady_clock::duration::zero()) | ||||||
{ | ||||||
timeout_steady = std::chrono::steady_clock::duration::max(); | ||||||
} | ||||||
|
||||||
bool result = false; | ||||||
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) | ||||||
{ | ||||||
// When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called | ||||||
// between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait | ||||||
// for ever | ||||||
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); | ||||||
result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition); | ||||||
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; | ||||||
} | ||||||
|
||||||
// If it will be already signaled, we must wait util notified. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed. thanks. |
||||||
// We use a spin lock here | ||||||
if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel)) | ||||||
{ | ||||||
for (int retry_waiting_times = 0; | ||||||
false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times) | ||||||
{ | ||||||
opentelemetry::common::SpinLockMutex::fast_yield(); | ||||||
if ((retry_waiting_times & 127) == 127) | ||||||
{ | ||||||
std::this_thread::yield(); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
is_force_flush_notified_.store(false, std::memory_order_release); | ||||||
|
||||||
if (timeout == std::chrono::steady_clock::duration::zero()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need remove this if branch?It does nothing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes removed thanks. |
||||||
{ | ||||||
} | ||||||
if (timeout_steady <= std::chrono::steady_clock::duration::zero()) | ||||||
{ | ||||||
// forceflush timeout, exporter force-flush won't be triggered. | ||||||
result = false; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need reset result to false here?I think result will keep false and break the loop above when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes the while loop for |
||||||
} | ||||||
|
||||||
if (result) | ||||||
{ | ||||||
// - If original `timeout` is `zero`, use that in exporter::forceflush | ||||||
// - Else if remaining `timeout_steady` more than zero, use that in exporter::forceflush | ||||||
// - Else don't invoke exporter::forceflush ( as remaining time is zero or less) | ||||||
if (timeout <= std::chrono::steady_clock::duration::zero()) | ||||||
{ | ||||||
result = | ||||||
exporter_->ForceFlush(std::chrono::duration_cast<std::chrono::microseconds>(timeout)); | ||||||
} | ||||||
else if (timeout_steady > std::chrono::steady_clock::duration::zero()) | ||||||
{ | ||||||
result = exporter_->ForceFlush( | ||||||
std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady)); | ||||||
} | ||||||
else | ||||||
{ | ||||||
// remaining timeout_steady is zero or less | ||||||
result = false; | ||||||
} | ||||||
} | ||||||
return result; | ||||||
} | ||||||
|
||||||
bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can store fail then false should be returned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if it can ever fail. It's atomic assignment operation so should eventually be successful. Also, this method returns
void
, not sure how to get any failure status from this method.Let me know if I am missing something :)