Skip to content

Commit bd7a3c7

Browse files
authored
[Metrics SDK] Implement Forceflush for Periodic Metric Reader (open-telemetry#2064)
1 parent 380f0f2 commit bd7a3c7

File tree

3 files changed

+97
-10
lines changed

3 files changed

+97
-10
lines changed

sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,11 @@ class PeriodicExportingMetricReader : public MetricReader
6363
std::thread worker_thread_;
6464

6565
/* Synchronization primitives */
66-
std::condition_variable cv_;
67-
std::mutex cv_m_;
66+
std::atomic<bool> is_force_flush_pending_;
67+
std::atomic<bool> is_force_wakeup_background_worker_;
68+
std::atomic<bool> is_force_flush_notified_;
69+
std::condition_variable cv_, force_flush_cv_;
70+
std::mutex cv_m_, force_flush_m_;
6871
};
6972

7073
} // namespace metrics

sdk/src/metrics/export/periodic_exporting_metric_reader.cc

+91-8
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,15 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
6060
auto end = std::chrono::steady_clock::now();
6161
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
6262
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
63-
cv_.wait_for(lk, remaining_wait_interval_ms);
63+
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() {
64+
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire))
65+
{
66+
is_force_wakeup_background_worker_.store(false, std::memory_order_release);
67+
return true;
68+
}
69+
return IsShutdown();
70+
});
6471
} while (IsShutdown() != true);
65-
// One last Collect and Export before shutdown
66-
auto status = CollectAndExportOnce();
67-
if (!status)
68-
{
69-
OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.")
70-
}
7172
}
7273

7374
bool PeriodicExportingMetricReader::CollectAndExportOnce()
@@ -86,6 +87,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
8687
return true;
8788
});
8889
});
90+
8991
std::future_status status;
9092
do
9193
{
@@ -96,12 +98,93 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
9698
break;
9799
}
98100
} while (status != std::future_status::ready);
101+
bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel);
102+
if (notify_force_flush)
103+
{
104+
is_force_flush_notified_.store(true, std::memory_order_release);
105+
force_flush_cv_.notify_one();
106+
}
107+
99108
return true;
100109
}
101110

102111
bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
103112
{
104-
return exporter_->ForceFlush(timeout);
113+
std::unique_lock<std::mutex> lk_cv(force_flush_m_);
114+
is_force_flush_pending_.store(true, std::memory_order_release);
115+
auto break_condition = [this]() {
116+
if (IsShutdown())
117+
{
118+
return true;
119+
}
120+
121+
// Wake up the worker thread once.
122+
if (is_force_flush_pending_.load(std::memory_order_acquire))
123+
{
124+
is_force_wakeup_background_worker_.store(true, std::memory_order_release);
125+
cv_.notify_one();
126+
}
127+
return is_force_flush_notified_.load(std::memory_order_acquire);
128+
};
129+
130+
auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
131+
timeout, std::chrono::microseconds::zero());
132+
std::chrono::steady_clock::duration timeout_steady =
133+
std::chrono::duration_cast<std::chrono::steady_clock::duration>(wait_timeout);
134+
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
135+
{
136+
timeout_steady = std::chrono::steady_clock::duration::max();
137+
}
138+
139+
bool result = false;
140+
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
141+
{
142+
// When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called
143+
// between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait
144+
// for ever
145+
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
146+
result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition);
147+
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
148+
}
149+
150+
// If it will be already signaled, we must wait until notified.
151+
// We use a spin lock here
152+
if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel))
153+
{
154+
for (int retry_waiting_times = 0;
155+
false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times)
156+
{
157+
opentelemetry::common::SpinLockMutex::fast_yield();
158+
if ((retry_waiting_times & 127) == 127)
159+
{
160+
std::this_thread::yield();
161+
}
162+
}
163+
}
164+
is_force_flush_notified_.store(false, std::memory_order_release);
165+
166+
if (result)
167+
{
168+
// - If original `timeout` is `zero`, use that in exporter::forceflush
169+
// - Else if remaining `timeout_steady` more than zero, use that in exporter::forceflush
170+
// - Else don't invoke exporter::forceflush ( as remaining time is zero or less)
171+
if (timeout <= std::chrono::steady_clock::duration::zero())
172+
{
173+
result =
174+
exporter_->ForceFlush(std::chrono::duration_cast<std::chrono::microseconds>(timeout));
175+
}
176+
else if (timeout_steady > std::chrono::steady_clock::duration::zero())
177+
{
178+
result = exporter_->ForceFlush(
179+
std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady));
180+
}
181+
else
182+
{
183+
// remaining timeout_steady is zero or less
184+
result = false;
185+
}
186+
}
187+
return result;
105188
}
106189

107190
bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept

sdk/test/metrics/periodic_exporting_metric_reader_test.cc

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ TEST(PeriodicExporingMetricReader, BasicTests)
7070
MockMetricProducer producer;
7171
reader.SetMetricProducer(&producer);
7272
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
73+
EXPECT_NO_THROW(reader.ForceFlush());
7374
reader.Shutdown();
7475
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
7576
static_cast<MockMetricProducer *>(&producer)->GetDataCount());

0 commit comments

Comments
 (0)