Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/envoy/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ class ListenerManager {
/**
* Start all workers accepting new connections on all added listeners.
* @param guard_dog supplies the guard dog to use for thread watching.
* @param callback supplies the callback to complete server initialization.
*/
virtual void startWorkers(GuardDog& guard_dog) PURE;
virtual void startWorkers(GuardDog& guard_dog, std::function<void()> callback) PURE;

/**
* Stop all listeners from accepting new connections without actually removing any of them. This
Expand Down
6 changes: 6 additions & 0 deletions source/server/listener_hooks.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class ListenerHooks {
*/
virtual void onWorkerListenerRemoved() PURE;

/**
* Called when all workers have started.
*/
virtual void onWorkersStarted() PURE;

/**
* Called when the Runtime::ScopedLoaderSingleton is created by the server.
*/
Expand All @@ -36,6 +41,7 @@ class DefaultListenerHooks : public ListenerHooks {
// ListenerHooks
void onWorkerListenerAdded() override {}
void onWorkerListenerRemoved() override {}
void onWorkersStarted() override {}
void onRuntimeCreated() override {}
};

Expand Down
15 changes: 9 additions & 6 deletions source/server/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ bool ListenerManagerImpl::removeListenerInternal(const std::string& name,
return true;
}

void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog, std::function<void()> callback) {
ENVOY_LOG(info, "all dependencies initialized. starting workers");
ASSERT(!workers_started_);
workers_started_ = true;
Expand All @@ -899,11 +899,13 @@ void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
ENVOY_LOG(debug, "starting worker {}", i);
ASSERT(warming_listeners_.empty());
for (const auto& listener : active_listeners_) {
addListenerToWorker(*worker, absl::nullopt, *listener, [this, listeners_pending_init]() {
if (--(*listeners_pending_init) == 0) {
stats_.workers_started_.set(1);
}
});
addListenerToWorker(*worker, absl::nullopt, *listener,
[this, listeners_pending_init, callback]() {
if (--(*listeners_pending_init) == 0) {
stats_.workers_started_.set(1);
callback();
}
});
}
worker->start(guard_dog);
if (enable_dispatcher_stats_) {
Expand All @@ -913,6 +915,7 @@ void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
}
if (active_listeners_.empty()) {
stats_.workers_started_.set(1);
callback();
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/server/listener_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class ListenerManagerImpl : public ListenerManager, Logger::Loggable<Logger::Id:
listeners(ListenerState state = ListenerState::ACTIVE) override;
uint64_t numConnections() const override;
bool removeListener(const std::string& listener_name) override;
void startWorkers(GuardDog& guard_dog) override;
void startWorkers(GuardDog& guard_dog, std::function<void()> callback) override;
void stopListeners(StopListenersType stop_listeners_type) override;
void stopWorkers() override;
void beginListenerUpdate() override { error_state_tracker_.clear(); }
Expand Down
27 changes: 17 additions & 10 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ InstanceImpl::InstanceImpl(
: nullptr),
grpc_context_(store.symbolTable()), http_context_(store.symbolTable()),
router_context_(store.symbolTable()), process_context_(std::move(process_context)),
main_thread_id_(std::this_thread::get_id()), server_contexts_(*this) {
main_thread_id_(std::this_thread::get_id()), hooks_(hooks), server_contexts_(*this) {
try {
if (!options.logPath().empty()) {
try {
Expand Down Expand Up @@ -609,15 +609,22 @@ void InstanceImpl::onRuntimeReady() {
}

void InstanceImpl::startWorkers() {
listener_manager_->startWorkers(*worker_guard_dog_);
initialization_timer_->complete();
// Update server stats as soon as initialization is done.
updateServerStats();
workers_started_ = true;
// At this point we are ready to take traffic and all listening ports are up. Notify our parent
// if applicable that they can stop listening and drain.
restarter_.drainParentListeners();
drain_manager_->startParentShutdownSequence();
// The callback will be called after workers are started.
listener_manager_->startWorkers(*worker_guard_dog_, [this]() {
if (isShutdown()) {
return;
}
Comment on lines +614 to +616
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have test coverage of this case? (Put an ASSERT in there and see if it's hit or look at coverage report)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test for this in the new commit.


initialization_timer_->complete();
// Update server stats as soon as initialization is done.
updateServerStats();
workers_started_ = true;
hooks_.onWorkersStarted();
// At this point we are ready to take traffic and all listening ports are up. Notify our
// parent if applicable that they can stop listening and drain.
restarter_.drainParentListeners();
drain_manager_->startParentShutdownSequence();
});
}

Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
Expand Down
1 change: 1 addition & 0 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
// initialization_time is a histogram for tracking the initialization time across hot restarts
// whenever we have support for histogram merge across hot restarts.
Stats::TimespanPtr initialization_timer_;
ListenerHooks& hooks_;

ServerFactoryContextImpl server_contexts_;

Expand Down
1 change: 1 addition & 0 deletions test/integration/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ class IntegrationTestServer : public Logger::Loggable<Logger::Id::testing>,
on_server_ready_cb_ = std::move(on_server_ready);
}
void onRuntimeCreated() override {}
void onWorkersStarted() override {}

void start(const Network::Address::IpVersion version,
std::function<void()> on_server_init_function, bool deterministic,
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MockListenerManager : public ListenerManager {
(ListenerState state));
MOCK_METHOD(uint64_t, numConnections, (), (const));
MOCK_METHOD(bool, removeListener, (const std::string& listener_name));
MOCK_METHOD(void, startWorkers, (GuardDog & guard_dog));
MOCK_METHOD(void, startWorkers, (GuardDog & guard_dog, std::function<void()> callback));
MOCK_METHOD(void, stopListeners, (StopListenersType listeners_type));
MOCK_METHOD(void, stopWorkers, ());
MOCK_METHOD(void, beginListenerUpdate, ());
Expand Down
Loading