Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 50 additions & 25 deletions tests/proxy_overload_recovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,13 @@ prepare(ThresholdSeed) ->
{ok, VPid0} = riak_core_vnode_manager:get_vnode_pid(Id, riak_kv_vnode),
sys:resume(VPid0),
ok = supervisor:terminate_child(riak_core_vnode_sup, VPid0),
false = is_process_alive(VPid0),

%% Reset the proxy pid to make sure it resets state and picks up the new
%% environment variables
ok = supervisor:terminate_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),
RegName = riak_core_vnode_proxy:reg_name(riak_kv_vnode, Index),
undefined = whereis(RegName),
%% Fail if we get back the dead vnode
{ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode),
?assertNotEqual(VPid1, VPid0),
VPid1 = wait_for_vnode_change(VPid0, Index),

{ok, PPid} = supervisor:restart_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),

Expand Down Expand Up @@ -269,14 +266,14 @@ resume_args(#tstate{rt = RT}) ->
resume(#rt{ppid = PPid, vpid = VPid}) ->
sys:resume(VPid),
%% Use the sys:get_status call to force a synchronous call
%% against the vnode proxy to ensure all messages sent by
%% against the vnode & the proxy to ensure all messages sent by
%% this process have been serviced and there are no pending
%% 'ping's in the vnode before we continue.
%% Then drain the vnode to make sure any pending pongs have
%% been sent.
ok = drain(VPid),
%% been sent, and ensure the proxy has
_ = sys:get_status(PPid),
_ = sys:get_status(VPid),
_ = sys:get_status(PPid).
ok = drain([VPid, PPid]).

resume_next(S, _V, _A) ->
S#tstate{vnode_running = true, proxy_msgs = 0, direct_msgs = 0}.
Expand Down Expand Up @@ -329,28 +326,28 @@ overloaded_args(#tstate{vnode_running = Running, rt = RT}) ->
overloaded(Running, #rt{ppid = PPid, vpid = VPid}) ->
case Running of
true ->
ok = drain(PPid), % make sure all proxy msgs processed/dropped
ok = drain(VPid); % make sure any pending ping/pongs are processed
ok = drain([PPid, VPid]);
_ ->
ok
end,
{riak_core_vnode_proxy:overloaded(PPid),
msgq_len(VPid), % just for debug so we can review in log output
sys:get_status(PPid)}. % ditto
{messages, PMsgs} = process_info(PPid, messages),
{messages, VMsgs} = process_info(VPid, messages),
Overloaded = riak_core_vnode_proxy:overloaded(PPid),
{Overloaded, {VMsgs, PMsgs}, sys:get_status(PPid)}.

overloaded_post(#tstate{threshold = undefined}, _A,
{R, _VnodeQ, _ProxyStatus}) ->
{R, _Messages, _ProxyStatus}) ->
%% If there are no thresholds there should never be an overload
eq(R, false);
overloaded_post(#tstate{vnode_running = true}, _A,
{R, _VnodeQ = 0, _ProxyStatus}) ->
{R, _Messages, _ProxyStatus}) ->
%% If the vnode is running, we have cleared queues so
%% should not be in overload.
eq(R, false);
overloaded_post(#tstate{vnode_running = false,
proxy_msgs = ProxyMsgs,
threshold = Threshold}, _A,
{ResultOverload, _VnodeQ, _ProxyStatus}) ->
{ResultOverload, _Messages, _ProxyStatus}) ->
%% Either
%% mailbox is completely an estimate based on proxy msgs
%% or mailbox is a check + estimate since
Expand Down Expand Up @@ -397,16 +394,33 @@ prep_env(Var, Val) ->
%% Wait until all messages are drained by the Pid. No guarantees
%% about future messages being sent, or that responses for the
%% last message consumed have been transmitted.
%%
drain(Pid) ->
case erlang:process_info(Pid, message_queue_len) of
{message_queue_len, 0} ->
%% NOTE: The "drain 3 times in a row" was determined empirically,
%% and may not be sufficient (2 was not). Given time constraints,
%% living with it for now. If this fails, we should really add some
%% tracing code around the send of messages to Vnode and Proxy to
%% determine where extra messages are coming from rather than just
%% make this "try 4 times"
%%
drain(Pid) when is_pid(Pid) ->
drain([Pid], {-1, -1});

drain(Pids) when is_list(Pids) ->
drain(Pids, {-1, -1}).
drain(Pids, {PrevPrev, Prev}) ->
_ = [sys:suspend(Pid) || Pid <- Pids],
Len = lists:foldl(fun(Pid, Acc0) ->
{message_queue_len, Len} = erlang:process_info(Pid, message_queue_len),
Acc0 + Len
end, 0, Pids),
_ = [sys:resume(Pid) || Pid <- Pids],
case {PrevPrev, Prev, Len} of
{0, 0, 0} ->
ok;
{message_queue_len, L} when L > 0 ->
timer:sleep(1), % give it a millisecond to drain
drain(Pid);
ER ->
ER
_ ->
%% Attempt to ensure something else is scheduled before we try to drain again
erlang:yield(),
timer:sleep(1),
drain(Pids, {Prev, Len})
end.

%% Return the length of the message queue (or crash if proc dead)
Expand Down Expand Up @@ -462,3 +476,14 @@ confirm() ->
pass.

-endif.


wait_for_vnode_change(VPid0, Index) ->
{ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode),
case VPid1 of
VPid0 ->
timer:sleep(1),
wait_for_vnode_change(VPid0, Index);
_ ->
VPid1
end.