Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions include/envoy/server/overload_manager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#include <unordered_map>

#include "envoy/common/pure.h"
#include "envoy/thread_local/thread_local.h"

namespace Envoy {
namespace Server {
Expand All @@ -21,6 +24,32 @@ enum class OverloadActionState {
*/
typedef std::function<void(OverloadActionState)> OverloadActionCb;

/**
* Thread-local copy of the state of each configured overload action.
*/
class ThreadLocalOverloadState : public ThreadLocal::ThreadLocalObject {
public:
const OverloadActionState& getState(const std::string& action) {
auto it = actions_.find(action);
if (it == actions_.end()) {
it = actions_.insert(std::make_pair(action, OverloadActionState::Inactive)).first;
}
return it->second;
}

void setState(const std::string& action, OverloadActionState state) {
auto it = actions_.find(action);
if (it == actions_.end()) {
actions_[action] = state;
} else {
it->second = state;
}
}

private:
std::unordered_map<std::string, OverloadActionState> actions_;
};

/**
* The OverloadManager protects the Envoy instance from being overwhelmed by client
* requests. It monitors a set of resources and notifies registered listeners if
Expand All @@ -46,6 +75,12 @@ class OverloadManager {
*/
virtual void registerForAction(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback) PURE;

/**
* Get the thread-local overload action states. Lookups in this object can be used as
* an alternative to registering a callback for overload action state changes.
*/
virtual ThreadLocalOverloadState& getThreadLocalOverloadState() PURE;
};

} // namespace Server
Expand Down
1 change: 1 addition & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ envoy_cc_library(
deps = [
"//include/envoy/server:overload_manager_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/thread_local:thread_local_interface",
"//source/common/common:logger_lib",
"//source/common/config:utility_lib",
"//source/server:resource_monitor_config_lib",
Expand Down
14 changes: 13 additions & 1 deletion source/server/overload_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ bool OverloadAction::isActive() const { return !fired_triggers_.empty(); }

OverloadManagerImpl::OverloadManagerImpl(
Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
ThreadLocal::SlotAllocator& slot_allocator,
const envoy::config::overload::v2alpha::OverloadManager& config)
: started_(false), dispatcher_(dispatcher),
: started_(false), dispatcher_(dispatcher), tls_(slot_allocator.allocateSlot()),
refresh_interval_(
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))) {
Configuration::ResourceMonitorFactoryContextImpl context(dispatcher);
Expand Down Expand Up @@ -125,6 +126,10 @@ OverloadManagerImpl::OverloadManagerImpl(
resource_to_actions_.insert(std::make_pair(resource, name));
}
}

tls_->set([](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<ThreadLocalOverloadState>();
});
}

void OverloadManagerImpl::start() {
Expand Down Expand Up @@ -157,6 +162,10 @@ void OverloadManagerImpl::registerForAction(const std::string& action,
std::forward_as_tuple(dispatcher, callback));
}

ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() {
return tls_->getTyped<ThreadLocalOverloadState>();
}

void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure) {
auto action_range = resource_to_actions_.equal_range(resource);
std::for_each(action_range.first, action_range.second,
Expand All @@ -170,6 +179,9 @@ void OverloadManagerImpl::updateResourcePressure(const std::string& resource, do
is_active ? OverloadActionState::Active : OverloadActionState::Inactive;
ENVOY_LOG(info, "Overload action {} has become {}", action,
is_active ? "active" : "inactive");
tls_->runOnAllThreads([this, action, state] {
tls_->getTyped<ThreadLocalOverloadState>().setState(action, state);
});
auto callback_range = action_to_callbacks_.equal_range(action);
std::for_each(callback_range.first, callback_range.second,
[&](ActionToCallbackMap::value_type& cb_entry) {
Expand Down
4 changes: 4 additions & 0 deletions source/server/overload_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "envoy/server/resource_monitor.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats.h"
#include "envoy/thread_local/thread_local.h"

#include "common/common/logger.h"

Expand Down Expand Up @@ -50,12 +51,14 @@ class OverloadAction {
class OverloadManagerImpl : Logger::Loggable<Logger::Id::main>, public OverloadManager {
public:
OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
ThreadLocal::SlotAllocator& slot_allocator,
const envoy::config::overload::v2alpha::OverloadManager& config);

// Server::OverloadManager
void start() override;
void registerForAction(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback) override;
ThreadLocalOverloadState& getThreadLocalOverloadState() override;

private:
class Resource : public ResourceMonitor::Callbacks {
Expand Down Expand Up @@ -90,6 +93,7 @@ class OverloadManagerImpl : Logger::Loggable<Logger::Id::main>, public OverloadM

bool started_;
Event::Dispatcher& dispatcher_;
ThreadLocal::SlotPtr tls_;
const std::chrono::milliseconds refresh_interval_;
Event::TimerPtr timer_;
std::unordered_map<std::string, Resource> resources_;
Expand Down
8 changes: 4 additions & 4 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ void InstanceImpl::initialize(Options& options,

loadServerFlags(initial_config.flagsPath());

// Initialize the overload manager early so other modules can register for actions.
overload_manager_.reset(
new OverloadManagerImpl(dispatcher(), stats(), bootstrap_.overload_manager()));

// Workers get created first so they register for thread local updates.
listener_manager_.reset(new ListenerManagerImpl(
*this, listener_component_factory_, worker_factory_, ProdSystemTimeSource::instance_));
Expand All @@ -259,6 +255,10 @@ void InstanceImpl::initialize(Options& options,
// whether it runs on the main thread or on workers can still use TLS.
thread_local_.registerThread(*dispatcher_, true);

// Initialize the overload manager early so other modules can register for actions.
overload_manager_.reset(
new OverloadManagerImpl(dispatcher(), stats(), threadLocal(), bootstrap_.overload_manager()));

// We can now initialize stats for threading.
stats_store_.initializeThreading(*dispatcher_, thread_local_);

Expand Down
1 change: 1 addition & 0 deletions test/mocks/server/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class MockOverloadManager : public OverloadManager {
MOCK_METHOD0(start, void());
MOCK_METHOD3(registerForAction, void(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback));
MOCK_METHOD0(getThreadLocalOverloadState, ThreadLocalOverloadState&());
};

class MockInstance : public Instance {
Expand Down
1 change: 1 addition & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ envoy_cc_test(
"//source/extensions/resource_monitors/common:factory_base_lib",
"//source/server:overload_manager_lib",
"//test/mocks/event:event_mocks",
"//test/mocks/thread_local:thread_local_mocks",
"//test/test_common:registry_lib",
"//test/test_common:utility_lib",
],
Expand Down
55 changes: 34 additions & 21 deletions test/server/overload_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "extensions/resource_monitors/common/factory_base.h"

#include "test/mocks/event/mocks.h"
#include "test/mocks/thread_local/mocks.h"
#include "test/test_common/registry.h"
#include "test/test_common/utility.h"

Expand Down Expand Up @@ -118,46 +119,56 @@ class OverloadManagerImplTest : public testing::Test {
)EOF";
}

std::unique_ptr<OverloadManagerImpl> createOverloadManager(const std::string& config) {
return std::make_unique<OverloadManagerImpl>(dispatcher_, stats_, thread_local_,
parseConfig(config));
}

FakeResourceMonitorFactory factory1_;
FakeResourceMonitorFactory factory2_;
Registry::InjectFactory<Configuration::ResourceMonitorFactory> register_factory1_;
Registry::InjectFactory<Configuration::ResourceMonitorFactory> register_factory2_;
NiceMock<Event::MockDispatcher> dispatcher_;
Stats::IsolatedStoreImpl stats_;
NiceMock<ThreadLocal::MockInstance> thread_local_;
Event::TimerCb timer_cb_;
};

TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) {
setDispatcherExpectation();

OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig()));
auto manager(createOverloadManager(getConfig()));
bool is_active = false;
int cb_count = 0;
manager.registerForAction("envoy.overload_actions.dummy_action", dispatcher_,
[&](OverloadActionState state) {
is_active = state == OverloadActionState::Active;
cb_count++;
});
manager.registerForAction("envoy.overload_actions.unknown_action", dispatcher_,
[&](OverloadActionState) { EXPECT_TRUE(false); });
manager.start();
manager->registerForAction("envoy.overload_actions.dummy_action", dispatcher_,
[&](OverloadActionState state) {
is_active = state == OverloadActionState::Active;
cb_count++;
});
manager->registerForAction("envoy.overload_actions.unknown_action", dispatcher_,
[&](OverloadActionState) { EXPECT_TRUE(false); });
manager->start();

Stats::Gauge& active_gauge = stats_.gauge("overload.envoy.overload_actions.dummy_action.active");
Stats::Gauge& pressure_gauge1 =
stats_.gauge("overload.envoy.resource_monitors.fake_resource1.pressure");
Stats::Gauge& pressure_gauge2 =
stats_.gauge("overload.envoy.resource_monitors.fake_resource2.pressure");
const OverloadActionState& action_state =
manager->getThreadLocalOverloadState().getState("envoy.overload_actions.dummy_action");

factory1_.monitor_->setPressure(0.5);
timer_cb_();
EXPECT_FALSE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Inactive);
EXPECT_EQ(0, cb_count);
EXPECT_EQ(0, active_gauge.value());
EXPECT_EQ(50, pressure_gauge1.value());

factory1_.monitor_->setPressure(0.95);
timer_cb_();
EXPECT_TRUE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Active);
EXPECT_EQ(1, cb_count);
EXPECT_EQ(1, active_gauge.value());
EXPECT_EQ(95, pressure_gauge1.value());
Expand All @@ -166,6 +177,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) {
factory1_.monitor_->setPressure(0.94);
timer_cb_();
EXPECT_TRUE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Active);
EXPECT_EQ(1, cb_count);
EXPECT_EQ(94, pressure_gauge1.value());

Expand All @@ -174,22 +186,24 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) {
factory2_.monitor_->setPressure(0.9);
timer_cb_();
EXPECT_TRUE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Active);
EXPECT_EQ(1, cb_count);
EXPECT_EQ(50, pressure_gauge1.value());
EXPECT_EQ(90, pressure_gauge2.value());

factory2_.monitor_->setPressure(0.4);
timer_cb_();
EXPECT_FALSE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Inactive);
EXPECT_EQ(2, cb_count);
EXPECT_EQ(0, active_gauge.value());
EXPECT_EQ(40, pressure_gauge2.value());
}

TEST_F(OverloadManagerImplTest, FailedUpdates) {
setDispatcherExpectation();
OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig()));
manager.start();
auto manager(createOverloadManager(getConfig()));
manager->start();
Stats::Counter& failed_updates =
stats_.counter("overload.envoy.resource_monitors.fake_resource1.failed_updates");

Expand All @@ -207,8 +221,8 @@ TEST_F(OverloadManagerImplTest, SkippedUpdates) {
Event::PostCb post_cb;
ON_CALL(dispatcher_, post(_)).WillByDefault(Invoke([&](Event::PostCb cb) { post_cb = cb; }));

OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig()));
manager.start();
auto manager(createOverloadManager(getConfig()));
manager->start();
Stats::Counter& skipped_updates =
stats_.counter("overload.envoy.resource_monitors.fake_resource1.skipped_updates");

Expand All @@ -233,8 +247,8 @@ TEST_F(OverloadManagerImplTest, DuplicateResourceMonitor) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Duplicate resource monitor .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException,
"Duplicate resource monitor .*");
}

TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) {
Expand All @@ -247,8 +261,8 @@ TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Duplicate overload action .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException,
"Duplicate overload action .*");
}

TEST_F(OverloadManagerImplTest, UnknownTrigger) {
Expand All @@ -264,8 +278,8 @@ TEST_F(OverloadManagerImplTest, UnknownTrigger) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Unknown trigger resource .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException,
"Unknown trigger resource .*");
}

TEST_F(OverloadManagerImplTest, DuplicateTrigger) {
Expand All @@ -290,8 +304,7 @@ TEST_F(OverloadManagerImplTest, DuplicateTrigger) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Duplicate trigger .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, "Duplicate trigger .*");
}
} // namespace
} // namespace Server
Expand Down