Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix two flaky tests #1579

Merged
merged 1 commit into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/InternalConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ internal static class InternalConstants
/// This is not configurable, but was discovered while working on this issue:
/// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/980
/// </summary>
internal const int DefaultRabbitMqMaxClientProvideNameLength = 3652;
internal const int DefaultRabbitMqMaxClientProvideNameLength = 3000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public async Task<string> BasicConsumeAsync(string queue, bool autoAck, string c
{
string resultConsumerTag = await InnerChannel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal,
exclusive, arguments, consumer, cancellationToken)
.ConfigureAwait(false);
.ConfigureAwait(false) ?? throw new InvalidOperationException("basic.consume returned null consumer tag");
var rc = new RecordedConsumer(channel: this, consumer: consumer, consumerTag: resultConsumerTag,
queue: queue, autoAck: autoAck, exclusive: exclusive, arguments: arguments);
await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -400,6 +401,11 @@ await _recordedEntitiesSemaphore.WaitAsync()

private void DoDeleteRecordedConsumer(string consumerTag)
{
if (consumerTag is null)
{
throw new ArgumentNullException(nameof(consumerTag));
}

if (_recordedConsumers.Remove(consumerTag, out RecordedConsumer recordedConsumer))
{
DeleteAutoDeleteQueue(recordedConsumer.Queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
internal abstract class ConsumerDispatcherBase
{
private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer();
private readonly IDictionary<string, IBasicConsumer> _consumers = new ConcurrentDictionary<string, IBasicConsumer>();
private readonly ConcurrentDictionary<string, IBasicConsumer> _consumers = new ConcurrentDictionary<string, IBasicConsumer>();

public IBasicConsumer? DefaultConsumer { get; set; }

Expand Down
71 changes: 32 additions & 39 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public abstract class IntegrationFixture : IAsyncLifetime
protected readonly ushort _consumerDispatchConcurrency = 1;
protected readonly bool _openChannel = true;

public static readonly TimeSpan ShortSpan;
public static readonly TimeSpan WaitSpan;
public static readonly TimeSpan LongWaitSpan;
public static readonly TimeSpan RecoveryInterval = TimeSpan.FromSeconds(2);
Expand All @@ -95,12 +96,14 @@ static IntegrationFixture()

if (s_isRunningInCI)
{
ShortSpan = TimeSpan.FromSeconds(20);
WaitSpan = TimeSpan.FromSeconds(60);
LongWaitSpan = TimeSpan.FromSeconds(120);
RequestedConnectionTimeout = TimeSpan.FromSeconds(4);
}
else
{
ShortSpan = TimeSpan.FromSeconds(10);
WaitSpan = TimeSpan.FromSeconds(30);
LongWaitSpan = TimeSpan.FromSeconds(60);
}
Expand Down Expand Up @@ -160,9 +163,8 @@ public virtual async Task InitializeAsync()
if (IsVerbose)
{
AddCallbackShutdownHandlers();
AddCallbackExceptionHandlers();
}

AddCallbackExceptionHandlers();
}

if (_connFactory.AutomaticRecoveryEnabled)
Expand Down Expand Up @@ -221,59 +223,55 @@ protected virtual void DisposeAssertions()

protected void AddCallbackExceptionHandlers()
{
if (_conn != null)
AddCallbackExceptionHandlers(_conn, _channel);
}

protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel)
{
if (conn != null)
{
_conn.ConnectionRecoveryError += (s, ea) =>
conn.ConnectionRecoveryError += (s, ea) =>
{
_connectionRecoveryException = ea.Exception;

if (IsVerbose)
try
{
_output.WriteLine($"{0} connection recovery exception: {1}",
_testDisplayName, _connectionRecoveryException);
}
catch (InvalidOperationException)
{
try
{
_output.WriteLine($"{0} connection recovery exception: {1}",
_testDisplayName, _connectionRecoveryException);
}
catch (InvalidOperationException)
{
}
}
};

_conn.CallbackException += (o, ea) =>
conn.CallbackException += (o, ea) =>
{
_connectionCallbackException = ea.Exception;

if (IsVerbose)
try
{
_output.WriteLine("{0} connection callback exception: {1}",
_testDisplayName, _connectionCallbackException);
}
catch (InvalidOperationException)
{
try
{
_output.WriteLine("{0} connection callback exception: {1}",
_testDisplayName, _connectionCallbackException);
}
catch (InvalidOperationException)
{
}
}
};
}

if (_channel != null)
if (channel != null)
{
_channel.CallbackException += (o, ea) =>
channel.CallbackException += (o, ea) =>
{
_channelCallbackException = ea.Exception;

if (IsVerbose)
try
{
_output.WriteLine("{0} channel callback exception: {1}",
_testDisplayName, _channelCallbackException);
}
catch (InvalidOperationException)
{
try
{
_output.WriteLine("{0} channel callback exception: {1}",
_testDisplayName, _channelCallbackException);
}
catch (InvalidOperationException)
{
}
}
};
}
Expand Down Expand Up @@ -491,11 +489,6 @@ protected static void AssertPreconditionFailed(ShutdownEventArgs args)
AssertShutdownError(args, Constants.PreconditionFailed);
}

protected static Task AssertRanToCompletion(params Task[] tasks)
{
return DoAssertRanToCompletion(tasks);
}

protected static Task AssertRanToCompletion(IEnumerable<Task> tasks)
{
return DoAssertRanToCompletion(tasks);
Expand Down
27 changes: 15 additions & 12 deletions projects/Test/Common/Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,32 @@ public static async Task CloseConnectionAsync(IConnection conn)
connectionToClose = connections.Where(c0 =>
string.Equals((string)c0.ClientProperties["connection_name"], conn.ClientProvidedName,
StringComparison.InvariantCultureIgnoreCase)).FirstOrDefault();

if (connectionToClose == null)
{
tries++;
}
else
{
break;
}
}
catch (ArgumentNullException)
{
// Sometimes we see this in GitHub CI
tries++;
continue;
}
} while (tries <= 30);

if (connectionToClose != null)
{
try
{
await s_managementClient.CloseConnectionAsync(connectionToClose);
return;
}
catch (UnexpectedHttpStatusCodeException)
{
tries++;
}
}
} while (tries <= 10);

if (connectionToClose == null)
{
throw new InvalidOperationException($"Could not delete connection: '{conn.ClientProvidedName}'");
}

await s_managementClient.CloseConnectionAsync(connectionToClose);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public async Task TestExchangeRecoveryTest()
[Fact]
public async Task TestExchangeToExchangeBindingRecovery()
{
await _channel.ConfirmSelectAsync();

string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName;

string ex_source = GenerateExchangeName();
Expand All @@ -70,7 +72,8 @@ public async Task TestExchangeToExchangeBindingRecovery()
{
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);
await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"));
await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"), mandatory: true);
await _channel.WaitForConfirmsOrDieAsync();
await AssertMessageCountAsync(q, 1);
}
finally
Expand Down
Loading