Skip to content

Commit 010c35a

Browse files
committed
Merge pull request #947 from basho/dr/forward/proxy_overload_recovery_fixes
Forward port changes to proxy_overload_recovery to make it pass in the face of indeterminate behavior.
2 parents 288fc66 + 57659d3 commit 010c35a

File tree

1 file changed

+50
-25
lines changed

1 file changed

+50
-25
lines changed

tests/proxy_overload_recovery.erl

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -208,16 +208,13 @@ prepare(ThresholdSeed) ->
208208
{ok, VPid0} = riak_core_vnode_manager:get_vnode_pid(Id, riak_kv_vnode),
209209
sys:resume(VPid0),
210210
ok = supervisor:terminate_child(riak_core_vnode_sup, VPid0),
211-
false = is_process_alive(VPid0),
212211

213212
%% Reset the proxy pid to make sure it resets state and picks up the new
214213
%% environment variables
215214
ok = supervisor:terminate_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),
216215
RegName = riak_core_vnode_proxy:reg_name(riak_kv_vnode, Index),
217216
undefined = whereis(RegName),
218-
%% Fail if we get back the dead vnode
219-
{ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode),
220-
?assertNotEqual(VPid1, VPid0),
217+
VPid1 = wait_for_vnode_change(VPid0, Index),
221218

222219
{ok, PPid} = supervisor:restart_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),
223220

@@ -269,14 +266,14 @@ resume_args(#tstate{rt = RT}) ->
269266
resume(#rt{ppid = PPid, vpid = VPid}) ->
270267
sys:resume(VPid),
271268
%% Use the sys:get_status call to force a synchronous call
272-
%% against the vnode proxy to ensure all messages sent by
269+
%% against the vnode & the proxy to ensure all messages sent by
273270
%% this process have been serviced and there are no pending
274271
%% 'ping's in the vnode before we continue.
275272
%% Then drain the vnode to make sure any pending pongs have
276-
%% been sent.
277-
ok = drain(VPid),
273+
%% been sent, and ensure the proxy has
274+
_ = sys:get_status(PPid),
278275
_ = sys:get_status(VPid),
279-
_ = sys:get_status(PPid).
276+
ok = drain([VPid, PPid]).
280277

281278
resume_next(S, _V, _A) ->
282279
S#tstate{vnode_running = true, proxy_msgs = 0, direct_msgs = 0}.
@@ -329,28 +326,28 @@ overloaded_args(#tstate{vnode_running = Running, rt = RT}) ->
329326
overloaded(Running, #rt{ppid = PPid, vpid = VPid}) ->
330327
case Running of
331328
true ->
332-
ok = drain(PPid), % make sure all proxy msgs processed/dropped
333-
ok = drain(VPid); % make sure any pending ping/pongs are processed
329+
ok = drain([PPid, VPid]);
334330
_ ->
335331
ok
336332
end,
337-
{riak_core_vnode_proxy:overloaded(PPid),
338-
msgq_len(VPid), % just for debug so we can review in log output
339-
sys:get_status(PPid)}. % ditto
333+
{messages, PMsgs} = process_info(PPid, messages),
334+
{messages, VMsgs} = process_info(VPid, messages),
335+
Overloaded = riak_core_vnode_proxy:overloaded(PPid),
336+
{Overloaded, {VMsgs, PMsgs}, sys:get_status(PPid)}.
340337

341338
overloaded_post(#tstate{threshold = undefined}, _A,
342-
{R, _VnodeQ, _ProxyStatus}) ->
339+
{R, _Messages, _ProxyStatus}) ->
343340
%% If there are no thresholds there should never be an overload
344341
eq(R, false);
345342
overloaded_post(#tstate{vnode_running = true}, _A,
346-
{R, _VnodeQ = 0, _ProxyStatus}) ->
343+
{R, _Messages, _ProxyStatus}) ->
347344
%% If the vnode is running, we have cleared queues so
348345
%% should not be in overload.
349346
eq(R, false);
350347
overloaded_post(#tstate{vnode_running = false,
351348
proxy_msgs = ProxyMsgs,
352349
threshold = Threshold}, _A,
353-
{ResultOverload, _VnodeQ, _ProxyStatus}) ->
350+
{ResultOverload, _Messages, _ProxyStatus}) ->
354351
%% Either
355352
%% mailbox is completely an estimate based on proxy msgs
356353
%% or mailbox is a check + estimate since
@@ -397,16 +394,33 @@ prep_env(Var, Val) ->
397394
%% Wait until all messages are drained by the Pid. No guarantees
398395
%% about future messages being sent, or that responses for the
399396
%% last message consumed have been transmitted.
400-
%%
401-
drain(Pid) ->
402-
case erlang:process_info(Pid, message_queue_len) of
403-
{message_queue_len, 0} ->
397+
%% NOTE: The "drain 3 times in a row" was determined empirically,
398+
%% and may not be sufficient (2 was not). Given time constraints,
399+
%% living with it for now. If this fails, we should really add some
400+
%% tracing code around the send of messages to Vnode and Proxy to
401+
%% determine where extra messages are coming from rather than just
402+
%% make this "try 4 times"
403+
%%
404+
drain(Pid) when is_pid(Pid) ->
405+
drain([Pid], {-1, -1});
406+
407+
drain(Pids) when is_list(Pids) ->
408+
drain(Pids, {-1, -1}).
409+
drain(Pids, {PrevPrev, Prev}) ->
410+
_ = [sys:suspend(Pid) || Pid <- Pids],
411+
Len = lists:foldl(fun(Pid, Acc0) ->
412+
{message_queue_len, Len} = erlang:process_info(Pid, message_queue_len),
413+
Acc0 + Len
414+
end, 0, Pids),
415+
_ = [sys:resume(Pid) || Pid <- Pids],
416+
case {PrevPrev, Prev, Len} of
417+
{0, 0, 0} ->
404418
ok;
405-
{message_queue_len, L} when L > 0 ->
406-
timer:sleep(1), % give it a millisecond to drain
407-
drain(Pid);
408-
ER ->
409-
ER
419+
_ ->
420+
%% Attempt to ensure something else is scheduled before we try to drain again
421+
erlang:yield(),
422+
timer:sleep(1),
423+
drain(Pids, {Prev, Len})
410424
end.
411425

412426
%% Return the length of the message queue (or crash if proc dead)
@@ -462,3 +476,14 @@ confirm() ->
462476
pass.
463477

464478
-endif.
479+
480+
481+
wait_for_vnode_change(VPid0, Index) ->
482+
{ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode),
483+
case VPid1 of
484+
VPid0 ->
485+
timer:sleep(1),
486+
wait_for_vnode_change(VPid0, Index);
487+
_ ->
488+
VPid1
489+
end.

0 commit comments

Comments
 (0)