diff --git a/ebin/riak_core.app b/ebin/riak_core.app index 624a5ca1a..8148ac404 100644 --- a/ebin/riak_core.app +++ b/ebin/riak_core.app @@ -20,6 +20,7 @@ riak_core_app, riak_core_bucket, riak_core_cinfo_core, + riak_core_claimant, riak_core_claim, riak_core_new_claim, riak_core_config, @@ -108,7 +109,7 @@ {vnode_inactivity_timeout, 60000}, %% Number of VNodes allowed to do handoff concurrently. - {handoff_concurrency, 1}, + {handoff_concurrency, 2}, %% Disable Nagle on HTTP sockets {disable_http_nagle, false}, diff --git a/src/riak_core.erl b/src/riak_core.erl index 2f11017b8..4fde274e7 100644 --- a/src/riak_core.erl +++ b/src/riak_core.erl @@ -20,8 +20,8 @@ %% %% ------------------------------------------------------------------- -module(riak_core). --export([stop/0, stop/1, join/1, join/4, remove/1, down/1, leave/0, - remove_from_cluster/1]). +-export([stop/0, stop/1, join/1, join/5, staged_join/1, remove/1, down/1, + leave/0, remove_from_cluster/1]). -export([vnode_modules/0]). -export([register/1, register/2, bucket_fixups/0, bucket_validators/0]). @@ -54,19 +54,28 @@ stop(Reason) -> %% %% @doc Join the ring found on the specified remote node %% -join(NodeStr) when is_list(NodeStr) -> - join(riak_core_util:str_to_node(NodeStr)); -join(Node) when is_atom(Node) -> - join(node(), Node). +join(Node) -> + join(Node, true). -join(Node, Node) -> +%% @doc Join the remote cluster without automatically claiming ring +%% ownership. Used to stage a join in the newer plan/commit +%% approach to cluster administration. See {@link riak_core_claimant} +staged_join(Node) -> + join(Node, false). + +join(NodeStr, Auto) when is_list(NodeStr) -> + join(riak_core_util:str_to_node(NodeStr), Auto); +join(Node, Auto) when is_atom(Node) -> + join(node(), Node, Auto). + +join(Node, Node, _) -> {error, self_join}; -join(_, Node) -> - join(riak_core_gossip:legacy_gossip(), node(), Node, false). +join(_, Node, Auto) -> + join(riak_core_gossip:legacy_gossip(), node(), Node, false, Auto). -join(true, _, Node, _Rejoin) -> +join(true, _, Node, _Rejoin, _Auto) -> legacy_join(Node); -join(false, _, Node, Rejoin) -> +join(false, _, Node, Rejoin, Auto) -> case net_adm:ping(Node) of pang -> {error, not_reachable}; @@ -78,7 +87,7 @@ join(false, _, Node, Rejoin) -> %% Failure due to trying to join older node that %% doesn't define legacy_gossip will be handled %% in standard_join based on seeing a legacy ring. - standard_join(Node, Rejoin) + standard_join(Node, Rejoin, Auto) end end. @@ -95,7 +104,7 @@ get_other_ring(Node) -> Error end. -standard_join(Node, Rejoin) when is_atom(Node) -> +standard_join(Node, Rejoin, Auto) when is_atom(Node) -> case net_adm:ping(Node) of pong -> case get_other_ring(Node) of @@ -104,7 +113,7 @@ standard_join(Node, Rejoin) when is_atom(Node) -> true -> legacy_join(Node); false -> - standard_join(Node, Ring, Rejoin) + standard_join(Node, Ring, Rejoin, Auto) end; _ -> {error, unable_to_get_join_ring} @@ -113,7 +122,7 @@ standard_join(Node, Rejoin) when is_atom(Node) -> {error, not_reachable} end. -standard_join(Node, Ring, Rejoin) -> +standard_join(Node, Ring, Rejoin, Auto) -> {ok, MyRing} = riak_core_ring_manager:get_raw_ring(), SameSize = (riak_core_ring:num_partitions(MyRing) =:= riak_core_ring:num_partitions(Ring)), @@ -134,10 +143,16 @@ standard_join(Node, Ring, Rejoin) -> node(), gossip_vsn, GossipVsn), - riak_core_ring_manager:set_my_ring(Ring4), + Ring5 = maybe_auto_join(Auto, node(), Ring4), + riak_core_ring_manager:set_my_ring(Ring5), riak_core_gossip:send_ring(Node, node()) end. +maybe_auto_join(false, _Node, Ring) -> + Ring; +maybe_auto_join(true, Node, Ring) -> + riak_core_ring:update_member_meta(Node, Ring, Node, '$autojoin', true). + legacy_join(Node) when is_atom(Node) -> {ok, OurRingSize} = application:get_env(riak_core, ring_creation_size), case net_adm:ping(Node) of diff --git a/src/riak_core_claim.erl b/src/riak_core_claim.erl index c773cb9a5..c14ada07a 100644 --- a/src/riak_core_claim.erl +++ b/src/riak_core_claim.erl @@ -172,7 +172,8 @@ choose_claim_v2(Ring, Node) -> RingSize = riak_core_ring:num_partitions(Ring), NodeCount = erlang:length(Active), Avg = RingSize div NodeCount, - Deltas = [{Member, Avg - Count} || {Member, Count} <- Counts], + ActiveDeltas = [{Member, Avg - Count} || {Member, Count} <- Counts], + Deltas = add_default_deltas(Owners, ActiveDeltas, 0), {_, Want} = lists:keyfind(Node, 1, Deltas), TargetN = app_helper:get_env(riak_core, target_n_val), AllIndices = lists:zip(lists:seq(0, length(Owners)-1), @@ -424,6 +425,13 @@ get_counts(Nodes, Ring) -> end, dict:from_list(Empty), Ring), dict:to_list(Counts). +%% @private +add_default_deltas(IdxOwners, Deltas, Default) -> + {_, Owners} = lists:unzip(IdxOwners), + Owners2 = lists:usort(Owners), + Defaults = [{Member, Default} || Member <- Owners2], + lists:usort(Deltas ++ Defaults). + %% @private get_expected_partitions(Ring, Node) -> riak_core_ring:num_partitions(Ring) div get_member_count(Ring, Node). diff --git a/src/riak_core_claimant.erl b/src/riak_core_claimant.erl new file mode 100644 index 000000000..f890f84fb --- /dev/null +++ b/src/riak_core_claimant.erl @@ -0,0 +1,919 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_core_claimant). +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([leave_member/1, + remove_member/1, + force_replace/2, + replace/2, + plan/0, + commit/0, + clear/0, + ring_changed/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-type action() :: leave + | remove + | {replace, node()} + | {force_replace, node()}. + +-type riak_core_ring() :: riak_core_ring:riak_core_ring(). + +%% A tuple representing a given cluster transition: +%% {Ring, NewRing} where NewRing = f(Ring) +-type ring_transition() :: {riak_core_ring(), riak_core_ring()}. + +-record(state, { + %% The set of staged cluster changes + changes :: [{node(), action()}], + + %% Ring computed during the last planning stage based on + %% applying a set of staged cluster changes. When commiting + %% changes, the computed ring must match the previous planned + %% ring to be allowed. + next_ring :: riak_core_ring(), + + %% Random number seed passed to remove_node to ensure the + %% current randomized remove algorithm is deterministic + %% between plan and commit phases + seed}). + +-define(ROUT(S,A),ok). +%%-define(ROUT(S,A),?debugFmt(S,A)). +%%-define(ROUT(S,A),io:format(S,A)). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawn and register the riak_core_claimant server +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% @doc Determine how the cluster will be affected by the staged changes, +%% returning the set of pending changes as well as a list of ring +%% modifications that correspond to each resulting cluster transition +%% (eg. the initial transition that applies the staged changes, and +%% any additional transitions triggered by later rebalancing). +-spec plan() -> legacy | {ok, [action()], [ring_transition()]}. +plan() -> + gen_server:call(claimant(), plan, infinity). + +%% @doc Commit the set of staged cluster changes, returning true on success. +%% A commit is only allowed to succeed if the ring is ready and if the +%% current set of changes matches those computed by the most recent +%% call to plan/0. +-spec commit() -> true | false. +commit() -> + gen_server:call(claimant(), commit, infinity). + +%% @doc Stage a request for `Node' to leave the cluster. If committed, `Node' +%% will handoff all of its data to other nodes in the cluster and then +%% shutdown. +leave_member(Node) -> + stage(Node, leave). + +%% @doc Stage a request for `Node' to be forcefully removed from the cluster. +%% If committed, all partitions owned by `Node' will immediately be +%% re-assigned to other nodes. No data on `Node' will be transfered to +%% other nodes, and all replicas on `Node' will be lost. +remove_member(Node) -> + stage(Node, remove). + +%% @doc Stage a request for `Node' to be replaced by `NewNode'. If committed, +%% `Node' will handoff all of its data to `NewNode' and then shutdown. +%% The current implementation requires `NewNode' to be a fresh node that +%% is joining the cluster and does not yet own any partitions of its own. +replace(Node, NewNode) -> + stage(Node, {replace, NewNode}). + +%% @doc Stage a request for `Node' to be forcefully replaced by `NewNode'. +%% If committed, all partitions owned by `Node' will immediately be +%% re-assigned to `NewNode'. No data on `Node' will be transfered, +%% and all replicas on `Node' will be lost. The current implementation +%% requires `NewNode' to be a fresh node that is joining the cluster +%% and does not yet own any partitions of its own. +force_replace(Node, NewNode) -> + stage(Node, {force_replace, NewNode}). + +%% @doc Clear the current set of staged transfers +clear() -> + gen_server:call(claimant(), clear, infinity). + +%% @doc This function is called as part of the ring reconciliation logic +%% triggered by the gossip subsystem. This is only called on the one +%% node that is currently the claimant. This function is the top-level +%% entry point to the claimant logic that orchestrates cluster state +%% transitions. The current code path: +%% riak_core_gossip:reconcile/2 +%% --> riak_core_ring:ring_changed/2 +%% -----> riak_core_ring:internal_ring_changed/2 +%% --------> riak_core_claimant:ring_changed/2 +ring_changed(Node, Ring) -> + internal_ring_changed(Node, Ring). + +%%%=================================================================== +%%% Internal API helpers +%%%=================================================================== + +stage(Node, Action) -> + gen_server:call(claimant(), {stage, Node, Action}, infinity). + +claimant() -> + {ok, Ring} = riak_core_ring_manager:get_my_ring(), + {?MODULE, riak_core_ring:claimant(Ring)}. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([]) -> + {ok, #state{changes=[], seed=erlang:now()}}. + +handle_call(clear, _From, State) -> + State2 = clear_staged(State), + {reply, ok, State2}; + +handle_call({stage, Node, Action}, _From, State) -> + {ok, Ring} = riak_core_ring_manager:get_raw_ring(), + {Reply, State2} = maybe_stage(Node, Action, Ring, State), + {reply, Reply, State2}; + +handle_call(plan, _From, State) -> + {ok, Ring} = riak_core_ring_manager:get_raw_ring(), + case riak_core_ring:ring_ready(Ring) of + false -> + Reply = {error, ring_not_ready}, + {reply, Reply, State}; + true -> + {Reply, State2} = generate_plan(Ring, State), + {reply, Reply, State2} + end; + +handle_call(commit, _From, State) -> + {Reply, State2} = commit_staged(State), + {reply, Reply, State2}; + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% @private +%% @doc Verify that a cluster change request is valid and add it to +%% the list of staged changes. +maybe_stage(Node, Action, Ring, State=#state{changes=Changes}) -> + case valid_request(Node, Action, Changes, Ring) of + true -> + Changes2 = orddict:store(Node, Action, Changes), + Changes3 = filter_changes(Changes2, Ring), + State2 = State#state{changes=Changes3}, + {ok, State2}; + Error -> + {Error, State} + end. + +%% @private +%% @doc Determine how the staged set of cluster changes will affect +%% the cluster. See {@link plan/0} for additional details. +generate_plan(Ring, State=#state{changes=Changes}) -> + Changes2 = filter_changes(Changes, Ring), + Joining = [{Node, join} || Node <- riak_core_ring:members(Ring, [joining])], + AllChanges = lists:ukeysort(1, Changes2 ++ Joining), + State2 = State#state{changes=Changes2}, + generate_plan(AllChanges, Ring, State2). + +generate_plan([], _, State) -> + %% There are no changes to apply + {{ok, [], []}, State}; +generate_plan(Changes, Ring, State=#state{seed=Seed}) -> + case compute_all_next_rings(Changes, Seed, Ring) of + legacy -> + {{error, legacy}, State}; + {ok, NextRings} -> + {_, NextRing} = hd(NextRings), + State2 = State#state{next_ring=NextRing}, + Reply = {ok, Changes, NextRings}, + {Reply, State2} + end. + +%% @private +%% @doc Commit the set of staged cluster changes. See {@link commit/0} +%% for additional details. +commit_staged(State=#state{next_ring=undefined}) -> + {{error, nothing_planned}, State}; +commit_staged(State) -> + case maybe_commit_staged(State) of + {ok, _} -> + State2 = State#state{next_ring=undefined, + changes=[], + seed=erlang:now()}, + {ok, State2}; + not_changed -> + {error, State}; + {not_changed, Reason} -> + {{error, Reason}, State} + end. + +%% @private +maybe_commit_staged(State) -> + riak_core_ring_manager:ring_trans(fun maybe_commit_staged/2, State). + +%% @private +maybe_commit_staged(Ring, State=#state{changes=Changes, seed=Seed}) -> + Changes2 = filter_changes(Changes, Ring), + case compute_next_ring(Changes2, Seed, Ring) of + {legacy, _} -> + {ignore, legacy}; + {ok, NextRing} -> + maybe_commit_staged(Ring, NextRing, State) + end. + +%% @private +maybe_commit_staged(Ring, NextRing, #state{next_ring=PlannedRing}) -> + Claimant = riak_core_ring:claimant(Ring), + IsReady = riak_core_ring:ring_ready(Ring), + IsClaimant = (Claimant == node()), + IsSamePlan = same_plan(PlannedRing, NextRing), + case {IsReady, IsClaimant, IsSamePlan} of + {false, _, _} -> + {ignore, ring_not_ready}; + {_, false, _} -> + ignore; + {_, _, false} -> + {ignore, plan_changed}; + _ -> + NewRing = riak_core_ring:increment_vclock(Claimant, NextRing), + {new_ring, NewRing} + end. + +%% @private +%% @doc Clear the current set of staged transfers. Since `joining' nodes +%% are determined based on the node's actual state, rather than a +%% staged action, the only way to clear pending joins is to remove +%% the `joining' nodes from the cluster. Used by the public API +%% call {@link clear/0}. +clear_staged(State) -> + remove_joining_nodes(), + State#state{changes=[], seed=erlang:now()}. + +%% @private +remove_joining_nodes() -> + riak_core_ring_manager:ring_trans(fun remove_joining_nodes/2, ok). + +%% @private +remove_joining_nodes(Ring, _) -> + Claimant = riak_core_ring:claimant(Ring), + IsClaimant = (Claimant == node()), + Joining = riak_core_ring:members(Ring, [joining]), + AreJoining = (Joining /= []), + case IsClaimant and AreJoining of + false -> + ignore; + true -> + NewRing = remove_joining_nodes_from_ring(Claimant, Joining, Ring), + {new_ring, NewRing} + end. + +%% @private +remove_joining_nodes_from_ring(Claimant, Joining, Ring) -> + NewRing = + lists:foldl(fun(Node, RingAcc) -> + riak_core_ring:set_member(Claimant, RingAcc, Node, + invalid, same_vclock) + end, Ring, Joining), + NewRing2 = riak_core_ring:increment_vclock(Claimant, NewRing), + NewRing2. + +%% @private +valid_request(Node, Action, Changes, Ring) -> + case Action of + leave -> + valid_leave_request(Node, Ring); + remove -> + valid_remove_request(Node, Ring); + {replace, NewNode} -> + valid_replace_request(Node, NewNode, Changes, Ring); + {force_replace, NewNode} -> + valid_force_replace_request(Node, NewNode, Changes, Ring) + end. + +%% @private +valid_leave_request(Node, Ring) -> + case {riak_core_ring:all_members(Ring), + riak_core_ring:member_status(Ring, Node)} of + {_, invalid} -> + {error, not_member}; + {[Node], _} -> + {error, only_member}; + {_, valid} -> + true; + {_, joining} -> + true; + {_, _} -> + {error, already_leaving} + end. + +%% @private +valid_remove_request(Node, Ring) -> + IsClaimant = (Node == riak_core_ring:claimant(Ring)), + case {IsClaimant, + riak_core_ring:all_members(Ring), + riak_core_ring:member_status(Ring, Node)} of + {true, _, _} -> + {error, is_claimant}; + {_, _, invalid} -> + {error, not_member}; + {_, [Node], _} -> + {error, only_member}; + _ -> + true + end. + +%% @private +valid_replace_request(Node, NewNode, Changes, Ring) -> + AlreadyReplacement = lists:member(NewNode, existing_replacements(Changes)), + NewJoining = + (riak_core_ring:member_status(Ring, NewNode) == joining) + and (not orddict:is_key(NewNode, Changes)), + case {riak_core_ring:member_status(Ring, Node), + AlreadyReplacement, + NewJoining} of + {invalid, _, _} -> + {error, not_member}; + {leaving, _, _} -> + {error, already_leaving}; + {_, true, _} -> + {error, already_replacement}; + {_, _, false} -> + {error, invalid_replacement}; + _ -> + true + end. + +%% @private +valid_force_replace_request(Node, NewNode, Changes, Ring) -> + IsClaimant = (Node == riak_core_ring:claimant(Ring)), + AlreadyReplacement = lists:member(NewNode, existing_replacements(Changes)), + NewJoining = + (riak_core_ring:member_status(Ring, NewNode) == joining) + and (not orddict:is_key(NewNode, Changes)), + case {IsClaimant, + riak_core_ring:member_status(Ring, Node), + AlreadyReplacement, + NewJoining} of + {true, _, _, _} -> + {error, is_claimant}; + {_, invalid, _, _} -> + {error, not_member}; + {_, _, true, _} -> + {error, already_replacement}; + {_, _, _, false} -> + {error, invalid_replacement}; + _ -> + true + end. + +%% @private +%% @doc Filter out any staged changes that are no longer valid. Changes +%% can become invalid based on other staged changes, or by cluster +%% changes that bypass the staging system. +filter_changes(Changes, Ring) -> + orddict:filter(fun(Node, Change) -> + filter_changes_pred(Node, Change, Changes, Ring) + end, Changes). + +%% @private +filter_changes_pred(Node, {Change, NewNode}, Changes, Ring) + when (Change == replace) or (Change == force_replace) -> + IsMember = (riak_core_ring:member_status(Ring, Node) /= invalid), + IsJoining = (riak_core_ring:member_status(Ring, NewNode) == joining), + NotChanging = (not orddict:is_key(NewNode, Changes)), + IsMember and IsJoining and NotChanging; +filter_changes_pred(Node, _, _, Ring) -> + IsMember = (riak_core_ring:member_status(Ring, Node) /= invalid), + IsMember. + +%% @private +existing_replacements(Changes) -> + [Node || {_, {Change, Node}} <- Changes, + (Change == replace) or (Change == force_replace)]. + +%% @private +%% Determine if two rings have logically equal cluster state +same_plan(RingA, RingB) -> + (riak_core_ring:all_member_status(RingA) == riak_core_ring:all_member_status(RingB)) andalso + (riak_core_ring:all_owners(RingA) == riak_core_ring:all_owners(RingB)) andalso + (riak_core_ring:pending_changes(RingA) == riak_core_ring:pending_changes(RingB)). + +%% ========================================================================= +%% Claimant rebalance/reassign logic +%% ========================================================================= + +%% @private +compute_all_next_rings(Changes, Seed, Ring) -> + compute_all_next_rings(Changes, Seed, Ring, []). + +%% @private +compute_all_next_rings(Changes, Seed, Ring, Acc) -> + case compute_next_ring(Changes, Seed, Ring) of + {legacy, _} -> + legacy; + {ok, NextRing} -> + Acc2 = [{Ring, NextRing}|Acc], + case not same_plan(Ring, NextRing) of + true -> + FutureRing = riak_core_ring:future_ring(NextRing), + compute_all_next_rings([], Seed, FutureRing, Acc2); + false -> + {ok, lists:reverse(Acc2)} + end + end. + +%% @private +compute_next_ring(Changes, Seed, Ring) -> + Replacing = [{Node, NewNode} || {Node, {replace, NewNode}} <- Changes], + + Ring2 = apply_changes(Ring, Changes), + {_, Ring3} = maybe_handle_joining(node(), Ring2), + {_, Ring4} = do_claimant_quiet(node(), Ring3, Replacing, Seed), + Members = riak_core_ring:all_members(Ring4), + case riak_core_gossip:any_legacy_gossip(Ring4, Members) of + true -> + {legacy, Ring}; + false -> + {ok, Ring4} + end. + +%% @private +apply_changes(Ring, Changes) -> + NewRing = + lists:foldl( + fun({Node, Cmd}, RingAcc2) -> + RingAcc3 = change({Cmd, Node}, RingAcc2), + RingAcc3 + end, Ring, Changes), + NewRing. + +%% @private +change({join, Node}, Ring) -> + Ring2 = riak_core_ring:add_member(Node, Ring, Node), + Ring2; +change({leave, Node}, Ring) -> + Members = riak_core_ring:all_members(Ring), + lists:member(Node, Members) orelse throw(invalid_member), + Ring2 = riak_core_ring:leave_member(Node, Ring, Node), + Ring2; +change({remove, Node}, Ring) -> + Members = riak_core_ring:all_members(Ring), + lists:member(Node, Members) orelse throw(invalid_member), + Ring2 = riak_core_ring:remove_member(Node, Ring, Node), + Ring2; +change({{replace, _NewNode}, Node}, Ring) -> + %% Just treat as a leave, reassignment happens elsewhere + Ring2 = riak_core_ring:leave_member(Node, Ring, Node), + Ring2; +change({{force_replace, NewNode}, Node}, Ring) -> + Indices = riak_core_ring:indices(Ring, Node), + Reassign = [{Idx, NewNode} || Idx <- Indices], + Ring2 = riak_core_ring:add_member(NewNode, Ring, NewNode), + Ring3 = riak_core_ring:change_owners(Ring2, Reassign), + Ring4 = riak_core_ring:remove_member(Node, Ring3, Node), + Ring4. + +internal_ring_changed(Node, CState) -> + {Changed, CState5} = do_claimant(Node, CState, fun log/2), + inform_removed_nodes(Node, CState, CState5), + + %% Start/stop converge and rebalance delay timers + %% (converge delay) + %% -- Starts when claimant changes the ring + %% -- Stops when the ring converges (ring_ready) + %% (rebalance delay) + %% -- Starts when next changes from empty to non-empty + %% -- Stops when next changes from non-empty to empty + %% + IsClaimant = (riak_core_ring:claimant(CState5) =:= Node), + WasPending = ([] /= riak_core_ring:pending_changes(CState)), + IsPending = ([] /= riak_core_ring:pending_changes(CState5)), + + %% Outer case statement already checks for ring_ready + case {IsClaimant, Changed} of + {true, true} -> + riak_core_stat:update(converge_timer_end), + riak_core_stat:update(converge_timer_begin); + {true, false} -> + riak_core_stat:update(converge_timer_end); + _ -> + ok + end, + + case {IsClaimant, WasPending, IsPending} of + {true, false, true} -> + riak_core_stat:update(rebalance_timer_begin); + {true, true, false} -> + riak_core_stat:update(rebalance_timer_end); + _ -> + ok + end, + + %% Set cluster name if it is undefined + case {IsClaimant, riak_core_ring:cluster_name(CState5)} of + {true, undefined} -> + ClusterName = {Node, erlang:now()}, + riak_core_util:rpc_every_member(riak_core_ring_manager, + set_cluster_name, + [ClusterName], + 1000), + ok; + _ -> + ClusterName = riak_core_ring:cluster_name(CState5), + ok + end, + + case Changed of + true -> + CState6 = riak_core_ring:set_cluster_name(CState5, ClusterName), + riak_core_ring:increment_vclock(Node, CState6); + false -> + CState5 + end. + +inform_removed_nodes(Node, OldRing, NewRing) -> + CName = riak_core_ring:cluster_name(NewRing), + Exiting = riak_core_ring:members(OldRing, [exiting]) -- [Node], + Invalid = riak_core_ring:members(NewRing, [invalid]), + Changed = ordsets:intersection(ordsets:from_list(Exiting), + ordsets:from_list(Invalid)), + lists:map(fun(ExitingNode) -> + %% Tell exiting node to shutdown. + riak_core_ring_manager:refresh_ring(ExitingNode, CName) + end, Changed), + ok. + +do_claimant_quiet(Node, CState, Replacing, Seed) -> + do_claimant(Node, CState, Replacing, Seed, fun no_log/2). + +do_claimant(Node, CState, Log) -> + do_claimant(Node, CState, [], erlang:now(), Log). + +do_claimant(Node, CState, Replacing, Seed, Log) -> + AreJoining = are_joining_nodes(CState), + {C1, CState2} = maybe_update_claimant(Node, CState), + {C2, CState3} = maybe_handle_auto_joining(Node, CState2), + case AreJoining of + true -> + %% Do not rebalance if there are joining nodes + Changed = C1 or C2, + CState5 = CState3; + false -> + {C3, CState4} = + maybe_update_ring(Node, CState3, Replacing, Seed, Log), + {C4, CState5} = maybe_remove_exiting(Node, CState4), + Changed = (C1 or C2 or C3 or C4) + end, + {Changed, CState5}. + +%% @private +maybe_update_claimant(Node, CState) -> + Members = riak_core_ring:members(CState, [valid, leaving]), + Claimant = riak_core_ring:claimant(CState), + NextClaimant = hd(Members ++ [undefined]), + ClaimantMissing = not lists:member(Claimant, Members), + + case {ClaimantMissing, NextClaimant} of + {true, Node} -> + %% Become claimant + CState2 = riak_core_ring:set_claimant(CState, Node), + CState3 = riak_core_ring:increment_ring_version(Claimant, CState2), + {true, CState3}; + _ -> + {false, CState} + end. + +%% @private +maybe_update_ring(Node, CState, Replacing, Seed, Log) -> + Claimant = riak_core_ring:claimant(CState), + case Claimant of + Node -> + case riak_core_ring:claiming_members(CState) of + [] -> + %% Consider logging an error/warning here or even + %% intentionally crashing. This state makes no logical + %% sense given that it represents a cluster without any + %% active nodes. + {false, CState}; + _ -> + {Changed, CState2} = + update_ring(Node, CState, Replacing, Seed, Log), + {Changed, CState2} + end; + _ -> + {false, CState} + end. + +%% @private +maybe_remove_exiting(Node, CState) -> + Claimant = riak_core_ring:claimant(CState), + case Claimant of + Node -> + %% Change exiting nodes to invalid, skipping this node. + Exiting = riak_core_ring:members(CState, [exiting]) -- [Node], + Changed = (Exiting /= []), + CState2 = + lists:foldl(fun(ENode, CState0) -> + riak_core_ring:set_member(Node, CState0, ENode, + invalid, same_vclock) + end, CState, Exiting), + {Changed, CState2}; + _ -> + {false, CState} + end. + +%% @private +are_joining_nodes(CState) -> + Joining = riak_core_ring:members(CState, [joining]), + Joining /= []. + +%% @private +maybe_handle_auto_joining(Node, CState) -> + Joining = riak_core_ring:members(CState, [joining]), + Auto = [Member || Member <- Joining, + riak_core_ring:get_member_meta(CState, + Member, + '$autojoin') == true], + maybe_handle_joining(Node, Auto, CState). + +%% @private +maybe_handle_joining(Node, CState) -> + Joining = riak_core_ring:members(CState, [joining]), + maybe_handle_joining(Node, Joining, CState). + +%% @private +maybe_handle_joining(Node, Joining, CState) -> + Claimant = riak_core_ring:claimant(CState), + case Claimant of + Node -> + Changed = (Joining /= []), + CState2 = + lists:foldl(fun(JNode, CState0) -> + riak_core_ring:set_member(Node, CState0, JNode, + valid, same_vclock) + end, CState, Joining), + {Changed, CState2}; + _ -> + {false, CState} + end. + +%% @private +update_ring(CNode, CState, Replacing, Seed, Log) -> + Next0 = riak_core_ring:pending_changes(CState), + + ?ROUT("Members: ~p~n", [riak_core_ring:members(CState, [joining, valid, + leaving, exiting, + invalid])]), + ?ROUT("Updating ring :: next0 : ~p~n", [Next0]), + + %% Remove tuples from next for removed nodes + InvalidMembers = riak_core_ring:members(CState, [invalid]), + Next2 = lists:filter(fun(NInfo) -> + {Owner, NextOwner, _} = riak_core_ring:next_owner(NInfo), + not lists:member(Owner, InvalidMembers) and + not lists:member(NextOwner, InvalidMembers) + end, Next0), + CState2 = riak_core_ring:set_pending_changes(CState, Next2), + + %% Transfer ownership after completed handoff + {RingChanged1, CState3} = transfer_ownership(CState2, Log), + ?ROUT("Updating ring :: next1 : ~p~n", + [riak_core_ring:pending_changes(CState3)]), + + %% Ressign leaving/inactive indices + {RingChanged2, CState4} = reassign_indices(CState3, Replacing, Seed, Log), + ?ROUT("Updating ring :: next2 : ~p~n", + [riak_core_ring:pending_changes(CState4)]), + + %% Rebalance the ring as necessary + Next3 = rebalance_ring(CNode, CState4), + Log(debug,{"Pending ownership transfers: ~b~n", + [length(riak_core_ring:pending_changes(CState4))]}), + + %% Remove transfers to/from down nodes + Next4 = handle_down_nodes(CState4, Next3), + + NextChanged = (Next0 /= Next4), + Changed = (NextChanged or RingChanged1 or RingChanged2), + case Changed of + true -> + OldS = ordsets:from_list([{Idx,O,NO} || {Idx,O,NO,_,_} <- Next0]), + NewS = ordsets:from_list([{Idx,O,NO} || {Idx,O,NO,_,_} <- Next4]), + Diff = ordsets:subtract(NewS, OldS), + [Log(next, NChange) || NChange <- Diff], + ?ROUT("Updating ring :: next3 : ~p~n", [Next4]), + CState5 = riak_core_ring:set_pending_changes(CState4, Next4), + CState6 = riak_core_ring:increment_ring_version(CNode, CState5), + {true, CState6}; + false -> + {false, CState} + end. + +%% @private +transfer_ownership(CState, Log) -> + Next = riak_core_ring:pending_changes(CState), + %% Remove already completed and transfered changes + Next2 = lists:filter(fun(NInfo={Idx, _, _, _, _}) -> + {_, NewOwner, S} = riak_core_ring:next_owner(NInfo), + not ((S == complete) and + (riak_core_ring:index_owner(CState, Idx) =:= NewOwner)) + end, Next), + + CState2 = lists:foldl( + fun(NInfo={Idx, _, _, _, _}, CState0) -> + case riak_core_ring:next_owner(NInfo) of + {_, Node, complete} -> + Log(ownership, {Idx, Node, CState0}), + riak_core_ring:transfer_node(Idx, Node, + CState0); + _ -> + CState0 + end + end, CState, Next2), + + NextChanged = (Next2 /= Next), + RingChanged = (riak_core_ring:all_owners(CState) /= riak_core_ring:all_owners(CState2)), + Changed = (NextChanged or RingChanged), + CState3 = riak_core_ring:set_pending_changes(CState2, Next2), + {Changed, CState3}. + +%% @private +reassign_indices(CState, Replacing, Seed, Log) -> + Next = riak_core_ring:pending_changes(CState), + Invalid = riak_core_ring:members(CState, [invalid]), + CState2 = + lists:foldl(fun(Node, CState0) -> + remove_node(CState0, Node, invalid, + Replacing, Seed, Log) + end, CState, Invalid), + CState3 = case Next of + [] -> + Leaving = riak_core_ring:members(CState, [leaving]), + lists:foldl(fun(Node, CState0) -> + remove_node(CState0, Node, leaving, + Replacing, Seed, Log) + end, CState2, Leaving); + _ -> + CState2 + end, + Owners1 = riak_core_ring:all_owners(CState), + Owners2 = riak_core_ring:all_owners(CState3), + RingChanged = (Owners1 /= Owners2), + NextChanged = (Next /= riak_core_ring:pending_changes(CState3)), + {RingChanged or NextChanged, CState3}. + +%% @private +rebalance_ring(CNode, CState) -> + Next = riak_core_ring:pending_changes(CState), + rebalance_ring(CNode, Next, CState). + +rebalance_ring(_CNode, [], CState) -> + Members = riak_core_ring:claiming_members(CState), + CState2 = lists:foldl(fun(Node, Ring0) -> + riak_core_gossip:claim_until_balanced(Ring0, + Node) + end, CState, Members), + Owners1 = riak_core_ring:all_owners(CState), + Owners2 = riak_core_ring:all_owners(CState2), + Owners3 = lists:zip(Owners1, Owners2), + Next = [{Idx, PrevOwner, NewOwner, [], awaiting} + || {{Idx, PrevOwner}, {Idx, NewOwner}} <- Owners3, + PrevOwner /= NewOwner], + Next; +rebalance_ring(_CNode, Next, _CState) -> + Next. + +%% @private +handle_down_nodes(CState, Next) -> + LeavingMembers = riak_core_ring:members(CState, [leaving, invalid]), + DownMembers = riak_core_ring:members(CState, [down]), + Next2 = [begin + OwnerLeaving = lists:member(O, LeavingMembers), + NextDown = lists:member(NO, DownMembers), + case (OwnerLeaving and NextDown) of + true -> + Active = riak_core_ring:active_members(CState) -- [O], + RNode = lists:nth(random:uniform(length(Active)), + Active), + {Idx, O, RNode, Mods, Status}; + _ -> + T + end + end || T={Idx, O, NO, Mods, Status} <- Next], + Next3 = [T || T={_, O, NO, _, _} <- Next2, + not lists:member(O, DownMembers), + not lists:member(NO, DownMembers)], + Next3. + +%% @private +reassign_indices_to(Node, NewNode, Ring) -> + Indices = riak_core_ring:indices(Ring, Node), + Reassign = [{Idx, NewNode} || Idx <- Indices], + Ring2 = riak_core_ring:change_owners(Ring, Reassign), + Ring2. + +%% @private +remove_node(CState, Node, Status, Replacing, Seed, Log) -> + Indices = riak_core_ring:indices(CState, Node), + remove_node(CState, Node, Status, Replacing, Seed, Log, Indices). + +%% @private +remove_node(CState, _Node, _Status, _Log, _Replacing, _Seed, []) -> + CState; +remove_node(CState, Node, Status, Replacing, Seed, Log, Indices) -> + CStateT1 = riak_core_ring:change_owners(CState, + riak_core_ring:all_next_owners(CState)), + case orddict:find(Node, Replacing) of + {ok, NewNode} -> + CStateT2 = reassign_indices_to(Node, NewNode, CStateT1); + error -> + CStateT2 = riak_core_gossip:remove_from_cluster(CStateT1, Node, Seed) + end, + + Owners1 = riak_core_ring:all_owners(CState), + Owners2 = riak_core_ring:all_owners(CStateT2), + Owners3 = lists:zip(Owners1, Owners2), + RemovedIndices = case Status of + invalid -> + Indices; + leaving -> + [] + end, + Reassign = [{Idx, NewOwner} || {Idx, NewOwner} <- Owners2, + lists:member(Idx, RemovedIndices)], + Next = [{Idx, PrevOwner, NewOwner, [], awaiting} + || {{Idx, PrevOwner}, {Idx, NewOwner}} <- Owners3, + PrevOwner /= NewOwner, + not lists:member(Idx, RemovedIndices)], + + [Log(reassign, {Idx, NewOwner, CState}) || {Idx, NewOwner} <- Reassign], + + %% Unlike rebalance_ring, remove_node can be called when Next is non-empty, + %% therefore we need to merge the values. Original Next has priority. + Next2 = lists:ukeysort(1, riak_core_ring:pending_changes(CState) ++ Next), + CState2 = riak_core_ring:change_owners(CState, Reassign), + CState3 = riak_core_ring:set_pending_changes(CState2, Next2), + CState3. + +no_log(_, _) -> + ok. + +log(debug, {Msg, Args}) -> + lager:debug(Msg, Args); +log(ownership, {Idx, NewOwner, CState}) -> + Owner = riak_core_ring:index_owner(CState, Idx), + lager:debug("(new-owner) ~b :: ~p -> ~p~n", [Idx, Owner, NewOwner]); +log(reassign, {Idx, NewOwner, CState}) -> + Owner = riak_core_ring:index_owner(CState, Idx), + lager:debug("(reassign) ~b :: ~p -> ~p~n", [Idx, Owner, NewOwner]); +log(next, {Idx, Owner, NewOwner}) -> + lager:debug("(pending) ~b :: ~p -> ~p~n", [Idx, Owner, NewOwner]); +log(_, _) -> + ok. diff --git a/src/riak_core_console.erl b/src/riak_core_console.erl index 3d4668def..b1383ac33 100644 --- a/src/riak_core_console.erl +++ b/src/riak_core_console.erl @@ -19,13 +19,25 @@ %% ------------------------------------------------------------------- -module(riak_core_console). --export([member_status/1, ring_status/1]). +-export([member_status/1, ring_status/1, print_member_status/2, + stage_leave/1, stage_remove/1, stage_replace/1, + stage_force_replace/1, print_staged/1, commit_staged/1, + clear_staged/1, transfer_limit/1]). member_status([]) -> + {ok, Ring} = riak_core_ring_manager:get_my_ring(), + print_member_status(Ring, legacy_gossip(Ring)). + +legacy_gossip(Ring) -> + Members = riak_core_ring:all_members(Ring), + LegacyGossip = + [{Node, riak_core_gossip:legacy_gossip(Node)} || Node <- Members], + orddict:from_list(LegacyGossip). + +print_member_status(Ring, LegacyGossip) -> io:format("~33..=s Membership ~34..=s~n", ["", ""]), io:format("Status Ring Pending Node~n"), io:format("~79..-s~n", [""]), - {ok, Ring} = riak_core_ring_manager:get_my_ring(), AllStatus = lists:keysort(2, riak_core_ring:all_member_status(Ring)), RingSize = riak_core_ring:num_partitions(Ring), IsPending = ([] /= riak_core_ring:pending_changes(Ring)), @@ -40,7 +52,7 @@ member_status([]) -> NextPercent = length(NextIndices) * 100 / RingSize, StatusOut = - case riak_core_gossip:legacy_gossip(Node) of + case orddict:fetch(Node, LegacyGossip) of true -> "(legacy)"; false -> Status end, @@ -168,3 +180,362 @@ unreachable_status(Down) -> "forcibly remove the nodes from the cluster (riak-admin~n" "force-remove NODE) to allow the remaining nodes to settle.~n"), ok. + +stage_leave([]) -> + stage_leave(node()); +stage_leave([NodeStr]) -> + stage_leave(list_to_atom(NodeStr)); +stage_leave(Node) -> + try + case riak_core_claimant:leave_member(Node) of + ok -> + io:format("Success: staged leave request for ~p~n", [Node]), + ok; + {error, already_leaving} -> + io:format("~p is already in the process of leaving the " + "cluster.~n", [Node]), + ok; + {error, not_member} -> + io:format("Failed: ~p is not a member of the cluster.~n", + [Node]), + error; + {error, only_member} -> + io:format("Failed: ~p is the only member.~n", [Node]), + error + end + catch + Exception:Reason -> + lager:error("Leave failed ~p:~p", [Exception, Reason]), + io:format("Leave failed, see log for details~n"), + error + end. + +stage_remove([NodeStr]) -> + stage_remove(list_to_atom(NodeStr)); +stage_remove(Node) -> + try + case riak_core_claimant:remove_member(Node) of + ok -> + io:format("Success: staged remove request for ~p~n", [Node]), + ok; + {error, not_member} -> + io:format("Failed: ~p is not a member of the cluster.~n", + [Node]), + error; + {error, is_claimant} -> + is_claimant_error(Node, "remove"), + error; + {error, only_member} -> + io:format("Failed: ~p is the only member.~n", [Node]), + error + end + catch + Exception:Reason -> + lager:error("Remove failed ~p:~p", [Exception, Reason]), + io:format("Remove failed, see log for details~n"), + error + end. + +stage_replace([NodeStr1, NodeStr2]) -> + stage_replace(list_to_atom(NodeStr1), list_to_atom(NodeStr2)). +stage_replace(Node1, Node2) -> + try + case riak_core_claimant:replace(Node1, Node2) of + ok -> + io:format("Success: staged replacement of ~p with ~p~n", + [Node1, Node2]), + ok; + {error, already_leaving} -> + io:format("~p is already in the process of leaving the " + "cluster.~n", [Node1]), + ok; + {error, not_member} -> + io:format("Failed: ~p is not a member of the cluster.~n", + [Node1]), + error; + {error, invalid_replacement} -> + io:format("Failed: ~p is not a valid replacement candiate.~n" + "Only newly joining nodes can be used for " + "replacement.~n", [Node2]), + error; + {error, already_replacement} -> + io:format("Failed: ~p is already staged to replace another " + "node.~n", [Node2]), + error + end + catch + Exception:Reason -> + lager:error("Node replacement failed ~p:~p", [Exception, Reason]), + io:format("Node replacement failed, see log for details~n"), + error + end. + +stage_force_replace([NodeStr1, NodeStr2]) -> + stage_force_replace(list_to_atom(NodeStr1), list_to_atom(NodeStr2)). +stage_force_replace(Node1, Node2) -> + try + case riak_core_claimant:force_replace(Node1, Node2) of + ok -> + io:format("Success: staged forced replacement of ~p with ~p~n", + [Node1, Node2]), + ok; + {error, not_member} -> + io:format("Failed: ~p is not a member of the cluster.~n", + [Node1]), + error; + {error, is_claimant} -> + is_claimant_error(Node1, "replace"), + error; + {error, invalid_replacement} -> + io:format("Failed: ~p is not a valid replacement candiate.~n" + "Only newly joining nodes can be used for " + "replacement.~n", [Node2]), + error; + {error, already_replacement} -> + io:format("Failed: ~p is already staged to replace another " + "node.~n", [Node2]), + error + end + catch + Exception:Reason -> + lager:error("Forced node replacement failed ~p:~p", + [Exception, Reason]), + io:format("Forced node replacement failed, see log for details~n"), + error + end. + +clear_staged([]) -> + try + case riak_core_claimant:clear() of + ok -> + io:format("Cleared staged cluster changes~n"), + ok + end + catch + Exception:Reason -> + lager:error("Failed to clear staged cluster changes ~p:~p", + [Exception, Reason]), + io:format("Failed to clear staged cluster changes, see log " + "for details~n"), + error + end. + +is_claimant_error(Node, Action) -> + io:format("Failed: ~p is the claimant (see: riak-admin ring_status).~n", + [Node]), + io:format( + "The claimant is the node responsible for initiating cluster changes,~n" + "and cannot forcefully ~s itself. You can use 'riak-admin down' to~n" + "mark the node as offline, which will trigger a new claimant to take~n" + "over. However, this will clear any staged changes.~n", [Action]). + +print_staged([]) -> + case riak_core_claimant:plan() of + {error, legacy} -> + io:format("The cluster is running in legacy mode and does not " + "support plan/commit.~n"); + {error, ring_not_ready} -> + io:format("Cannot plan until cluster state has converged.~n" + "Check 'Ring Ready' in 'riak-admin ring_status'~n"); + {ok, Changes, NextRings} -> + {ok, Ring} = riak_core_ring_manager:get_my_ring(), + %% The last next ring is always the final ring after all changes, + %% which is uninteresting to show. Only print N-1 rings. + NextRings2 = lists:sublist(NextRings, + erlang:max(0, length(NextRings)-1)), + print_plan(Changes, Ring, NextRings2), + ok + end. + +print_plan([], _, _) -> + io:format("There are no staged changes~n"); +print_plan(Changes, _Ring, NextRings) -> + io:format("~31..=s Staged Changes ~32..=s~n", ["", ""]), + io:format("Action Nodes(s)~n"), + io:format("~79..-s~n", [""]), + + lists:map(fun({Node, join}) -> + io:format("join ~p~n", [Node]); + ({Node, leave}) -> + io:format("leave ~p~n", [Node]); + ({Node, remove}) -> + io:format("force-remove ~p~n", [Node]); + ({Node, {replace, NewNode}}) -> + io:format("replace ~p with ~p~n", [Node, NewNode]); + ({Node, {force_replace, NewNode}}) -> + io:format("force-replace ~p with ~p~n", [Node, NewNode]) + end, Changes), + io:format("~79..-s~n", [""]), + io:format("~n"), + + lists:map(fun({Node, remove}) -> + io:format("WARNING: All of ~p replicas will be lost~n", [Node]); + ({Node, {force_replace, _}}) -> + io:format("WARNING: All of ~p replicas will be lost~n", [Node]); + (_) -> + ok + end, Changes), + io:format("~n"), + + Transitions = length(NextRings), + case Transitions of + 1 -> + io:format("NOTE: Applying these changes will result in 1 " + "cluster transition~n~n"); + _ -> + io:format("NOTE: Applying these changes will result in ~b " + "cluster transitions~n~n", [Transitions]) + end, + + lists:mapfoldl(fun({Ring1, Ring2}, I) -> + io:format("~79..#s~n", [""]), + io:format("~24.. s After cluster transition ~b/~b~n", + ["", I, Transitions]), + io:format("~79..#s~n~n", [""]), + output(Ring1, Ring2), + {ok, I+1} + end, 1, NextRings), + ok. + +output(Ring, NextRing) -> + Members = riak_core_ring:all_members(NextRing), + LegacyGossip = orddict:from_list([{Node, false} || Node <- Members]), + riak_core_console:print_member_status(NextRing, LegacyGossip), + io:format("~n"), + + FutureRing = riak_core_ring:future_ring(NextRing), + case riak_core_ring_util:check_ring(FutureRing) of + [] -> + ok; + _ -> + io:format("WARNING: Not all replicas will be on distinct nodes~n~n") + end, + + Owners1 = riak_core_ring:all_owners(Ring), + Owners2 = riak_core_ring:all_owners(NextRing), + Owners3 = lists:zip(Owners1, Owners2), + Reassigned = [{Idx, PrevOwner, NewOwner} + || {{Idx, PrevOwner}, {Idx, NewOwner}} <- Owners3, + PrevOwner /= NewOwner], + ReassignedTally = tally(Reassigned), + + Pending = riak_core_ring:pending_changes(NextRing), + Next = [{Idx, PrevOwner, NewOwner} || {Idx, PrevOwner, NewOwner, _, _} <- Pending], + NextTally = tally(Next), + + case Reassigned of + [] -> + ok; + _ -> + io:format("Partitions reassigned from cluster changes: ~p~n", + [length(Reassigned)]), + [io:format(" ~b reassigned from ~p to ~p~n", [Count, PrevOwner, NewOwner]) + || {{PrevOwner, NewOwner}, Count} <- ReassignedTally], + io:format("~n"), + ok + end, + + case Next of + [] -> + ok; + _ -> + io:format("Transfers resulting from cluster changes: ~p~n", + [length(Next)]), + [io:format(" ~b transfers from ~p to ~p~n", [Count, PrevOwner, NewOwner]) + || {{PrevOwner, NewOwner}, Count} <- NextTally], + ok, + io:format("~n") + end, + ok. + +tally(Changes) -> + Tally = + lists:foldl(fun({_, PrevOwner, NewOwner}, Tally) -> + dict:update_counter({PrevOwner, NewOwner}, 1, Tally) + end, dict:new(), Changes), + dict:to_list(Tally). + +commit_staged([]) -> + case riak_core_claimant:commit() of + ok -> + io:format("Cluster changes committed~n"); + {error, legacy} -> + io:format("The cluster is running in legacy mode and does not " + "support plan/commit.~n"); + {error, nothing_planned} -> + io:format("You must verify the plan with " + "'riak-admin cluster plan' before committing~n"); + {error, ring_not_ready} -> + io:format("Cannot commit until cluster state has converged.~n" + "Check 'Ring Ready' in 'riak-admin ring_status'~n"); + {error, plan_changed} -> + io:format("The plan has changed. Verify with " + "'riak-admin cluster plan' before committing~n"); + _ -> + io:format("Unable to commit cluster changes. Plan " + "may have changed, please verify the~n" + "plan and try to commit again~n") + end. + +transfer_limit([]) -> + {Limits, Down} = + riak_core_util:rpc_every_member_ann(riak_core_handoff_manager, + get_concurrency, [], 5000), + io:format("~s~n", [string:centre(" Transfer Limit ", 79, $=)]), + io:format("Limit Node~n"), + io:format("~79..-s~n", [""]), + lists:foreach(fun({Node, Limit}) -> + io:format("~5b ~p~n", [Limit, Node]) + end, Limits), + lists:foreach(fun(Node) -> + io:format("(offline) ~p~n", [Node]) + end, Down), + io:format("~79..-s~n", [""]), + io:format("Note: You can change transfer limits with " + "'riak-admin transfer_limit '~n" + " and 'riak-admin transfer_limit '~n"), + ok; +transfer_limit([LimitStr]) -> + {Valid, Limit} = check_limit(LimitStr), + case Valid of + false -> + io:format("Invalid limit: ~s~n", [LimitStr]), + error; + true -> + io:format("Setting transfer limit to ~b across the cluster~n", + [Limit]), + {_, Down} = + riak_core_util:rpc_every_member_ann(riak_core_handoff_manager, + set_concurrency, + [Limit], 5000), + (Down == []) orelse + io:format("Failed to set limit for: ~p~n", [Down]), + ok + end; +transfer_limit([NodeStr, LimitStr]) -> + Node = list_to_atom(NodeStr), + {Valid, Limit} = check_limit(LimitStr), + case Valid of + false -> + io:format("Invalid limit: ~s~n", [LimitStr]), + error; + true -> + case rpc:call(Node, riak_core_handoff_manager, + set_concurrency, [Limit]) of + {badrpc, _} -> + io:format("Failed to set transfer limit for ~p~n", [Node]); + _ -> + io:format("Set transfer limit for ~p to ~b~n", + [Node, Limit]) + end, + ok + end. + +check_limit(Str) -> + try + Int = list_to_integer(Str), + {Int >= 0, Int} + catch + _:_ -> + {false, 0} + end. diff --git a/src/riak_core_gossip.erl b/src/riak_core_gossip.erl index 4f8201a1b..8b444eff2 100644 --- a/src/riak_core_gossip.erl +++ b/src/riak_core_gossip.erl @@ -37,9 +37,10 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export ([distribute_ring/1, send_ring/1, send_ring/2, remove_from_cluster/2, - claim_until_balanced/2, random_gossip/1, + remove_from_cluster/3, claim_until_balanced/2, random_gossip/1, recursive_gossip/1, random_recursive_gossip/1, rejoin/2, - gossip_version/0, legacy_gossip/0, legacy_gossip/1]). + gossip_version/0, legacy_gossip/0, legacy_gossip/1, + any_legacy_gossip/2]). -include("riak_core_ring.hrl"). @@ -89,6 +90,20 @@ legacy_gossip() -> legacy_gossip(Node) -> gen_server:call(?MODULE, {legacy_gossip, Node}). +%% @doc Determine if any of the `Nodes' are using legacy gossip by querying +%% each node's capability directly over RPC. The proper way to check +%% for legacy gossip is to use {@link legacy/gossip/1}. This function +%% is used to support staged clustering in `riak_core_claimant'. +any_legacy_gossip(_Ring, []) -> + false; +any_legacy_gossip(Ring, [Node|Nodes]) -> + case rpc_gossip_version(Ring, Node) of + ?LEGACY_RING_VSN -> + true; + _ -> + any_legacy_gossip(Ring, Nodes) + end. + %% @doc Gossip state to a random node in the ring. random_gossip(Ring) -> case riak_core_ring:random_other_active_node(Ring) of @@ -308,7 +323,7 @@ handle_cast({rejoin, RingIn}, State) -> true -> Legacy = check_legacy_gossip(Ring, State), OtherNode = riak_core_ring:owner_node(OtherRing), - riak_core:join(Legacy, node(), OtherNode, true), + riak_core:join(Legacy, node(), OtherNode, true, true), {noreply, State}; false -> {noreply, State} @@ -425,12 +440,15 @@ claim_until_balanced(Ring, Node) -> end. remove_from_cluster(Ring, ExitingNode) -> + remove_from_cluster(Ring, ExitingNode, erlang:now()). + +remove_from_cluster(Ring, ExitingNode, Seed) -> % Get a list of indices owned by the ExitingNode... AllOwners = riak_core_ring:all_owners(Ring), % Transfer indexes to other nodes... ExitRing = - case attempt_simple_transfer(Ring, AllOwners, ExitingNode) of + case attempt_simple_transfer(Seed, Ring, AllOwners, ExitingNode) of {ok, NR} -> NR; target_n_fail -> @@ -450,14 +468,14 @@ remove_from_cluster(Ring, ExitingNode) -> end, ExitRing. -attempt_simple_transfer(Ring, Owners, ExitingNode) -> +attempt_simple_transfer(Seed, Ring, Owners, ExitingNode) -> TargetN = app_helper:get_env(riak_core, target_n_val), - attempt_simple_transfer(Ring, Owners, + attempt_simple_transfer(Seed, Ring, Owners, TargetN, ExitingNode, 0, [{O,-TargetN} || O <- riak_core_ring:claiming_members(Ring), O /= ExitingNode]). -attempt_simple_transfer(Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) -> +attempt_simple_transfer(Seed, Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) -> %% handoff case [ N || {N, I} <- Last, Idx-I >= TargetN ] of [] -> @@ -479,18 +497,19 @@ attempt_simple_transfer(Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) -> target_n_fail; Qualifiers -> %% these nodes don't violate target_n forward - Chosen = lists:nth(random:uniform(length(Qualifiers)), - Qualifiers), + {Rand, Seed2} = random:uniform_s(length(Qualifiers), Seed), + Chosen = lists:nth(Rand, Qualifiers), %% choose one, and do the rest of the ring attempt_simple_transfer( + Seed2, riak_core_ring:transfer_node(P, Chosen, Ring), Rest, TargetN, Exit, Idx+1, lists:keyreplace(Chosen, 1, Last, {Chosen, Idx})) end end; -attempt_simple_transfer(Ring, [{_, N}|Rest], TargetN, Exit, Idx, Last) -> +attempt_simple_transfer(Seed, Ring, [{_, N}|Rest], TargetN, Exit, Idx, Last) -> %% just keep track of seeing this node - attempt_simple_transfer(Ring, Rest, TargetN, Exit, Idx+1, + attempt_simple_transfer(Seed, Ring, Rest, TargetN, Exit, Idx+1, lists:keyreplace(N, 1, Last, {N, Idx})); -attempt_simple_transfer(Ring, [], _, _, _, _) -> +attempt_simple_transfer(_, Ring, [], _, _, _, _) -> {ok, Ring}. diff --git a/src/riak_core_handoff_manager.erl b/src/riak_core_handoff_manager.erl index 821dd488a..b85b29d87 100644 --- a/src/riak_core_handoff_manager.erl +++ b/src/riak_core_handoff_manager.erl @@ -39,6 +39,7 @@ status/1, status_update/2, set_concurrency/1, + get_concurrency/0, kill_handoffs/0 ]). @@ -104,6 +105,9 @@ status_update(ModIdx, Stats) -> set_concurrency(Limit) -> gen_server:call(?MODULE,{set_concurrency,Limit}). +get_concurrency() -> + gen_server:call(?MODULE, get_concurrency). + kill_handoffs() -> set_concurrency(0). @@ -159,7 +163,11 @@ handle_call({set_concurrency,Limit},_From,State=#state{handoffs=HS}) -> {reply, ok, State}; false -> {reply, ok, State} - end. + end; + +handle_call(get_concurrency, _From, State) -> + Concurrency = get_concurrency_limit(), + {reply, Concurrency, State}. handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) -> {noreply, State#state{excl=ordsets:del_element({Mod, Idx}, Excl)}}; diff --git a/src/riak_core_ring.erl b/src/riak_core_ring.erl index d349ed124..d8f193c9d 100644 --- a/src/riak_core_ring.erl +++ b/src/riak_core_ring.erl @@ -72,6 +72,14 @@ leave_member/3, exit_member/3, down_member/3, + set_member/4, + set_member/5, + members/2, + set_claimant/2, + increment_vclock/2, + ring_version/1, + increment_ring_version/2, + set_pending_changes/2, active_members/1, claiming_members/1, ready_members/1, @@ -80,10 +88,14 @@ set_owner/2, indices/2, future_indices/2, + future_ring/1, disowning_indices/2, pending_changes/1, + next_owner/1, next_owner/2, next_owner/3, + all_next_owners/1, + change_owners/2, handoff_complete/3, ring_ready/0, ring_ready/1, @@ -100,10 +112,6 @@ -include_lib("eunit/include/eunit.hrl"). -endif. --define(ROUT(S,A),ok). -%%-define(ROUT(S,A),?debugFmt(S,A)). -%%-define(ROUT(S,A),io:format(S,A)). - -define(CHSTATE, #chstate_v2). -record(chstate_v2, { nodename :: node(), % the Node responsible for this chstate @@ -243,6 +251,9 @@ is_primary(Ring, IdxNode) -> all_members(?CHSTATE{members=Members}) -> get_members(Members). +members(?CHSTATE{members=Members}, Types) -> + get_members(Members, Types). + %% @doc Produce a list of all active (not marked as down) cluster members active_members(?CHSTATE{members=Members}) -> get_members(Members, [joining, valid, leaving, exiting]). @@ -486,6 +497,9 @@ update_meta(Key, Val, State) -> claimant(?CHSTATE{claimant=Claimant}) -> Claimant. +set_claimant(State, Claimant) -> + State?CHSTATE{claimant=Claimant}. + %% @doc Returns the unique identifer for this cluster. -spec cluster_name(State :: chstate()) -> term(). cluster_name(State) -> @@ -504,7 +518,18 @@ reconcile_names(RingA=?CHSTATE{clustername=NameA}, false -> {RingA, RingB} end. - + +increment_vclock(Node, State) -> + VClock = vclock:increment(Node, State?CHSTATE.vclock), + State?CHSTATE{vclock=VClock}. + +ring_version(?CHSTATE{rvsn=RVsn}) -> + RVsn. + +increment_ring_version(Node, State) -> + RVsn = vclock:increment(Node, State?CHSTATE.rvsn), + State?CHSTATE{rvsn=RVsn}. + %% @doc Returns the current membership status for a node in the cluster. -spec member_status(State :: chstate(), Node :: node()) -> member_status(). member_status(?CHSTATE{members=Members}, Node) -> @@ -571,6 +596,21 @@ exit_member(PNode, State, Node) -> down_member(PNode, State, Node) -> set_member(PNode, State, Node, down). +set_member(Node, CState, Member, Status) -> + VClock = vclock:increment(Node, CState?CHSTATE.vclock), + CState2 = set_member(Node, CState, Member, Status, same_vclock), + CState2?CHSTATE{vclock=VClock}. + +set_member(Node, CState, Member, Status, same_vclock) -> + Members2 = orddict:update(Member, + fun({_, VC, MD}) -> + {Status, vclock:increment(Node, VC), MD} + end, + {Status, vclock:increment(Node, + vclock:fresh()), []}, + CState?CHSTATE.members), + CState?CHSTATE{members=Members2}. + %% @doc Return a list of all members of the cluster that are eligible to %% claim partitions. -spec claiming_members(State :: chstate()) -> [Node :: node()]. @@ -600,6 +640,17 @@ future_indices(State, Node) -> FutureState = change_owners(State, all_next_owners(State)), indices(FutureState, Node). +%% @private +all_next_owners(CState) -> + Next = riak_core_ring:pending_changes(CState), + [{Idx, NextOwner} || {Idx, _, NextOwner, _, _} <- Next]. + +%% @private +change_owners(CState, Reassign) -> + lists:foldl(fun({Idx, NewOwner}, CState0) -> + riak_core_ring:transfer_node(Idx, NewOwner, CState0) + end, CState, Reassign). + %% @doc Return all indices that a node is scheduled to give to another. disowning_indices(State, Node) -> [Idx || {Idx, Owner, _NextOwner, _Mods, _Status} <- State?CHSTATE.next, @@ -610,6 +661,9 @@ pending_changes(State) -> %% For now, just return next directly. State?CHSTATE.next. +set_pending_changes(State, Transfers) -> + State?CHSTATE{next=Transfers}. + %% @doc Return details for a pending partition ownership change. -spec next_owner(State :: chstate(), Idx :: integer()) -> pending_change(). next_owner(State, Idx) -> @@ -638,6 +692,10 @@ next_owner(State, Idx, Mod) -> end end. +%% @private +next_owner({_, Owner, NextOwner, _Transfers, Status}) -> + {Owner, NextOwner, Status}. + %% @doc Returns true if all cluster members have seen the current ring. -spec ring_ready(State :: chstate()) -> boolean(). ring_ready(State0) -> @@ -695,6 +753,25 @@ ring_changed(Node, State) -> "Error: riak_core_ring/ring_changed called on tainted ring"), internal_ring_changed(Node, State). +%% @doc Return the ring that will exist after all pending ownership transfers +%% have completed. +future_ring(State) -> + FutureState = change_owners(State, all_next_owners(State)), + %% Individual nodes will move themselves from leaving to exiting if they + %% have no ring ownership, this is implemented in riak_core_ring_handler. + %% Emulate it here to return similar ring. + Leaving = get_members(FutureState?CHSTATE.members, [leaving]), + FutureState2 = + lists:foldl(fun(Node, StateAcc) -> + case indices(StateAcc, Node) of + [] -> + riak_core_ring:exit_member(Node, StateAcc, Node); + _ -> + StateAcc + end + end, FutureState, Leaving), + FutureState2?CHSTATE{next=[]}. + pretty_print(Ring, Opts) -> OptNumeric = lists:member(numeric, Opts), OptLegend = lists:member(legend, Opts), @@ -818,10 +895,9 @@ legacy_reconcile(MyNodeName, StateA, StateB) -> chring=CHRing, meta=Meta}. -%% ========================================================================= -%% Claimant rebalance/reassign logic -%% (TODO: Consider refactoring into riak_core_gossip or riak_core_claimant) -%% ========================================================================= +%% ==================================================================== +%% Internal functions +%% ==================================================================== %% @private internal_ring_changed(Node, CState0) -> @@ -830,333 +906,9 @@ internal_ring_changed(Node, CState0) -> false -> CState; true -> - {C1, CState2} = maybe_update_claimant(Node, CState), - {C2, CState3} = maybe_handle_joining(Node, CState2), - case C2 of - true -> - Changed = true, - CState5 = CState3; - false -> - {C3, CState4} = maybe_update_ring(Node, CState3), - {C4, CState5} = maybe_remove_exiting(Node, CState4), - Changed = (C1 or C2 or C3 or C4) - end, - - %% Start/stop converge and rebalance delay timers - %% (converge delay) - %% -- Starts when claimant changes the ring - %% -- Stops when the ring converges (ring_ready) - %% (rebalance delay) - %% -- Starts when next changes from empty to non-empty - %% -- Stops when next changes from non-empty to empty - %% - IsClaimant = (CState5?CHSTATE.claimant =:= Node), - WasPending = ([] /= pending_changes(CState)), - IsPending = ([] /= pending_changes(CState5)), - - %% Outer case statement already checks for ring_ready - case {IsClaimant, Changed} of - {true, true} -> - riak_core_stat:update(converge_timer_end), - riak_core_stat:update(converge_timer_begin); - {true, false} -> - riak_core_stat:update(converge_timer_end); - _ -> - ok - end, - - case {IsClaimant, WasPending, IsPending} of - {true, false, true} -> - riak_core_stat:update(rebalance_timer_begin); - {true, true, false} -> - riak_core_stat:update(rebalance_timer_end); - _ -> - ok - end, - - %% Set cluster name if it is undefined - case {IsClaimant, cluster_name(CState5)} of - {true, undefined} -> - ClusterName = {Node, erlang:now()}, - riak_core_util:rpc_every_member(riak_core_ring_manager, - set_cluster_name, - [ClusterName], - 1000), - ok; - _ -> - ClusterName = cluster_name(CState5), - ok - end, - - case Changed of - true -> - VClock = vclock:increment(Node, CState5?CHSTATE.vclock), - CState5?CHSTATE{vclock=VClock, clustername=ClusterName}; - false -> - CState5 - end - end. - -%% @private -maybe_update_claimant(Node, CState) -> - Members = get_members(CState?CHSTATE.members, [valid, leaving]), - Claimant = CState?CHSTATE.claimant, - RVsn = CState?CHSTATE.rvsn, - NextClaimant = hd(Members ++ [undefined]), - ClaimantMissing = not lists:member(Claimant, Members), - - case {ClaimantMissing, NextClaimant} of - {true, Node} -> - %% Become claimant - RVsn2 = vclock:increment(Claimant, RVsn), - CState2 = CState?CHSTATE{claimant=Node, rvsn=RVsn2}, - {true, CState2}; - _ -> - {false, CState} - end. - -%% @private -maybe_update_ring(Node, CState) -> - Claimant = CState?CHSTATE.claimant, - case Claimant of - Node -> - case claiming_members(CState) of - [] -> - %% Consider logging an error/warning here or even - %% intentionally crashing. This state makes no logical - %% sense given that it represents a cluster without any - %% active nodes. - {false, CState}; - _ -> - {Changed, CState2} = update_ring(Node, CState), - {Changed, CState2} - end; - _ -> - {false, CState} - end. - -%% @private -maybe_remove_exiting(Node, CState) -> - Claimant = CState?CHSTATE.claimant, - case Claimant of - Node -> - %% Change exiting nodes to invalid, skipping this node. - Exiting = get_members(CState?CHSTATE.members, [exiting]) -- [Node], - Changed = (Exiting /= []), - CState2 = - lists:foldl(fun(ENode, CState0) -> - %% Tell exiting node to shutdown. - CName = cluster_name(CState), - riak_core_ring_manager:refresh_ring(ENode, - CName), - set_member(Node, CState0, ENode, - invalid, same_vclock) - end, CState, Exiting), - {Changed, CState2}; - _ -> - {false, CState} - end. - -%% @private -maybe_handle_joining(Node, CState) -> - Claimant = CState?CHSTATE.claimant, - case Claimant of - Node -> - Joining = get_members(CState?CHSTATE.members, [joining]), - Changed = (Joining /= []), - CState2 = - lists:foldl(fun(JNode, CState0) -> - set_member(Node, CState0, JNode, - valid, same_vclock) - end, CState, Joining), - {Changed, CState2}; - _ -> - {false, CState} - end. - -%% @private -update_ring(CNode, CState) -> - Next0 = CState?CHSTATE.next, - - ?ROUT("Members: ~p~n", [CState?CHSTATE.members]), - ?ROUT("Updating ring :: next0 : ~p~n", [Next0]), - - %% Remove tuples from next for removed nodes - InvalidMembers = get_members(CState?CHSTATE.members, [invalid]), - Next2 = lists:filter(fun(NInfo) -> - {Owner, NextOwner, _} = next_owner(NInfo), - not lists:member(Owner, InvalidMembers) and - not lists:member(NextOwner, InvalidMembers) - end, Next0), - CState2 = CState?CHSTATE{next=Next2}, - - %% Transfer ownership after completed handoff - {RingChanged1, CState3} = transfer_ownership(CState2), - ?ROUT("Updating ring :: next1 : ~p~n", [CState3?CHSTATE.next]), - - %% Ressign leaving/inactive indices - {RingChanged2, CState4} = reassign_indices(CState3), - ?ROUT("Updating ring :: next2 : ~p~n", [CState4?CHSTATE.next]), - - %% Rebalance the ring as necessary - Next3 = rebalance_ring(CNode, CState4), - lager:debug("Pending ownership transfers: ~b~n", - [length(pending_changes(CState4))]), - - %% Remove transfers to/from down nodes - Next4 = handle_down_nodes(CState4, Next3), - - NextChanged = (Next0 /= Next4), - Changed = (NextChanged or RingChanged1 or RingChanged2), - case Changed of - true -> - OldS = ordsets:from_list([{Idx,O,NO} || {Idx,O,NO,_,_} <- Next0]), - NewS = ordsets:from_list([{Idx,O,NO} || {Idx,O,NO,_,_} <- Next4]), - Diff = ordsets:subtract(NewS, OldS), - [log(next, NChange) || NChange <- Diff], - RVsn2 = vclock:increment(CNode, CState4?CHSTATE.rvsn), - ?ROUT("Updating ring :: next3 : ~p~n", [Next4]), - {true, CState4?CHSTATE{next=Next4, rvsn=RVsn2}}; - false -> - {false, CState} + riak_core_claimant:ring_changed(Node, CState) end. -%% @private -transfer_ownership(CState=?CHSTATE{next=Next}) -> - %% Remove already completed and transfered changes - Next2 = lists:filter(fun(NInfo={Idx, _, _, _, _}) -> - {_, NewOwner, S} = next_owner(NInfo), - not ((S == complete) and - (index_owner(CState, Idx) =:= NewOwner)) - end, Next), - - CState2 = lists:foldl( - fun(NInfo={Idx, _, _, _, _}, CState0) -> - case next_owner(NInfo) of - {_, Node, complete} -> - log(ownership, {Idx, Node, CState0}), - riak_core_ring:transfer_node(Idx, Node, - CState0); - _ -> - CState0 - end - end, CState, Next2), - - NextChanged = (Next2 /= Next), - RingChanged = (all_owners(CState) /= all_owners(CState2)), - Changed = (NextChanged or RingChanged), - {Changed, CState2?CHSTATE{next=Next2}}. - -%% @private -reassign_indices(CState=?CHSTATE{next=Next}) -> - Invalid = get_members(CState?CHSTATE.members, [invalid]), - CState2 = - lists:foldl(fun(Node, CState0) -> - remove_node(CState0, Node, invalid) - end, CState, Invalid), - CState3 = case Next of - [] -> - Leaving = get_members(CState?CHSTATE.members, [leaving]), - lists:foldl(fun(Node, CState0) -> - remove_node(CState0, Node, leaving) - end, CState2, Leaving); - _ -> - CState2 - end, - Owners1 = all_owners(CState), - Owners2 = all_owners(CState3), - RingChanged = (Owners1 /= Owners2), - NextChanged = (Next /= CState3?CHSTATE.next), - {RingChanged or NextChanged, CState3}. - -%% @private -rebalance_ring(_CNode, CState=?CHSTATE{next=[]}) -> - Members = claiming_members(CState), - CState2 = lists:foldl(fun(Node, Ring0) -> - riak_core_gossip:claim_until_balanced(Ring0, - Node) - end, CState, Members), - Owners1 = all_owners(CState), - Owners2 = all_owners(CState2), - Owners3 = lists:zip(Owners1, Owners2), - Next = [{Idx, PrevOwner, NewOwner, [], awaiting} - || {{Idx, PrevOwner}, {Idx, NewOwner}} <- Owners3, - PrevOwner /= NewOwner], - Next; -rebalance_ring(_CNode, _CState=?CHSTATE{next=Next}) -> - Next. - -%% @private -handle_down_nodes(CState, Next) -> - LeavingMembers = get_members(CState?CHSTATE.members, [leaving, invalid]), - DownMembers = get_members(CState?CHSTATE.members, [down]), - Next2 = [begin - OwnerLeaving = lists:member(O, LeavingMembers), - NextDown = lists:member(NO, DownMembers), - case (OwnerLeaving and NextDown) of - true -> - Active = riak_core_ring:active_members(CState) -- [O], - RNode = lists:nth(random:uniform(length(Active)), - Active), - {Idx, O, RNode, Mods, Status}; - _ -> - T - end - end || T={Idx, O, NO, Mods, Status} <- Next], - Next3 = [T || T={_, O, NO, _, _} <- Next2, - not lists:member(O, DownMembers), - not lists:member(NO, DownMembers)], - Next3. - -%% @private -all_next_owners(?CHSTATE{next=Next}) -> - [{Idx, NextOwner} || {Idx, _, NextOwner, _, _} <- Next]. - -%% @private -change_owners(CState, Reassign) -> - lists:foldl(fun({Idx, NewOwner}, CState0) -> - riak_core_ring:transfer_node(Idx, NewOwner, CState0) - end, CState, Reassign). - -%% @private -remove_node(CState, Node, Status) -> - Indices = indices(CState, Node), - remove_node(CState, Node, Status, Indices). - -%% @private -remove_node(CState, _Node, _Status, []) -> - CState; -remove_node(CState, Node, Status, Indices) -> - CStateT1 = change_owners(CState, all_next_owners(CState)), - CStateT2 = riak_core_gossip:remove_from_cluster(CStateT1, Node), - Owners1 = all_owners(CState), - Owners2 = all_owners(CStateT2), - Owners3 = lists:zip(Owners1, Owners2), - RemovedIndices = case Status of - invalid -> - Indices; - leaving -> - [] - end, - Reassign = [{Idx, NewOwner} || {Idx, NewOwner} <- Owners2, - lists:member(Idx, RemovedIndices)], - Next = [{Idx, PrevOwner, NewOwner, [], awaiting} - || {{Idx, PrevOwner}, {Idx, NewOwner}} <- Owners3, - PrevOwner /= NewOwner, - not lists:member(Idx, RemovedIndices)], - - [log(reassign, {Idx, NewOwner, CState}) || {Idx, NewOwner} <- Reassign], - - %% Unlike rebalance_ring, remove_node can be called when Next is non-empty, - %% therefore we need to merge the values. Original Next has priority. - Next2 = lists:ukeysort(1, CState?CHSTATE.next ++ Next), - CState2 = change_owners(CState, Reassign), - CState2?CHSTATE{next=Next2}. - -%% ==================================================================== -%% Internal functions -%% ==================================================================== - %% @private merge_meta(M1,M2) -> dict:merge(fun(_,D1,D2) -> pick_val(D1,D2) end, M1, M2). @@ -1416,10 +1168,6 @@ transfer_complete(CState=?CHSTATE{next=Next, vclock=VClock}, Idx, Mod) -> VClock2 = vclock:increment(Owner, VClock), CState?CHSTATE{next=Next2, vclock=VClock2}. -%% @private -next_owner({_, Owner, NextOwner, _Transfers, Status}) -> - {Owner, NextOwner, Status}. - %% @private get_members(Members) -> get_members(Members, [joining, valid, leaving, exiting, down]). @@ -1428,23 +1176,6 @@ get_members(Members) -> get_members(Members, Types) -> [Node || {Node, {V, _, _}} <- Members, lists:member(V, Types)]. -%% @private -set_member(Node, CState, Member, Status) -> - VClock = vclock:increment(Node, CState?CHSTATE.vclock), - CState2 = set_member(Node, CState, Member, Status, same_vclock), - CState2?CHSTATE{vclock=VClock}. - -%% @private -set_member(Node, CState, Member, Status, same_vclock) -> - Members2 = orddict:update(Member, - fun({_, VC, MD}) -> - {Status, vclock:increment(Node, VC), MD} - end, - {Status, vclock:increment(Node, - vclock:fresh()), []}, - CState?CHSTATE.members), - CState?CHSTATE{members=Members2}. - %% @private update_seen(Node, CState=?CHSTATE{vclock=VClock, seen=Seen}) -> Seen2 = orddict:update(Node, @@ -1515,17 +1246,6 @@ filtered_seen(State=?CHSTATE{seen=Seen}) -> orddict:filter(fun(N, _) -> lists:member(N, Members) end, Seen) end. -log(ownership, {Idx, NewOwner, CState}) -> - Owner = index_owner(CState, Idx), - lager:debug("(new-owner) ~b :: ~p -> ~p~n", [Idx, Owner, NewOwner]); -log(reassign, {Idx, NewOwner, CState}) -> - Owner = index_owner(CState, Idx), - lager:debug("(reassign) ~b :: ~p -> ~p~n", [Idx, Owner, NewOwner]); -log(next, {Idx, Owner, NewOwner}) -> - lager:debug("(pending) ~b :: ~p -> ~p~n", [Idx, Owner, NewOwner]); -log(_, _) -> - ok. - %% =================================================================== %% EUnit tests %% =================================================================== diff --git a/src/riak_core_ring_manager.erl b/src/riak_core_ring_manager.erl index cc6102aee..0b6ed8860 100644 --- a/src/riak_core_ring_manager.erl +++ b/src/riak_core_ring_manager.erl @@ -256,6 +256,8 @@ handle_call({ring_trans, Fun, Args}, _From, State=#state{raw_ring=Ring}) -> {reply, {ok, NewRing}, State#state{raw_ring=NewRing}}; ignore -> {reply, not_changed, State}; + {ignore, Reason} -> + {reply, {not_changed, Reason}, State}; Other -> lager:error("ring_trans: invalid return value: ~p", [Other]), diff --git a/src/riak_core_sup.erl b/src/riak_core_sup.erl index 13ee0c669..5df6117a1 100644 --- a/src/riak_core_sup.erl +++ b/src/riak_core_sup.erl @@ -70,6 +70,7 @@ init([]) -> ?CHILD(riak_core_node_watcher, worker), ?CHILD(riak_core_vnode_manager, worker), ?CHILD(riak_core_gossip, worker), + ?CHILD(riak_core_claimant, worker), RiakWebs ]),