Skip to content

Commit

Permalink
Support AMQP filter expressions (#12415)
Browse files Browse the repository at this point in the history
* Support AMQP filter expressions

 ## What?

This PR implements the following property filter expressions for AMQP clients
consuming from streams as defined in
[AMQP Filter Expressions Version 1.0 Working Draft 09](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227):
* properties filters [section 4.2.4]
* application-properties filters [section 4.2.5]

String prefix and suffix matching is also supported.

This PR also fixes a bug where RabbitMQ would accept wrong filters.
Specifically, prior to this PR the values of the filter-set's map were
allowed to be symbols. However, "every value MUST be either null or of a
described type which provides the archetype filter."

 ## Why?

This feature adds the ability to RabbitMQ to have multiple concurrent clients
each consuming only a subset of messages while maintaining message order.

This feature also reduces network traffic between RabbitMQ and clients by
only dispatching those messages that the clients are actually interested in.

Note that AMQP filter expressions are more fine grained than the [bloom filter based
stream filtering](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) because
* they do not suffer false positives
* the unit of filtering is per-message instead of per-chunk
* matching can be performed on **multiple** values in the properties and
  application-properties sections
* prefix and suffix matching on the actual values is supported.

Both, AMQP filter expressions and bloom filters can be used together.

 ## How?

If a filter isn't valid, RabbitMQ ignores the filter. RabbitMQ only
replies with filters it actually supports and validated successfully to
comply with:
"The receiving endpoint sets its desired filter, the sending endpoint
[RabbitMQ] sets the filter actually in place (including any filters defaulted at
the node)."

* Delete streams test case

The test suite constructed a wrong filter-set.
Specifically the value of the filter-set didn't use a described type as
mandated by the spec.
Using https://azure.github.io/amqpnetlite/api/Amqp.Types.DescribedValue.html
throws errors that the descriptor can't be encoded. Given that this code
path is already tests via the amqp_filtex_SUITE, this F# test gets
therefore deleted.

* Re-introduce the AMQP filter-set bug

Since clients might rely on the wrong filter-set value type, we support
the bug behind a deprecated feature flag and gradually remove support
this bug.

* Revert "Delete streams test case"

This reverts commit c95cfea.
  • Loading branch information
ansd authored and michaelklishin committed Oct 7, 2024
1 parent 4c991a1 commit f556f11
Show file tree
Hide file tree
Showing 24 changed files with 1,275 additions and 303 deletions.
8 changes: 3 additions & 5 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -737,15 +737,13 @@ translate_terminus_durability(configuration) -> 1;
translate_terminus_durability(unsettled_state) -> 2.

translate_filters(Filters)
when is_map(Filters) andalso
map_size(Filters) == 0 ->
when map_size(Filters) =:= 0 ->
undefined;
translate_filters(Filters)
when is_map(Filters) ->
translate_filters(Filters) ->
{map,
maps:fold(
fun
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
%% special case conversion
Key = sym(K),
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
Expand Down
5 changes: 4 additions & 1 deletion deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,10 @@ wrap_ap_value(V) when is_integer(V) ->
case V < 0 of
true -> {int, V};
false -> {uint, V}
end.
end;
wrap_ap_value(V) when is_number(V) ->
%% AMQP double and Erlang float are both 64-bit.
{double, V}.

%% LOCAL
header_value(durable, undefined) -> false;
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_common/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def all_srcs(name = "all_srcs"):
)
filegroup(
name = "public_hdrs",
srcs = ["include/amqp10_framing.hrl", "include/amqp10_types.hrl"],
srcs = ["include/amqp10_filtex.hrl", "include/amqp10_framing.hrl", "include/amqp10_types.hrl"],
)
filegroup(
name = "private_hdrs",
Expand Down
15 changes: 15 additions & 0 deletions deps/amqp10_common/include/amqp10_filtex.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.


%% AMQP Filter Expressions Version 1.0 Working Draft 09
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227

-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).

-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).
16 changes: 16 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,7 @@ rabbitmq_integration_suite(
name = "amqp_client_SUITE",
size = "large",
additional_beam = [
":test_amqp_utils_beam",
":test_event_recorder_beam",
],
shard_count = 3,
Expand All @@ -1215,6 +1216,16 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "amqp_filtex_SUITE",
additional_beam = [
":test_amqp_utils_beam",
],
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

rabbitmq_integration_suite(
name = "amqp_proxy_protocol_SUITE",
size = "medium",
Expand All @@ -1235,6 +1246,7 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "amqp_auth_SUITE",
additional_beam = [
":test_amqp_utils_beam",
":test_event_recorder_beam",
],
shard_count = 2,
Expand All @@ -1246,6 +1258,9 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "amqp_address_SUITE",
shard_count = 2,
additional_beam = [
":test_amqp_utils_beam",
],
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
Expand Down Expand Up @@ -1358,6 +1373,7 @@ eunit(
":test_clustering_utils_beam",
":test_event_recorder_beam",
":test_rabbit_ct_hook_beam",
":test_amqp_utils_beam",
],
target = ":test_erlang_app",
test_env = {
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ define ct_master.erl
endef

PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit

Expand Down
20 changes: 20 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_filtex.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
Expand Down Expand Up @@ -302,6 +303,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_filtex.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
Expand Down Expand Up @@ -578,6 +580,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_filtex.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
Expand Down Expand Up @@ -2195,3 +2198,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "amqp_filtex_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_filtex_SUITE.erl"],
outs = ["test/amqp_filtex_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app"],
)
erlang_bytecode(
name = "test_amqp_utils_beam",
testonly = True,
srcs = ["test/amqp_utils.erl"],
outs = ["test/amqp_utils.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
1 change: 1 addition & 0 deletions deps/rabbit/ct.test.spec
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
, amqp_auth_SUITE
, amqp_client_SUITE
, amqp_credit_api_v2_SUITE
, amqp_filtex_SUITE
, amqp_proxy_protocol_SUITE
, amqp_system_SUITE
, amqpl_consumer_ack_SUITE
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ message_id(BasicMsg) ->
mc_compat:message_id(BasicMsg).

-spec property(atom(), state()) ->
{utf8, binary()} | undefined.
tagged_value().
property(Property, #?MODULE{protocol = Proto,
data = Data}) ->
Proto:property(Property, Data);
Expand Down
42 changes: 29 additions & 13 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

-define(MESSAGE_ANNOTATIONS_GUESS_SIZE, 100).

-define(SIMPLE_VALUE(V),
-define(IS_SIMPLE_VALUE(V),
is_binary(V) orelse
is_number(V) orelse
is_boolean(V)).
Expand Down Expand Up @@ -145,16 +145,32 @@ property(Prop, #v1{bare_and_footer = Bin,
Props = amqp10_framing:decode(PropsDescribed),
property0(Prop, Props).

property0(correlation_id, #'v1_0.properties'{correlation_id = Corr}) ->
Corr;
property0(message_id, #'v1_0.properties'{message_id = MsgId}) ->
MsgId;
property0(user_id, #'v1_0.properties'{user_id = UserId}) ->
UserId;
property0(subject, #'v1_0.properties'{subject = Subject}) ->
Subject;
property0(to, #'v1_0.properties'{to = To}) ->
To;
property0(message_id, #'v1_0.properties'{message_id = Val}) ->
Val;
property0(user_id, #'v1_0.properties'{user_id = Val}) ->
Val;
property0(to, #'v1_0.properties'{to = Val}) ->
Val;
property0(subject, #'v1_0.properties'{subject = Val}) ->
Val;
property0(reply_to, #'v1_0.properties'{reply_to = Val}) ->
Val;
property0(correlation_id, #'v1_0.properties'{correlation_id = Val}) ->
Val;
property0(content_type, #'v1_0.properties'{content_type = Val}) ->
Val;
property0(content_encoding, #'v1_0.properties'{content_encoding = Val}) ->
Val;
property0(absolute_expiry_time, #'v1_0.properties'{absolute_expiry_time = Val}) ->
Val;
property0(creation_time, #'v1_0.properties'{creation_time = Val}) ->
Val;
property0(group_id, #'v1_0.properties'{group_id = Val}) ->
Val;
property0(group_sequence, #'v1_0.properties'{group_sequence = Val}) ->
Val;
property0(reply_to_group_id, #'v1_0.properties'{reply_to_group_id = Val}) ->
Val;
property0(_Prop, #'v1_0.properties'{}) ->
undefined.

Expand Down Expand Up @@ -454,7 +470,7 @@ message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content) ->
%% the section record format really is terrible
lists:filtermap(fun({{symbol, K}, {_T, V}})
when ?SIMPLE_VALUE(V) ->
when ?IS_SIMPLE_VALUE(V) ->
{true, {K, V}};
(_) ->
false
Expand All @@ -480,7 +496,7 @@ application_properties_as_simple_map(
application_properties_as_simple_map0(Content, L) ->
%% the section record format really is terrible
lists:foldl(fun({{utf8, K}, {_T, V}}, Acc)
when ?SIMPLE_VALUE(V) ->
when ?IS_SIMPLE_VALUE(V) ->
[{K, V} | Acc];
({{utf8, K}, V}, Acc)
when V =:= undefined orelse is_boolean(V) ->
Expand Down
Loading

0 comments on commit f556f11

Please sign in to comment.