Skip to content

Commit 8480b43

Browse files
author
Jon Meredith
committed
Merge branch 'gh177-staged-clustering' into jdm-merge-staged-clustering
Resolves merge conflicts between staged clustering and claim sim/claimv3 branches. Conflicts: src/riak_core.erl src/riak_core_gossip.erl src/riak_core_ring.erl
2 parents bbf95d1 + 952dd27 commit 8480b43

11 files changed

+1475
-439
lines changed

ebin/riak_core.app

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
riak_core_app,
2121
riak_core_bucket,
2222
riak_core_cinfo_core,
23+
riak_core_claimant,
2324
riak_core_claim,
2425
riak_core_claim_sim,
2526
riak_core_claim_util,
@@ -113,7 +114,7 @@
113114
{vnode_inactivity_timeout, 60000},
114115

115116
%% Number of VNodes allowed to do handoff concurrently.
116-
{handoff_concurrency, 1},
117+
{handoff_concurrency, 2},
117118

118119
%% Disable Nagle on HTTP sockets
119120
{disable_http_nagle, true},

src/riak_core.erl

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
%%
2121
%% -------------------------------------------------------------------
2222
-module(riak_core).
23-
-export([stop/0, stop/1, join/1, join/4, remove/1, down/1, leave/0,
24-
remove_from_cluster/1]).
23+
-export([stop/0, stop/1, join/1, join/5, staged_join/1, remove/1, down/1,
24+
leave/0, remove_from_cluster/1]).
2525
-export([vnode_modules/0]).
2626
-export([register/1, register/2, bucket_fixups/0, bucket_validators/0]).
2727

@@ -54,19 +54,28 @@ stop(Reason) ->
5454
%%
5555
%% @doc Join the ring found on the specified remote node
5656
%%
57-
join(NodeStr) when is_list(NodeStr) ->
58-
join(riak_core_util:str_to_node(NodeStr));
59-
join(Node) when is_atom(Node) ->
60-
join(node(), Node).
57+
join(Node) ->
58+
join(Node, true).
6159

62-
join(Node, Node) ->
60+
%% @doc Join the remote cluster without automatically claiming ring
61+
%% ownership. Used to stage a join in the newer plan/commit
62+
%% approach to cluster administration. See {@link riak_core_claimant}
63+
staged_join(Node) ->
64+
join(Node, false).
65+
66+
join(NodeStr, Auto) when is_list(NodeStr) ->
67+
join(riak_core_util:str_to_node(NodeStr), Auto);
68+
join(Node, Auto) when is_atom(Node) ->
69+
join(node(), Node, Auto).
70+
71+
join(Node, Node, _) ->
6372
{error, self_join};
64-
join(_, Node) ->
65-
join(riak_core_gossip:legacy_gossip(), node(), Node, false).
73+
join(_, Node, Auto) ->
74+
join(riak_core_gossip:legacy_gossip(), node(), Node, false, Auto).
6675

67-
join(true, _, Node, _Rejoin) ->
76+
join(true, _, Node, _Rejoin, _Auto) ->
6877
legacy_join(Node);
69-
join(false, _, Node, Rejoin) ->
78+
join(false, _, Node, Rejoin, Auto) ->
7079
case net_adm:ping(Node) of
7180
pang ->
7281
{error, not_reachable};
@@ -78,7 +87,7 @@ join(false, _, Node, Rejoin) ->
7887
%% Failure due to trying to join older node that
7988
%% doesn't define legacy_gossip will be handled
8089
%% in standard_join based on seeing a legacy ring.
81-
standard_join(Node, Rejoin)
90+
standard_join(Node, Rejoin, Auto)
8291
end
8392
end.
8493

@@ -95,7 +104,7 @@ get_other_ring(Node) ->
95104
Error
96105
end.
97106

98-
standard_join(Node, Rejoin) when is_atom(Node) ->
107+
standard_join(Node, Rejoin, Auto) when is_atom(Node) ->
99108
case net_adm:ping(Node) of
100109
pong ->
101110
case get_other_ring(Node) of
@@ -104,7 +113,7 @@ standard_join(Node, Rejoin) when is_atom(Node) ->
104113
true ->
105114
legacy_join(Node);
106115
false ->
107-
standard_join(Node, Ring, Rejoin)
116+
standard_join(Node, Ring, Rejoin, Auto)
108117
end;
109118
_ ->
110119
{error, unable_to_get_join_ring}
@@ -113,7 +122,7 @@ standard_join(Node, Rejoin) when is_atom(Node) ->
113122
{error, not_reachable}
114123
end.
115124

116-
standard_join(Node, Ring, Rejoin) ->
125+
standard_join(Node, Ring, Rejoin, Auto) ->
117126
{ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
118127
SameSize = (riak_core_ring:num_partitions(MyRing) =:=
119128
riak_core_ring:num_partitions(Ring)),
@@ -135,10 +144,16 @@ standard_join(Node, Ring, Rejoin) ->
135144
gossip_vsn,
136145
GossipVsn),
137146
{_, Ring5} = riak_core_capability:update_ring(Ring4),
138-
riak_core_ring_manager:set_my_ring(Ring5),
147+
Ring6 = maybe_auto_join(Auto, node(), Ring5),
148+
riak_core_ring_manager:set_my_ring(Ring6),
139149
riak_core_gossip:send_ring(Node, node())
140150
end.
141151

152+
maybe_auto_join(false, _Node, Ring) ->
153+
Ring;
154+
maybe_auto_join(true, Node, Ring) ->
155+
riak_core_ring:update_member_meta(Node, Ring, Node, '$autojoin', true).
156+
142157
legacy_join(Node) when is_atom(Node) ->
143158
{ok, OurRingSize} = application:get_env(riak_core, ring_creation_size),
144159
case net_adm:ping(Node) of

src/riak_core_claim.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,8 @@ choose_claim_v2(Ring, Node, Params0) ->
291291
RingSize = riak_core_ring:num_partitions(Ring),
292292
NodeCount = erlang:length(Active),
293293
Avg = RingSize div NodeCount,
294-
Deltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
294+
ActiveDeltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
295+
Deltas = add_default_deltas(Owners, ActiveDeltas, 0),
295296
{_, Want} = lists:keyfind(Node, 1, Deltas),
296297
TargetN = proplists:get_value(target_n_val, Params),
297298
AllIndices = lists:zip(lists:seq(0, length(Owners)-1),
@@ -672,6 +673,13 @@ get_counts(Nodes, Ring) ->
672673
end, dict:from_list(Empty), Ring),
673674
dict:to_list(Counts).
674675

676+
%% @private
677+
add_default_deltas(IdxOwners, Deltas, Default) ->
678+
{_, Owners} = lists:unzip(IdxOwners),
679+
Owners2 = lists:usort(Owners),
680+
Defaults = [{Member, Default} || Member <- Owners2],
681+
lists:usort(Deltas ++ Defaults).
682+
675683
%% @private
676684
get_expected_partitions(Ring, Node) ->
677685
riak_core_ring:num_partitions(Ring) div get_member_count(Ring, Node).

src/riak_core_claim_sim.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ dryrun1(Ring00, CmdsL, #simopts{wants = Wants,
272272
cancel ->
273273
riak_core_ring:cancel_transfers(Ring00);
274274
finish ->
275-
riak_core_ring:finish_transfers(Ring00)
275+
riak_core_ring:future_ring(Ring00)
276276
end,
277277

278278
Prepared(Ring01, Prepare),
@@ -287,13 +287,13 @@ dryrun1(Ring00, CmdsL, #simopts{wants = Wants,
287287
PerCmd(RingAcc3, Cmd),
288288
RingAcc3
289289
end, RingAcc1, Cmds),
290-
{_, NewRing2} = riak_core_ring:reassign_indices(NewRing),
290+
{_, NewRing2} = riak_core_claimant:reassign_indices(NewRing),
291291
Pending = riak_core_ring:pending_changes(NewRing2),
292292
case Pending of
293293
[] ->
294294
NewRing3 = NewRing2;
295295
_ ->
296-
NewRing3 = riak_core_ring:finish_transfers(NewRing2),
296+
NewRing3 = riak_core_ring:future_ring(NewRing2),
297297
PostXfer(NewRing3)
298298
end,
299299
NewRing4 = run_rebalance(NewRing3, Wants, Choose, Rebalance),

0 commit comments

Comments
 (0)