Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
stop/1,
join/2,
leave/1,
admin/2,
wait_until_pingable/1,
wait_until_unpingable/1,
wait_until_ready/1,
Expand Down Expand Up @@ -78,6 +79,9 @@ leave(Node) ->
?assertEqual(ok, R),
ok.

admin(Node, Args) ->
?HARNESS:admin(Node, Args).

%% @doc Have `Node' remove `OtherNode' from the cluster
remove(Node, OtherNode) ->
?assertEqual(ok,
Expand Down
18 changes: 18 additions & 0 deletions src/rtdev.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ gitcmd(Path, Cmd) ->
io_lib:format("git --git-dir=\"~s/dev/.git\" --work-tree=\"~s/dev\" ~s",
[Path, Path, Cmd]).

riak_admin_cmd(Path, N, Args) ->
Quoted =
lists:map(fun(Arg) when is_list(Arg) ->
lists:flatten([$", Arg, $"]);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should also escape quote characters in Arg before wrapping it?

re:replace(Arg, "\"", "\\\\\"", [global, {return, list}]).

(_) ->
erlang:error(badarg)
end, Args),
ArgStr = string:join(Quoted, " "),
io_lib:format("~s/dev/dev~b/bin/riak-admin ~s", [Path, N, ArgStr]).

run_git(Path, Cmd) ->
lager:debug("Running: ~s", [gitcmd(Path, Cmd)]),
os:cmd(gitcmd(Path, Cmd)).
Expand Down Expand Up @@ -96,6 +106,14 @@ start(Node) ->
run_riak(node_id(Node), ?PATH, "start"),
ok.

admin(Node, Args) ->
Path = ?PATH,
Cmd = riak_admin_cmd(Path, node_id(Node), Args),
lager:debug("Running: ~s", [Cmd]),
Result = os:cmd(Cmd),
io:format("~s", [Result]),
ok.

node_id(Node) ->
NodeMap = rt:config(rt_nodes),
orddict:fetch(Node, NodeMap).
Expand Down
152 changes: 152 additions & 0 deletions tests/verify_staged_clustering.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
-module(verify_staged_clustering).
-export([verify_staged_clustering/0]).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").

verify_staged_clustering() ->
Nodes = rt:deploy_nodes(4),
[Node1, Node2, Node3, Node4] = Nodes,
Nodes123 = [Node1, Node2, Node3],
Nodes23 = [Node2, Node3],

lager:info("Join ~p and ~p to ~p", [Node2, Node3, Node1]),
[stage_join(Node, Node1) || Node <- Nodes23],
?assertEqual(ok, rt:wait_until_all_members(Nodes123)),
?assertEqual(ok, rt:wait_until_no_pending_changes(Nodes123)),

lager:info("Ensure that ~p has not yet claimed partitions", [Node2]),
[?assertEqual([Node1], rt:owners_according_to(Node)) || Node <- Nodes123],

lager:info("Commit without first printing the plan. This should fail"),
commit_staged(Node1),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says, "This should fail", but it looks like that's not checked here. Is there a way it could be checked?


lager:info("Print staged plan and then commit"),
print_staged(Node1),
commit_staged(Node1),

lager:info("Ensure that ~p now own all partitions", [Nodes123]),
?assertEqual(ok, rt:wait_until_nodes_ready(Nodes123)),
?assertEqual(ok, rt:wait_until_no_pending_changes(Nodes123)),
[?assertEqual(Nodes123, rt:owners_according_to(Node)) || Node <- Nodes123],

lager:info("Join ~p to the cluster", [Node4]),
stage_join(Node4, Node1),
?assertEqual(ok, rt:wait_until_all_members(Nodes)),

lager:info("Stage replacement of ~p with ~p", [Node2, Node4]),
stage_replace(Node1, Node2, Node4),

lager:info("Print staged plan and commit"),
print_staged(Node1),
commit_staged(Node1),

Nodes134 = [Node1, Node3, Node4],
lager:info("Ensure that ~p now own all partitions", [Nodes134]),
?assertEqual(ok, rt:wait_until_nodes_ready(Nodes134)),
?assertEqual(ok, rt:wait_until_no_pending_changes(Nodes134)),
[?assertEqual(Nodes134, rt:owners_according_to(Node)) || Node <- Nodes134],

lager:info("Verify that ~p shutdown after being replaced", [Node2]),
?assertEqual(ok, rt:wait_until_unpingable(Node2)),

lager:info("Restart ~p and re-join to cluster", [Node2]),
rt:start(Node2),
stage_join(Node2, Node1),
?assertEqual(ok, rt:wait_until_all_members(Nodes)),

lager:info("Schedule force-replace of ~p with ~p", [Node3, Node2]),
stage_force_replace(Node4, Node3, Node2),

lager:info("Print staged plan and commit"),
print_staged(Node4),
commit_staged(Node4),

Nodes124 = [Node1, Node2, Node4],
lager:info("Ensure that ~p now own all partitions", [Nodes124]),
?assertEqual(ok, rt:wait_until_nodes_ready(Nodes124)),
?assertEqual(ok, rt:wait_until_no_pending_changes(Nodes124)),
[?assertEqual(Nodes124, rt:owners_according_to(Node)) || Node <- Nodes124],

lager:info("Stage leave of ~p", [Node2]),
stage_leave(Node1, Node2),
lager:info("Stage force-remove of ~p", [Node4]),
stage_remove(Node1, Node4),

lager:info("Print staged plan and verify clear_staged works"),
print_staged(Node1),
clear_staged(Node1),
commit_staged(Node1),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit of the test looks like it could generate confusing output. If it read it right, nothing will fail here if clear_staged is broken, but stage_leave will fail below because Node2 has been removed from the cluster. Could the result of clear_staged be checked more directly?


lager:info("Re-stage leave of ~p and force-remove of ~p", [Node2, Node4]),
stage_leave(Node1, Node2),
stage_remove(Node1, Node4),
lager:info("Print staged plan and commit"),
print_staged(Node1),
commit_staged(Node1),

lager:info("Verify that ~p is the only remaining cluster member", [Node1]),
?assertEqual(ok, rt:wait_until_no_pending_changes([Node1])),
?assertEqual([Node1], rt:owners_according_to(Node1)),
?assertEqual(ok, rt:wait_until_all_members([Node1])),

lager:info("Test verify_staged_clustering: Passed"),
ok.

n(Atom) ->
atom_to_list(Atom).

stage_join(Node, OtherNode) ->
%% rpc:call(Node, riak_kv_console, staged_join, [[n(OtherNode)]]).
rt:admin(Node, ["cluster", "join", n(OtherNode)]).

stage_leave(Node, OtherNode) ->
%% rpc:call(Node, riak_core_console, stage_leave, [[n(OtherNode)]]).
rt:admin(Node, ["cluster", "leave", n(OtherNode)]).

stage_remove(Node, OtherNode) ->
%% rpc:call(Node, riak_core_console, stage_remove, [[n(OtherNode)]]).
rt:admin(Node, ["cluster", "force-remove", n(OtherNode)]).

stage_replace(Node, Node1, Node2) ->
%% rpc:call(Node, riak_core_console, stage_replace, [[n(Node1), n(Node2)]]).
rt:admin(Node, ["cluster", "replace", n(Node1), n(Node2)]).

stage_force_replace(Node, Node1, Node2) ->
%% rpc:call(Node, riak_core_console, stage_force_replace, [[n(Node1), n(Node2)]]).
rt:admin(Node, ["cluster", "force-replace", n(Node1), n(Node2)]).

print_staged(Node) ->
%% rpc:call(Node, riak_core_console, print_staged, [[]]).
rt:admin(Node, ["cluster", "plan"]).

commit_staged(Node) ->
%% rpc:call(Node, riak_core_console, commit_staged, [[]]).
rt:admin(Node, ["cluster", "commit"]).

clear_staged(Node) ->
%% rpc:call(Node, riak_core_console, clear_staged, [[]]).
rt:admin(Node, ["cluster", "clear"]).

stage_join_rpc(Node, OtherNode) ->
rpc:call(Node, riak_core, staged_join, [OtherNode]).

stage_leave_rpc(Node, OtherNode) ->
rpc:call(Node, riak_core_claimant, leave_member, [OtherNode]).

stage_remove_rpc(Node, OtherNode) ->
rpc:call(Node, riak_core_claimant, remove_member, [OtherNode]).

stage_replace_rpc(Node, Node1, Node2) ->
rpc:call(Node, riak_core_claimant, replace, [Node1, Node2]).

stage_force_replace_rpc(Node, Node1, Node2) ->
rpc:call(Node, riak_core_claimant, force_replace, [Node1, Node2]).

plan_staged_rpc(Node) ->
rpc:call(Node, riak_core_claimant, plan, []).

commit_staged_rpc(Node) ->
rpc:call(Node, riak_core_claimant, commit, []).

clear_staged_rpc(Node) ->
rpc:call(Node, riak_core_claimant, clear, []).