Skip to content

Conversation

@evanmcc
Copy link
Contributor

@evanmcc evanmcc commented Oct 16, 2013

add a simple, point to point capability check for batch message support. if not, fall back to per-object messages.

support. if not, fall back to per-object messages.
@jonmeredith
Copy link
Contributor

+1 merge.

Tested by hand against 1.2.5anya, 1.3.2, 1.4.2, 1.4.2patched. Verified all vnode entries made it there using this hacky little thing. Populated cluster - added/removed nodes and verified vnode count/content was still correct after all transfers completed. AAE has to be disabled for it to be a valid test.

-module(handoffh).
-compile([export_all]).

-define(MD_VTAG,     <<"X-Riak-VTag">>).
-define(MD_LASTMOD,  <<"X-Riak-Last-Modified">>).

write() ->
    write(0).

write(Seq) ->
    Parts = get_parts(),
    [ok = rpc:call(Node, ?MODULE, write_vnode, [Partition, partition_entries(Idx), Seq]) ||
        {Idx, {Partition, Node}} <- Parts],
    ok.

read() ->
    read(0).

read(Seq) ->
    Parts = get_parts(),
    [{Node, Idx, Valid} || {Idx, {Partition, Node}} <- Parts,
                           (Valid = rpc:call(Node, ?MODULE, read_vnode, [Partition, partition_entries(Idx), Seq])) /= partition_entries(Idx)].

get_parts() ->
    {ok, R} = riak_core_ring_manager:get_my_ring(),
    Owners = riak_core_ring:all_owners(R),
    lists:zip(lists:seq(1, length(Owners)), Owners).

write_vnode(_Partition, 0, _Seq) ->
    ok;
write_vnode(Partition, Entries, Seq) ->
    ok = write_entry(Partition, Entries, Seq),
    write_vnode(Partition, Entries - 1, Seq).

read_vnode(Partition, Entries, Seq) ->
    read_vnode(Partition, Entries, Seq, 0).

read_vnode(_Partition, 0, _Seq, NumOk) ->
    NumOk;
read_vnode(Partition, Entries, Seq, NumOk) ->
    case read_entry(Partition, Entries, Seq) of
        true ->
            read_vnode(Partition, Entries - 1, Seq, NumOk + 1);
        false ->
            read_vnode(Partition, Entries - 1, Seq, NumOk)
    end.

write_entry(Partition, Entry, Seq) ->
    Obj = make_obj(Partition, Entry, Seq),
    case local_put(Partition, Obj) of
        {{w, Partition, _},{dw, Partition, _}} ->
            ok;
        Reply ->
            Reply
    end.

read_entry(Partition, Entry, Seq) ->
    case local_get(Partition, {<<"b">>, <<Entry:32>>}) of
        {ok, Obj} ->
            check_entry(Partition, Entry, Seq, Obj);
        _ ->
            false
    end.

check_entry(Partition, Entry, Seq,Obj) ->
    make_value(Partition, Entry, Seq) == riak_object:get_value(Obj).

make_obj(Partition, Entry, Seq) ->
    O1 = riak_object:new(<<"b">>,<<Entry:32>>, make_value(Partition, Entry, Seq)),
    O2 = update_last_modified(O1),
    riak_object:apply_updates(O2).

make_value(Partition, Entry, Seq) ->
    <<Partition:160, Entry:32, Seq:32>>.

partition_entries(Idx) ->
    Idx * 19 + 1.

update_last_modified(RObj) ->
    MD0 = case dict:find(clean, riak_object:get_update_metadata(RObj)) of
              {ok, true} ->
                  %% There have been no changes to updatemetadata. If we stash the
                  %% last modified in this dict, it will cause us to lose existing
                  %% metadata (bz://508). If there is only one instance of metadata,
                  %% we can safely update that one, but in the case of multiple siblings,
                  %% it's hard to know which one to use. In that situation, use the update
                  %% metadata as is.
                  case riak_object:get_metadatas(RObj) of
                      [MD] ->
                          MD;
                      _ ->
                          riak_object:get_update_metadata(RObj)
                  end;
               _ ->
                  riak_object:get_update_metadata(RObj)
          end,
    %% Post-0.14.2 changed vtags to be generated from node/now rather the vclocks.
    %% The vclock has not been updated at this point.  Vtags/etags should really
    %% be an external interface concern and are only used for sibling selection
    %% and if-modified type tests so they could be generated on retrieval instead.
    %% This changes from being a hash on the value to a likely-to-be-unique value
    %% which should serve the same purpose.  It was possible to generate two
    %% objects with the same vclock on 0.14.2 if the same clientid was used in
    %% the same second.  It can be revisited post-1.0.0.
    Now = os:timestamp(),
    NewMD = dict:store(?MD_VTAG, make_vtag(Now),
                       dict:store(?MD_LASTMOD, Now, MD0)),
    riak_object:update_metadata(RObj, NewMD).

make_vtag(Now) ->
    <<HashAsNum:128/integer>> = crypto:md5(term_to_binary({node(), Now})),
    riak_core_util:integer_to_list(HashAsNum,62).

local_get(Index, BKey) ->
    ReqId = erlang:phash2(erlang:now()),
    riak_kv_vnode:get({Index,node()}, BKey, ReqId),
    receive
        {'$gen_event', {r, Result, Index, ReqId}} ->
            Result;
        {'$gen_event', Reply} ->
            {error, Reply}
    end.


local_put(Index, Obj) ->
    local_put(Index, Obj, []).

local_put(Index, Obj, Options) ->
    BKey = {riak_object:bucket(Obj), riak_object:key(Obj)},
    Ref = make_ref(),
    ReqId = erlang:phash2(erlang:now()),
    StartTime = riak_core_util:moment(),
    Sender = {raw, Ref, self()},
    riak_kv_vnode:put({Index, node()}, BKey, Obj, ReqId, StartTime, Options, Sender),
    receive
        {Ref, Reply} ->
            case Reply of
                {w, _, _} ->
                    receive
                        {Ref, Reply2} ->
                            {Reply, Reply2}
                    after
                        30000 ->
                            {error, timeout2}
                    end
            end
    after
        30000 ->
            {error, timeout}
    end.

evanmcc added a commit that referenced this pull request Oct 17, 2013
@evanmcc evanmcc merged commit 9ced77a into 1.4 Oct 17, 2013
@evanmcc
Copy link
Contributor Author

evanmcc commented Oct 17, 2013

this should also go into develop at some point.

@seancribbs seancribbs deleted the pevm-handoff-back-compat branch April 1, 2015 23:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants