Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 67 additions & 8 deletions src/yokozuna_rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
-include("yokozuna_rt.hrl").

-export([check_exists/2,
clear_trees/1,
commit/2,
expire_trees/1,
gen_keys/1,
host_entries/1,
override_schema/5,
remove_index_dirs/2,
rolling_upgrade/2,
rolling_upgrade/3,
Expand All @@ -33,6 +37,7 @@
search_expect/5,
search_expect/6,
search_expect/7,
assert_search/6,
verify_num_found_query/3,
wait_for_aae/1,
wait_for_full_exchange_round/2,
Expand All @@ -48,11 +53,20 @@
-type json_string() :: atom | string() | binary().

-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))).
-define(SOFTCOMMIT, 1000).

-spec host_entries(rt:conn_info()) -> [{host(), portnum()}].
host_entries(ClusterConnInfo) ->
[riak_http(I) || {_,I} <- ClusterConnInfo].

%% @doc Generate `SeqMax' keys. Yokozuna supports only UTF-8 compatible keys.
-spec gen_keys(pos_integer()) -> list().
gen_keys(SeqMax) ->
[<<N:64/integer>> || N <- lists:seq(1, SeqMax),
not lists:any(
fun(E) -> E > 127 end,
binary_to_list(<<N:64/integer>>))].

%% @doc Write `Keys' via the PB inteface to a `Bucket' and have them
%% searchable in an `Index'.
-spec write_data([node()], pid(), index_name(), bucket(), [binary()]) -> ok.
Expand Down Expand Up @@ -212,6 +226,15 @@ expire_trees(Cluster) ->
timer:sleep(100),
ok.

%% @doc Expire YZ trees
-spec clear_trees([node()]) -> ok.
clear_trees(Cluster) ->
lager:info("Expire all trees"),
_ = [ok = rpc:call(Node, yz_entropy_mgr, clear_trees, [])
|| Node <- Cluster],
ok.


%% @doc Remove index directories, removing the index.
-spec remove_index_dirs([node()], index_name()) -> ok.
remove_index_dirs(Nodes, IndexName) ->
Expand Down Expand Up @@ -261,6 +284,20 @@ search_expect(solr, {Host, Port}, Index, Name0, Term0, Shards, Expect)
{ok, "200", _, R} = ibrowse:send_req(URL, [], get, [], Opts),
verify_count_http(Expect, R).

assert_search(Pid, Cluster, Index, Search, SearchExpect, Params) ->
F = fun(_) ->
lager:info("Searching ~p and asserting it exists",
[SearchExpect]),
{ok,{search_results,[{_Index,Fields}], _Score, Found}} =
riakc_pb_socket:search(Pid, Index, Search, Params),
?assert(lists:member(SearchExpect, Fields)),
case Found of
1 -> true;
0 -> false
end
end,
rt:wait_until(Cluster, F).

search(HP, Index, Name, Term) ->
search(yokozuna, HP, Index, Name, Term).

Expand Down Expand Up @@ -289,7 +326,7 @@ search(Type, {Host, Port}, Index, Name0, Term0) ->
verify_count_http(Expected, Resp) ->
Count = get_count_http(Resp),
lager:info("Expected: ~p, Actual: ~p", [Expected, Count]),
Expected == Count.
?assertEqual(Expected, Count).

-spec get_count_http(json_string()) -> count().
get_count_http(Resp) ->
Expand Down Expand Up @@ -333,20 +370,24 @@ create_and_set_index(Cluster, Pid, Bucket, Index) ->
ok = riakc_pb_socket:create_search_index(Pid, Index),
%% For possible legacy upgrade reasons, wrap create index in a wait
wait_for_index(Cluster, Index),
set_index(Pid, Bucket, Index).
set_index(Pid, hd(Cluster), Bucket, Index).
-spec create_and_set_index([node()], pid(), bucket(), index_name(),
schema_name()) -> ok.
create_and_set_index(Cluster, Pid, Bucket, Index, Schema) ->
%% Create a search index and associate with a bucket
lager:info("Create a search index ~s with a custom schema named ~s and
associate it with bucket ~s", [Index, Schema, Bucket]),
lager:info("Create a search index ~s with a custom schema named ~s and " ++
"associate it with bucket ~p", [Index, Schema, Bucket]),
ok = riakc_pb_socket:create_search_index(Pid, Index, Schema, []),
%% For possible legacy upgrade reasons, wrap create index in a wait
wait_for_index(Cluster, Index),
set_index(Pid, Bucket, Index).

-spec set_index(pid(), bucket(), index_name()) -> ok.
set_index(Pid, Bucket, Index) ->
set_index(Pid, hd(Cluster), Bucket, Index).

-spec set_index(pid(), node(), bucket(), index_name()) -> ok.
set_index(_Pid, Node, {BucketType, _Bucket}, Index) ->
lager:info("Create and activate map-based bucket type ~s and tie it to search_index ~s",
[BucketType, Index]),
rt:create_and_activate_bucket_type(Node, BucketType, [{search_index, Index}]);
set_index(Pid, _Node, Bucket, Index) ->
ok = riakc_pb_socket:set_search_index(Pid, Bucket, Index).

internal_solr_url(Host, Port, Index) ->
Expand All @@ -360,3 +401,21 @@ internal_solr_url(Host, Port, Index, Name, Term, Shards) ->
quote_unicode(Value) ->
mochiweb_util:quote_plus(binary_to_list(
unicode:characters_to_binary(Value))).

-spec commit([node()], index_name()) -> ok.
commit(Nodes, Index) ->
%% Wait for yokozuna index to trigger, then force a commit
timer:sleep(?SOFTCOMMIT),
lager:info("Commit search writes to ~s at softcommit (default) ~p",
[Index, ?SOFTCOMMIT]),
rpc:multicall(Nodes, yz_solr, commit, [Index]),
ok.

-spec override_schema(pid(), [node()], index_name(), schema_name(), string()) ->
{ok, [node()]}.
override_schema(Pid, Cluster, Index, Schema, RawUpdate) ->
lager:info("Overwrite schema with updated schema"),
ok = riakc_pb_socket:create_search_schema(Pid, Schema, RawUpdate),
yokozuna_rt:wait_for_schema(Cluster, Schema, RawUpdate),
[Node|_] = Cluster,
{ok, _} = rpc:call(Node, yz_index, reload, [Index]).
27 changes: 24 additions & 3 deletions tests/verify_snmp_repl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@
-include_lib("eunit/include/eunit.hrl").
-compile({parse_transform, rt_intercept_pt}).

-define(FIRST_CLUSTER, "cluster-1").
-define(OTHER_CLUSTERS, ["cluster-2", "cluster-3"]).
-define(CLUSTERS, [?FIRST_CLUSTER] ++ ?OTHER_CLUSTERS).

confirm() ->
Clusters = make_clusters(["cluster-1", "cluster-2", "cluster-3"], 1),
Clusters = make_clusters(?CLUSTERS, 1),
[{_, Leader, _}|_] = Clusters,
intercept_riak_snmp_stat_poller(Leader),
wait_for_snmp_stat_poller().
wait_for_snmp_stat_poller(),
verify_snmp_cluster_stats(Clusters),
pass.

make_clusters(Names, NodeCount) ->
ClusterCount = length(Names),
Expand Down Expand Up @@ -102,7 +108,22 @@ wait_until_leader_converge({_Name, Nodes}) ->
enable_realtime([{_, Node, _}|OtherClusters]) ->
lists:foreach(
fun({Cluster, _, _}) ->
repl_util:enable_realtime(Node, Cluster)
repl_util:enable_realtime(Node, Cluster),
timer:sleep(1000)
end,
OtherClusters).

%% snmpwalk gives the following output containing the OIDs we're interested in:
%% SNMPv2-SMI::enterprises.31130.200.1.1.1.99.108.117.115.116.101.114.45.50 = STRING: "cluster-2"
%% SNMPv2-SMI::enterprises.31130.200.1.1.1.99.108.117.115.116.101.114.45.51 = STRING: "cluster-3"
-define(CLUSTER_OIDS,
[[1,3,6,1,4,1,31130,200,1,1,1,99,108,117,115,116,101,114,45] ++ [Tail]
|| Tail <- [50, 51]]).

verify_snmp_cluster_stats(Clusters) ->
[{_Name, Leader, [_Nodes]} | _Rest] = Clusters,
rpc:call(Leader, riak_core, wait_for_application, [snmp]),
rpc:call(Leader, riak_core, wait_for_application, [riak_snmp]),
ClusterJoins = rpc:call(Leader, snmpa, get, [snmp_master_agent, ?CLUSTER_OIDS]),
?assertEqual(?OTHER_CLUSTERS, ClusterJoins).

8 changes: 4 additions & 4 deletions tests/yz_core_properties_create_unload.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ confirm() ->
%% Write keys and wait for soft commit
lager:info("Writing ~p keys", [KeyCount]),
[ok = rt:pbc_write(Pid, ?BUCKET, Key, Key, "text/plain") || Key <- Keys],
timer:sleep(1100),
yokozuna_rt:commit(Cluster, ?INDEX),

verify_count(Pid, KeyCount),

Expand All @@ -87,7 +87,7 @@ test_core_props_removal(Cluster, RandNodes, KeyCount, Pid) ->

lager:info("Write one more piece of data"),
ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"),
timer:sleep(1100),
yokozuna_rt:commit(Cluster, ?INDEX),

verify_count(Pid, KeyCount + 1).

Expand All @@ -102,7 +102,7 @@ test_remove_index_dirs(Cluster, RandNodes, KeyCount, Pid) ->

lager:info("Write second piece of data"),
ok = rt:pbc_write(Pid, ?BUCKET, <<"food">>, <<"foody">>, "text/plain"),
timer:sleep(1100),
yokozuna_rt:commit(Cluster, ?INDEX),

verify_count(Pid, KeyCount + 2).

Expand All @@ -121,7 +121,7 @@ test_remove_segment_infos_and_rebuild(Cluster, RandNodes, KeyCount, Pid) ->

lager:info("Write third piece of data"),
ok = rt:pbc_write(Pid, ?BUCKET, <<"baz">>, <<"bar">>, "text/plain"),
timer:sleep(1100),
yokozuna_rt:commit(Cluster, ?INDEX),

verify_count(Pid, KeyCount + 3).

Expand Down
3 changes: 1 addition & 2 deletions tests/yz_crdt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ confirm() ->
?KEY,
riakc_map:to_op(Map2)),

%% Wait for yokozuna index to trigger.
timer:sleep(1000),
yokozuna_rt:commit(Nodes, ?INDEX),

%% Perform simple queries, check for register, set fields.
{ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search(
Expand Down
11 changes: 3 additions & 8 deletions tests/yz_default_bucket_type_upgrade.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,14 @@ confirm() ->

[rt:assert_capability(ANode, ?YZ_CAP, {unknown_capability, ?YZ_CAP}) || ANode <- Cluster],

%% Generate keys, YZ only supports UTF-8 compatible keys
GenKeys = [<<N:64/integer>> || N <- lists:seq(1, ?SEQMAX),
not lists:any(
fun(E) -> E > 127 end,
binary_to_list(<<N:64/integer>>))],
GenKeys = yokozuna_rt:gen_keys(?SEQMAX),
KeyCount = length(GenKeys),
lager:info("KeyCount ~p", [KeyCount]),

OldPid = rt:pbc(Node),

yokozuna_rt:write_data(Cluster, OldPid, ?INDEX, ?BUCKET, GenKeys),
%% wait for solr soft commit
timer:sleep(1100),
yokozuna_rt:commit(Cluster, ?INDEX),

yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount),

Expand All @@ -86,7 +81,7 @@ confirm() ->
lager:info("Write one more piece of data"),
Pid = rt:pbc(Node),
ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"),
timer:sleep(1100),
yokozuna_rt:commit(Cluster, ?INDEX),

yokozuna_rt:expire_trees(Cluster),
yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 1),
Expand Down
10 changes: 5 additions & 5 deletions tests/yz_ensemble.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,26 @@ confirm() ->
create_index(Node, Index),
set_bucket_props(Node, Bucket, Index),

verify_ensemble_delete_support(Node, Bucket, Index),
verify_ensemble_delete_support(Nodes, Bucket, Index),

pass.


%% @private
%% @doc Populates then deletes from SC bucket
verify_ensemble_delete_support(Node, Bucket, Index) ->
verify_ensemble_delete_support(Cluster, Bucket, Index) ->
%% Yz only supports UTF-8 compatible keys
Keys = [<<N:64/integer>> || N <- lists:seq(1,2000),
not lists:any(fun(E) -> E > 127 end,binary_to_list(<<N:64/integer>>))],

PBC = rt:pbc(Node),
PBC = rt:pbc(hd(Cluster)),

lager:info("Writing ~p keys", [length(Keys)]),
[ok = rt:pbc_write(PBC, Bucket, Key, Key, "text/plain") || Key <- Keys],
yokozuna_rt:commit(Cluster, Index),

%% soft commit wait, then check that last key is indexed
lager:info("Search for keys to verify they exist"),
timer:sleep(1000),
LKey = lists:last(Keys),
rt:wait_until(fun() ->
{M, _} = riakc_pb_socket:search(PBC, Index, query_value(LKey)),
Expand All @@ -64,7 +64,7 @@ verify_ensemble_delete_support(Node, Bucket, Index) ->

lager:info("Deleting keys"),
[riakc_pb_socket:delete(PBC, Bucket, Key) || Key <- Keys],
timer:sleep(1000),
yokozuna_rt:commit(Cluster, Index),
rt:wait_until(fun() ->
case riakc_pb_socket:search(PBC, Index, query_value(LKey)) of
{ok,{search_results,Res,_,_}} ->
Expand Down
Loading