Skip to content

Commit

Permalink
Merge pull request #12268 from rabbitmq/mergify/bp/v4.0.x/pr-11225
Browse files Browse the repository at this point in the history
Reorganize data in the Khepri store (backport #11225)
  • Loading branch information
dumbbell authored Sep 10, 2024
2 parents b73fa52 + adfcb5e commit 067b038
Show file tree
Hide file tree
Showing 17 changed files with 101 additions and 87 deletions.
1 change: 1 addition & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ def all_srcs(name = "all_srcs"):
"include/amqqueue.hrl",
"include/amqqueue_v2.hrl",
"include/internal_user.hrl",
"include/khepri.hrl",
"include/mc.hrl",
"include/rabbit_amqp.hrl",
"include/rabbit_global_counters.hrl",
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/include/khepri.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term “Broadcom”
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-define(KHEPRI_ROOT_PATH, [rabbitmq]).
23 changes: 10 additions & 13 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@

-export([
khepri_route_path/1, khepri_route_path/5,
khepri_route_path_to_args/1,
khepri_route_exchange_path/1
khepri_route_path_to_args/1
]).

%% Recovery is only needed for transient entities. Once mnesia is removed, these
Expand Down Expand Up @@ -202,8 +201,6 @@ create_in_khepri(#binding{source = SrcName,
MaybeSerial = rabbit_exchange:serialise_events(Src),
Serial = rabbit_khepri:transaction(
fun() ->
ExchangePath = khepri_route_exchange_path(SrcName),
ok = khepri_tx:put(ExchangePath, #{type => Src#exchange.type}),
case khepri_tx:get(RoutePath) of
{ok, Set} ->
case sets:is_element(Binding, Set) of
Expand Down Expand Up @@ -1010,18 +1007,21 @@ clear_in_khepri() ->
%% --------------------------------------------------------------

khepri_route_path(
#binding{source = #resource{virtual_host = VHost, name = SrcName},
destination = #resource{kind = Kind, name = DstName},
#binding{source = #resource{virtual_host = VHost,
kind = exchange,
name = SrcName},
destination = #resource{virtual_host = VHost,
kind = Kind,
name = DstName},
key = RoutingKey}) ->
khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey).

khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(SrcName) andalso
?IS_KHEPRI_PATH_CONDITION(Kind) andalso
when ?IS_KHEPRI_PATH_CONDITION(Kind) andalso
?IS_KHEPRI_PATH_CONDITION(DstName) andalso
?IS_KHEPRI_PATH_CONDITION(RoutingKey) ->
[?MODULE, routes, VHost, SrcName, Kind, DstName, RoutingKey].
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, SrcName),
ExchangePath ++ [bindings, Kind, DstName, RoutingKey].

khepri_route_path_to_args(Path) ->
Pattern = khepri_route_path(
Expand All @@ -1047,9 +1047,6 @@ khepri_route_path_to_args(
'$RoutingKey' := RoutingKey}) ->
{VHost, SrcName, Kind, DstName, RoutingKey}.

khepri_route_exchange_path(#resource{virtual_host = VHost, name = SrcName}) ->
[?MODULE, routes, VHost, SrcName].

%% --------------------------------------------------------------
%% Internal
%% --------------------------------------------------------------
Expand Down
8 changes: 1 addition & 7 deletions deps/rabbit/src/rabbit_db_binding_m2k_converter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) ->
%% @private

copy_to_khepri(rabbit_route = Table,
#route{binding = #binding{source = XName} = Binding},
#route{binding = #binding{} = Binding},
State) ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: [~0p] key: ~0p",
Expand All @@ -55,18 +55,12 @@ copy_to_khepri(rabbit_route = Table,
rabbit_db_m2k_converter:with_correlation_id(
fun(CorrId) ->
Extra = #{async => CorrId},
XPath = rabbit_db_binding:khepri_route_exchange_path(XName),
?LOG_DEBUG(
"Mnesia->Khepri data copy: [~0p] path: ~0p corr: ~0p",
[Table, Path, CorrId],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
rabbit_khepri:transaction(
fun() ->
%% Store the exchange's type in the exchange name
%% branch of the tree.
[#exchange{type = XType}] =
rabbit_db_exchange:get_in_khepri_tx(XName),
ok = khepri_tx:put(XPath, #{type => XType}),
%% Add the binding to the set at the binding's
%% path.
Set = case khepri_tx:get(Path) of
Expand Down
18 changes: 8 additions & 10 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-include("include/khepri.hrl").

-export([
get_all/0,
get_all/1,
Expand Down Expand Up @@ -894,15 +896,11 @@ maybe_auto_delete_in_khepri(XName, OnlyDurable) ->
khepri_exchange_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_exchange_path(VHost, Name).

khepri_exchange_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, exchanges, VHost, Name].
khepri_exchange_path(VHost, Name) when ?IS_KHEPRI_PATH_CONDITION(Name) ->
rabbit_db_vhost:khepri_vhost_path(VHost) ++ [exchanges, Name].

khepri_exchange_serial_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_exchange_serial_path(VHost, Name).
khepri_exchange_serial_path(#resource{} = Resource) ->
khepri_exchange_path(Resource) ++ [serial].

khepri_exchange_serial_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, exchange_serials, VHost, Name].
khepri_exchange_serial_path(VHost, Name) ->
khepri_exchange_path(VHost, Name) ++ [serial].
4 changes: 3 additions & 1 deletion deps/rabbit/src/rabbit_db_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-include("include/khepri.hrl").

-export([
table_definitions/0,
set/1,
Expand Down Expand Up @@ -168,4 +170,4 @@ get_consistent_in_khepri(Node) ->
%% -------------------------------------------------------------------

khepri_maintenance_path(Node) when ?IS_KHEPRI_PATH_CONDITION(Node) ->
[?MODULE, maintenance, Node].
?KHEPRI_ROOT_PATH ++ [node_maintenance, Node].
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_db_msup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
-include_lib("khepri/include/khepri.hrl").
-include("mirrored_supervisor.hrl").

-include("include/khepri.hrl").

-export([
create_tables/0,
table_definitions/0,
Expand Down Expand Up @@ -326,8 +328,8 @@ clear_in_khepri() ->
khepri_mirrored_supervisor_path(Group, Id)
when ?IS_KHEPRI_PATH_CONDITION(Group) andalso
?IS_KHEPRI_PATH_CONDITION(Id) ->
[?MODULE, mirrored_supervisor_childspec, Group, Id];
?KHEPRI_ROOT_PATH ++ [mirrored_supervisors, Group, Id];
khepri_mirrored_supervisor_path(Group, Id)
when is_atom(Group) ->
IdPath = Group:id_to_khepri_path(Id),
[?MODULE, mirrored_supervisor_childspec, Group] ++ IdPath.
?KHEPRI_ROOT_PATH ++ [mirrored_supervisors, Group] ++ IdPath.
6 changes: 2 additions & 4 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,5 @@ list_with_possible_retry_in_khepri(Fun) ->
khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_queue_path(VHost, Name).

khepri_queue_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, queues, VHost, Name].
khepri_queue_path(VHost, Name) when ?IS_KHEPRI_PATH_CONDITION(Name) ->
rabbit_db_vhost:khepri_vhost_path(VHost) ++ [queues, Name].
10 changes: 6 additions & 4 deletions deps/rabbit/src/rabbit_db_rtparams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-include("include/khepri.hrl").

-export([set/2, set/4,
get/1,
get_all/0, get_all/2,
Expand Down Expand Up @@ -362,10 +364,10 @@ khepri_rp_path(Key) ->
khepri_global_rp_path(Key).

khepri_global_rp_path(Key) when ?IS_KHEPRI_PATH_CONDITION(Key) ->
[?MODULE, global, Key].
?KHEPRI_ROOT_PATH ++ [runtime_params, Key].

khepri_vhost_rp_path(VHost, Component, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Component) andalso
when ?IS_KHEPRI_PATH_CONDITION(Component) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, per_vhost, VHost, Component, Name].
VHostPath = rabbit_db_vhost:khepri_vhost_path(VHost),
VHostPath ++ [runtime_params, Component, Name].
31 changes: 15 additions & 16 deletions deps/rabbit/src/rabbit_db_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-include("include/khepri.hrl").

-export([create/1,
update/2,
get/1,
Expand Down Expand Up @@ -489,13 +491,12 @@ set_user_permissions_in_khepri(Username, VHostName, UserPermission) ->
end)), rw).

set_user_permissions_in_khepri_tx(Username, VHostName, UserPermission) ->
%% TODO: Check user presence in a transaction.
Path = khepri_user_permission_path(
#if_all{conditions =
[Username,
#if_node_exists{exists = true}]},
Username,
VHostName),
Extra = #{keep_while =>
#{rabbit_db_vhost:khepri_vhost_path(VHostName) =>
#{rabbit_db_user:khepri_user_path(Username) =>
#if_node_exists{exists = true}}},
Ret = khepri_tx:put(
Path, UserPermission, Extra),
Expand Down Expand Up @@ -877,14 +878,13 @@ set_topic_permissions_in_khepri(Username, VHostName, TopicPermission) ->
set_topic_permissions_in_khepri_tx(Username, VHostName, TopicPermission) ->
#topic_permission{topic_permission_key =
#topic_permission_key{exchange = ExchangeName}} = TopicPermission,
%% TODO: Check user presence in a transaction.
Path = khepri_topic_permission_path(
#if_all{conditions =
[Username,
#if_node_exists{exists = true}]},
Username,
VHostName,
ExchangeName),
Extra = #{keep_while =>
#{rabbit_db_vhost:khepri_vhost_path(VHostName) =>
#{rabbit_db_user:khepri_user_path(Username) =>
#if_node_exists{exists = true}}},
Ret = khepri_tx:put(Path, TopicPermission, Extra),
case Ret of
Expand Down Expand Up @@ -1094,15 +1094,14 @@ clear_in_khepri() ->

khepri_user_path(Username)
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
[?MODULE, users, Username].
?KHEPRI_ROOT_PATH ++ [users, Username].

khepri_user_permission_path(Username, VHostName)
when ?IS_KHEPRI_PATH_CONDITION(Username) andalso
?IS_KHEPRI_PATH_CONDITION(VHostName) ->
[?MODULE, users, Username, user_permissions, VHostName].
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
(rabbit_db_vhost:khepri_vhost_path(VHostName) ++
[user_permissions, Username]).

khepri_topic_permission_path(Username, VHostName, Exchange)
when ?IS_KHEPRI_PATH_CONDITION(Username) andalso
?IS_KHEPRI_PATH_CONDITION(VHostName) andalso
?IS_KHEPRI_PATH_CONDITION(Exchange) ->
[?MODULE, users, Username, topic_permissions, VHostName, Exchange].
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
(rabbit_db_exchange:khepri_exchange_path(VHostName, Exchange) ++
[user_permissions, Username]).
12 changes: 4 additions & 8 deletions deps/rabbit/src/rabbit_db_user_m2k_converter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ copy_to_khepri(
[Table, Username, VHost],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
Path = rabbit_db_user:khepri_user_permission_path(
#if_all{conditions =
[Username,
#if_node_exists{exists = true}]},
Username,
VHost),
rabbit_db_m2k_converter:with_correlation_id(
fun(CorrId) ->
Extra = #{keep_while =>
#{rabbit_db_vhost:khepri_vhost_path(VHost) =>
#{rabbit_db_user:khepri_user_path(Username) =>
#if_node_exists{exists = true}},
async => CorrId},
?LOG_DEBUG(
Expand All @@ -103,15 +101,13 @@ copy_to_khepri(
[Table, Username, VHost],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
Path = rabbit_db_user:khepri_topic_permission_path(
#if_all{conditions =
[Username,
#if_node_exists{exists = true}]},
Username,
VHost,
Exchange),
rabbit_db_m2k_converter:with_correlation_id(
fun(CorrId) ->
Extra = #{keep_while =>
#{rabbit_db_vhost:khepri_vhost_path(VHost) =>
#{rabbit_db_user:khepri_user_path(Username) =>
#if_node_exists{exists = true}},
async => CorrId},
?LOG_DEBUG(
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_db_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
-include_lib("rabbit_common/include/logging.hrl").
-include_lib("khepri/include/khepri.hrl").

-include("include/khepri.hrl").
-include("vhost.hrl").

-export([create_or_get/3,
Expand Down Expand Up @@ -532,4 +533,4 @@ clear_in_khepri() ->
%% --------------------------------------------------------------

khepri_vhost_path(VHost) when ?IS_KHEPRI_PATH_CONDITION(VHost) ->
[?MODULE, VHost].
?KHEPRI_ROOT_PATH ++ [vhosts, VHost].
24 changes: 19 additions & 5 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
-include_lib("rabbit_common/include/logging.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-include("include/khepri.hrl").

-export([setup/0,
setup/1,
init/0,
Expand Down Expand Up @@ -145,6 +147,7 @@

dir/0,
info/0,
root_path/0,

handle_async_ret/1,

Expand Down Expand Up @@ -895,6 +898,15 @@ cluster_status_from_khepri() ->
{error, khepri_not_running}
end.

-spec root_path() -> RootPath when
RootPath :: khepri_path:path().
%% @doc Returns the path where RabbitMQ stores every metadata.
%%
%% This path must be prepended to all paths used by RabbitMQ subsystems.

root_path() ->
?KHEPRI_ROOT_PATH.

%% -------------------------------------------------------------------
%% "Proxy" functions to Khepri API.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -1213,10 +1225,11 @@ register_rabbit_index_route_projection() ->
Options = #{type => bag, keypos => #index_route.source_key},
Projection = khepri_projection:new(
rabbit_khepri_index_route, ProjectionFun, Options),
DirectOrFanout = #if_data_matches{pattern = #{type => '$1'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic}}]},
DirectOrFanout = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic}}]},
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
_Exchange = DirectOrFanout,
Expand Down Expand Up @@ -1319,7 +1332,8 @@ register_rabbit_topic_graph_projection() ->
Projection = khepri_projection:new(Name, ProjectionFun, Options),
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
_Exchange = #if_data_matches{pattern = #{type => topic}},
_Exchange = #if_data_matches{
pattern = #exchange{type = topic, _ = '_'}},
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ khepri_consistent_hash_path(#exchange{name = Name}) ->
khepri_consistent_hash_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_consistent_hash_path(VHost, Name).

khepri_consistent_hash_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, exchange_type_consistent_hash_ring_state, VHost, Name].
khepri_consistent_hash_path(VHost, Name) ->
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, Name),
ExchangePath ++ [consistent_hash_ring_state].
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys).
khepri_jms_topic_exchange_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_jms_topic_exchange_path(VHost, Name).

khepri_jms_topic_exchange_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, jms_topic_exchange, VHost, Name].
khepri_jms_topic_exchange_path(VHost, Name) ->
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, Name),
ExchangePath ++ [jms_topic].
Loading

0 comments on commit 067b038

Please sign in to comment.