diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl index 60d78e5d46df..94ad1f999746 100644 --- a/deps/rabbit/test/feature_flags_v2_SUITE.erl +++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl @@ -171,10 +171,16 @@ start_slave_node(Parent, Config, Testcase, N) -> Name = list_to_atom( rabbit_misc:format("~ts-~b", [Testcase, N])), ct:pal("- Starting slave node `~ts@...`", [Name]), + %% `wait_boot' is set explicitly because the 15-second default is too + %% tight when many nodes are started concurrently by parallel test + %% groups. On timeout, `peer:start/1' calls `erlang:exit(timeout)', + %% which propagates through the linked starter to the test case and + %% aborts the whole CT run. {ok, NodePid, Node} = peer:start(#{ name => Name, connection => standard_io, - shutdown => close + shutdown => close, + wait_boot => 60_000 }), peer:call(NodePid, net_kernel, set_net_ticktime, [5]), diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl index d09d60b9de5c..35524457d932 100644 --- a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -1911,10 +1911,15 @@ list_nodes(MapState) -> lists:sort(?MOD:list_nodes(state(MapState))). start_node(Name) -> + %% `wait_boot' is set explicitly because the 15-second default is too + %% tight when this suite runs alongside other parallel CT sets. On + %% timeout, `peer:start/1' calls `erlang:exit(timeout)', which would + %% crash the test case with the bare reason `timeout'. {ok, NodePid, Node} = peer:start(#{ name => Name, connection => standard_io, - shutdown => close + shutdown => close, + wait_boot => 60_000 }), {NodePid, Node}. diff --git a/deps/rabbitmq_cli/test/ctl/close_all_connections_command_test.exs b/deps/rabbitmq_cli/test/ctl/close_all_connections_command_test.exs index bda156823ade..f71a3ae37183 100644 --- a/deps/rabbitmq_cli/test/ctl/close_all_connections_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/close_all_connections_command_test.exs @@ -67,7 +67,12 @@ defmodule CloseAllConnectionsCommandTest do [[vhost: @vhost], [vhost: @vhost], [vhost: @vhost]] = fetch_connection_vhosts(node, nodes) opts = %{node: node, vhost: @vhost, global: false, per_connection_delay: 0, limit: 2} assert {:ok, "Closed 2 connections"} == @command.run(["test"], opts) - Process.sleep(100) + # Closing connections is asynchronous; poll for the expected count + # instead of assuming a fixed delay is enough. + await_condition( + fn -> length(fetch_connection_vhosts(node, nodes)) == 1 end, + 5_000 + ) assert fetch_connection_vhosts(node, nodes) == [[vhost: @vhost]] end) end diff --git a/deps/rabbitmq_cli/test/ctl/close_connection_command_test.exs b/deps/rabbitmq_cli/test/ctl/close_connection_command_test.exs index 362b71600956..4342a8eef695 100644 --- a/deps/rabbitmq_cli/test/ctl/close_connection_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/close_connection_command_test.exs @@ -48,7 +48,9 @@ defmodule CloseConnectionCommandTest do nodes = @helpers.nodes_in_cluster(node) [[pid: pid]] = fetch_connection_pids(node, nodes) assert :ok == @command.run([:rabbit_misc.pid_to_string(pid), "test"], %{node: node}) - Process.sleep(500) + # Closing a connection is asynchronous; poll instead of assuming a + # fixed delay is enough. + await_no_client_connections(node, 5_000) assert fetch_connection_pids(node, nodes) == [] end) end diff --git a/deps/rabbitmq_cli/test/ctl/decode_command_test.exs b/deps/rabbitmq_cli/test/ctl/decode_command_test.exs index 4f52c6cd309c..84fa088bad4c 100644 --- a/deps/rabbitmq_cli/test/ctl/decode_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/decode_command_test.exs @@ -109,16 +109,20 @@ defmodule DecodeCommandTest do assert {:ok, secret} === @command.run([format_as_erlang_term(output), passphrase], context[:opts]) - # wrong passphrase - assert match?( - {:error, _}, - @command.run([format_as_erlang_term(encrypted), "wrong/passphrase"], context[:opts]) - ) + # Wrong passphrase: decryption usually errors out, but garbage bytes + # can occasionally form a valid Erlang term. Either way, the result + # must not be the original secret. + refute {:ok, secret} === + @command.run( + [format_as_erlang_term(encrypted), "wrong/passphrase"], + context[:opts] + ) - assert match?( - {:error, _}, - @command.run([format_as_erlang_term(output), "wrong passphrase"], context[:opts]) - ) + refute {:ok, secret} === + @command.run( + [format_as_erlang_term(output), "wrong passphrase"], + context[:opts] + ) end defp format_as_erlang_term(value) do diff --git a/deps/rabbitmq_cli/test/ctl/list_consumers_command_test.exs b/deps/rabbitmq_cli/test/ctl/list_consumers_command_test.exs index f4947786fcbd..b2b92ce46dba 100644 --- a/deps/rabbitmq_cli/test/ctl/list_consumers_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/list_consumers_command_test.exs @@ -84,7 +84,8 @@ defmodule ListConsumersCommandTest do with_channel(@vhost, fn channel -> {:ok, _} = AMQP.Basic.consume(channel, queue_name, nil, consumer_tag: consumer_tag) - :timer.sleep(100) + # Consumer registration is asynchronous; wait for it to be visible. + await_consumer_count(context[:opts], 1) [[consumer]] = run_command_to_list(@command, [info_keys_s, context[:opts]]) assert info_keys_a == Keyword.keys(consumer) assert consumer[:consumer_tag] == consumer_tag @@ -111,7 +112,8 @@ defmodule ListConsumersCommandTest do {:ok, tag1} = AMQP.Basic.consume(channel, queue_name1) {:ok, tag2} = AMQP.Basic.consume(channel, queue_name2) {:ok, tag3} = AMQP.Basic.consume(channel, queue_name2) - :timer.sleep(100) + # Consumer registration is asynchronous; wait for them to be visible. + await_consumer_count(context[:opts], 3) try do consumers = @@ -149,7 +151,8 @@ defmodule ListConsumersCommandTest do {:ok, tag1} = AMQP.Basic.consume(channel, queue_name) {:ok, tag2} = AMQP.Basic.consume(channel, queue_name) {:ok, tag3} = AMQP.Basic.consume(channel, queue_name) - :timer.sleep(100) + # Consumer registration is asynchronous; wait for them to be visible. + await_consumer_count(context[:opts], 3) try do consumers = @@ -201,7 +204,8 @@ defmodule ListConsumersCommandTest do {:ok, tag1} = AMQP.Basic.consume(channel, queue_name) {:ok, tag2} = AMQP.Basic.consume(channel, queue_name) {:ok, tag3} = AMQP.Basic.consume(channel, queue_name) - :timer.sleep(100) + # Consumer registration is asynchronous; wait for them to be visible. + await_consumer_count(context[:opts], 3) try do consumers = @@ -226,7 +230,8 @@ defmodule ListConsumersCommandTest do ) AMQP.Basic.cancel(channel, tag1) - :timer.sleep(100) + # Consumer cancellation is asynchronous; wait for the count to drop. + await_consumer_count(context[:opts], 2) consumers = List.first( @@ -300,4 +305,14 @@ defmodule ListConsumersCommandTest do ] ], {1, :continue}} end + + defp await_consumer_count(opts, expected_count) do + await_condition( + fn -> + consumers = run_command_to_list(@command, [["queue_name"], opts]) + Enum.reduce(consumers, 0, fn group, acc -> acc + length(group) end) == expected_count + end, + 10_000 + ) + end end diff --git a/deps/rabbitmq_cli/test/ctl/reconcile_vhosts_command_test.exs b/deps/rabbitmq_cli/test/ctl/reconcile_vhosts_command_test.exs index 13d01ca8bcdb..7379745b2e6a 100644 --- a/deps/rabbitmq_cli/test/ctl/reconcile_vhosts_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/reconcile_vhosts_command_test.exs @@ -58,8 +58,14 @@ defmodule ReconcileVhostsCommandTest do node_name = context[:opts][:node] force_vhost_failure(node_name, vhost) assert :ok == @command.run([], context[:opts]) - :timer.sleep(1000) - assert match?({:ok, _}, :rpc.call(node_name, :rabbit_vhost_sup_sup, :get_vhost_sup, [vhost])) + # Reconciliation is asynchronous; wait for the vhost supervisor to be + # restarted instead of assuming a fixed delay is enough. + await_condition( + fn -> + match?({:ok, _}, :rpc.call(node_name, :rabbit_vhost_sup_sup, :get_vhost_sup, [vhost])) + end, + 30_000 + ) end # @@ -68,8 +74,16 @@ defmodule ReconcileVhostsCommandTest do defp setup_vhosts do add_vhost(@vhost) - # give the vhost a chance to fully start and initialise - :timer.sleep(1000) + # Wait for the vhost supervisor to be fully started. + await_condition( + fn -> + match?( + {:ok, _}, + :rpc.call(get_rabbit_hostname(), :rabbit_vhost_sup_sup, :get_vhost_sup, [@vhost]) + ) + end, + 30_000 + ) on_exit(fn -> delete_vhost(@vhost) diff --git a/deps/rabbitmq_cli/test/ctl/restart_vhost_command_test.exs b/deps/rabbitmq_cli/test/ctl/restart_vhost_command_test.exs index c5e3c3ddb176..18942667794d 100644 --- a/deps/rabbitmq_cli/test/ctl/restart_vhost_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/restart_vhost_command_test.exs @@ -72,8 +72,16 @@ defmodule RestartVhostCommandTest do defp setup_vhosts do add_vhost(@vhost) - # give the vhost a chance to fully start and initialise - :timer.sleep(1000) + # Wait for the vhost supervisor to be fully started. + await_condition( + fn -> + match?( + {:ok, _}, + :rpc.call(get_rabbit_hostname(), :rabbit_vhost_sup_sup, :get_vhost_sup, [@vhost]) + ) + end, + 30_000 + ) on_exit(fn -> delete_vhost(@vhost) diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index 7c50a758af1a..7b85a9f0bbb5 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -762,6 +762,7 @@ do_start_rabbitmq_node(Config, NodeConfig, I) -> true -> ["RABBITMQ_KEEP_PID_FILE_ON_EXIT=yes" | ExtraArgs2]; _ -> ExtraArgs2 end, +<<<<<<< HEAD ExtraArgs4 = case WithPlugins of false -> ExtraArgs3; _ -> ["NOBUILD=1" | ExtraArgs3] @@ -882,6 +883,66 @@ do_start_rabbitmq_node(Config, NodeConfig, I) -> _ = rabbit_ct_helpers:exec([RunCmd | AbortCmd]), {skip, "Failed to initialize RabbitMQ"} end +======= + PeerEnv = if + UseSecondaryDist -> + [{"ERL_LIBS", filename:join(?config(secondary_dist, Config), "plugins")}|PeerEnv3]; + UseSecondaryUmbrella -> + [{"ERL_LIBS", ?config(secondary_erlang_mk_depsdir, Config)}|PeerEnv3]; + true -> + PeerEnv3 + end, + %% Start the peer node. + %% + %% `wait_boot' is set explicitly because the 15-second default is too + %% tight when many nodes are started concurrently by parallel test + %% groups. On timeout, `peer:start/1' calls `erlang:exit(timeout)', + %% which propagates through the linked starter to the test case and + %% aborts the whole CT run. + {ok, PeerPid, Nodename} = peer:start(#{ + name => Nodename1, + longnames => false, + host => HostName1, + connection => standard_io, + exec => os:find_executable("erl"), + args => PeerArgs, + env => PeerEnv, + wait_boot => 60_000}), + %% Redirect the PeerPid's standard output to a file. + %% The standard output of the peer process is a redirect + %% from the peer node. We tie the file's process to PeerPid + %% so that it stays open until PeerPid exits. + %% + %% Both stdio and stderr are redirected to this file, + %% unlike when using make run-broker and friends. + _ = sys:replace_state(PeerPid, fun(St) -> + StdoutPath = filename:join([NodeDir, "log", "startup_log"]), + ok = filelib:ensure_dir(StdoutPath), + {ok, StdoutFd} = file:open(StdoutPath, [write, {encoding, utf8}]), + group_leader(StdoutFd, self()), + St + end), + %% Make sure the node runs in the right directory. + SetCwdPath = case UseSecondaryDist of + true -> ?config(secondary_dist, Config); + false -> SrcDir + end, + ok = peer:call(PeerPid, file, set_cwd, [SetCwdPath]), + %% Boot RabbitMQ. + try peer:call(PeerPid, rabbit, boot, [], 60_000) of + ok -> + store_peer_pid(Nodename, PeerPid), + NodeConfig1 = rabbit_ct_helpers:set_config( + NodeConfig, + [ + {use_secondary_umbrella, UseSecondaryUmbrella orelse UseSecondaryDist} + ]), + query_node(Config, NodeConfig1) + catch Class:Error -> + %% @todo Get rid of {skip,_} returns. + peer:stop(PeerPid), + {skip, {Class, Error}} +>>>>>>> d0204ef89f (Give `peer`-started nodes more than 15s to boot) end. query_node(Config, NodeConfig) ->