From 63a10b51e3f58fd80b64d6ba2f001cb5d29f6c7f Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Mon, 13 Feb 2017 16:02:32 -0800 Subject: [PATCH] Add shutdown timeout to grpc completion queue --- src/nginx/grpc_queue.cc | 24 ++++++++++++++++++++++-- src/nginx/grpc_queue.h | 1 + src/nginx/t/grpc_interop_cancel.t | 5 ++--- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/nginx/grpc_queue.cc b/src/nginx/grpc_queue.cc index ced697e1c..520cea98e 100644 --- a/src/nginx/grpc_queue.cc +++ b/src/nginx/grpc_queue.cc @@ -95,6 +95,10 @@ namespace nginx { // GRPC team to create an API for integrating libgrpc into arbitrary // event loops. +namespace { +const std::chrono::seconds kShutdownTimeout(2); +} + std::weak_ptr NgxEspGrpcQueue::instance; std::shared_ptr NgxEspGrpcQueue::Instance() { @@ -125,7 +129,17 @@ void NgxEspGrpcQueue::NginxTagHandler(ngx_event_t *) { void NgxEspGrpcQueue::WorkerThread(NgxEspGrpcQueue *queue) { void *tag; bool ok; - while (queue->cq_->Next(&tag, &ok)) { + while (true) { + auto status = queue->cq_->AsyncNext( + &tag, &ok, std::chrono::system_clock::now() + kShutdownTimeout); + if (status == ::grpc::CompletionQueue::NextStatus::SHUTDOWN || + (status == ::grpc::CompletionQueue::NextStatus::TIMEOUT && + queue->shutting_down_)) { + break; + } + if (status == ::grpc::CompletionQueue::NextStatus::TIMEOUT) { + continue; + } std::unique_ptr cb(static_cast(tag)); if (cb) { bool notify_nginx = false; @@ -147,7 +161,9 @@ void NgxEspGrpcQueue::WorkerThread(NgxEspGrpcQueue *queue) { void NgxEspGrpcQueue::Deleter(NgxEspGrpcQueue *lib) { delete lib; } NgxEspGrpcQueue::NgxEspGrpcQueue() - : cq_(new ::grpc::CompletionQueue()), notified_(false) { + : cq_(new ::grpc::CompletionQueue()), + notified_(false), + shutting_down_(false) { worker_thread_ = std::thread(&NgxEspGrpcQueue::WorkerThread, this); } @@ -176,6 +192,10 @@ NgxEspGrpcQueue::~NgxEspGrpcQueue() { cq_->Shutdown(); + // TODO: This flag is a temporary workaround to force shutdown + // completing queue. To be removed. + shutting_down_ = true; + // N.B. Joining on the worker thread is essential, as that thread // maintains a raw pointer to this datastructure. worker_thread_.join(); diff --git a/src/nginx/grpc_queue.h b/src/nginx/grpc_queue.h index 318e9ce3c..a25bd3fbb 100644 --- a/src/nginx/grpc_queue.h +++ b/src/nginx/grpc_queue.h @@ -141,6 +141,7 @@ class NgxEspGrpcQueue : public AsyncGrpcQueue { std::unique_ptr<::grpc::CompletionQueue> cq_; std::deque pending_; bool notified_; + bool shutting_down_; std::thread worker_thread_; }; diff --git a/src/nginx/t/grpc_interop_cancel.t b/src/nginx/t/grpc_interop_cancel.t index 1e9e08adc..3f4dd5aa4 100644 --- a/src/nginx/t/grpc_interop_cancel.t +++ b/src/nginx/t/grpc_interop_cancel.t @@ -44,7 +44,7 @@ my $ServiceControlPort = ApiManager::pick_port(); my $GrpcBackendPort = ApiManager::pick_port(); my $HttpBackendPort = ApiManager::pick_port(); -my $t = Test::Nginx->new()->has(qw/http proxy/)->plan(4); +my $t = Test::Nginx->new()->has(qw/http proxy/)->plan(5); $t->write_file( 'service.pb.txt', @@ -85,8 +85,7 @@ is($t->waitforsocket("127.0.0.1:${Http2NginxPort}"), 1, 'Nginx socket ready.'); ################################################################################ my @test_cases = ( -# Temporary disabled per b/35314304 -# 'cancel_after_begin', + 'cancel_after_begin', 'cancel_after_first_response', );