Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ebin/riak_core.app
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
riak_core_app,
riak_core_bucket,
riak_core_cinfo_core,
riak_core_claimant,
riak_core_claim,
riak_core_new_claim,
riak_core_config,
Expand Down Expand Up @@ -108,7 +109,7 @@
{vnode_inactivity_timeout, 60000},

%% Number of VNodes allowed to do handoff concurrently.
{handoff_concurrency, 1},
{handoff_concurrency, 2},

%% Disable Nagle on HTTP sockets
{disable_http_nagle, false},
Expand Down
47 changes: 31 additions & 16 deletions src/riak_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
%%
%% -------------------------------------------------------------------
-module(riak_core).
-export([stop/0, stop/1, join/1, join/4, remove/1, down/1, leave/0,
remove_from_cluster/1]).
-export([stop/0, stop/1, join/1, join/5, staged_join/1, remove/1, down/1,
leave/0, remove_from_cluster/1]).
-export([vnode_modules/0]).
-export([register/1, register/2, bucket_fixups/0, bucket_validators/0]).

Expand Down Expand Up @@ -54,19 +54,28 @@ stop(Reason) ->
%%
%% @doc Join the ring found on the specified remote node
%%
join(NodeStr) when is_list(NodeStr) ->
join(riak_core_util:str_to_node(NodeStr));
join(Node) when is_atom(Node) ->
join(node(), Node).
join(Node) ->
join(Node, true).

join(Node, Node) ->
%% @doc Join the remote cluster without automatically claiming ring
%% ownership. Used to stage a join in the newer plan/commit
%% approach to cluster administration. See {@link riak_core_claimant}
staged_join(Node) ->
join(Node, false).

join(NodeStr, Auto) when is_list(NodeStr) ->
join(riak_core_util:str_to_node(NodeStr), Auto);
join(Node, Auto) when is_atom(Node) ->
join(node(), Node, Auto).

join(Node, Node, _) ->
{error, self_join};
join(_, Node) ->
join(riak_core_gossip:legacy_gossip(), node(), Node, false).
join(_, Node, Auto) ->
join(riak_core_gossip:legacy_gossip(), node(), Node, false, Auto).

join(true, _, Node, _Rejoin) ->
join(true, _, Node, _Rejoin, _Auto) ->
legacy_join(Node);
join(false, _, Node, Rejoin) ->
join(false, _, Node, Rejoin, Auto) ->
case net_adm:ping(Node) of
pang ->
{error, not_reachable};
Expand All @@ -78,7 +87,7 @@ join(false, _, Node, Rejoin) ->
%% Failure due to trying to join older node that
%% doesn't define legacy_gossip will be handled
%% in standard_join based on seeing a legacy ring.
standard_join(Node, Rejoin)
standard_join(Node, Rejoin, Auto)
end
end.

Expand All @@ -95,7 +104,7 @@ get_other_ring(Node) ->
Error
end.

standard_join(Node, Rejoin) when is_atom(Node) ->
standard_join(Node, Rejoin, Auto) when is_atom(Node) ->
case net_adm:ping(Node) of
pong ->
case get_other_ring(Node) of
Expand All @@ -104,7 +113,7 @@ standard_join(Node, Rejoin) when is_atom(Node) ->
true ->
legacy_join(Node);
false ->
standard_join(Node, Ring, Rejoin)
standard_join(Node, Ring, Rejoin, Auto)
end;
_ ->
{error, unable_to_get_join_ring}
Expand All @@ -113,7 +122,7 @@ standard_join(Node, Rejoin) when is_atom(Node) ->
{error, not_reachable}
end.

standard_join(Node, Ring, Rejoin) ->
standard_join(Node, Ring, Rejoin, Auto) ->
{ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
SameSize = (riak_core_ring:num_partitions(MyRing) =:=
riak_core_ring:num_partitions(Ring)),
Expand All @@ -134,10 +143,16 @@ standard_join(Node, Ring, Rejoin) ->
node(),
gossip_vsn,
GossipVsn),
riak_core_ring_manager:set_my_ring(Ring4),
Ring5 = maybe_auto_join(Auto, node(), Ring4),
riak_core_ring_manager:set_my_ring(Ring5),
riak_core_gossip:send_ring(Node, node())
end.

maybe_auto_join(false, _Node, Ring) ->
Ring;
maybe_auto_join(true, Node, Ring) ->
riak_core_ring:update_member_meta(Node, Ring, Node, '$autojoin', true).

legacy_join(Node) when is_atom(Node) ->
{ok, OurRingSize} = application:get_env(riak_core, ring_creation_size),
case net_adm:ping(Node) of
Expand Down
10 changes: 9 additions & 1 deletion src/riak_core_claim.erl
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ choose_claim_v2(Ring, Node) ->
RingSize = riak_core_ring:num_partitions(Ring),
NodeCount = erlang:length(Active),
Avg = RingSize div NodeCount,
Deltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
ActiveDeltas = [{Member, Avg - Count} || {Member, Count} <- Counts],
Deltas = add_default_deltas(Owners, ActiveDeltas, 0),
{_, Want} = lists:keyfind(Node, 1, Deltas),
TargetN = app_helper:get_env(riak_core, target_n_val),
AllIndices = lists:zip(lists:seq(0, length(Owners)-1),
Expand Down Expand Up @@ -424,6 +425,13 @@ get_counts(Nodes, Ring) ->
end, dict:from_list(Empty), Ring),
dict:to_list(Counts).

%% @private
add_default_deltas(IdxOwners, Deltas, Default) ->
{_, Owners} = lists:unzip(IdxOwners),
Owners2 = lists:usort(Owners),
Defaults = [{Member, Default} || Member <- Owners2],
lists:usort(Deltas ++ Defaults).

%% @private
get_expected_partitions(Ring, Node) ->
riak_core_ring:num_partitions(Ring) div get_member_count(Ring, Node).
Expand Down
Loading