Skip to content

Commit 93233a9

Browse files
authored
[Disco] Treat hangup of disco worker process as kShutdown (#16989)
Prior to this commit, each disco worker needed to receive `DiscoAction::kShutdown` in order to close cleanly. While this is sent from the destructor of `ProcessSessionObj`, which owns the worker processes, this does not guarantee that the disco workers will receive the shutdown command. For example, the controller process holding the `ProcessSessionObj` may reach a timeout and be terminated, preventing it from sending the `DiscoAction::kShutdown` command. This commit updates the disco worker to check for a closed pipe that occurs between two packets, and to treat this as if the `DiscoAction::kShutdown` command were received. A closed pipe that occurs at any other location is still treated as an error and reported.
1 parent c6a8a80 commit 93233a9

File tree

1 file changed

+34
-4
lines changed

1 file changed

+34
-4
lines changed

src/runtime/disco/process_session.cc

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,21 @@ class DiscoPipeMessageQueue : private dmlc::Stream, private DiscoProtocol<DiscoP
4848
}
4949

5050
TVMArgs Recv() {
51-
DequeueNextPacket();
51+
bool is_implicit_shutdown = DequeueNextPacket();
5252
TVMValue* values = nullptr;
5353
int* type_codes = nullptr;
5454
int num_args = 0;
55-
RPCReference::RecvPackedSeq(&values, &type_codes, &num_args, this);
55+
56+
if (is_implicit_shutdown) {
57+
num_args = 2;
58+
values = ArenaAlloc<TVMValue>(num_args);
59+
type_codes = ArenaAlloc<int>(num_args);
60+
TVMArgsSetter setter(values, type_codes);
61+
setter(0, static_cast<int>(DiscoAction::kShutDown));
62+
setter(1, 0);
63+
} else {
64+
RPCReference::RecvPackedSeq(&values, &type_codes, &num_args, this);
65+
}
5666
return TVMArgs(values, type_codes, num_args);
5767
}
5868

@@ -62,18 +72,38 @@ class DiscoPipeMessageQueue : private dmlc::Stream, private DiscoProtocol<DiscoP
6272
write_buffer_.clear();
6373
}
6474

65-
void DequeueNextPacket() {
75+
/* \brief Read next packet and reset unpacker
76+
*
77+
* Read the next packet into `read_buffer_`, releasing all arena
78+
* allocations performed by the unpacker and resetting the unpacker
79+
* to its initial state.
80+
*
81+
* \return A boolean value. If true, this packet should be treated
82+
* equivalently to a `DiscoAction::kShutdown` event. If false,
83+
* this packet should be unpacked.
84+
*/
85+
bool DequeueNextPacket() {
6686
uint64_t packet_nbytes = 0;
6787
int read_size = pipe_.Read(&packet_nbytes, sizeof(packet_nbytes));
88+
if (read_size == 0) {
89+
// Special case, connection dropped between packets. Treat as a
90+
// request to shutdown.
91+
return true;
92+
}
93+
6894
ICHECK_EQ(read_size, sizeof(packet_nbytes))
6995
<< "Pipe closed without proper shutdown. Please make sure to explicitly call "
7096
"`Session::Shutdown`";
7197
read_buffer_.resize(packet_nbytes);
72-
pipe_.Read(read_buffer_.data(), packet_nbytes);
98+
read_size = pipe_.Read(read_buffer_.data(), packet_nbytes);
99+
ICHECK_EQ(read_size, packet_nbytes)
100+
<< "Pipe closed without proper shutdown. Please make sure to explicitly call "
101+
"`Session::Shutdown`";
73102
read_offset_ = 0;
74103
this->RecycleAll();
75104
RPCCode code = RPCCode::kReturn;
76105
this->Read(&code);
106+
return false;
77107
}
78108

79109
size_t Read(void* data, size_t size) final {

0 commit comments

Comments
 (0)