diff --git a/tests/nextgenrepl_rtq_peerdiscovery.erl b/tests/nextgenrepl_rtq_peerdiscovery.erl index 9ad860012..94cc4c4a1 100644 --- a/tests/nextgenrepl_rtq_peerdiscovery.erl +++ b/tests/nextgenrepl_rtq_peerdiscovery.erl @@ -163,6 +163,31 @@ cluster_test(ClusterA, ClusterB, ClusterC, Protocol) -> lager:info("Every peer should change on reset after start"), ?assertMatch(LA, length(reset_cluster_peers(NodeA, cluster_a))), ?assertMatch(LB, length(reset_cluster_peers(NodeB, cluster_b))), + + + lager:info("Confirm peer discovery handles suspended queue"), + PidA = get_peerdiscovery_pid(NodeA), + ok = sink_action(NodeA, {suspend, cluster_a}), + UA0 = update_discovery(NodeA, cluster_a), + PidB = get_peerdiscovery_pid(NodeA), + ok = sink_action(NodeA, {resume, cluster_a}), + + ?assert(not UA0), + ?assertMatch(PidA, PidB), + + lager:info("Confirm peer discovery handles disabled sink"), + ok = sink_action(NodeA, disable), + UA1 = update_discovery(NodeA, cluster_a), + PidC = get_peerdiscovery_pid(NodeA), + ok = sink_action(NodeA, enable), + UA2 = update_discovery(NodeA, cluster_a), + PidD = get_peerdiscovery_pid(NodeA), + + ?assert(not UA1), + ?assertMatch(PidA, PidC), + ?assert(UA2), + ?assertMatch(PidA, PidD), + pass. compare_peer_info(ExpectedPeers, Protocol) -> @@ -487,4 +512,25 @@ set_max_delay([Node|Rest], S) -> set_max_delay(Rest, S). check_peers_stable(Node, QueueName) -> + rpc:call(Node, riak_kv_replrtq_peer, update_discovery, [QueueName]). + +get_peerdiscovery_pid(Node) -> + rpc:call(Node, erlang, whereis, [riak_kv_replrtq_peer]). + +sink_action(Node, {suspend, QueueName}) -> + rpc:call(Node, riak_kv_replrtq_snk, suspend_snkqueue, [QueueName]); +sink_action(Node, {resume, QueueName}) -> + rpc:call(Node, riak_kv_replrtq_snk, resume_snkqueue, [QueueName]); +sink_action(Node, disable) -> + rpc:call(Node, application, set_env, [riak_kv, replrtq_enablesink, false]), + P = rpc:call(Node, erlang, whereis, [riak_kv_replrtq_snk]), + rpc:call(Node, erlang, exit, [P, kill]), + ok; +sink_action(Node, enable) -> + rpc:call(Node, application, set_env, [riak_kv, replrtq_enablesink, true]), + P = rpc:call(Node, erlang, whereis, [riak_kv_replrtq_snk]), + rpc:call(Node, erlang, exit, [P, kill]), + ok. + +update_discovery(Node, QueueName) -> rpc:call(Node, riak_kv_replrtq_peer, update_discovery, [QueueName]). \ No newline at end of file diff --git a/tests/repl_rt_overload.erl b/tests/repl_rt_overload.erl index 1440d5b1e..404c42d83 100644 --- a/tests/repl_rt_overload.erl +++ b/tests/repl_rt_overload.erl @@ -81,7 +81,7 @@ verify_overload_writes(LeaderA, LeaderB) -> <> <= erlang:md5(term_to_binary(os:timestamp()))]), TestBucket = <>, First = 1, - Last = 10000, + Last = 100000, %% Write some objects to the source cluster (A), lager:info("Writing ~p keys to ~p, to ~p",