From ad537f96694dbc9b1fed315493bc1ebd2aa39157 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 8 Apr 2026 16:58:23 -0700 Subject: [PATCH 1/4] Give `peer`-started nodes more than 15s to boot To prevent CI flakes. --- deps/rabbit/test/feature_flags_v2_SUITE.erl | 8 +++++++- deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl | 7 ++++++- .../rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl | 9 ++++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl index 7ba5edb1cd0f..8d196abfa8df 100644 --- a/deps/rabbit/test/feature_flags_v2_SUITE.erl +++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl @@ -159,10 +159,16 @@ start_slave_node(Parent, Config, Testcase, N) -> Name = list_to_atom( rabbit_misc:format("~ts-~b", [Testcase, N])), ct:pal("- Starting slave node `~ts@...`", [Name]), + %% `wait_boot' is set explicitly because the 15-second default is too + %% tight when many nodes are started concurrently by parallel test + %% groups. On timeout, `peer:start/1' calls `erlang:exit(timeout)', + %% which propagates through the linked starter to the test case and + %% aborts the whole CT run. {ok, NodePid, Node} = peer:start(#{ name => Name, connection => standard_io, - shutdown => close + shutdown => close, + wait_boot => 60_000 }), peer:call(NodePid, net_kernel, set_net_ticktime, [5]), diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl index 9b8980498983..96e84268674e 100644 --- a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -2063,10 +2063,15 @@ evaluate_group_ensure_monitors_test(_) -> ok. start_node(Name) -> + %% `wait_boot' is set explicitly because the 15-second default is too + %% tight when this suite runs alongside other parallel CT sets. On + %% timeout, `peer:start/1' calls `erlang:exit(timeout)', which would + %% crash the test case with the bare reason `timeout'. {ok, NodePid, Node} = peer:start(#{ name => Name, connection => standard_io, - shutdown => close + shutdown => close, + wait_boot => 60_000 }), {NodePid, Node}. diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index 2e8002478565..c2a2f24477a9 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -768,6 +768,12 @@ do_start_rabbitmq_node(Config, NodeConfig, I) -> PeerEnv3 end, %% Start the peer node. + %% + %% `wait_boot' is set explicitly because the 15-second default is too + %% tight when many nodes are started concurrently by parallel test + %% groups. On timeout, `peer:start/1' calls `erlang:exit(timeout)', + %% which propagates through the linked starter to the test case and + %% aborts the whole CT run. {ok, PeerPid, Nodename} = peer:start(#{ name => Nodename1, longnames => false, @@ -775,7 +781,8 @@ do_start_rabbitmq_node(Config, NodeConfig, I) -> connection => standard_io, exec => os:find_executable("erl"), args => PeerArgs, - env => PeerEnv}), + env => PeerEnv, + wait_boot => 60_000}), %% Redirect the PeerPid's standard output to a file. %% The standard output of the peer process is a redirect %% from the peer node. We tie the file's process to PeerPid From 8ce2bc58a67b1e5f542026a523194e62aa91cc63 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 8 Apr 2026 17:36:46 -0700 Subject: [PATCH 2/4] Fix a CLI test flake that had to do with... random data looking like valid Erlang terms --- .../test/ctl/decode_command_test.exs | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/deps/rabbitmq_cli/test/ctl/decode_command_test.exs b/deps/rabbitmq_cli/test/ctl/decode_command_test.exs index 24ad4fad61e3..3b1a5ac00c04 100644 --- a/deps/rabbitmq_cli/test/ctl/decode_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/decode_command_test.exs @@ -109,16 +109,20 @@ defmodule DecodeCommandTest do assert {:ok, secret} === @command.run([format_as_erlang_term(output), passphrase], context[:opts]) - # wrong passphrase - assert match?( - {:error, _}, - @command.run([format_as_erlang_term(encrypted), "wrong/passphrase"], context[:opts]) - ) + # Wrong passphrase: decryption usually errors out, but garbage bytes + # can occasionally form a valid Erlang term. Either way, the result + # must not be the original secret. + refute {:ok, secret} === + @command.run( + [format_as_erlang_term(encrypted), "wrong/passphrase"], + context[:opts] + ) - assert match?( - {:error, _}, - @command.run([format_as_erlang_term(output), "wrong passphrase"], context[:opts]) - ) + refute {:ok, secret} === + @command.run( + [format_as_erlang_term(output), "wrong passphrase"], + context[:opts] + ) end defp format_as_erlang_term(value) do From 2ace9b57709eb42ef6704e3669481e1d9d31e44f Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 8 Apr 2026 17:47:06 -0700 Subject: [PATCH 3/4] CLI tests: adopt `await_condition/2` and other eventual matchers in a few places --- .../close_all_connections_command_test.exs | 7 +++++- .../ctl/close_connection_command_test.exs | 4 +++- .../ctl/reconcile_vhosts_command_test.exs | 22 +++++++++++++++---- .../test/ctl/restart_vhost_command_test.exs | 12 ++++++++-- 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/deps/rabbitmq_cli/test/ctl/close_all_connections_command_test.exs b/deps/rabbitmq_cli/test/ctl/close_all_connections_command_test.exs index 279b61703197..401d8d794607 100644 --- a/deps/rabbitmq_cli/test/ctl/close_all_connections_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/close_all_connections_command_test.exs @@ -67,7 +67,12 @@ defmodule CloseAllConnectionsCommandTest do [[vhost: @vhost], [vhost: @vhost], [vhost: @vhost]] = fetch_connection_vhosts(node, nodes) opts = %{node: node, vhost: @vhost, global: false, per_connection_delay: 0, limit: 2} assert {:ok, "Closed 2 connections"} == @command.run(["test"], opts) - Process.sleep(100) + # Closing connections is asynchronous; poll for the expected count + # instead of assuming a fixed delay is enough. + await_condition( + fn -> length(fetch_connection_vhosts(node, nodes)) == 1 end, + 5_000 + ) assert fetch_connection_vhosts(node, nodes) == [[vhost: @vhost]] end) end diff --git a/deps/rabbitmq_cli/test/ctl/close_connection_command_test.exs b/deps/rabbitmq_cli/test/ctl/close_connection_command_test.exs index 45967e6b7afb..13e45cbd9188 100644 --- a/deps/rabbitmq_cli/test/ctl/close_connection_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/close_connection_command_test.exs @@ -48,7 +48,9 @@ defmodule CloseConnectionCommandTest do nodes = @helpers.nodes_in_cluster(node) [[pid: pid]] = fetch_connection_pids(node, nodes) assert :ok == @command.run([:rabbit_misc.pid_to_string(pid), "test"], %{node: node}) - Process.sleep(500) + # Closing a connection is asynchronous; poll instead of assuming a + # fixed delay is enough. + await_no_client_connections(node, 5_000) assert fetch_connection_pids(node, nodes) == [] end) end diff --git a/deps/rabbitmq_cli/test/ctl/reconcile_vhosts_command_test.exs b/deps/rabbitmq_cli/test/ctl/reconcile_vhosts_command_test.exs index 37981cd7c1f1..4cf036f9f76a 100644 --- a/deps/rabbitmq_cli/test/ctl/reconcile_vhosts_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/reconcile_vhosts_command_test.exs @@ -58,8 +58,14 @@ defmodule ReconcileVhostsCommandTest do node_name = context[:opts][:node] force_vhost_failure(node_name, vhost) assert :ok == @command.run([], context[:opts]) - :timer.sleep(1000) - assert match?({:ok, _}, :rpc.call(node_name, :rabbit_vhost_sup_sup, :get_vhost_sup, [vhost])) + # Reconciliation is asynchronous; wait for the vhost supervisor to be + # restarted instead of assuming a fixed delay is enough. + await_condition( + fn -> + match?({:ok, _}, :rpc.call(node_name, :rabbit_vhost_sup_sup, :get_vhost_sup, [vhost])) + end, + 30_000 + ) end # @@ -68,8 +74,16 @@ defmodule ReconcileVhostsCommandTest do defp setup_vhosts do add_vhost(@vhost) - # give the vhost a chance to fully start and initialise - :timer.sleep(1000) + # Wait for the vhost supervisor to be fully started. + await_condition( + fn -> + match?( + {:ok, _}, + :rpc.call(get_rabbit_hostname(), :rabbit_vhost_sup_sup, :get_vhost_sup, [@vhost]) + ) + end, + 30_000 + ) on_exit(fn -> delete_vhost(@vhost) diff --git a/deps/rabbitmq_cli/test/ctl/restart_vhost_command_test.exs b/deps/rabbitmq_cli/test/ctl/restart_vhost_command_test.exs index 789e5d845efc..bb90c8fdc76e 100644 --- a/deps/rabbitmq_cli/test/ctl/restart_vhost_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/restart_vhost_command_test.exs @@ -72,8 +72,16 @@ defmodule RestartVhostCommandTest do defp setup_vhosts do add_vhost(@vhost) - # give the vhost a chance to fully start and initialise - :timer.sleep(1000) + # Wait for the vhost supervisor to be fully started. + await_condition( + fn -> + match?( + {:ok, _}, + :rpc.call(get_rabbit_hostname(), :rabbit_vhost_sup_sup, :get_vhost_sup, [@vhost]) + ) + end, + 30_000 + ) on_exit(fn -> delete_vhost(@vhost) From 746250e642a5a9adbae7b4395f3fa8a668888af4 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 8 Apr 2026 17:50:07 -0700 Subject: [PATCH 4/4] CLI tests: adopt `await_condition/2` and other eventual matchers in a few places (encore, round 2) --- .../test/ctl/list_consumers_command_test.exs | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/deps/rabbitmq_cli/test/ctl/list_consumers_command_test.exs b/deps/rabbitmq_cli/test/ctl/list_consumers_command_test.exs index 475ce52bfef4..93b18bdf2d3e 100644 --- a/deps/rabbitmq_cli/test/ctl/list_consumers_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/list_consumers_command_test.exs @@ -84,7 +84,8 @@ defmodule ListConsumersCommandTest do with_channel(@vhost, fn channel -> {:ok, _} = AMQP.Basic.consume(channel, queue_name, nil, consumer_tag: consumer_tag) - :timer.sleep(100) + # Consumer registration is asynchronous; wait for it to be visible. + await_consumer_count(context[:opts], 1) [[consumer]] = run_command_to_list(@command, [info_keys_s, context[:opts]]) assert info_keys_a == Keyword.keys(consumer) assert consumer[:consumer_tag] == consumer_tag @@ -111,7 +112,8 @@ defmodule ListConsumersCommandTest do {:ok, tag1} = AMQP.Basic.consume(channel, queue_name1) {:ok, tag2} = AMQP.Basic.consume(channel, queue_name2) {:ok, tag3} = AMQP.Basic.consume(channel, queue_name2) - :timer.sleep(100) + # Consumer registration is asynchronous; wait for them to be visible. + await_consumer_count(context[:opts], 3) try do consumers = @@ -149,7 +151,8 @@ defmodule ListConsumersCommandTest do {:ok, tag1} = AMQP.Basic.consume(channel, queue_name) {:ok, tag2} = AMQP.Basic.consume(channel, queue_name) {:ok, tag3} = AMQP.Basic.consume(channel, queue_name) - :timer.sleep(100) + # Consumer registration is asynchronous; wait for them to be visible. + await_consumer_count(context[:opts], 3) try do consumers = @@ -201,7 +204,8 @@ defmodule ListConsumersCommandTest do {:ok, tag1} = AMQP.Basic.consume(channel, queue_name) {:ok, tag2} = AMQP.Basic.consume(channel, queue_name) {:ok, tag3} = AMQP.Basic.consume(channel, queue_name) - :timer.sleep(100) + # Consumer registration is asynchronous; wait for them to be visible. + await_consumer_count(context[:opts], 3) try do consumers = @@ -226,7 +230,8 @@ defmodule ListConsumersCommandTest do ) AMQP.Basic.cancel(channel, tag1) - :timer.sleep(100) + # Consumer cancellation is asynchronous; wait for the count to drop. + await_consumer_count(context[:opts], 2) consumers = List.first( @@ -300,4 +305,14 @@ defmodule ListConsumersCommandTest do ] ], {1, :continue}} end + + defp await_consumer_count(opts, expected_count) do + await_condition( + fn -> + consumers = run_command_to_list(@command, [["queue_name"], opts]) + Enum.reduce(consumers, 0, fn group, acc -> acc + length(group) end) == expected_count + end, + 10_000 + ) + end end