Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ InstanceImpl::InstanceImpl(
: nullptr),
grpc_context_(store.symbolTable()), http_context_(store.symbolTable()),
router_context_(store.symbolTable()), process_context_(std::move(process_context)),
hooks_(hooks), server_contexts_(*this) {
hooks_(hooks), server_contexts_(*this), stats_flush_in_progress_(false) {
TRY_ASSERT_MAIN_THREAD {
if (!options.logPath().empty()) {
TRY_ASSERT_MAIN_THREAD {
Expand Down Expand Up @@ -206,6 +206,13 @@ void InstanceUtil::flushMetricsToSinks(const std::list<Stats::SinkPtr>& sinks, S
}

void InstanceImpl::flushStats() {
if (stats_flush_in_progress_) {
ENVOY_LOG(debug, "skipping stats flush as flush is already in progress");
server_stats_->dropped_stat_flushes_.inc();
return;
}

stats_flush_in_progress_ = true;
ENVOY_LOG(debug, "flushing stats");
// If Envoy is not fully initialized, workers will not be started and mergeHistograms
// completion callback is not called immediately. As a result of this server stats will
Expand Down Expand Up @@ -256,6 +263,8 @@ void InstanceImpl::flushStatsInternal() {
if (stat_flush_timer_ != nullptr) {
stat_flush_timer_->enableTimer(stats_config.flushInterval());
}

stats_flush_in_progress_ = false;
}

bool InstanceImpl::healthCheckFailed() { return !live_.load(); }
Expand Down
3 changes: 3 additions & 0 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct ServerCompilationSettingsStats {
COUNTER(envoy_bug_failures) \
COUNTER(dynamic_unknown_fields) \
COUNTER(static_unknown_fields) \
COUNTER(dropped_stat_flushes) \
GAUGE(concurrency, NeverImport) \
GAUGE(days_until_first_cert_expiring, Accumulate) \
GAUGE(seconds_until_first_ocsp_response_expiring, Accumulate) \
Expand Down Expand Up @@ -385,6 +386,8 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,

ServerFactoryContextImpl server_contexts_;

bool stats_flush_in_progress_ : 1;

template <class T>
class LifecycleCallbackHandle : public ServerLifecycleNotifier::Handle, RaiiListElement<T> {
public:
Expand Down
5 changes: 4 additions & 1 deletion test/integration/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,14 @@ class TestIsolatedStoreImpl : public StoreRoot {
void setHistogramSettings(HistogramSettingsConstPtr&&) override {}
void initializeThreading(Event::Dispatcher&, ThreadLocal::Instance&) override {}
void shutdownThreading() override {}
void mergeHistograms(PostMergeCb) override {}
void mergeHistograms(PostMergeCb cb) override { merge_cb_ = cb; }

void runMergeCallback() { merge_cb_(); }

private:
mutable Thread::MutexBasicLockable lock_;
IsolatedStoreImpl store_;
PostMergeCb merge_cb_;
};

} // namespace Stats
Expand Down
51 changes: 51 additions & 0 deletions test/server/server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,57 @@ TEST_P(ServerInstanceImplTest, FlushStatsOnAdmin) {
server_thread->join();
}

TEST_P(ServerInstanceImplTest, ConcurrentFlushes) {
CustomStatsSinkFactory factory;
Registry::InjectFactory<Server::Configuration::StatsSinkFactory> registered(factory);
options_.bootstrap_version_ = 3;

bool workers_started = false;
absl::Notification workers_started_fired;
// Run the server in a separate thread so we can test different lifecycle stages.
auto server_thread = Thread::threadFactoryForTest().createThread([&] {
auto hooks = CustomListenerHooks([&]() {
workers_started = true;
workers_started_fired.Notify();
});
initialize("test/server/test_data/server/stats_sink_manual_flush_bootstrap.yaml", false, hooks);
server_->run();
server_ = nullptr;
thread_local_ = nullptr;
});

workers_started_fired.WaitForNotification();
EXPECT_TRUE(workers_started);

// Flush three times in a row. Two of these should get dropped.
server_->dispatcher().post([&] {
server_->flushStats();
server_->flushStats();
server_->flushStats();
});

EXPECT_TRUE(
TestUtility::waitForCounterEq(stats_store_, "server.dropped_stat_flushes", 2, time_system_));

server_->dispatcher().post([&] { stats_store_.runMergeCallback(); });

EXPECT_TRUE(TestUtility::waitForCounterEq(stats_store_, "stats.flushed", 1, time_system_));

// Trigger another flush after the first one finished. This should go through an no drops should
// be recorded.
server_->dispatcher().post([&] { server_->flushStats(); });

server_->dispatcher().post([&] { stats_store_.runMergeCallback(); });

EXPECT_TRUE(TestUtility::waitForCounterEq(stats_store_, "stats.flushed", 2, time_system_));

EXPECT_TRUE(
TestUtility::waitForCounterEq(stats_store_, "server.dropped_stat_flushes", 2, time_system_));

server_->dispatcher().post([&] { server_->shutdown(); });
server_thread->join();
}

// Default validation mode
TEST_P(ServerInstanceImplTest, ValidationDefault) {
options_.service_cluster_name_ = "some_cluster_name";
Expand Down