Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect and Export metric data before PeriodicMetricReader shutdown. #1860

Merged
merged 10 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class PeriodicExportingMetricReader : public MetricReader
std::chrono::milliseconds export_timeout_millis_;

void DoBackgroundWork();
bool CollectAndExportOnce();

/* The background worker thread */
std::thread worker_thread_;
Expand Down
68 changes: 40 additions & 28 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,40 +51,52 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
std::unique_lock<std::mutex> lk(cv_m_);
do
{
if (IsShutdown())
auto start = std::chrono::steady_clock::now();
auto status = CollectAndExportOnce();
if (!status)
{
break;
OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.")
}
std::atomic<bool> cancel_export_for_timeout{false};
auto start = std::chrono::steady_clock::now();
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
if (cancel_export_for_timeout)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(metric_data);
return true;
});
});
std::future_status status;
do
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
{
cancel_export_for_timeout = true;
break;
}
} while (status != std::future_status::ready);
auto end = std::chrono::steady_clock::now();
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
cv_.wait_for(lk, remaining_wait_interval_ms);
} while (true);
} while (IsShutdown() != true);
// One last Collect and Export before shutdown
auto status = CollectAndExportOnce();
if (!status)
{
OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.")
}
}

bool PeriodicExportingMetricReader::CollectAndExportOnce()
{
std::atomic<bool> cancel_export_for_timeout{false};
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
if (cancel_export_for_timeout)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(metric_data);
return true;
});
});
std::future_status status;
do
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
{
cancel_export_for_timeout = true;
break;
}
} while (status != std::future_status::ready);
return true;
}

bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
Expand Down
4 changes: 3 additions & 1 deletion sdk/src/metrics/metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ bool MetricReader::Collect(
OTEL_INTERNAL_LOG_WARN(
"MetricReader::Collect Cannot invoke Collect(). No MetricProducer registered for "
"collection!")
return false;
}
if (IsShutdown())
{
OTEL_INTERNAL_LOG_WARN("MetricReader::Collect Cannot invoke Collect(). Shutdown in progress!");
// Continue with warning, and let pull and push MetricReader state machine handle this.
OTEL_INTERNAL_LOG_WARN("MetricReader::Collect invoked while Shutdown in progress!");
}

return metric_producer_->Collect(callback);
Expand Down