diff --git a/tests/proxy_overload_recovery.erl b/tests/proxy_overload_recovery.erl index 883ca19bd..03248d008 100644 --- a/tests/proxy_overload_recovery.erl +++ b/tests/proxy_overload_recovery.erl @@ -208,16 +208,13 @@ prepare(ThresholdSeed) -> {ok, VPid0} = riak_core_vnode_manager:get_vnode_pid(Id, riak_kv_vnode), sys:resume(VPid0), ok = supervisor:terminate_child(riak_core_vnode_sup, VPid0), - false = is_process_alive(VPid0), %% Reset the proxy pid to make sure it resets state and picks up the new %% environment variables ok = supervisor:terminate_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}), RegName = riak_core_vnode_proxy:reg_name(riak_kv_vnode, Index), undefined = whereis(RegName), - %% Fail if we get back the dead vnode - {ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode), - ?assertNotEqual(VPid1, VPid0), + VPid1 = wait_for_vnode_change(VPid0, Index), {ok, PPid} = supervisor:restart_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}), @@ -269,14 +266,14 @@ resume_args(#tstate{rt = RT}) -> resume(#rt{ppid = PPid, vpid = VPid}) -> sys:resume(VPid), %% Use the sys:get_status call to force a synchronous call - %% against the vnode proxy to ensure all messages sent by + %% against the vnode & the proxy to ensure all messages sent by %% this process have been serviced and there are no pending %% 'ping's in the vnode before we continue. %% Then drain the vnode to make sure any pending pongs have - %% been sent. - ok = drain(VPid), + %% been sent, and ensure the proxy has + _ = sys:get_status(PPid), _ = sys:get_status(VPid), - _ = sys:get_status(PPid). + ok = drain([VPid, PPid]). resume_next(S, _V, _A) -> S#tstate{vnode_running = true, proxy_msgs = 0, direct_msgs = 0}. @@ -329,28 +326,28 @@ overloaded_args(#tstate{vnode_running = Running, rt = RT}) -> overloaded(Running, #rt{ppid = PPid, vpid = VPid}) -> case Running of true -> - ok = drain(PPid), % make sure all proxy msgs processed/dropped - ok = drain(VPid); % make sure any pending ping/pongs are processed + ok = drain([PPid, VPid]); _ -> ok end, - {riak_core_vnode_proxy:overloaded(PPid), - msgq_len(VPid), % just for debug so we can review in log output - sys:get_status(PPid)}. % ditto + {messages, PMsgs} = process_info(PPid, messages), + {messages, VMsgs} = process_info(VPid, messages), + Overloaded = riak_core_vnode_proxy:overloaded(PPid), + {Overloaded, {VMsgs, PMsgs}, sys:get_status(PPid)}. overloaded_post(#tstate{threshold = undefined}, _A, - {R, _VnodeQ, _ProxyStatus}) -> + {R, _Messages, _ProxyStatus}) -> %% If there are no thresholds there should never be an overload eq(R, false); overloaded_post(#tstate{vnode_running = true}, _A, - {R, _VnodeQ = 0, _ProxyStatus}) -> + {R, _Messages, _ProxyStatus}) -> %% If the vnode is running, we have cleared queues so %% should not be in overload. eq(R, false); overloaded_post(#tstate{vnode_running = false, proxy_msgs = ProxyMsgs, threshold = Threshold}, _A, - {ResultOverload, _VnodeQ, _ProxyStatus}) -> + {ResultOverload, _Messages, _ProxyStatus}) -> %% Either %% mailbox is completely an estimate based on proxy msgs %% or mailbox is a check + estimate since @@ -397,16 +394,33 @@ prep_env(Var, Val) -> %% Wait until all messages are drained by the Pid. No guarantees %% about future messages being sent, or that responses for the %% last message consumed have been transmitted. -%% -drain(Pid) -> - case erlang:process_info(Pid, message_queue_len) of - {message_queue_len, 0} -> +%% NOTE: The "drain 3 times in a row" was determined empirically, +%% and may not be sufficient (2 was not). Given time constraints, +%% living with it for now. If this fails, we should really add some +%% tracing code around the send of messages to Vnode and Proxy to +%% determine where extra messages are coming from rather than just +%% make this "try 4 times" +%% +drain(Pid) when is_pid(Pid) -> + drain([Pid], {-1, -1}); + +drain(Pids) when is_list(Pids) -> + drain(Pids, {-1, -1}). +drain(Pids, {PrevPrev, Prev}) -> + _ = [sys:suspend(Pid) || Pid <- Pids], + Len = lists:foldl(fun(Pid, Acc0) -> + {message_queue_len, Len} = erlang:process_info(Pid, message_queue_len), + Acc0 + Len + end, 0, Pids), + _ = [sys:resume(Pid) || Pid <- Pids], + case {PrevPrev, Prev, Len} of + {0, 0, 0} -> ok; - {message_queue_len, L} when L > 0 -> - timer:sleep(1), % give it a millisecond to drain - drain(Pid); - ER -> - ER + _ -> + %% Attempt to ensure something else is scheduled before we try to drain again + erlang:yield(), + timer:sleep(1), + drain(Pids, {Prev, Len}) end. %% Return the length of the message queue (or crash if proc dead) @@ -462,3 +476,14 @@ confirm() -> pass. -endif. + + +wait_for_vnode_change(VPid0, Index) -> + {ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode), + case VPid1 of + VPid0 -> + timer:sleep(1), + wait_for_vnode_change(VPid0, Index); + _ -> + VPid1 + end.