diff --git a/src/yokozuna_rt.erl b/src/yokozuna_rt.erl index 7063ec0db..84e7149b4 100644 --- a/src/yokozuna_rt.erl +++ b/src/yokozuna_rt.erl @@ -23,8 +23,12 @@ -include("yokozuna_rt.hrl"). -export([check_exists/2, + clear_trees/1, + commit/2, expire_trees/1, + gen_keys/1, host_entries/1, + override_schema/5, remove_index_dirs/2, rolling_upgrade/2, rolling_upgrade/3, @@ -33,6 +37,7 @@ search_expect/5, search_expect/6, search_expect/7, + assert_search/6, verify_num_found_query/3, wait_for_aae/1, wait_for_full_exchange_round/2, @@ -48,11 +53,20 @@ -type json_string() :: atom | string() | binary(). -define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))). +-define(SOFTCOMMIT, 1000). -spec host_entries(rt:conn_info()) -> [{host(), portnum()}]. host_entries(ClusterConnInfo) -> [riak_http(I) || {_,I} <- ClusterConnInfo]. +%% @doc Generate `SeqMax' keys. Yokozuna supports only UTF-8 compatible keys. +-spec gen_keys(pos_integer()) -> list(). +gen_keys(SeqMax) -> + [<> || N <- lists:seq(1, SeqMax), + not lists:any( + fun(E) -> E > 127 end, + binary_to_list(<>))]. + %% @doc Write `Keys' via the PB inteface to a `Bucket' and have them %% searchable in an `Index'. -spec write_data([node()], pid(), index_name(), bucket(), [binary()]) -> ok. @@ -212,6 +226,15 @@ expire_trees(Cluster) -> timer:sleep(100), ok. +%% @doc Expire YZ trees +-spec clear_trees([node()]) -> ok. +clear_trees(Cluster) -> + lager:info("Expire all trees"), + _ = [ok = rpc:call(Node, yz_entropy_mgr, clear_trees, []) + || Node <- Cluster], + ok. + + %% @doc Remove index directories, removing the index. -spec remove_index_dirs([node()], index_name()) -> ok. remove_index_dirs(Nodes, IndexName) -> @@ -261,6 +284,20 @@ search_expect(solr, {Host, Port}, Index, Name0, Term0, Shards, Expect) {ok, "200", _, R} = ibrowse:send_req(URL, [], get, [], Opts), verify_count_http(Expect, R). +assert_search(Pid, Cluster, Index, Search, SearchExpect, Params) -> + F = fun(_) -> + lager:info("Searching ~p and asserting it exists", + [SearchExpect]), + {ok,{search_results,[{_Index,Fields}], _Score, Found}} = + riakc_pb_socket:search(Pid, Index, Search, Params), + ?assert(lists:member(SearchExpect, Fields)), + case Found of + 1 -> true; + 0 -> false + end + end, + rt:wait_until(Cluster, F). + search(HP, Index, Name, Term) -> search(yokozuna, HP, Index, Name, Term). @@ -289,7 +326,7 @@ search(Type, {Host, Port}, Index, Name0, Term0) -> verify_count_http(Expected, Resp) -> Count = get_count_http(Resp), lager:info("Expected: ~p, Actual: ~p", [Expected, Count]), - Expected == Count. + ?assertEqual(Expected, Count). -spec get_count_http(json_string()) -> count(). get_count_http(Resp) -> @@ -333,20 +370,24 @@ create_and_set_index(Cluster, Pid, Bucket, Index) -> ok = riakc_pb_socket:create_search_index(Pid, Index), %% For possible legacy upgrade reasons, wrap create index in a wait wait_for_index(Cluster, Index), - set_index(Pid, Bucket, Index). + set_index(Pid, hd(Cluster), Bucket, Index). -spec create_and_set_index([node()], pid(), bucket(), index_name(), schema_name()) -> ok. create_and_set_index(Cluster, Pid, Bucket, Index, Schema) -> %% Create a search index and associate with a bucket - lager:info("Create a search index ~s with a custom schema named ~s and - associate it with bucket ~s", [Index, Schema, Bucket]), + lager:info("Create a search index ~s with a custom schema named ~s and " ++ + "associate it with bucket ~p", [Index, Schema, Bucket]), ok = riakc_pb_socket:create_search_index(Pid, Index, Schema, []), %% For possible legacy upgrade reasons, wrap create index in a wait wait_for_index(Cluster, Index), - set_index(Pid, Bucket, Index). - --spec set_index(pid(), bucket(), index_name()) -> ok. -set_index(Pid, Bucket, Index) -> + set_index(Pid, hd(Cluster), Bucket, Index). + +-spec set_index(pid(), node(), bucket(), index_name()) -> ok. +set_index(_Pid, Node, {BucketType, _Bucket}, Index) -> + lager:info("Create and activate map-based bucket type ~s and tie it to search_index ~s", + [BucketType, Index]), + rt:create_and_activate_bucket_type(Node, BucketType, [{search_index, Index}]); +set_index(Pid, _Node, Bucket, Index) -> ok = riakc_pb_socket:set_search_index(Pid, Bucket, Index). internal_solr_url(Host, Port, Index) -> @@ -360,3 +401,21 @@ internal_solr_url(Host, Port, Index, Name, Term, Shards) -> quote_unicode(Value) -> mochiweb_util:quote_plus(binary_to_list( unicode:characters_to_binary(Value))). + +-spec commit([node()], index_name()) -> ok. +commit(Nodes, Index) -> + %% Wait for yokozuna index to trigger, then force a commit + timer:sleep(?SOFTCOMMIT), + lager:info("Commit search writes to ~s at softcommit (default) ~p", + [Index, ?SOFTCOMMIT]), + rpc:multicall(Nodes, yz_solr, commit, [Index]), + ok. + +-spec override_schema(pid(), [node()], index_name(), schema_name(), string()) -> + {ok, [node()]}. +override_schema(Pid, Cluster, Index, Schema, RawUpdate) -> + lager:info("Overwrite schema with updated schema"), + ok = riakc_pb_socket:create_search_schema(Pid, Schema, RawUpdate), + yokozuna_rt:wait_for_schema(Cluster, Schema, RawUpdate), + [Node|_] = Cluster, + {ok, _} = rpc:call(Node, yz_index, reload, [Index]). diff --git a/tests/verify_snmp_repl.erl b/tests/verify_snmp_repl.erl index 9bab62bbe..ead7bff2e 100644 --- a/tests/verify_snmp_repl.erl +++ b/tests/verify_snmp_repl.erl @@ -24,11 +24,17 @@ -include_lib("eunit/include/eunit.hrl"). -compile({parse_transform, rt_intercept_pt}). +-define(FIRST_CLUSTER, "cluster-1"). +-define(OTHER_CLUSTERS, ["cluster-2", "cluster-3"]). +-define(CLUSTERS, [?FIRST_CLUSTER] ++ ?OTHER_CLUSTERS). + confirm() -> - Clusters = make_clusters(["cluster-1", "cluster-2", "cluster-3"], 1), + Clusters = make_clusters(?CLUSTERS, 1), [{_, Leader, _}|_] = Clusters, intercept_riak_snmp_stat_poller(Leader), - wait_for_snmp_stat_poller(). + wait_for_snmp_stat_poller(), + verify_snmp_cluster_stats(Clusters), + pass. make_clusters(Names, NodeCount) -> ClusterCount = length(Names), @@ -102,7 +108,22 @@ wait_until_leader_converge({_Name, Nodes}) -> enable_realtime([{_, Node, _}|OtherClusters]) -> lists:foreach( fun({Cluster, _, _}) -> - repl_util:enable_realtime(Node, Cluster) + repl_util:enable_realtime(Node, Cluster), + timer:sleep(1000) end, OtherClusters). +%% snmpwalk gives the following output containing the OIDs we're interested in: +%% SNMPv2-SMI::enterprises.31130.200.1.1.1.99.108.117.115.116.101.114.45.50 = STRING: "cluster-2" +%% SNMPv2-SMI::enterprises.31130.200.1.1.1.99.108.117.115.116.101.114.45.51 = STRING: "cluster-3" +-define(CLUSTER_OIDS, + [[1,3,6,1,4,1,31130,200,1,1,1,99,108,117,115,116,101,114,45] ++ [Tail] + || Tail <- [50, 51]]). + +verify_snmp_cluster_stats(Clusters) -> + [{_Name, Leader, [_Nodes]} | _Rest] = Clusters, + rpc:call(Leader, riak_core, wait_for_application, [snmp]), + rpc:call(Leader, riak_core, wait_for_application, [riak_snmp]), + ClusterJoins = rpc:call(Leader, snmpa, get, [snmp_master_agent, ?CLUSTER_OIDS]), + ?assertEqual(?OTHER_CLUSTERS, ClusterJoins). + diff --git a/tests/yz_core_properties_create_unload.erl b/tests/yz_core_properties_create_unload.erl index 29a6180d3..70bf967b7 100644 --- a/tests/yz_core_properties_create_unload.erl +++ b/tests/yz_core_properties_create_unload.erl @@ -67,7 +67,7 @@ confirm() -> %% Write keys and wait for soft commit lager:info("Writing ~p keys", [KeyCount]), [ok = rt:pbc_write(Pid, ?BUCKET, Key, Key, "text/plain") || Key <- Keys], - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), verify_count(Pid, KeyCount), @@ -87,7 +87,7 @@ test_core_props_removal(Cluster, RandNodes, KeyCount, Pid) -> lager:info("Write one more piece of data"), ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"), - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), verify_count(Pid, KeyCount + 1). @@ -102,7 +102,7 @@ test_remove_index_dirs(Cluster, RandNodes, KeyCount, Pid) -> lager:info("Write second piece of data"), ok = rt:pbc_write(Pid, ?BUCKET, <<"food">>, <<"foody">>, "text/plain"), - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), verify_count(Pid, KeyCount + 2). @@ -121,7 +121,7 @@ test_remove_segment_infos_and_rebuild(Cluster, RandNodes, KeyCount, Pid) -> lager:info("Write third piece of data"), ok = rt:pbc_write(Pid, ?BUCKET, <<"baz">>, <<"bar">>, "text/plain"), - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), verify_count(Pid, KeyCount + 3). diff --git a/tests/yz_crdt.erl b/tests/yz_crdt.erl index 815f9bd98..1a103815e 100644 --- a/tests/yz_crdt.erl +++ b/tests/yz_crdt.erl @@ -59,8 +59,7 @@ confirm() -> ?KEY, riakc_map:to_op(Map2)), - %% Wait for yokozuna index to trigger. - timer:sleep(1000), + yokozuna_rt:commit(Nodes, ?INDEX), %% Perform simple queries, check for register, set fields. {ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search( diff --git a/tests/yz_default_bucket_type_upgrade.erl b/tests/yz_default_bucket_type_upgrade.erl index 20d197779..f3d94e849 100644 --- a/tests/yz_default_bucket_type_upgrade.erl +++ b/tests/yz_default_bucket_type_upgrade.erl @@ -59,19 +59,14 @@ confirm() -> [rt:assert_capability(ANode, ?YZ_CAP, {unknown_capability, ?YZ_CAP}) || ANode <- Cluster], - %% Generate keys, YZ only supports UTF-8 compatible keys - GenKeys = [<> || N <- lists:seq(1, ?SEQMAX), - not lists:any( - fun(E) -> E > 127 end, - binary_to_list(<>))], + GenKeys = yokozuna_rt:gen_keys(?SEQMAX), KeyCount = length(GenKeys), lager:info("KeyCount ~p", [KeyCount]), OldPid = rt:pbc(Node), yokozuna_rt:write_data(Cluster, OldPid, ?INDEX, ?BUCKET, GenKeys), - %% wait for solr soft commit - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount), @@ -86,7 +81,7 @@ confirm() -> lager:info("Write one more piece of data"), Pid = rt:pbc(Node), ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"), - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), yokozuna_rt:expire_trees(Cluster), yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 1), diff --git a/tests/yz_ensemble.erl b/tests/yz_ensemble.erl index 5b0361e58..f4a604592 100644 --- a/tests/yz_ensemble.erl +++ b/tests/yz_ensemble.erl @@ -34,26 +34,26 @@ confirm() -> create_index(Node, Index), set_bucket_props(Node, Bucket, Index), - verify_ensemble_delete_support(Node, Bucket, Index), + verify_ensemble_delete_support(Nodes, Bucket, Index), pass. %% @private %% @doc Populates then deletes from SC bucket -verify_ensemble_delete_support(Node, Bucket, Index) -> +verify_ensemble_delete_support(Cluster, Bucket, Index) -> %% Yz only supports UTF-8 compatible keys Keys = [<> || N <- lists:seq(1,2000), not lists:any(fun(E) -> E > 127 end,binary_to_list(<>))], - PBC = rt:pbc(Node), + PBC = rt:pbc(hd(Cluster)), lager:info("Writing ~p keys", [length(Keys)]), [ok = rt:pbc_write(PBC, Bucket, Key, Key, "text/plain") || Key <- Keys], + yokozuna_rt:commit(Cluster, Index), %% soft commit wait, then check that last key is indexed lager:info("Search for keys to verify they exist"), - timer:sleep(1000), LKey = lists:last(Keys), rt:wait_until(fun() -> {M, _} = riakc_pb_socket:search(PBC, Index, query_value(LKey)), @@ -64,7 +64,7 @@ verify_ensemble_delete_support(Node, Bucket, Index) -> lager:info("Deleting keys"), [riakc_pb_socket:delete(PBC, Bucket, Key) || Key <- Keys], - timer:sleep(1000), + yokozuna_rt:commit(Cluster, Index), rt:wait_until(fun() -> case riakc_pb_socket:search(PBC, Index, query_value(LKey)) of {ok,{search_results,Res,_,_}} -> diff --git a/tests/yz_extractors.erl b/tests/yz_extractors.erl index 252701afc..33d6ee8e9 100644 --- a/tests/yz_extractors.erl +++ b/tests/yz_extractors.erl @@ -27,10 +27,97 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("riakc/include/riakc.hrl"). +-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))). +-define(TYPE1, <<"extractors_in_paradise">>). +-define(TYPE2, <<"extractors_in_paradiso">>). -define(INDEX1, <<"test_idx1">>). --define(BUCKET1, <<"test_bkt1">>). +-define(BUCKET1, {?TYPE1, <<"test_bkt1">>}). -define(INDEX2, <<"test_idx2">>). --define(BUCKET2, <<"test_bkt2">>). +-define(BUCKET2, {?TYPE2, <<"test_bkt2">>}). +-define(SCHEMANAME, <<"test">>). +-define(TEST_SCHEMA, +<<" + + + + + + + + + + + + + + + + + +_yz_id + + + + + + + + + + + + + + + + + + + + +">>). +-define(TEST_SCHEMA_UPGRADE, +<<" + + + + + + + + + + + + + + + + + + +_yz_id + + + + + + + + + + + + + + + + + + + + +">>). -define(YZ_CAP, {yokozuna, extractor_map_in_cmd}). -define(GET_MAP_RING_MFA, {yz_extractor, get_map, 1}). -define(GET_MAP_MFA, {yz_extractor, get_map, 0}). @@ -38,6 +125,8 @@ -define(YZ_META_EXTRACTORS, {yokozuna, extractors}). -define(YZ_EXTRACTOR_MAP, yokozuna_extractor_map). -define(NEW_EXTRACTOR, {"application/httpheader", yz_noop_extractor}). +-define(EXTRACTOR_CT, element(1, ?NEW_EXTRACTOR)). +-define(EXTRACTOR_MOD, element(2, ?NEW_EXTRACTOR)). -define(DEFAULT_MAP, [{default, yz_noop_extractor}, {"application/json",yz_json_extractor}, {"application/riak_counter", yz_dt_extractor}, @@ -51,6 +140,13 @@ -define(SEQMAX, 20). -define(CFG, [ + {riak_kv, + [ + %% allow AAE to build trees and exchange rapidly + {anti_entropy_build_limit, {100, 1000}}, + {anti_entropy_concurrency, 8}, + {anti_entropy_tick, 1000} + ]}, {yokozuna, [ {enabled, true} @@ -69,22 +165,17 @@ confirm() -> OldPid = rt:pbc(Node), - %% Generate keys, YZ only supports UTF-8 compatible keys - GenKeys = [<> || N <- lists:seq(1, ?SEQMAX), - not lists:any( - fun(E) -> E > 127 end, - binary_to_list(<>))], + GenKeys = yokozuna_rt:gen_keys(?SEQMAX), KeyCount = length(GenKeys), rt:count_calls(Cluster, [?GET_MAP_RING_MFA, ?GET_MAP_MFA]), - yokozuna_rt:write_data(Cluster, OldPid, ?INDEX1, ?BUCKET1, GenKeys), + yokozuna_rt:write_data(Cluster, OldPid, ?INDEX1, + {?SCHEMANAME, ?TEST_SCHEMA}, ?BUCKET1, GenKeys), + yokozuna_rt:commit(Cluster, ?INDEX1), ok = rt:stop_tracing(), - %% wait for solr soft commit - timer:sleep(1100), - {ok, BProps} = riakc_pb_socket:get_bucket(OldPid, ?BUCKET1), N = proplists:get_value(n_val, BProps), @@ -108,9 +199,8 @@ confirm() -> ?assertEqual(?DEFAULT_MAP, get_map(Node)), - %% Custom Register - ExtractMap = register_extractor(Node, element(1, ?NEW_EXTRACTOR), - element(2, ?NEW_EXTRACTOR)), + %% %% Custom Register + ExtractMap = register_extractor(Node, ?EXTRACTOR_CT, ?EXTRACTOR_MOD), ?assertEqual(?EXTRACTMAPEXPECT, ExtractMap), @@ -118,7 +208,8 @@ confirm() -> yokozuna_rt:rolling_upgrade(Cluster, current), [rt:assert_capability(ANode, ?YZ_CAP, true) || ANode <- Cluster], - [rt:assert_supported(rt:capability(ANode, all), ?YZ_CAP, [true, false]) || ANode <- Cluster], + [rt:assert_supported(rt:capability(ANode, all), ?YZ_CAP, [true, false]) || + ANode <- Cluster], %% test query count again yokozuna_rt:verify_num_found_query(Cluster, ?INDEX1, KeyCount), @@ -128,13 +219,13 @@ confirm() -> rt:count_calls(Cluster, [?GET_MAP_RING_MFA, ?GET_MAP_MFA, ?GET_MAP_READTHROUGH_MFA]), - yokozuna_rt:write_data(Cluster, Pid, ?INDEX2, ?BUCKET2, GenKeys), - riakc_pb_socket:stop(Pid), + yokozuna_rt:write_data(Cluster, Pid, ?INDEX2, {?SCHEMANAME, ?TEST_SCHEMA}, + ?BUCKET2, GenKeys), + yokozuna_rt:commit(Cluster, ?INDEX2), ok = rt:stop_tracing(), - %% wait for solr soft commit - timer:sleep(1100), + riakc_pb_socket:stop(Pid), CurrGetMapRingCC = rt:get_call_count(Cluster, ?GET_MAP_RING_MFA), CurrGetMapCC = rt:get_call_count(Cluster, ?GET_MAP_MFA), @@ -148,7 +239,7 @@ confirm() -> ?assert(CurrGetMapCC =< PrevGetMapCC), lager:info("Number of calls to get_map_read_through/0: ~p~n, Number of calls to get_map/0: ~p~n", [CurrGetMapRTCC, CurrGetMapCC]), - ?assert(CurrGetMapRTCC < CurrGetMapCC), + ?assert(CurrGetMapRTCC =< CurrGetMapCC), {_RingVal2, MDVal2} = get_ring_and_cmd_vals(Node, ?YZ_META_EXTRACTORS, ?YZ_EXTRACTOR_MAP), @@ -156,17 +247,11 @@ confirm() -> ?assertEqual(?EXTRACTMAPEXPECT, MDVal2), ?assertEqual(?EXTRACTMAPEXPECT, get_map(Node)), - rt_intercept:add(Node, {yz_noop_extractor, - [{{extract, 1}, extract_httpheader}]}), - rt_intercept:wait_until_loaded(Node), + Packet = <<"GET http://www.google.com HTTP/1.1\n">>, + test_extractor_works(Cluster, Packet), + test_extractor_with_aae_expire(Cluster, ?INDEX2, ?BUCKET2, Packet), - ExpectedExtraction = [{method,'GET'}, - {host,<<"www.google.com">>}, - {uri,<<"/">>}], - ?assertEqual(ExpectedExtraction, - verify_extractor(Node, - <<"GET http://www.google.com HTTP/1.1\n">>, - element(2, ?NEW_EXTRACTOR))), + rt:clean_cluster(Cluster), pass. @@ -194,3 +279,70 @@ get_map(Node) -> verify_extractor(Node, PacketData, Mod) -> rpc:call(Node, yz_extractor, run, [PacketData, Mod]). + +bucket_url({Host,Port}, {BType, BName}, Key) -> + ?FMT("http://~s:~B/types/~s/buckets/~s/keys/~s", + [Host, Port, BType, BName, Key]). + +test_extractor_works(Cluster, Packet) -> + [rt_intercept:add(ANode, {yz_noop_extractor, + [{{extract, 1}, extract_httpheader}]}) || + ANode <- Cluster], + [rt_intercept:wait_until_loaded(ANode) || ANode <- Cluster], + + ExpectedExtraction = [{method, 'GET'}, + {host, <<"www.google.com">>}, + {uri, <<"/">>}], + ?assertEqual(ExpectedExtraction, + verify_extractor(rt:select_random(Cluster), Packet, ?EXTRACTOR_MOD)). + +test_extractor_with_aae_expire(Cluster, Index, Bucket, Packet) -> + %% Now make sure we register extractor across all nodes + [register_extractor(ANode, ?EXTRACTOR_CT, ?EXTRACTOR_MOD) || + ANode <- Cluster], + + Key = <<"google">>, + + {Host, Port} = rt:select_random(yokozuna_rt:host_entries( + rt:connection_info( + Cluster))), + URL = bucket_url({Host, Port}, Bucket, + mochiweb_util:quote_plus(Key)), + + CT = ?EXTRACTOR_CT, + {ok, "204", _, _} = ibrowse:send_req( + URL, [{"Content-Type", CT}], put, Packet), + + yokozuna_rt:commit(Cluster, Index), + + yokozuna_rt:search_expect({Host, Port}, Index, <<"host">>, + <<"www*">>, 1), + + yokozuna_rt:expire_trees(Cluster), + yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()), + + yokozuna_rt:search_expect({Host, Port}, Index, <<"host">>, + <<"www*">>, 1), + + APid = rt:pbc(rt:select_random(Cluster)), + yokozuna_rt:override_schema(APid, Cluster, Index, ?SCHEMANAME, + ?TEST_SCHEMA_UPGRADE), + + {ok, "200", RHeaders, _} = ibrowse:send_req(URL, [{"Content-Type", CT}], get, + [], []), + VC = proplists:get_value("X-Riak-Vclock", RHeaders), + + {ok, "204", _, _} = ibrowse:send_req( + URL, [{"Content-Type", CT}, {"X-Riak-Vclock", VC}], + put, Packet), + yokozuna_rt:commit(Cluster, Index), + + yokozuna_rt:search_expect({Host, Port}, Index, <<"method">>, + <<"GET">>, 1), + + yokozuna_rt:expire_trees(Cluster), + yokozuna_rt:wait_for_full_exchange_round(Cluster, erlang:now()), + + yokozuna_rt:search_expect({Host, Port}, Index, <<"method">>, + <<"GET">>, 1), + riakc_pb_socket:stop(APid). diff --git a/tests/yz_handoff.erl b/tests/yz_handoff.erl index 0c4ac95cd..4c5b1af6d 100644 --- a/tests/yz_handoff.erl +++ b/tests/yz_handoff.erl @@ -78,7 +78,7 @@ confirm() -> Pid = rt:pbc(Node2), yokozuna_rt:write_data(Nodes, Pid, ?INDEX, ?BUCKET, Keys), - timer:sleep(1100), + yokozuna_rt:commit(Nodes, ?INDEX), %% Separate out shards for multiple runs [Shard1|Shards2Rest] = Shards, @@ -99,8 +99,7 @@ confirm() -> join_node = Node1, admin_node = Node2}], - %% Run Shell Script to count/test # of replicas and leave/join - %% nodes from the cluster + %% Run set of leave/join trials and count/test #'s from the cluster [[begin check_data(Nodes, KeyCount, BucketURL, SearchURL, State), check_counts(Pid, KeyCount, BucketURL) diff --git a/tests/yz_schema_change_reset.erl b/tests/yz_schema_change_reset.erl index 9980ff69b..26bc17ea4 100644 --- a/tests/yz_schema_change_reset.erl +++ b/tests/yz_schema_change_reset.erl @@ -131,11 +131,7 @@ confirm() -> [Node1|_RestNodes] = Cluster = rt:build_cluster(4, ?CFG), rt:wait_for_cluster_service(Cluster, yokozuna), - %% Generate keys, YZ only supports UTF-8 compatible keys - GenKeys = [<> || N <- lists:seq(1, ?SEQMAX), - not lists:any( - fun(E) -> E > 127 end, - binary_to_list(<>))], + GenKeys = yokozuna_rt:gen_keys(?SEQMAX), KeyCount = length(GenKeys), lager:info("KeyCount ~p", [KeyCount]), @@ -147,8 +143,7 @@ confirm() -> yokozuna_rt:write_data(Cluster, Pid, ?INDEX, {?SCHEMANAME, ?TEST_SCHEMA}, ?BUCKET1, GenKeys), - timer:sleep(1100), - + yokozuna_rt:commit(Cluster, ?INDEX), lager:info("Create and activate map-based bucket type ~s and tie it to search_index ~s", [?TYPE, ?INDEX]), rt:create_and_activate_bucket_type(Node1, ?TYPE, [{datatype, map}, @@ -165,14 +160,16 @@ confirm() -> "application/json"), {ok, _ObjA} = riakc_pb_socket:put(Pid, NewObj1A, [return_head]), - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), {ok, _ObjB} = riakc_pb_socket:put(Pid, NewObj1B, [return_head]), - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 2), - assert_search(Pid, Cluster, <<"age:26">>, {<<"age">>, <<"26">>}, []), - assert_search(Pid, Cluster, <<"age:99">>, {<<"age">>, <<"99">>}, []), + yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, + <<"age:26">>, {<<"age">>, <<"26">>}, []), + yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, + <<"age:99">>, {<<"age">>, <<"99">>}, []), Map1 = riakc_map:update( {<<"0_foo">>, register}, @@ -197,9 +194,11 @@ confirm() -> <<"keyMap1">>, riakc_map:to_op(Map3)), - timer:sleep(1100), - assert_search(Pid, Cluster, <<"0_foo_register:44ab">>, {<<"0_foo_register">>, - <<"44ab">>}, []), + yokozuna_rt:commit(Cluster, ?INDEX), + yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, + <<"0_foo_register:44ab">>, + {<<"0_foo_register">>, <<"44ab">>}, + []), lager:info("Expire and re-check count before updating schema"), @@ -208,8 +207,7 @@ confirm() -> yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 3), - lager:info("Overwrite schema with updated schema"), - override_schema(Pid, Cluster, ?INDEX, ?SCHEMANAME, ?TEST_SCHEMA_UPDATE), + yokozuna_rt:override_schema(Pid, Cluster, ?INDEX, ?SCHEMANAME, ?TEST_SCHEMA_UPDATE), lager:info("Write and check hello_i at integer per schema update"), @@ -218,10 +216,11 @@ confirm() -> "application/json"), {ok, _Obj2} = riakc_pb_socket:put(Pid, NewObj2, [return_head]), - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 4), - assert_search(Pid, Cluster, <<"hello_i:36">>, {<<"hello_i">>, <<"36">>}, []), + yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, + <<"hello_i:36">>, {<<"hello_i">>, <<"36">>}, []), lager:info("Write and check age at string per schema update"), @@ -230,10 +229,12 @@ confirm() -> "application/json"), {ok, _Obj3} = riakc_pb_socket:put(Pid, NewObj3, [return_head]), + yokozuna_rt:commit(Cluster, ?INDEX), yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 5), - assert_search(Pid, Cluster, <<"age:3jlkjkl">>, - {<<"age">>, <<"3jlkjkl">>}, []), + yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, + <<"age:3jlkjkl">>, {<<"age">>, <<"3jlkjkl">>}, + []), lager:info("Expire and re-check count to make sure we're correctly indexed by the new schema"), @@ -244,7 +245,7 @@ confirm() -> yokozuna_rt:verify_num_found_query(Cluster, ?INDEX, KeyCount + 5), HP = rt:select_random(yokozuna_rt:host_entries(rt:connection_info(Cluster))), - yokozuna_rt:search_expect(HP, ?INDEX, <<"age">>, <<"*">>, 2), + yokozuna_rt:search_expect(HP, ?INDEX, <<"age">>, <<"*">>, 3), lager:info("Re-Put because AAE won't find a diff even though the types have changed, as it only compares based on bkey currently. @@ -252,9 +253,10 @@ confirm() -> with allow_mult=false... no siblings"), {ok, _Obj4} = riakc_pb_socket:put(Pid, NewObj1A, [return_head]), - timer:sleep(1100), + yokozuna_rt:commit(Cluster, ?INDEX), - assert_search(Pid, Cluster, <<"age:26">>, {<<"age">>, <<"26">>}, []), + yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, + <<"age:26">>, {<<"age">>, <<"26">>}, []), lager:info("Re-Put Map data by dec/inc counter to account for *change* and allow previously unindexed counter to be searchable"), @@ -272,11 +274,15 @@ confirm() -> <<"keyMap1">>, riakc_map:to_op(Map5)), - timer:sleep(1100), - assert_search(Pid, Cluster, <<"0_foo_register:44ab">>, {<<"0_foo_register">>, - <<"44ab">>}, []), - assert_search(Pid, Cluster, <<"1_baz_counter:10">>, {<<"1_baz_counter">>, - <<"10">>}, []), + yokozuna_rt:commit(Cluster, ?INDEX), + yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, + <<"0_foo_register:44ab">>, + {<<"0_foo_register">>, <<"44ab">>}, + []), + yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, + <<"1_baz_counter:10">>, + {<<"1_baz_counter">>, <<"10">>}, + []), lager:info("Test nested json searches w/ unsearched fields ignored"), @@ -286,30 +292,13 @@ confirm() -> "application/json"), {ok, _Obj5} = riakc_pb_socket:put(Pid, NewObj5, [return_head]), - timer:sleep(1100), - assert_search(Pid, Cluster, <<"paths.quip:88">>, - {<<"paths.quip">>, <<"88">>}, []), + yokozuna_rt:commit(Cluster, ?INDEX), + yokozuna_rt:assert_search(Pid, Cluster, ?INDEX, + <<"paths.quip:88">>, + {<<"paths.quip">>, <<"88">>}, + []), riakc_pb_socket:stop(Pid), pass. -override_schema(Pid, Cluster, Index, Schema, RawUpdate) -> - ok = riakc_pb_socket:create_search_schema(Pid, Schema, RawUpdate), - yokozuna_rt:wait_for_schema(Cluster, Schema, RawUpdate), - [Node|_] = Cluster, - {ok, _} = rpc:call(Node, yz_index, reload, [Index]). - -assert_search(Pid, Cluster, Search, SearchExpect, Params) -> - F = fun(_) -> - lager:info("Searching ~p and asserting it exists", - [SearchExpect]), - {ok,{search_results,[{_Index,Fields}], _Score, Found}} = - riakc_pb_socket:search(Pid, ?INDEX, Search, Params), - ?assert(lists:member(SearchExpect, Fields)), - case Found of - 1 -> true; - 0 -> false - end - end, - rt:wait_until(Cluster, F).