Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions include/envoy/common/time.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
#pragma once

#include "envoy/common/pure.h"

/**
* Less typing for common system time type.
*/
typedef std::chrono::time_point<std::chrono::system_clock> SystemTime;

/**
* Abstraction for getting the current system time. Useful for testing.
*/
class SystemTimeSource {
public:
virtual ~SystemTimeSource() {}

/**
* @return the current system time.
*/
virtual SystemTime currentSystemTime() PURE;
};
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void ClusterManagerImpl::loadCluster(const Json::Object& cluster, Stats::Store&
}

new_cluster->setOutlierDetector(OutlierDetectorImplFactory::createForCluster(
*new_cluster, cluster, dns_resolver.dispatcher()));
*new_cluster, cluster, dns_resolver.dispatcher(), runtime, stats));
primary_clusters_.emplace(new_cluster->name(), new_cluster);
}

Expand Down
121 changes: 117 additions & 4 deletions source/common/upstream/outlier_detection_impl.cc
Original file line number Diff line number Diff line change
@@ -1,22 +1,50 @@
#include "outlier_detection_impl.h"

#include "envoy/event/dispatcher.h"

#include "common/common/assert.h"
#include "common/http/codes.h"

namespace Upstream {

OutlierDetectorPtr OutlierDetectorImplFactory::createForCluster(Cluster& cluster,
const Json::Object& cluster_config,
Event::Dispatcher& dispatcher) {
Event::Dispatcher& dispatcher,
Runtime::Loader& runtime,
Stats::Store& stats) {
// Right now we don't support any configuration but in order to make the config backwards
// compatible we just look for an empty object.
if (cluster_config.hasObject("outlier_detection")) {
return OutlierDetectorPtr{new OutlierDetectorImpl(cluster, dispatcher)};
return OutlierDetectorPtr{new ProdOutlierDetectorImpl(cluster, dispatcher, runtime, stats)};
} else {
return nullptr;
}
}

OutlierDetectorImpl::OutlierDetectorImpl(Cluster& cluster, Event::Dispatcher&) {
void OutlierDetectorHostSinkImpl::eject(SystemTime ejection_time) {
ASSERT(!host_.lock()->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK));
host_.lock()->healthFlagSet(Host::HealthFlag::FAILED_OUTLIER_CHECK);
num_ejections_++;
ejection_time_ = ejection_time;
}

void OutlierDetectorHostSinkImpl::putHttpResponseCode(uint64_t response_code) {
if (Http::CodeUtility::is5xx(response_code)) {
if (++consecutive_5xx_ ==
detector_.runtime().snapshot().getInteger("outlier_detection.consecutive_5xx", 5)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create jira on docs for outlier detection and runtime params?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

detector_.onConsecutive5xx(host_.lock());
}
} else {
consecutive_5xx_ = 0;
}
}

OutlierDetectorImpl::OutlierDetectorImpl(Cluster& cluster, Event::Dispatcher& dispatcher,
Runtime::Loader& runtime, Stats::Store& stats,
SystemTimeSource& time_source)
: dispatcher_(dispatcher), runtime_(runtime), time_source_(time_source),
stats_(generateStats(cluster.name(), stats)),
interval_timer_(dispatcher.createTimer([this]() -> void { onIntervalTimer(); })) {
for (HostPtr host : cluster.hosts()) {
addHostSink(host);
}
Expand All @@ -29,16 +57,101 @@ OutlierDetectorImpl::OutlierDetectorImpl(Cluster& cluster, Event::Dispatcher&) {

for (HostPtr host : hosts_removed) {
ASSERT(host_sinks_.count(host) == 1);
if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
ASSERT(stats_.ejections_active_.value() > 0);
stats_.ejections_active_.dec();
}

host_sinks_.erase(host);
}
});

armIntervalTimer();
}

void OutlierDetectorImpl::addHostSink(HostPtr host) {
ASSERT(host_sinks_.count(host) == 0);
OutlierDetectorHostSinkImpl* sink = new OutlierDetectorHostSinkImpl();
OutlierDetectorHostSinkImpl* sink = new OutlierDetectorHostSinkImpl(*this, host);
host_sinks_[host] = sink;
host->setOutlierDetector(OutlierDetectorHostSinkPtr{sink});
}

void OutlierDetectorImpl::armIntervalTimer() {
interval_timer_->enableTimer(std::chrono::milliseconds(
runtime_.snapshot().getInteger("outlier_detection.interval_ms", 10000)));
}

void OutlierDetectorImpl::checkHostForUneject(HostPtr host, OutlierDetectorHostSinkImpl* sink,
SystemTime now) {
if (!host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
return;
}

std::chrono::milliseconds base_eject_time = std::chrono::milliseconds(
runtime_.snapshot().getInteger("outlier_detection.base_ejection_time_ms", 30000));
ASSERT(sink->numEjections() > 0)
if ((base_eject_time * sink->numEjections()) <= (now - sink->ejectionTime())) {
stats_.ejections_active_.dec();
host->healthFlagClear(Host::HealthFlag::FAILED_OUTLIER_CHECK);
runCallbacks(host);
}
}

void OutlierDetectorImpl::ejectHost(HostPtr host) {
uint64_t max_ejection_percent =
std::min(100UL, runtime_.snapshot().getInteger("outlier_detection.max_ejection_percent", 10));
if ((stats_.ejections_active_.value() / host_sinks_.size()) < max_ejection_percent) {
stats_.ejections_total_.inc();
if (runtime_.snapshot().featureEnabled("outlier_detection.enforcing", 100)) {
stats_.ejections_active_.inc();
host_sinks_[host]->eject(time_source_.currentSystemTime());
runCallbacks(host);
}
} else {
stats_.ejections_overflow_.inc();
}
}

OutlierDetectionStats OutlierDetectorImpl::generateStats(const std::string& name,
Stats::Store& store) {
std::string prefix(fmt::format("cluster.{}.outlier_detection.", name));
return {ALL_OUTLIER_DETECTION_STATS(POOL_COUNTER_PREFIX(store, prefix),
POOL_GAUGE_PREFIX(store, prefix))};
}

void OutlierDetectorImpl::onConsecutive5xx(HostPtr host) {
// This event will come from all threads, so we synchronize with a post to the main thread.
dispatcher_.post([this, host]() -> void { onConsecutive5xxWorker(host); });
}

void OutlierDetectorImpl::onConsecutive5xxWorker(HostPtr host) {
// This comes in cross thread. There is a chance that the host has already been removed from
// the set. If so, just ignore it.
if (host_sinks_.count(host) == 0) {
return;
}

if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
return;
}

stats_.ejections_consecutive_5xx_.inc();
ejectHost(host);
}

void OutlierDetectorImpl::onIntervalTimer() {
SystemTime now = time_source_.currentSystemTime();
for (auto host : host_sinks_) {
checkHostForUneject(host.first, host.second, now);
}

armIntervalTimer();
}

void OutlierDetectorImpl::runCallbacks(HostPtr host) {
for (ChangeStateCb cb : callbacks_) {
cb(host);
}
}

} // Upstream
71 changes: 68 additions & 3 deletions source/common/upstream/outlier_detection_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#pragma once

#include "envoy/common/time.h"
#include "envoy/event/timer.h"
#include "envoy/runtime/runtime.h"
#include "envoy/upstream/outlier_detection.h"
#include "envoy/upstream/upstream.h"

Expand All @@ -23,17 +26,52 @@ class OutlierDetectorHostSinkNullImpl : public OutlierDetectorHostSink {
class OutlierDetectorImplFactory {
public:
static OutlierDetectorPtr createForCluster(Cluster& cluster, const Json::Object& cluster_config,
Event::Dispatcher& dispatcher);
Event::Dispatcher& dispatcher,
Runtime::Loader& runtime, Stats::Store& stats);
};

class OutlierDetectorImpl;

/**
* Implementation of OutlierDetectorHostSink for the generic detector.
*/
class OutlierDetectorHostSinkImpl : public OutlierDetectorHostSink {
public:
OutlierDetectorHostSinkImpl(OutlierDetectorImpl& detector, HostPtr host)
: detector_(detector), host_(host) {}

void eject(SystemTime ejection_time);
SystemTime ejectionTime() { return ejection_time_; }
uint32_t numEjections() { return num_ejections_; }

// Upstream::OutlierDetectorHostSink
void putHttpResponseCode(uint64_t) override {}
void putHttpResponseCode(uint64_t response_code) override;
void putResponseTime(std::chrono::milliseconds) override {}

private:
OutlierDetectorImpl& detector_;
std::weak_ptr<Host> host_;
std::atomic<uint32_t> consecutive_5xx_{0};
SystemTime ejection_time_;
uint32_t num_ejections_{};
};

/**
* All outlier detection stats. @see stats_macros.h
*/
// clang-format off
#define ALL_OUTLIER_DETECTION_STATS(COUNTER, GAUGE) \
COUNTER(ejections_total) \
GAUGE (ejections_active) \
COUNTER(ejections_overflow) \
COUNTER(ejections_consecutive_5xx)
// clang-format on

/**
* Struct definition for all outlier detection stats. @see stats_macros.h
*/
struct OutlierDetectionStats {
ALL_OUTLIER_DETECTION_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
};

/**
Expand All @@ -43,16 +81,43 @@ class OutlierDetectorHostSinkImpl : public OutlierDetectorHostSink {
*/
class OutlierDetectorImpl : public OutlierDetector {
public:
OutlierDetectorImpl(Cluster& cluster, Event::Dispatcher& dispatcher);
void onConsecutive5xx(HostPtr host);
Runtime::Loader& runtime() { return runtime_; }

// Upstream::OutlierDetector
void addChangedStateCb(ChangeStateCb cb) override { callbacks_.push_back(cb); }

protected:
OutlierDetectorImpl(Cluster& cluster, Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
Stats::Store& stats, SystemTimeSource& time_source);

private:
void addHostSink(HostPtr host);
void armIntervalTimer();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: call that in a way to easy get that this is associated with host unejection

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named it like this because eventually it will be used for histogram computation, not just unejection.

void checkHostForUneject(HostPtr host, OutlierDetectorHostSinkImpl* sink, SystemTime now);
void ejectHost(HostPtr host);
static OutlierDetectionStats generateStats(const std::string& name, Stats::Store& store);
void onConsecutive5xxWorker(HostPtr host);
void onIntervalTimer();
void runCallbacks(HostPtr host);

Event::Dispatcher& dispatcher_;
Runtime::Loader& runtime_;
SystemTimeSource& time_source_;
OutlierDetectionStats stats_;
Event::TimerPtr interval_timer_;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: uneject_timer_ may be

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named it like this because eventually it will be used for histogram computation, not just unejection.

std::list<ChangeStateCb> callbacks_;
std::unordered_map<HostPtr, OutlierDetectorHostSinkImpl*> host_sinks_;
};

class ProdOutlierDetectorImpl : public OutlierDetectorImpl, public SystemTimeSource {
public:
ProdOutlierDetectorImpl(Cluster& cluster, Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
Stats::Store& stats)
: OutlierDetectorImpl(cluster, dispatcher, runtime, stats, *this) {}

// SystemTimeSource
SystemTime currentSystemTime() override { return std::chrono::system_clock::now(); }
};

} // Upstream
Loading