Skip to content

Commit e747281

Browse files
committed
vhost_prefix
1 parent 7781ba9 commit e747281

File tree

3 files changed

+29
-11
lines changed

3 files changed

+29
-11
lines changed

apps/faxe/priv/faxe.schema

+6
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,12 @@
523523
[{default, "x_"}, {datatype, string}]
524524
}.
525525

526+
%% rabbitmq default prefix for every vhost
527+
%% only applies, if vhost is NOT the default: '/'
528+
{mapping, "rabbitmq.vhost_prefix", "faxe.rabbitmq.vhost_prefix",
529+
[{default, "dh_"}, {datatype, string}]
530+
}.
531+
526532
%% @doc ------------------------------------------------------------------------------
527533
%% Queue takeover
528534
%% -----------------------------------------------------------------------------------

apps/faxe/src/components/esp_amqp_consume.erl

+15-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
check_options/0,
2727
handle_ack/3]).
2828

29-
29+
-define(VHOST_DEFAULT, <<"/">>).
3030
-define(QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<>>]).
3131

3232
%% state for direct publish mode
@@ -89,6 +89,8 @@ options() -> [
8989
{pass, string, {amqp, pass}},
9090
{ssl, is_set, false},
9191
{vhost, string, <<"/">>},
92+
%% only applies, if vhost is NOT the default: '/', applies to both vhost params
93+
{vhost_prefix, string, {rabbitmq, vhost_prefix}},
9294
{routing_key, string, undefined},
9395
{bindings, string_list, undefined},
9496
{qx_name, string, undefined}, %% not used currently
@@ -137,9 +139,9 @@ metrics() ->
137139
].
138140

139141
init({GraphId, NodeId} = Idx, _Ins,
140-
#{ host := Host0, port := Port, user := _User, pass := _Pass, vhost := VHost, queue := Q0, queue_type := QType0,
142+
#{ host := Host0, port := Port, user := _User, pass := _Pass, vhost := VHost0, queue := Q0, queue_type := QType0,
141143
exchange := Ex0, qx_name := _QxName, prefetch := Prefetch, routing_key := RoutingKey0, bindings := Bindings0,
142-
dt_field := DTField, dt_format := DTFormat, ssl := UseSSL, include_topic := IncludeTopic,
144+
dt_field := DTField, dt_format := DTFormat, ssl := UseSSL, include_topic := IncludeTopic, vhost_prefix := VHostPrefix,
143145
topic_as := TopicKey, ack_every := AckEvery0, ack_after := AckTimeout0, as := As, consumer_tag := CTag0,
144146
queue_prefix := QPrefix, root_exchange := RExchange, exchange_prefix := XPrefix
145147
, use_flow_ack := FlowAck, clean_field_names := Clean,
@@ -150,6 +152,7 @@ init({GraphId, NodeId} = Idx, _Ins,
150152
'_parent_pid' := ParentPid, '_parent_subscriptions' := ParentSubs, passive := Passive
151153
} = Opts0) ->
152154

155+
VHost = case VHost0 of ?VHOST_DEFAULT -> VHost0; _ -> faxe_util:prefix_binary(VHost0, VHostPrefix) end,
153156
%% lager:warning("opts ~p", [Opts0]),
154157
Q = eval_name(Q0, Opts0, Idx),
155158
QName = faxe_util:prefix_binary(Q, QPrefix),
@@ -205,14 +208,14 @@ init({GraphId, NodeId} = Idx, _Ins,
205208
%% lager:info("opts before: ~p",[Opts0]),
206209

207210
Opts = Opts0#{
208-
host => Host, consumer_tag => CTag,
211+
host => Host, consumer_tag => CTag, vhost => VHost,
209212
exchange => faxe_util:prefix_binary(Ex, XPrefix),
210213
root_exchange => case RExchange of undefined -> undefined; _ -> RExchange end,
211214
queue => QName, queue_type => QType,
212215
routing_key => faxe_util:to_rkey(RoutingKey0),
213216
bindings => faxe_util:to_rkey(Bindings0)
214217
},
215-
%% lager:info("opts: ~p",[Opts]),
218+
lager:info("opts: ~p",[Opts]),
216219

217220
State = State1#state{
218221
opts = Opts, ack_after = AckTimeout, queue_type = QType,
@@ -236,8 +239,13 @@ check_unique_q(_, _) ->
236239

237240
init_takeover_consumer(ParentPid, IdxParent, CTag,
238241
Opts = #{takeover_queue := Q0, takeover_queue_type := QType0, '_name' := Name, takeover_queue_vhost := TVHost,
239-
vhost := VHost}) ->
240-
TakeoverVHost = case TVHost of undefined -> VHost; _ -> TVHost end,
242+
vhost := VHost, vhost_prefix := VHostPrefix}) ->
243+
TakeoverVHost0 = case TVHost of undefined -> VHost; _ -> TVHost end,
244+
TakeoverVHost =
245+
case TakeoverVHost0 of
246+
?VHOST_DEFAULT -> TakeoverVHost0;
247+
_ -> faxe_util:prefix_binary(TakeoverVHost0, VHostPrefix)
248+
end,
241249
NewOpts = Opts#{
242250
%% do not use esq for takeover
243251
safe => false,

apps/faxe/src/components/esp_amqp_publish.erl

+8-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
%% API
2121
-export([init/3, process/3, options/0, handle_info/2, shutdown/1, metrics/0, check_options/0]).
2222

23+
-define(VHOST_DEFAULT, <<"/">>).
24+
2325
%% state for direct publish mode
2426
-record(state, {
2527
client,
@@ -45,7 +47,9 @@ options() -> [
4547
{port, integer, {amqp, port}},
4648
{user, string, {amqp, user}},
4749
{pass, string, {amqp, pass}},
48-
{vhost, string, <<"/">>},
50+
{vhost, string, ?VHOST_DEFAULT},
51+
%% only applies, if vhost is NOT the default: '/'
52+
{vhost_prefix, string, {rabbitmq, vhost_prefix}},
4953
{routing_key, string, undefined},
5054
{routing_key_lambda, lambda, undefined},
5155
{routing_key_field, string, undefined},
@@ -80,11 +84,11 @@ metrics() ->
8084
].
8185

8286
init({_GraphId, _NodeId} = Idx, _Ins,
83-
#{ host := Host0, port := Port, user := _User, pass := _Pass, vhost := _VHost, exchange := Ex,
87+
#{ host := Host0, port := Port, user := _User, pass := _Pass, vhost := VHost0, vhost_prefix := VHostPrefix, exchange := Ex,
8488
routing_key := RoutingKey, routing_key_lambda := RkLambda, routing_key_field := RkField, ssl := UseSSL,
8589
persistent := _Persist} = Opts0) ->
86-
87-
Opts1 = #{safe_mode := SafeMode, use_queue := UseInternalQueue} = eval_qos(Opts0),
90+
VHost = case VHost0 of ?VHOST_DEFAULT -> VHost0; _ -> faxe_util:prefix_binary(VHost0, VHostPrefix) end,
91+
Opts1 = #{safe_mode := SafeMode, use_queue := UseInternalQueue} = eval_qos(Opts0#{vhost => VHost}),
8892

8993
process_flag(trap_exit, true),
9094
Host = binary_to_list(Host0),

0 commit comments

Comments
 (0)