Skip to content

Commit 561db3b

Browse files
malmaudamitmurthy
authored andcommitted
Add boundaries to wire format
1 parent 844f284 commit 561db3b

File tree

2 files changed

+130
-55
lines changed

2 files changed

+130
-55
lines changed

base/multi.jl

+120-55
Original file line numberDiff line numberDiff line change
@@ -37,30 +37,47 @@ end
3737
hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))
3838
==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id)
3939

40+
## Wire format description
41+
#
42+
# Each message has three parts, which are written in order to the worker's stream.
43+
# 1) A header of type MsgHeader is serialized to the stream (via `serialize`).
44+
# 2) A message of type AbstractMsg is then serialized.
45+
# 3) Finally, a fixed bounday of 10 bytes is written.
46+
47+
# Message header stored separately from body to be able to send back errors if
48+
# a deserialization error occurs when reading the message body.
49+
type MsgHeader
50+
response_oid::RRID
51+
notify_oid::RRID
52+
end
53+
54+
# Special oid (0,0) uses to indicate a null ID.
55+
# Used instead of Nullable to decrease wire size of header.
56+
null_id(id) = id == RRID(0, 0)
57+
58+
MsgHeader(;response_oid::RRID=RRID(0,0), notify_oid::RRID=RRID(0,0)) =
59+
MsgHeader(response_oid, notify_oid)
4060

4161
type CallMsg{Mode} <: AbstractMsg
4262
f::Function
4363
args::Tuple
4464
kwargs::Array
45-
response_oid::RRID
4665
end
4766
type CallWaitMsg <: AbstractMsg
4867
f::Function
4968
args::Tuple
5069
kwargs::Array
51-
response_oid::RRID
52-
notify_oid::RRID
5370
end
5471
type RemoteDoMsg <: AbstractMsg
5572
f::Function
5673
args::Tuple
5774
kwargs::Array
5875
end
5976
type ResultMsg <: AbstractMsg
60-
response_oid::RRID
6177
value::Any
6278
end
6379

80+
6481
# Worker initialization messages
6582
type IdentifySocketMsg <: AbstractMsg
6683
from_pid::Int
@@ -70,34 +87,32 @@ end
7087
type JoinPGRPMsg <: AbstractMsg
7188
self_pid::Int
7289
other_workers::Array
73-
notify_oid::RRID
7490
topology::Symbol
7591
worker_pool
7692
end
7793
type JoinCompleteMsg <: AbstractMsg
78-
notify_oid::RRID
7994
cpu_cores::Int
8095
ospid::Int
8196
end
8297

83-
function send_msg_unknown(s::IO, msg)
98+
function send_msg_unknown(s::IO, header, msg)
8499
error("attempt to send to unknown socket")
85100
end
86101

87-
function send_msg(s::IO, msg)
102+
function send_msg(s::IO, header, msg)
88103
id = worker_id_from_socket(s)
89104
if id > -1
90-
return send_msg(worker_from_id(id), msg)
105+
return send_msg(worker_from_id(id), header, msg)
91106
end
92-
send_msg_unknown(s, msg)
107+
send_msg_unknown(s, header, msg)
93108
end
94109

95-
function send_msg_now(s::IO, msg::AbstractMsg)
110+
function send_msg_now(s::IO, msghdr, msg::AbstractMsg)
96111
id = worker_id_from_socket(s)
97112
if id > -1
98-
return send_msg_now(worker_from_id(id), msg)
113+
return send_msg_now(worker_from_id(id), msghdr, msg)
99114
end
100-
send_msg_unknown(s, msg)
115+
send_msg_unknown(s, msghdr, msg)
101116
end
102117

103118
abstract ClusterManager
@@ -197,12 +212,12 @@ function set_worker_state(w, state)
197212
notify(w.c_state; all=true)
198213
end
199214

200-
function send_msg_now(w::Worker, msg)
201-
send_msg_(w, msg, true)
215+
function send_msg_now(w::Worker, msghdr, msg)
216+
send_msg_(w, msghdr, msg, true)
202217
end
203218

204-
function send_msg(w::Worker, msg)
205-
send_msg_(w, msg, false)
219+
function send_msg(w::Worker, msghdr, msg)
220+
send_msg_(w, msghdr, msg, false)
206221
end
207222

208223
function flush_gc_msgs(w::Worker)
@@ -241,14 +256,20 @@ function check_worker_state(w::Worker)
241256
end
242257
end
243258

259+
# Boundary inserted between messages on the wire, used for recovering
260+
# from deserialization errors. Picked arbitrarily.
261+
# A size of 10 bytes indicates ~ ~1e24 possible boundaries, so chance of collision with message contents is trivial.
262+
const MSG_BOUNDARY = UInt8[0x79, 0x8e, 0x8e, 0xf5, 0x6e, 0x9b, 0x2e, 0x97, 0xd5, 0x7d]
244263

245-
function send_msg_(w::Worker, msg, now::Bool)
264+
function send_msg_(w::Worker, header, msg, now::Bool)
246265
check_worker_state(w)
247266
io = w.w_stream
248267
lock(io.lock)
249268
try
250269
reset_state(w.w_serializer)
270+
serialize(w.w_serializer, header)
251271
serialize(w.w_serializer, msg) # io is wrapped in w_serializer
272+
write(io, MSG_BOUNDARY)
252273

253274
if !now && w.gcflag
254275
flush_gc_msgs(w)
@@ -768,7 +789,6 @@ function showerror(io::IO, re::RemoteException)
768789
showerror(io, re.captured)
769790
end
770791

771-
772792
function run_work_thunk(thunk, print_error)
773793
local result
774794
try
@@ -811,7 +831,7 @@ end
811831
function remotecall(f, w::Worker, args...; kwargs...)
812832
rr = Future(w)
813833
#println("$(myid()) asking for $rr")
814-
send_msg(w, CallMsg{:call}(f, args, kwargs, remoteref_id(rr)))
834+
send_msg(w, MsgHeader(response_oid=remoteref_id(rr)), CallMsg{:call}(f, args, kwargs))
815835
rr
816836
end
817837

@@ -829,7 +849,7 @@ function remotecall_fetch(f, w::Worker, args...; kwargs...)
829849
oid = RRID()
830850
rv = lookup_ref(oid)
831851
rv.waitingfor = w.id
832-
send_msg(w, CallMsg{:call_fetch}(f, args, kwargs, oid))
852+
send_msg(w, MsgHeader(response_oid=oid), CallMsg{:call_fetch}(f, args, kwargs))
833853
v = take!(rv)
834854
delete!(PGRP.refs, oid)
835855
isa(v, RemoteException) ? throw(v) : v
@@ -846,7 +866,7 @@ function remotecall_wait(f, w::Worker, args...; kwargs...)
846866
rv = lookup_ref(prid)
847867
rv.waitingfor = w.id
848868
rr = Future(w)
849-
send_msg(w, CallWaitMsg(f, args, kwargs, remoteref_id(rr), prid))
869+
send_msg(w, MsgHeader(response_oid=remoteref_id(rr), notify_oid=prid), CallWaitMsg(f, args, kwargs))
850870
v = fetch(rv.c)
851871
delete!(PGRP.refs, prid)
852872
isa(v, RemoteException) && throw(v)
@@ -866,7 +886,7 @@ function remote_do(f, w::LocalProcess, args...; kwargs...)
866886
end
867887

868888
function remote_do(f, w::Worker, args...; kwargs...)
869-
send_msg(w, RemoteDoMsg(f, args, kwargs))
889+
send_msg(w, MsgHeader(), RemoteDoMsg(f, args, kwargs))
870890
nothing
871891
end
872892

@@ -952,13 +972,13 @@ close(rr::RemoteChannel) = call_on_owner(close_ref, rr)
952972

953973
function deliver_result(sock::IO, msg, oid, value)
954974
#print("$(myid()) sending result $oid\n")
955-
if is(msg,:call_fetch) || isa(value, RemoteException)
975+
if is(msg, :call_fetch) || isa(value, RemoteException)
956976
val = value
957977
else
958978
val = :OK
959979
end
960980
try
961-
send_msg_now(sock, ResultMsg(oid, val))
981+
send_msg_now(sock, MsgHeader(response_oid=oid), ResultMsg(val))
962982
catch e
963983
# terminate connection in case of serialization error
964984
# otherwise the reading end would hang
@@ -996,28 +1016,73 @@ function process_messages(r_stream::IO, w_stream::IO, incoming=true)
9961016
end
9971017

9981018
function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
1019+
wpid=0 # the worker r_stream is connected to.
1020+
boundary = similar(MSG_BOUNDARY)
9991021
try
10001022
version = process_hdr(r_stream, incoming)
10011023
serializer = ClusterSerializer(r_stream)
1024+
1025+
# The first message will associate wpid with r_stream
1026+
msghdr = deserialize(serializer)
1027+
msg = deserialize(serializer)
1028+
readbytes!(r_stream, boundary, length(MSG_BOUNDARY))
1029+
1030+
handle_msg(msg, msghdr, r_stream, w_stream, version)
1031+
wpid = worker_id_from_socket(r_stream)
1032+
1033+
@assert wpid > 0
1034+
10021035
while true
10031036
reset_state(serializer)
1004-
msg = deserialize(serializer)
1005-
# println("got msg: ", msg)
1006-
handle_msg(msg, r_stream, w_stream, version)
1037+
msghdr = deserialize(serializer)
1038+
# println("msghdr: ", msghdr)
1039+
1040+
try
1041+
msg = deserialize(serializer)
1042+
catch e
1043+
# Deserialization error; discard bytes in stream until boundary found
1044+
boundary_idx = 1
1045+
while true
1046+
# This may throw an EOF error if the terminal boundary was not written
1047+
# correctly, triggering the higher-scoped catch block below
1048+
byte = read(r_stream, UInt8)
1049+
if byte == MSG_BOUNDARY[boundary_idx]
1050+
boundary_idx += 1
1051+
if boundary_idx > length(MSG_BOUNDARY)
1052+
break
1053+
end
1054+
else
1055+
boundary_idx = 1
1056+
end
1057+
end
1058+
# println("Deserialization error.")
1059+
remote_err = RemoteException(myid(), CapturedException(e, catch_backtrace()))
1060+
if !null_id(msghdr.response_oid)
1061+
ref = lookup_ref(msghdr.response_oid)
1062+
put!(ref, remote_err)
1063+
end
1064+
if !null_id(msghdr.notify_oid)
1065+
deliver_result(w_stream, :call_fetch, msghdr.notify_oid, remote_err)
1066+
end
1067+
continue
1068+
end
1069+
readbytes!(r_stream, boundary, length(MSG_BOUNDARY))
1070+
1071+
# println("got msg: ", typeof(msg))
1072+
handle_msg(msg, msghdr, r_stream, w_stream, version)
10071073
end
10081074
catch e
10091075
# println(STDERR, "Process($(myid())) - Exception ", e)
1010-
iderr = worker_id_from_socket(r_stream)
1011-
if (iderr < 1)
1076+
if (wpid < 1)
10121077
println(STDERR, e)
10131078
println(STDERR, "Process($(myid())) - Unknown remote, closing connection.")
10141079
else
1015-
werr = worker_from_id(iderr)
1080+
werr = worker_from_id(wpid)
10161081
oldstate = werr.state
10171082
set_worker_state(werr, W_TERMINATED)
10181083

1019-
# If error occured talking to pid 1, commit harakiri
1020-
if iderr == 1
1084+
# If unhandleable error occured talking to pid 1, exit
1085+
if wpid == 1
10211086
if isopen(w_stream)
10221087
print(STDERR, "fatal error on ", myid(), ": ")
10231088
display_error(e, catch_backtrace())
@@ -1028,15 +1093,15 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
10281093
# Will treat any exception as death of node and cleanup
10291094
# since currently we do not have a mechanism for workers to reconnect
10301095
# to each other on unhandled errors
1031-
deregister_worker(iderr)
1096+
deregister_worker(wpid)
10321097
end
10331098

10341099
isopen(r_stream) && close(r_stream)
10351100
isopen(w_stream) && close(w_stream)
10361101

1037-
if (myid() == 1) && (iderr > 1)
1102+
if (myid() == 1) && (wpid > 1)
10381103
if oldstate != W_TERMINATING
1039-
println(STDERR, "Worker $iderr terminated.")
1104+
println(STDERR, "Worker $wpid terminated.")
10401105
rethrow(e)
10411106
end
10421107
end
@@ -1071,44 +1136,44 @@ function process_hdr(s, validate_cookie)
10711136
return VersionNumber(strip(String(version)))
10721137
end
10731138

1074-
function handle_msg(msg::CallMsg{:call}, r_stream, w_stream, version)
1075-
schedule_call(msg.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
1139+
function handle_msg(msg::CallMsg{:call}, msghdr, r_stream, w_stream, version)
1140+
schedule_call(msghdr.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
10761141
end
1077-
function handle_msg(msg::CallMsg{:call_fetch}, r_stream, w_stream, version)
1142+
function handle_msg(msg::CallMsg{:call_fetch}, msghdr, r_stream, w_stream, version)
10781143
@schedule begin
10791144
v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false)
1080-
deliver_result(w_stream, :call_fetch, msg.response_oid, v)
1145+
deliver_result(w_stream, :call_fetch, msghdr.response_oid, v)
10811146
end
10821147
end
10831148

1084-
function handle_msg(msg::CallWaitMsg, r_stream, w_stream, version)
1149+
function handle_msg(msg::CallWaitMsg, msghdr, r_stream, w_stream, version)
10851150
@schedule begin
1086-
rv = schedule_call(msg.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
1087-
deliver_result(w_stream, :call_wait, msg.notify_oid, fetch(rv.c))
1151+
rv = schedule_call(msghdr.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
1152+
deliver_result(w_stream, :call_wait, msghdr.notify_oid, fetch(rv.c))
10881153
end
10891154
end
10901155

1091-
function handle_msg(msg::RemoteDoMsg, r_stream, w_stream, version)
1156+
function handle_msg(msg::RemoteDoMsg, msghdr, r_stream, w_stream, version)
10921157
@schedule run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true)
10931158
end
10941159

1095-
function handle_msg(msg::ResultMsg, r_stream, w_stream, version)
1096-
put!(lookup_ref(msg.response_oid), msg.value)
1160+
function handle_msg(msg::ResultMsg, msghdr, r_stream, w_stream, version)
1161+
put!(lookup_ref(msghdr.response_oid), msg.value)
10971162
end
10981163

1099-
function handle_msg(msg::IdentifySocketMsg, r_stream, w_stream, version)
1164+
function handle_msg(msg::IdentifySocketMsg, msghdr, r_stream, w_stream, version)
11001165
# register a new peer worker connection
11011166
w=Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)
11021167
send_connection_hdr(w, false)
1103-
send_msg_now(w, IdentifySocketAckMsg())
1168+
send_msg_now(w, MsgHeader(), IdentifySocketAckMsg())
11041169
end
11051170

1106-
function handle_msg(msg::IdentifySocketAckMsg, r_stream, w_stream, version)
1171+
function handle_msg(msg::IdentifySocketAckMsg, msghdr, r_stream, w_stream, version)
11071172
w = map_sock_wrkr[r_stream]
11081173
w.version = version
11091174
end
11101175

1111-
function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream, version)
1176+
function handle_msg(msg::JoinPGRPMsg, msghdr, r_stream, w_stream, version)
11121177
LPROC.id = msg.self_pid
11131178
controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)
11141179
register_worker(LPROC)
@@ -1129,7 +1194,7 @@ function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream, version)
11291194

11301195
set_default_worker_pool(msg.worker_pool)
11311196
send_connection_hdr(controller, false)
1132-
send_msg_now(controller, JoinCompleteMsg(msg.notify_oid, Sys.CPU_CORES, getpid()))
1197+
send_msg_now(controller, MsgHeader(notify_oid=msghdr.notify_oid), JoinCompleteMsg(Sys.CPU_CORES, getpid()))
11331198
end
11341199

11351200
function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
@@ -1138,23 +1203,23 @@ function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConf
11381203
w = Worker(rpid, r_s, w_s, manager; config=wconfig)
11391204
process_messages(w.r_stream, w.w_stream, false)
11401205
send_connection_hdr(w, true)
1141-
send_msg_now(w, IdentifySocketMsg(myid()))
1206+
send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid()))
11421207
catch e
11431208
display_error(e, catch_backtrace())
11441209
println(STDERR, "Error [$e] on $(myid()) while connecting to peer $rpid. Exiting.")
11451210
exit(1)
11461211
end
11471212
end
11481213

1149-
function handle_msg(msg::JoinCompleteMsg, r_stream, w_stream, version)
1214+
function handle_msg(msg::JoinCompleteMsg, msghdr, r_stream, w_stream, version)
11501215
w = map_sock_wrkr[r_stream]
11511216
environ = get(w.config.environ, Dict())
11521217
environ[:cpu_cores] = msg.cpu_cores
11531218
w.config.environ = environ
11541219
w.config.ospid = msg.ospid
11551220
w.version = version
11561221

1157-
ntfy_channel = lookup_ref(msg.notify_oid)
1222+
ntfy_channel = lookup_ref(msghdr.notify_oid)
11581223
put!(ntfy_channel, w.id)
11591224

11601225
push!(default_worker_pool(), w)
@@ -1478,7 +1543,7 @@ function create_worker(manager, wconfig)
14781543

14791544
all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list)
14801545
send_connection_hdr(w, true)
1481-
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, ntfy_oid, PGRP.topology, default_worker_pool()))
1546+
send_msg_now(w, MsgHeader(notify_oid=ntfy_oid), JoinPGRPMsg(w.id, all_locs, PGRP.topology, default_worker_pool()))
14821547

14831548
@schedule manage(w.manager, w.id, w.config, :register)
14841549
wait(rr_ntfy_join)

0 commit comments

Comments
 (0)