Skip to content

Commit d19765b

Browse files
committed
queue takeover via config
1 parent 718031b commit d19765b

File tree

4 files changed

+75
-61
lines changed

4 files changed

+75
-61
lines changed

apps/faxe/priv/faxe.schema

+27
Original file line numberDiff line numberDiff line change
@@ -508,14 +508,41 @@
508508
[{default, "q_"}, {datatype, string}]
509509
}.
510510

511+
%% rabbitmq default prefix for every queue
512+
%% the amqp_consume node will use this to ensure this prefix is there for every queue
513+
%% this is mostly to make rabbitmq ha and other policies work automatically
514+
{mapping, "rabbitmq.queue_type", "faxe.rabbitmq.queue_type",
515+
[{default, "classic"} , {datatype, string}]
516+
%, {datatype, {enum,["classic", "quorum", ""]} } ]
517+
}.
518+
511519
%% rabbitmq default prefix for every exchange
512520
%% the amqp_consume node will use this to ensure this prefix is there for every exchange
513521
%% this is mostly to make rabbitmq ha and other policies work automatically
514522
{mapping, "rabbitmq.exchange_prefix", "faxe.rabbitmq.exchange_prefix",
515523
[{default, "x_"}, {datatype, string}]
516524
}.
517525

526+
%% @doc ------------------------------------------------------------------------------
527+
%% Queue takeover
528+
%% -----------------------------------------------------------------------------------
529+
%% rabbitmq default prefix for every takeover-queue
530+
%% the amqp_consume node will use this to ensure this prefix is there for every queue
531+
%% such a prefix is often used to select policies by (the start of) the queue name
532+
{mapping, "rabbitmq.takeover_queue_prefix", "faxe.rabbitmq.takeover_queue_prefix",
533+
[{default, "q_"}, {datatype, string}]
534+
}.
535+
536+
%% type of queue for the takeover queues
537+
{mapping, "rabbitmq.takeover_queue_type", "faxe.rabbitmq.takeover_queue_type",
538+
[{default, "classic"}, {datatype, string}]
539+
%, {datatype, {enum,["classic", "quorum", ""]} } ]
540+
}.
518541

542+
%% enable queue takeover
543+
{mapping, "rabbitmq.takeover", "faxe.rabbitmq.takeover",
544+
[{default, off}, {datatype, flag}, {commented, off}]
545+
}.
519546
%% @doc -------------------------------------------------------------------------------
520547
%% CrateDB defaults (postgreSQL connect)
521548
%% -------------------------------------------------------------------------------

apps/faxe/src/components/esp_amqp_consume.erl

+34-17
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,13 @@ options() -> [
9393
{bindings, string_list, undefined},
9494
{qx_name, string, undefined}, %% not used currently
9595
{queue, any, undefined},
96-
{queue_type, string, <<>>},
97-
{takeover, boolean, false},
96+
{queue_type, string, {rabbitmq, queue_type}},
97+
{takeover, boolean, {rabbitmq, takeover}},
9898
{takeover_timeout, duration, <<"5m">>},
9999
{takeover_queue, string, undefined},
100-
{takeover_queue_prefix, string, {rabbitmq, queue_prefix}},
101-
{takeover_queue_type, string, <<>>},
100+
{takeover_queue_prefix, string, {rabbitmq, takeover_queue_prefix}},
101+
{takeover_queue_type, string, {rabbitmq, takeover_queue_type}},
102+
%% defaults to "vhost"
102103
{takeover_queue_vhost, string, undefined},
103104
{queue_prefix, string, {rabbitmq, queue_prefix}},
104105
{consumer_tag, string, undefined},
@@ -136,7 +137,7 @@ metrics() ->
136137
].
137138

138139
init({GraphId, NodeId} = Idx, _Ins,
139-
#{ host := Host0, port := Port, user := _User, pass := _Pass, vhost := _VHost, queue := Q0, queue_type := QType0,
140+
#{ host := Host0, port := Port, user := _User, pass := _Pass, vhost := VHost, queue := Q0, queue_type := QType0,
140141
exchange := Ex0, qx_name := _QxName, prefetch := Prefetch, routing_key := RoutingKey0, bindings := Bindings0,
141142
dt_field := DTField, dt_format := DTFormat, ssl := UseSSL, include_topic := IncludeTopic,
142143
topic_as := TopicKey, ack_every := AckEvery0, ack_after := AckTimeout0, as := As, consumer_tag := CTag0,
@@ -149,10 +150,10 @@ init({GraphId, NodeId} = Idx, _Ins,
149150
'_parent_pid' := ParentPid, '_parent_subscriptions' := ParentSubs, passive := Passive
150151
} = Opts0) ->
151152

152-
%% lager:warning("init ~p", [Idx]),
153-
153+
%% lager:warning("opts ~p", [Opts0]),
154154
Q = eval_name(Q0, Opts0, Idx),
155-
QType = binary_to_list(QType0),
155+
QName = faxe_util:prefix_binary(Q, QPrefix),
156+
QType = faxe_util:to_list(QType0),
156157
CTag = case CTag0 of undefined -> <<"c_", GraphId/binary, "_", NodeId/binary>>; _ -> CTag0 end,
157158
State0 = #state{
158159
include_topic = IncludeTopic, topic_key = TopicKey, as = As, dedup_queue = memory_queue:new(DedupSize),
@@ -169,17 +170,27 @@ init({GraphId, NodeId} = Idx, _Ins,
169170
true ->
170171
TakeoverQ1 = eval_name(TakeoverQ0, Opts0, Idx),
171172
TakeoverQ2 = faxe_util:prefix_binary(TakeoverQ1, TakeoverQPrefix),
173+
172174
TakeoverTime = faxe_time:duration_to_ms(TakeoverTimeout),
173175
NewTOpts = Opts0#{takeover_queue => TakeoverQ2, takeover_time => TakeoverTime},
174176
TakeoverOpts0 = init_takeover_consumer(self(), Idx, CTag, NewTOpts),
175-
%% start takeover consumer
176-
State01 = State0#state{takeover_consumer_opts = TakeoverOpts0},
177-
TakeoverPid = start_takeover_consumer(State01),
178-
TakeoverData = memory_queue:new(DedupSize),
179-
{TakeoverOpts0, State01#state{takeover_consumer_pid = TakeoverPid, takeover_data = TakeoverData}}
177+
CompQ = #{queue => QName, queue_type => QType, vhost => VHost},
178+
case check_unique_q(TakeoverOpts0, CompQ) of
179+
true ->
180+
%% start takeover consumer
181+
State01 = State0#state{takeover_consumer_opts = TakeoverOpts0},
182+
TakeoverPid = start_takeover_consumer(State01),
183+
TakeoverData = memory_queue:new(DedupSize),
184+
{TakeoverOpts0, State01#state{takeover_consumer_pid = TakeoverPid, takeover_data = TakeoverData}};
185+
false ->
186+
lager:warning(
187+
"cannot start takeover action, because both queues are the same ~p, will continue without takeover",
188+
[CompQ]),
189+
{undefined, State0}
190+
end
180191
end,
181192

182-
TakeoverQType = binary_to_list(TakeoverQType0),
193+
TakeoverQType = faxe_util:to_list(TakeoverQType0),
183194
TakeoverQ = case is_map(TakeoverOpts) andalso is_map_key(queue, TakeoverOpts) of
184195
true -> maps:get(queue, TakeoverOpts);
185196
false -> undefined
@@ -192,7 +203,7 @@ init({GraphId, NodeId} = Idx, _Ins,
192203

193204
Host = binary_to_list(Host0),
194205
%% lager:info("opts before: ~p",[Opts0]),
195-
QName = faxe_util:prefix_binary(Q, QPrefix),
206+
196207
Opts = Opts0#{
197208
host => Host, consumer_tag => CTag,
198209
exchange => faxe_util:prefix_binary(Ex, XPrefix),
@@ -201,7 +212,7 @@ init({GraphId, NodeId} = Idx, _Ins,
201212
routing_key => faxe_util:to_rkey(RoutingKey0),
202213
bindings => faxe_util:to_rkey(Bindings0)
203214
},
204-
%% lager:info("opts: ~p",[Opts]),
215+
lager:info("opts: ~p",[Opts]),
205216

206217
State = State1#state{
207218
opts = Opts, ack_after = AckTimeout, queue_type = QType,
@@ -217,6 +228,12 @@ init({GraphId, NodeId} = Idx, _Ins,
217228

218229
{ok, start_consumer(NewState)}.
219230

231+
check_unique_q(#{queue := Q, queue_type := QType, vhost := VHost}, #{queue := Q, queue_type := QType, vhost := VHost}) ->
232+
false;
233+
check_unique_q(_, _) ->
234+
true.
235+
236+
220237
init_takeover_consumer(ParentPid, IdxParent, CTag,
221238
Opts = #{takeover_queue := Q0, takeover_queue_type := QType0, '_name' := Name, takeover_queue_vhost := TVHost,
222239
vhost := VHost}) ->
@@ -228,7 +245,7 @@ init_takeover_consumer(ParentPid, IdxParent, CTag,
228245
use_flow_ack => false,
229246
%% for the takeover-consumer, the takeover options become the "normal" q opts
230247
queue => Q0,
231-
queue_type => QType0,
248+
queue_type => faxe_util:to_list(QType0),
232249
vhost => TakeoverVHost,
233250
%% cannot use queue prefix
234251
queue_prefix => <<>>,

apps/faxe/src/faxe.erl

+3-32
Original file line numberDiff line numberDiff line change
@@ -489,30 +489,6 @@ start_task(TaskId, Permanent) when Permanent == true orelse Permanent == false -
489489
start_task(TaskId, GraphRunMode, Permanent) ->
490490
start_task(TaskId, #task_modes{run_mode = GraphRunMode, permanent = Permanent}).
491491

492-
%%do_start_task(T = #task{name = Name, definition = GraphDef},
493-
%% #task_modes{concurrency = Concurrency, permanent = Perm} = Mode) ->
494-
%% case dataflow:create_graph(Name, GraphDef) of
495-
%% {ok, Graph} ->
496-
%% try dataflow:start_graph(Graph, Mode) of
497-
%% _ ->
498-
%% faxe_db:save_task(T#task{pid = Graph, last_start = faxe_time:now_date(), permanent = Perm}),
499-
%% Res =
500-
%% case Concurrency of
501-
%% 1 -> {ok, Graph};
502-
%% Num when Num > 1 ->
503-
%% start_concurrent(T, Mode),
504-
%% {ok, Graph}
505-
%% end,
506-
%%%% flow_changed({task, Name, start}),
507-
%% Res
508-
%% catch
509-
%% _:_ = E ->
510-
%% lager:error("graph_start_error: ~p",[E]),
511-
%% {error, {graph_start_error, E}}
512-
%% end;
513-
%% {error, {already_started, _Pid}} -> {error, already_started}
514-
%% end.
515-
516492
start_concurrent(Task = #task{}, #task_modes{concurrency = Con} = Mode) ->
517493
F = fun(Num) -> start_copy(Task, Mode, Num) end,
518494
lists:map(F, lists:seq(2, Con)).
@@ -599,16 +575,11 @@ get_group_leader([T=#task{group_leader = true} | _R]) ->
599575

600576
-spec stop_task(integer()|binary()|#task{}) -> ok.
601577
%% @doc just stop the graph process and its children
602-
stop_task(_T=#task{pid = Graph}) when is_pid(Graph) ->
603-
case is_process_alive(Graph) of
604-
true ->
605-
df_graph:stop(Graph);
606-
false ->
607-
{error, not_running}
608-
end;
609-
578+
stop_task(T=#task{pid = _Graph}) ->
579+
stop_task(T, false);
610580
stop_task(TaskId) ->
611581
stop_task(TaskId, false).
582+
612583
-spec stop_task(#task{}|integer()|binary(), true|false) -> ok.
613584
stop_task(T = #task{}, Permanent) ->
614585
do_stop_task(T, Permanent);

apps/faxe/src/faxe_dfs.erl

+11-12
Original file line numberDiff line numberDiff line change
@@ -218,15 +218,15 @@ eval_options([#{name := OptName, type := OptType, default := CKey}|Opts], Acc) w
218218
eval_options([{OptName, OptType, CKey}|Opts], Acc);
219219
eval_options([{OptName, OptType, CKey}|Opts], Acc) when is_tuple(CKey) ->
220220
%% lager:info("call conf_val with: ~p for: ~p with Acc: ~p", [CKey, {OptName, OptType, CKey}, Acc]),
221-
OptVal = conf_val(CKey),
221+
OptVal = conf_val(CKey, OptType),
222222
eval_options(Opts, Acc ++ [{OptName, OptType, OptVal}]);
223223
eval_options([Opt|Opts], Acc) ->
224224
eval_options(Opts, Acc ++ [Opt]).
225225

226-
conf_val({ConfigKey, ConfigSubKey}) ->
226+
conf_val({ConfigKey, ConfigSubKey}, OptionType) ->
227227
ConfigData = application:get_env(faxe, ConfigKey, []),
228-
conv_config_val(proplists:get_value(ConfigSubKey, ConfigData));
229-
conf_val({ConfigKey, ConfigSubKey, ConfigSubSubKey}) ->
228+
conv_config_val(proplists:get_value(ConfigSubKey, ConfigData), OptionType);
229+
conf_val({ConfigKey, ConfigSubKey, ConfigSubSubKey}, OptionType) ->
230230
Value =
231231
case application:get_env(faxe, ConfigKey, []) of
232232
[] -> [];
@@ -236,16 +236,15 @@ conf_val({ConfigKey, ConfigSubKey, ConfigSubSubKey}) ->
236236
ConfSubData when is_list(ConfSubData) -> proplists:get_value(ConfigSubSubKey, ConfSubData)
237237
end
238238
end,
239-
conv_config_val(Value).
239+
conv_config_val(Value, OptionType).
240240

241241
%% @doc we know config does not give us any binary values, but in faxe we only use
242-
%% binaries for strings, so we convert any string(list) val to binary
243-
%% at the moment this is dangerous, as we do not know exactly whether the list was meant
244-
%% to be a string
245-
conv_config_val([]) -> [];
246-
conv_config_val(Val) when is_list(Val) ->
247-
list_to_binary(Val);
248-
conv_config_val(Val) -> Val.
242+
%% binaries for strings, so we convert any binary/string option type to binary
243+
conv_config_val([], binary) -> <<>>;
244+
conv_config_val([], string) -> <<>>;
245+
conv_config_val(Val, binary) -> faxe_util:to_bin(Val);
246+
conv_config_val(Val, string) -> faxe_util:to_bin(Val);
247+
conv_config_val(Val, _Type) -> Val.
249248

250249
node_id({Name, Id}) when is_binary(Name) andalso is_integer(Id) ->
251250
<<Name/binary, (integer_to_binary(Id))/binary>>.

0 commit comments

Comments
 (0)