Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
2cb2617
add streaming variant of the admin API
jmarantz Feb 13, 2024
d2412c6
remove superfluous diff
jmarantz Feb 13, 2024
48f114d
add streaming test
jmarantz Feb 13, 2024
d890eb3
add cancellation semantics and tests.
jmarantz Feb 18, 2024
b56ecb4
try to fix coverage and compiling without admin.
jmarantz Feb 18, 2024
aed8263
checkpoint
jmarantz Feb 19, 2024
3973c92
comment and cleanup
jmarantz Feb 19, 2024
9237ff9
format
jmarantz Feb 19, 2024
895e552
add asserts, refactor tests, and add quit/cancel race.
jmarantz Feb 19, 2024
4784a23
add cancel/quit race test
jmarantz Feb 19, 2024
d252659
tighten up logic and asserts.
jmarantz Feb 19, 2024
8ba301d
Cover a few missing lines.
jmarantz Feb 20, 2024
e6a10f3
format
jmarantz Feb 20, 2024
05c1127
hit one more early-exit line in main_common with a contrived test.
jmarantz Feb 20, 2024
c40e30a
improve consitency and handling of cancel function.
jmarantz Feb 20, 2024
8cfed12
add early exit in handleHeaders to force it to be tested, and refacto…
jmarantz Feb 20, 2024
ce30118
add comments.
jmarantz Feb 20, 2024
cd88721
cover more lines.
jmarantz Feb 21, 2024
4262d8d
tighten up coverage
jmarantz Feb 21, 2024
699f357
Merge branch 'main' into admin-streaming-c++-api
jmarantz Feb 21, 2024
47a51ca
mutex-protect destructor accesss to AdminResponse::terminated_. Make …
jmarantz Feb 22, 2024
f6edee6
add a TerminateNotifier to provide a context with appropriate lifetim…
jmarantz Feb 23, 2024
eda6834
remove superfluous function, fix compile-time issue with admin disabled.
jmarantz Feb 23, 2024
7239d03
add more tests and cleanup
jmarantz Feb 23, 2024
7fe67f3
fix initialization and expected results.
jmarantz Feb 23, 2024
7a433d1
Merge branch 'main' into admin-streaming-c++-api
jmarantz Feb 26, 2024
aa70f33
Split out AdminContext into its own source, hdr, test.
jmarantz Feb 27, 2024
a6d49b1
remove no-longer-needed 'friend' declarations and rename TerminateNot…
jmarantz Feb 27, 2024
153b2a2
format
jmarantz Feb 27, 2024
36195fe
cleanup & format
jmarantz Feb 27, 2024
4fb7898
fix admin-disabled build
jmarantz Feb 27, 2024
0cf32c1
address some review comments (others remain)
jmarantz Feb 28, 2024
03ce2c7
review comments
jmarantz Feb 29, 2024
48ece2d
remove ifdef'd out test helper.
jmarantz Feb 29, 2024
b006d79
add reference to FSM drawing
jmarantz Feb 29, 2024
f799b84
grammar
jmarantz Feb 29, 2024
e1640d3
fix race.
jmarantz Feb 29, 2024
d957b58
Merge branch 'main' into admin-streaming-c++-api
jmarantz Mar 1, 2024
06df4ec
Merge branch 'main' into admin-streaming-c++-api
jmarantz Mar 6, 2024
0e6a275
Better interlock for test infrastructure, hopefully still hitting all…
jmarantz Mar 6, 2024
23caf5e
remove run_before_resume hack.
jmarantz Mar 7, 2024
2427170
clean up
jmarantz Mar 7, 2024
bd8db80
Remove half-baked README.md, and add Tianyu's note to PtrSet comment.
jmarantz Mar 7, 2024
4fa21ea
Merge branch 'main' into admin-streaming-c++-api
jmarantz Mar 7, 2024
e1ce367
use lifecycle callbacks instead of PtrSet.
jmarantz Mar 13, 2024
cca6956
Merge branch 'main' into admin-streaming-c++-api-lifecycle
jmarantz Mar 18, 2024
c03d2ed
tests working.
jmarantz Mar 19, 2024
1a7cd3e
Merge branch 'main' into admin-streaming-c++-api-lifecycle
jmarantz Mar 19, 2024
c7e5305
Merge branch 'main' into admin-streaming-c++-api-lifecycle
jmarantz Mar 19, 2024
6908656
format
jmarantz Mar 19, 2024
d949aac
Merge branch 'main' into admin-streaming-c++-api-lifecycle
jmarantz Mar 19, 2024
d4e3c0c
Merge branch 'main' into admin-streaming-c++-api-lifecycle
jmarantz Mar 19, 2024
dd30b84
make lifecycle callbacks thread-safe.
jmarantz Mar 22, 2024
baa01ee
tests working
jmarantz Mar 22, 2024
9cfe3e5
fix race
jmarantz Mar 22, 2024
756d369
format
jmarantz Mar 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions source/exe/admin_response.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,24 @@
namespace Envoy {

AdminResponse::AdminResponse(Server::Instance& server, absl::string_view path,
absl::string_view method, SharedPtrSet response_set)
: server_(server), opt_admin_(server.admin()), shared_response_set_(response_set) {
request_headers_->setMethod(method);
request_headers_->setPath(path);
absl::string_view method /*, SharedPtrSet response_set*/)
: server_(server), opt_admin_(server.admin()),
// shared_response_set_(response_set)
lifecycle_notifier_(server.lifecycleNotifier().registerCallback(
Server::ServerLifecycleNotifier::Stage::ShutdownExit, [this] { terminate(); })) {
if (lifecycle_notifier_ == nullptr) {
terminate();
} else {
ENVOY_LOG_MISC(error, "AdminResponse(): {}", static_cast<void*>(this));
request_headers_->setMethod(method);
request_headers_->setPath(path);
}
}

AdminResponse::~AdminResponse() {
ENVOY_LOG_MISC(error, "~AdminResponse(): {}", static_cast<void*>(this));
cancel();
shared_response_set_->detachResponse(this);
// shared_response_set_->detachResponse(this);
}

void AdminResponse::getHeaders(HeadersFn fn) {
Expand Down Expand Up @@ -87,10 +96,12 @@ bool AdminResponse::cancelled() const {
// admin response, and so calls to getHeader and nextChunk remain valid,
// resulting in 503 and an empty body.
void AdminResponse::terminate() {
ENVOY_LOG_MISC(error, "terminate(): {}", static_cast<void*>(this));
ASSERT_IS_MAIN_OR_TEST_THREAD();
absl::MutexLock lock(&mutex_);
if (!terminated_) {
terminated_ = true;
lifecycle_notifier_.reset();
sendErrorLockHeld();
sendAbortChunkLockHeld();
}
Expand Down Expand Up @@ -159,7 +170,7 @@ void AdminResponse::sendErrorLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
}
}

void AdminResponse::PtrSet::terminateAdminRequests() {
/*void AdminResponse::PtrSet::terminateAdminRequests() {
ASSERT_IS_MAIN_OR_TEST_THREAD();

absl::MutexLock lock(&mutex_);
Expand All @@ -186,6 +197,6 @@ void AdminResponse::PtrSet::attachResponse(AdminResponse* response) {
void AdminResponse::PtrSet::detachResponse(AdminResponse* response) {
absl::MutexLock lock(&mutex_);
response_set_.erase(response);
}
}*/

} // namespace Envoy
10 changes: 7 additions & 3 deletions source/exe/admin_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class AdminResponse : public std::enable_shared_from_this<AdminResponse> {
// In summary:
// * MainCommonBase can outlive AdminResponse so we need detachResponse.
// * AdminResponse can outlive MainCommonBase, so we need shared_ptr.
#if 0
class PtrSet {
public:
/**
Expand Down Expand Up @@ -80,9 +81,10 @@ class AdminResponse : public std::enable_shared_from_this<AdminResponse> {
bool accepting_admin_requests_ ABSL_GUARDED_BY(mutex_) = true;
};
using SharedPtrSet = std::shared_ptr<PtrSet>;
#endif

AdminResponse(Server::Instance& server, absl::string_view path, absl::string_view method,
SharedPtrSet response_set);
AdminResponse(Server::Instance& server, absl::string_view path, absl::string_view method
/*, SharedPtrSet response_set*/);
~AdminResponse();

/**
Expand Down Expand Up @@ -150,6 +152,7 @@ class AdminResponse : public std::enable_shared_from_this<AdminResponse> {
void requestNextChunk();
void sendAbortChunkLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void sendErrorLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// void clearLifecycleTerminator();

Server::Instance& server_;
OptRef<Server::Admin> opt_admin_;
Expand Down Expand Up @@ -181,7 +184,8 @@ class AdminResponse : public std::enable_shared_from_this<AdminResponse> {
BodyFn body_fn_ ABSL_GUARDED_BY(mutex_);
mutable absl::Mutex mutex_;

SharedPtrSet shared_response_set_;
// SharedPtrSet shared_response_set_;
Server::ServerLifecycleNotifier::HandlePtr lifecycle_notifier_ ABSL_GUARDED_BY(mutex_);
};
using AdminResponseSharedPtr = std::shared_ptr<AdminResponse>;

Expand Down
11 changes: 4 additions & 7 deletions source/exe/main_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ MainCommonBase::MainCommonBase(const Server::Options& options, Event::TimeSystem
std::move(platform_impl), std::move(random_generator),
std::move(process_context), createFunction())
#ifdef ENVOY_ADMIN_FUNCTIONALITY
,
shared_response_set_(std::make_shared<AdminResponse::PtrSet>())
//,
// shared_response_set_(std::make_shared<AdminResponse::PtrSet>())
#endif
{
}
Expand All @@ -74,7 +74,7 @@ bool MainCommonBase::run() {
case Server::Mode::Serve:
runServer();
#ifdef ENVOY_ADMIN_FUNCTIONALITY
shared_response_set_->terminateAdminRequests();
// shared_response_set_->terminateAdminRequests();
#endif
ret = true;
break;
Expand Down Expand Up @@ -113,10 +113,7 @@ void MainCommonBase::adminRequest(absl::string_view path_and_query, absl::string

AdminResponseSharedPtr MainCommonBase::adminRequest(absl::string_view path_and_query,
absl::string_view method) {
auto response =
std::make_shared<AdminResponse>(*server(), path_and_query, method, shared_response_set_);
shared_response_set_->attachResponse(response.get());
return response;
return std::make_shared<AdminResponse>(*server(), path_and_query, method);
}
#endif

Expand Down
4 changes: 3 additions & 1 deletion source/exe/main_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ class MainCommonBase : public StrippedMainBase {
*/
AdminResponseSharedPtr adminRequest(absl::string_view path_and_query, absl::string_view method);

using LifecycleHandleSharedPtr = std::shared_ptr<Server::ServerLifecycleNotifier::HandlePtr>;

private:
AdminResponse::SharedPtrSet shared_response_set_;
// AdminResponse::SharedPtrSet shared_response_set_;
#endif
};

Expand Down
34 changes: 25 additions & 9 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ InstanceBase::InstanceBase(Init::Manager& init_manager, const Options& options,
Filesystem::Instance& file_system,
std::unique_ptr<ProcessContext> process_context,
Buffer::WatermarkFactorySharedPtr watermark_factory)
: init_manager_(init_manager), live_(false), options_(options),
: init_manager_(init_manager), options_(options),
validation_context_(options_.allowUnknownStaticFields(),
!options.rejectUnknownDynamicFields(),
options.ignoreUnknownDynamicFields()),
Expand Down Expand Up @@ -1054,7 +1054,10 @@ Runtime::Loader& InstanceBase::runtime() { return *runtime_; }

void InstanceBase::shutdown() {
ENVOY_LOG(info, "shutting down server instance");
shutdown_ = true;
{
absl::MutexLock lock(&stage_callbacks_mutex_);
shutdown_ = true;
}
restarter_.sendParentTerminateRequest();
notifyCallbacksForStage(Stage::ShutdownExit, [this] { dispatcher_->exit(); });
}
Expand All @@ -1074,26 +1077,39 @@ void InstanceBase::shutdownAdmin() {

ServerLifecycleNotifier::HandlePtr InstanceBase::registerCallback(Stage stage,
StageCallback callback) {
absl::MutexLock lock(&stage_callbacks_mutex_);
if (shutdown_) {
return nullptr;
}
auto& callbacks = stage_callbacks_[stage];
return std::make_unique<LifecycleCallbackHandle<StageCallback>>(callbacks, callback);
return std::make_unique<LifecycleCallbackHandle<StageCallback>>(callbacks, callback,
stage_callbacks_mutex_);
}

ServerLifecycleNotifier::HandlePtr
InstanceBase::registerCallback(Stage stage, StageCallbackWithCompletion callback) {
ASSERT(stage == Stage::ShutdownExit);
auto& callbacks = stage_completable_callbacks_[stage];
return std::make_unique<LifecycleCallbackHandle<StageCallbackWithCompletion>>(callbacks,
callback);
return std::make_unique<LifecycleCallbackHandle<StageCallbackWithCompletion>>(
callbacks, callback, stage_callbacks_mutex_);
}

void InstanceBase::notifyCallbacksForStage(Stage stage, std::function<void()> completion_cb) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
const auto it = stage_callbacks_.find(stage);
if (it != stage_callbacks_.end()) {
for (const StageCallback& callback : it->second) {
callback();

std::vector<StageCallback> callbacks_to_run;
{
absl::MutexLock lock(&stage_callbacks_mutex_);
const auto it = stage_callbacks_.find(stage);
if (it != stage_callbacks_.end()) {
LifecycleNotifierCallbacks& callbacks = it->second;
callbacks_to_run.insert(callbacks_to_run.end(), callbacks.begin(), callbacks.end());
}
}
for (StageCallback callback : callbacks_to_run) {
callback();
}
callbacks_to_run.clear();

// Wrap completion_cb so that it only gets invoked when all callbacks for this stage
// have finished their work.
Expand Down
28 changes: 21 additions & 7 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ class InstanceBase : Logger::Loggable<Logger::Id::main>,
void terminate();
void notifyCallbacksForStage(
Stage stage, std::function<void()> completion_cb = [] {});
void notifyCallbacksForStage(Stage stage, std::function<void()> completion_cb = [] {});
void onRuntimeReady();
void onClusterManagerPrimaryInitializationComplete();

Expand All @@ -353,8 +354,8 @@ class InstanceBase : Logger::Loggable<Logger::Id::main>,
// - There may be active connections referencing it.
std::unique_ptr<Secret::SecretManager> secret_manager_;
bool workers_started_{false};
std::atomic<bool> live_;
bool shutdown_{false};
std::atomic<bool> live_{false};
std::atomic<bool> shutdown_{false};
const Options& options_;
ProtobufMessage::ProdValidationContextImpl validation_context_;
TimeSource& time_source_;
Expand Down Expand Up @@ -385,7 +386,9 @@ class InstanceBase : Logger::Loggable<Logger::Id::main>,
std::unique_ptr<Runtime::Loader> runtime_;
ProdWorkerFactory worker_factory_;
std::unique_ptr<ListenerManager> listener_manager_;
absl::node_hash_map<Stage, LifecycleNotifierCallbacks> stage_callbacks_;
absl::node_hash_map<Stage, LifecycleNotifierCallbacks>
stage_callbacks_ ABSL_GUARDED_BY(stage_callbacks_mutex_);
absl::Mutex stage_callbacks_mutex_;
absl::node_hash_map<Stage, LifecycleNotifierCompletionCallbacks> stage_completable_callbacks_;
Configuration::MainImpl config_;
Network::DnsResolverSharedPtr dns_resolver_;
Expand Down Expand Up @@ -419,11 +422,22 @@ class InstanceBase : Logger::Loggable<Logger::Id::main>,

bool stats_flush_in_progress_ : 1;

template <class T>
class LifecycleCallbackHandle : public ServerLifecycleNotifier::Handle, RaiiListElement<T> {
template <class T> class LifecycleCallbackHandle : public ServerLifecycleNotifier::Handle {
public:
LifecycleCallbackHandle(std::list<T>& callbacks, T& callback)
: RaiiListElement<T>(callbacks, callback) {}
LifecycleCallbackHandle(std::list<T>& callbacks, T& callback, absl::Mutex& mutex)
: mutex_(mutex) {
// absl::MutexLock lock(&mutex_);
rail_list_element_ = std::make_unique<RaiiListElement<T>>(callbacks, callback);
}

~LifecycleCallbackHandle() {
absl::MutexLock lock(&mutex_);
rail_list_element_.reset();
}

private:
absl::Mutex& mutex_;
std::unique_ptr<RaiiListElement<T>> rail_list_element_;
};

#ifdef ENVOY_PERFETTO
Expand Down