From 01f5730a0d0103ebd7d4f05e5d3fd2eb359d6018 Mon Sep 17 00:00:00 2001 From: Yimin Jiang Date: Mon, 14 Oct 2019 18:45:05 +0800 Subject: [PATCH] zmq: fix exit hanging problem (#5) --- src/zmq_van.h | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/zmq_van.h b/src/zmq_van.h index 58736165e..9be27bfe9 100644 --- a/src/zmq_van.h +++ b/src/zmq_van.h @@ -74,6 +74,7 @@ class ZMQVan : public Van { // join all threads should_stop_ = true; for (auto t : thread_list_) t->join(); + PS_VLOG(1) << my_node_.ShortDebugString() << " all threads joined and destroyed"; // close sockets int linger = 0; int rc = zmq_setsockopt(receiver_, ZMQ_LINGER, &linger, sizeof(linger)); @@ -285,7 +286,7 @@ class ZMQVan : public Van { CHECK(socket); LOG(INFO) << "Start ZMQ recv thread"; - while (!should_stop_) { + while (true) { ZmqBufferContext *buf_ctx = new ZmqBufferContext(); for (int i = 0;; ++i) { @@ -295,7 +296,7 @@ class ZMQVan : public Van { std::lock_guard lk(mu_); // the zmq_msg_recv should be non-blocking, otherwise deadlock will happen int tag = ZMQ_DONTWAIT; - if (zmq_msg_recv(zmsg, socket, tag) != -1) break; + if (should_stop_ || zmq_msg_recv(zmsg, socket, tag) != -1) break; if (errno == EINTR) { std::cout << "interrupted"; continue; @@ -305,6 +306,7 @@ class ZMQVan : public Van { CHECK(0) << "failed to receive message. errno: " << errno << " " << zmq_strerror(errno); } + if (should_stop_) break; char* buf = CHECK_NOTNULL((char*)zmq_msg_data(zmsg)); size_t size = zmq_msg_size(zmsg); @@ -327,7 +329,7 @@ class ZMQVan : public Van { if (!more) break; } } // for - + if (should_stop_) break; recv_buffers_.Push(*buf_ctx); } // while }