Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/envoy/network/drain_decision.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class DrainDecision {
* @return TRUE if a connection should be drained and closed. It is up to individual network
* filters to determine when this should be called for the least impact possible.
*/
virtual bool drainClose() PURE;
virtual bool drainClose() const PURE;
};

} // namespace Network
Expand Down
3 changes: 2 additions & 1 deletion include/envoy/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ envoy_cc_library(
name = "filter_config_interface",
hdrs = ["filter_config.h"],
deps = [
":drain_manager_interface",
"//include/envoy/access_log:access_log_interface",
"//include/envoy/http:filter_interface",
"//include/envoy/init:init_interface",
"//include/envoy/local_info:local_info_interface",
"//include/envoy/network:drain_decision_interface",
"//include/envoy/ratelimit:ratelimit_interface",
"//include/envoy/runtime:runtime_interface",
"//include/envoy/thread_local:thread_local_interface",
Expand All @@ -117,6 +117,7 @@ envoy_cc_library(
name = "listener_manager_interface",
hdrs = ["listener_manager.h"],
deps = [
":drain_manager_interface",
":filter_config_interface",
":guarddog_interface",
"//include/envoy/json:json_object_interface",
Expand Down
15 changes: 6 additions & 9 deletions include/envoy/server/drain_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,17 @@ namespace Envoy {
namespace Server {

/**
* Handles connection draining. An instance is generally shared across the entire server.
* Handles connection draining. This concept is used globally during hot restart / server draining
* as well as on individual listeners when they are being dynamically removed.
*/
class DrainManager : public Network::DrainDecision {
public:
/**
* @return TRUE if the manager is currently draining connections.
* Invoked to begin the drain procedure. (Making drain close operations more likely).
* @param completion supplies the completion that will be called when the drain sequence is
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is optional (could be unassigned std::function) from the code below. Might be useful to call that out here.

* finished. The parameter is optional and can be an unassigned function.
*/
virtual bool draining() PURE;

/**
* Invoked in the secondary process to begin the drain procedure. (Making drain close operations
* more likely).
*/
virtual void startDrainSequence() PURE;
virtual void startDrainSequence(std::function<void()> completion) PURE;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we drain globally on each LDS update? This seems reasonable given the current semantics (each LDS refresh is an update of the complete set of listeners), but won't this also preclude the possibility of skipping the drain on listeners that aren't being modified (i.e. they have the same config hash)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, we create a new drain manager per listener as well as the singleton per-server. Makes sense (could be called out in some comment as well).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK will find a comment to add to.


/**
* Invoked in the newly launched primary process to begin the parent shutdown sequence. At the end
Expand Down
7 changes: 4 additions & 3 deletions include/envoy/server/filter_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
#include "envoy/http/filter.h"
#include "envoy/init/init.h"
#include "envoy/json/json_object.h"
#include "envoy/network/drain_decision.h"
#include "envoy/network/filter.h"
#include "envoy/ratelimit/ratelimit.h"
#include "envoy/runtime/runtime.h"
#include "envoy/server/drain_manager.h"
#include "envoy/tracing/http_tracer.h"
#include "envoy/upstream/cluster_manager.h"

Expand Down Expand Up @@ -47,9 +47,10 @@ class FactoryContext {
virtual Event::Dispatcher& dispatcher() PURE;

/**
* @return DrainManager& singleton for use by the entire server.
* @return const Network::DrainDecision& a drain decision that filters can use to determine if
* they should be doing graceful closes on connections when possible.
*/
virtual DrainManager& drainManager() PURE;
virtual const Network::DrainDecision& drainDecision() PURE;

/**
* @return whether external healthchecks are currently failed or not.
Expand Down
10 changes: 2 additions & 8 deletions include/envoy/server/instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,15 @@ class Instance {
*/
virtual Network::DnsResolverSharedPtr dnsResolver() PURE;

/**
* @return TRUE if the server is currently draining. No new connections will be received and
* filters should shed connections where possible.
*/
virtual bool draining() PURE;

/**
* Close the server's listening sockets and begin draining the listeners.
*/
virtual void drainListeners() PURE;

/**
* @return DrainManager& singleton for use by the entire server.
* @return const DrainManager& singleton for use by the entire server.
*/
virtual DrainManager& drainManager() PURE;
virtual const DrainManager& drainManager() PURE;

/**
* @return AccessLogManager for use by the entire server.
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/json/json_object.h"
#include "envoy/network/filter.h"
#include "envoy/network/listen_socket.h"
#include "envoy/server/drain_manager.h"
#include "envoy/server/filter_config.h"
#include "envoy/server/guarddog.h"
#include "envoy/ssl/context.h"
Expand Down Expand Up @@ -36,6 +37,11 @@ class ListenerComponentFactory {
createFilterFactoryList(const std::vector<Json::ObjectSharedPtr>& filters,
Configuration::FactoryContext& context) PURE;

/**
* @return DrainManagerPtr a new drain manager.
*/
virtual DrainManagerPtr createDrainManager() PURE;

/**
* @return uint64_t a listener tag usable for connection handler tracking.
*/
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ConnectionManagerTracingStats ConnectionManagerImpl::generateTracingStats(const
}

ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config,
Network::DrainDecision& drain_close,
const Network::DrainDecision& drain_close,
Runtime::RandomGenerator& random_generator,
Tracing::HttpTracer& tracer, Runtime::Loader& runtime,
const LocalInfo::LocalInfo& local_info)
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
public ServerConnectionCallbacks,
public Network::ConnectionCallbacks {
public:
ConnectionManagerImpl(ConnectionManagerConfig& config, Network::DrainDecision& drain_close,
ConnectionManagerImpl(ConnectionManagerConfig& config, const Network::DrainDecision& drain_close,
Runtime::RandomGenerator& random_generator, Tracing::HttpTracer& tracer,
Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info);
~ConnectionManagerImpl();
Expand Down Expand Up @@ -524,7 +524,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
ServerConnectionPtr codec_;
std::list<ActiveStreamPtr> streams_;
Stats::TimespanPtr conn_length_;
Network::DrainDecision& drain_close_;
const Network::DrainDecision& drain_close_;
DrainState drain_state_{DrainState::NotDraining};
UserAgent user_agent_;
Event::TimerPtr idle_timer_;
Expand Down
1 change: 1 addition & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ envoy_cc_library(
hdrs = ["listener_manager_impl.h"],
deps = [
":configuration_lib",
":drain_manager_lib",
":init_manager_lib",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
Expand Down
2 changes: 1 addition & 1 deletion source/server/config/network/http_connection_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ HttpConnectionManagerFilterConfigFactory::createFilterFactory(const Json::Object
new HttpConnectionManagerConfig(config, context));
return [http_config, &context](Network::FilterManager& filter_manager) mutable -> void {
filter_manager.addReadFilter(Network::ReadFilterSharedPtr{new Http::ConnectionManagerImpl(
*http_config, context.drainManager(), context.random(), context.httpTracer(),
*http_config, context.drainDecision(), context.random(), context.httpTracer(),
context.runtime(), context.localInfo())});
};
}
Expand Down
2 changes: 1 addition & 1 deletion source/server/config_validation/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class ValidationInstance : Logger::Loggable<Logger::Id::main>,
Ssl::ContextManager& sslContextManager() override { return *ssl_context_manager_; }
Event::Dispatcher& dispatcher() override { return *dispatcher_; }
Network::DnsResolverSharedPtr dnsResolver() override { return dns_resolver_; }
bool draining() override { NOT_IMPLEMENTED; }
void drainListeners() override { NOT_IMPLEMENTED; }
DrainManager& drainManager() override { NOT_IMPLEMENTED; }
AccessLog::AccessLogManager& accessLogManager() override { return access_log_manager_; }
Expand Down Expand Up @@ -98,6 +97,7 @@ class ValidationInstance : Logger::Loggable<Logger::Id::main>,
// validation mock.
return nullptr;
}
DrainManagerPtr createDrainManager() override { return nullptr; }
uint64_t nextListenerTag() override { return 0; }

// Server::WorkerFactory
Expand Down
7 changes: 5 additions & 2 deletions source/server/drain_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Server {

DrainManagerImpl::DrainManagerImpl(Instance& server) : server_(server) {}

bool DrainManagerImpl::drainClose() {
bool DrainManagerImpl::drainClose() const {
// If we are actively HC failed, always drain close.
if (server_.healthCheckFailed()) {
return true;
Expand All @@ -37,10 +37,13 @@ void DrainManagerImpl::drainSequenceTick() {

if (drain_time_completed_ < server_.options().drainTime()) {
drain_tick_timer_->enableTimer(std::chrono::milliseconds(1000));
} else if (drain_sequence_completion_) {
drain_sequence_completion_();
}
}

void DrainManagerImpl::startDrainSequence() {
void DrainManagerImpl::startDrainSequence(std::function<void()> completion) {
drain_sequence_completion_ = completion;
ASSERT(!drain_tick_timer_);
drain_tick_timer_ = server_.dispatcher().createTimer([this]() -> void { drainSequenceTick(); });
drainSequenceTick();
Expand Down
7 changes: 4 additions & 3 deletions source/server/drain_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ class DrainManagerImpl : Logger::Loggable<Logger::Id::main>, public DrainManager
DrainManagerImpl(Instance& server);

// Server::DrainManager
bool draining() override { return drain_tick_timer_ != nullptr; }
bool drainClose() override;
void startDrainSequence() override;
bool drainClose() const override;
void startDrainSequence(std::function<void()> completion) override;
void startParentShutdownSequence() override;

private:
bool draining() const { return drain_tick_timer_ != nullptr; }
void drainSequenceTick();

Instance& server_;
Event::TimerPtr drain_tick_timer_;
std::chrono::seconds drain_time_completed_{};
Event::TimerPtr parent_shutdown_timer_;
std::function<void()> drain_sequence_completion_;
};

} // namespace Server
Expand Down
59 changes: 44 additions & 15 deletions source/server/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/ssl/context_config_impl.h"

#include "server/configuration_impl.h" // TODO(mattklein123): Remove post 1.4.0
#include "server/drain_manager_impl.h"

namespace Envoy {
namespace Server {
Expand Down Expand Up @@ -87,6 +88,10 @@ ProdListenerComponentFactory::createListenSocket(Network::Address::InstanceConst
}
}

DrainManagerPtr ProdListenerComponentFactory::createDrainManager() {
return DrainManagerPtr{new DrainManagerImpl(server_)};
}

ListenerImpl::ListenerImpl(const Json::Object& json, ListenerManagerImpl& parent,
const std::string& name, bool workers_started, uint64_t hash)
: Json::Validator(json, Json::Schema::LISTENER_SCHEMA), parent_(parent),
Expand All @@ -98,7 +103,8 @@ ListenerImpl::ListenerImpl(const Json::Object& json, ListenerManagerImpl& parent
per_connection_buffer_limit_bytes_(
json.getInteger("per_connection_buffer_limit_bytes", 1024 * 1024)),
listener_tag_(parent_.factory_.nextListenerTag()), name_(name),
workers_started_(workers_started), hash_(hash) {
workers_started_(workers_started), hash_(hash),
local_drain_manager_(parent.factory_.createDrainManager()) {

// ':' is a reserved char in statsd. Do the translation here to avoid costly inline translations
// later.
Expand Down Expand Up @@ -130,6 +136,13 @@ bool ListenerImpl::createFilterChain(Network::Connection& connection) {
return Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories_);
}

bool ListenerImpl::drainClose() const {
// When a listener is draining, the "drain close" decision is the union of the per-listener drain
// manager and the server wide drain manager. This allows individual listeners to be drained and
// removed independently of a server-wide drain event (e.g., /healthcheck/fail or hot restart).
return local_drain_manager_->drainClose() || parent_.server_.drainManager().drainClose();
}

void ListenerImpl::infoLog(const std::string& message) {
ENVOY_LOG(info, "{}: name={}, hash={}, address={}", message, name_, hash_, address_->asString());
}
Expand Down Expand Up @@ -261,25 +274,39 @@ bool ListenerManagerImpl::addOrUpdateListener(const Json::Object& json) {
}

void ListenerManagerImpl::drainListener(ListenerImplPtr&& listener) {
// TODO(mattklein123): Actually implement timed draining in a follow up. Currently we just
// correctly synchronize removal across all workers.
// First add the listener to the draining list. This must be done under the lock since it
// can race with remove completions.
std::list<DrainingListener>::iterator draining_it;
{
std::lock_guard<std::mutex> guard(draining_listeners_lock_);
draining_it = draining_listeners_.emplace(draining_listeners_.begin(), std::move(listener),
workers_.size());
}

draining_it->listener_->infoLog("removing listener");
// Tell all workers to stop accepting new connections on this listener.
draining_it->listener_->infoLog("draining listener");
for (const auto& worker : workers_) {
worker->removeListener(*draining_it->listener_, [this, draining_it]() -> void {
std::lock_guard<std::mutex> guard(draining_listeners_lock_);
if (--draining_it->workers_pending_removal_ == 0) {
draining_it->listener_->infoLog("listener removal complete");
draining_listeners_.erase(draining_it);
}
});
worker->stopListener(*draining_it->listener_);
}

// The following sets up 2 level lambda. The first completes when the listener's drain manager
// has completed draining at whatever the server configured drain times are. Once the drain time
// has completed via the drain manager's timer, we tell the workers to remove the listener. The
// 2nd lambda acquires the lock and determines when we can remove the listener from the draining
// list. This makes sure that we don't destroy the listener while filters might still be using its
// context (stats, etc.).
draining_it->listener_->localDrainManager().startDrainSequence([this, draining_it]() -> void {
draining_it->listener_->infoLog("removing listener");
for (const auto& worker : workers_) {
worker->removeListener(*draining_it->listener_, [this, draining_it]() -> void {
std::lock_guard<std::mutex> guard(draining_listeners_lock_);
if (--draining_it->workers_pending_removal_ == 0) {
draining_it->listener_->infoLog("listener removal complete");
draining_listeners_.erase(draining_it);
}
});
}
});
}

ListenerManagerImpl::ListenerList::iterator
Expand All @@ -306,6 +333,12 @@ std::vector<std::reference_wrapper<Listener>> ListenerManagerImpl::listeners() {
}

void ListenerManagerImpl::onListenerWarmed(ListenerImpl& listener) {
// The warmed listener should be added first so that the worker will accept new connections
// when it stops listening on the old listener.
for (const auto& worker : workers_) {
worker->addListener(listener);
}

auto existing_active_listener = getListenerByName(active_listeners_, listener.name());
auto existing_warming_listener = getListenerByName(warming_listeners_, listener.name());
(*existing_warming_listener)->infoLog("warm complete. updating active listener");
Expand All @@ -317,10 +350,6 @@ void ListenerManagerImpl::onListenerWarmed(ListenerImpl& listener) {
}

warming_listeners_.erase(existing_warming_listener);

for (const auto& worker : workers_) {
worker->addListener(listener);
}
}

uint64_t ListenerManagerImpl::numConnections() {
Expand Down
Loading