Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions intercepts/riak_kv_bitcask_backend_intercepts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ corrupting_get(Bucket, Key, ModState) ->

corrupt_binary(O) ->
crypto:rand_bytes(byte_size(O)).

always_corrupt_get(Bucket, Key, ModState) ->
case ?M:get_orig(Bucket, Key, ModState) of
{ok, BinVal0, UpdModState} ->
BinVal = corrupt_binary(BinVal0),
{ok, BinVal, UpdModState};
Else -> Else
end.
25 changes: 25 additions & 0 deletions src/rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,20 @@ heal({_NewCookie, OldCookie, P1, P2}) ->
{_GN, []} = rpc:sbcast(Cluster, riak_core_node_watcher, broadcast),
ok.

%% @doc heal the partition created by call to partition/2, but if some
%% node in P1 is down, just skip it, rather than failing. Returns {ok,
%% list(node())} where the list is those nodes down and therefore not
%% healed/reconnected.
heal_upnodes({_NewCookie, OldCookie, P1, P2}) ->
%% set OldCookie on UP P1 Nodes
Res = [{N, rpc:call(N, erlang, set_cookie, [N, OldCookie])} || N <- P1],
UpForReconnect = [N || {N, true} <- Res],
DownForReconnect = [N || {N, RPC} <- Res, RPC /= true],
Cluster = UpForReconnect ++ P2,
wait_until_connected(Cluster),
{_GN, []} = rpc:sbcast(Cluster, riak_core_node_watcher, broadcast),
{ok, DownForReconnect}.

%% @doc Spawn `Cmd' on the machine running the test harness
spawn_cmd(Cmd) ->
?HARNESS:spawn_cmd(Cmd).
Expand Down Expand Up @@ -778,6 +792,17 @@ wait_until_transfers_complete([Node0|_]) ->
?assertEqual(ok, wait_until(Node0, F)),
ok.

%% @doc Waits until hinted handoffs from `Node0' are complete
wait_until_node_handoffs_complete(Node0) ->
lager:info("Wait until Node's transfers complete ~p", [Node0]),
F = fun(Node) ->
Handoffs = rpc:call(Node, riak_core_handoff_manager, status, [{direction, outbound}]),
lager:info("Handoffs: ~p", [Handoffs]),
Handoffs =:= []
end,
?assertEqual(ok, wait_until(Node0, F)),
ok.

wait_for_service(Node, Services) when is_list(Services) ->
F = fun(N) ->
case rpc:call(N, riak_core_node_watcher, services, [N]) of
Expand Down
185 changes: 185 additions & 0 deletions tests/kv679_dataloss_fb.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2017 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% @copyright (C) 2017, Basho Technologies
%%% @doc
%%% riak_test for kv679 lost clock/fallback/handoff flavour.
%%%
%%% issue kv679 is a possible dataloss issue, it's basically caused by
%%% the fact that per key logical clocks can go backwards in time in
%%% certain situations. The situation under test here is as follows:
%%%
%% A coords a write to K [{a, 1}] and replicates to fallbacks D, E
%% A coords a write to K [{a, 2}] and replicates to primaries B, C
%% A coords a write K [{a, 3}] and replicates to primaries B, C
%% A loses it's clock for K (so far this is like the lost clock case above)
%% Read of A, D, E read repairs A with K=[{a, 1}]
%% A coords a write, issues [{a, 2}] again
%% Acked write is lost
%%%
%%%
%%% @end

-module(kv679_dataloss_fb).
-behavior(riak_test).
-compile([export_all]).
-export([confirm/0]).

-include_lib("eunit/include/eunit.hrl").

-define(BUCKET, <<"kv679">>).
-define(KEY, <<"test">>).

confirm() ->
Conf = [
{riak_kv, [{anti_entropy, {off, []}}]},
{riak_core, [{default_bucket_props, [{allow_mult, true},
{dvv_enabled, true},
{ring_creation_size, 8},
{vnode_management_timer, 1000},
{handoff_concurrency, 100},
{vnode_inactivity_timeout, 1000}]}]},
{bitcask, [{sync_strategy, o_sync}, {io_mode, nif}]}],

%% Such nodes 'cos I want a perfect preflist when 2 primaries are
%% down. i.e. I want to kill the fallbacks before they can
%% handoff without effecting the primaries
Nodes = rt:build_cluster(6, Conf),

Clients = kv679_tombstone:create_pb_clients(Nodes),

%% Get preflist for key
PL = kv679_tombstone:get_preflist(hd(Nodes)),

?assert(kv679_tombstone2:perfect_preflist(PL)),

lager:info("Got preflist"),

{CoordNode, _}=CoordClient = kv679_tombstone:coordinating_client(Clients, PL),

OtherPrimaries = [Node || {{_Idx, Node}, Type} <- PL,
Type == primary,
Node /= CoordNode],

[rt:stop_and_wait(N) || N <- OtherPrimaries],

lager:info("Killed 2 primaries"),

rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode),
primary_and_fallback_counts(NewPL) == {1, 2}
end),

FBPL = kv679_tombstone:get_preflist(CoordNode),

lager:info("Got a preflist with coord and 2 fbs ~p~n", [FBPL]),

%% Write key twice at remaining, coordinating primary
kv679_tombstone:write_key(CoordClient, [<<"bob">>, <<"jim">>]),
kv679_tombstone2:dump_clock(CoordClient),
lager:info("Clock at 2 fallbacks"),

%% Kill the fallbacks before they can handoff
Fallbacks = [Node || {{_Idx, Node}, Type} <- FBPL,
Type == fallback],

[rt:brutal_kill(FB) || FB <- Fallbacks],

%% Bring back the primaries and do some more writes
[rt:start_and_wait(P) || P <- OtherPrimaries],
lager:info("started primaries back up"),
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode),
NewPL == PL
end),
kv679_tombstone:write_key(CoordClient, [<<"jon">>, <<"joe">>]),
kv679_tombstone2:dump_clock(CoordClient),

%% Kill those primaries with their frontier clocks
[rt:brutal_kill(P) || P <- OtherPrimaries],
lager:info("killed primaries again"),

%% delete the local data at the coordinator Key
kv679_dataloss:delete_datadir(hd(PL)),

%% Start up those fallbacks
[rt:start_and_wait(F) || F <- Fallbacks],
lager:info("restart fallbacks"),
%% Wait for the fallback prefist
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode),
NewPL == FBPL
end),

%% Read the key, read repair will mean that the data deleted vnode
%% will have an old clock (gone back in time!)
await_read_repair(CoordClient),
kv679_tombstone2:dump_clock(CoordClient),

%% write a new value, this _should_ be a sibling of what is on
%% crashed primaries
kv679_tombstone:write_key(CoordClient, <<"anne">>),
kv679_tombstone2:dump_clock(CoordClient),

%% Time to start up those primaries, let handoff happen, and see
%% what happened to that last write
[rt:start_and_wait(P) || P <- OtherPrimaries],
lager:info("restart primaries _again_"),
rt:wait_until(fun() ->
NewPL = kv679_tombstone:get_preflist(CoordNode),
NewPL == PL
end),

lager:info("wait for handoffs"),
[begin
rpc:call(FB, riak_core_vnode_manager, force_handoffs, []),
rt:wait_until_transfers_complete([FB])
end || FB <- Fallbacks],

lager:info("final get"),

Res = kv679_tombstone:read_key(CoordClient),
?assertMatch({ok, _}, Res),
{ok, O} = Res,

%% A nice riak would have somehow managed to make a sibling of the
%% last write
?assertEqual([<<"anne">>, <<"joe">>], riakc_obj:get_values(O)),
lager:info("Final Object ~p~n", [O]),
pass.

primary_and_fallback_counts(PL) ->
lists:foldl(fun({{_, _}, primary}, {P, F}) ->
{P+1, F};
({{_, _}, fallback}, {P, F}) ->
{P, F+1}
end,
{0, 0},
PL).

%% Wait until a read repair has occured, or at least, wait until there
%% is a value on disk at the coordinating/primary vnode (and assume it
%% must have got there via read repair)
await_read_repair(Client) ->
rt:wait_until(fun() ->
{ok, _O} = kv679_tombstone:read_key(Client),
{T, V} = kv679_tombstone:read_key(Client, [{pr,1},{r,1}, {sloppy_quorum, false}]),
lager:info("pr=1 fetch res ~p ~p", [T, V]),
T /= error
end).
Loading