Skip to content

Commit

Permalink
Fix consumer recovery with server-named queues
Browse files Browse the repository at this point in the history
Port of #1324 to main

Fixes #1238

* Add failing test
* Fix `RecordedConsumer` to allow the empty string for a queue name
* Add `CurrentQueue` to `IChannel` to keep track of the last declared queue name as defined in the AMQP 091 spec
* Fix `RecordedConsumer` to use `CurrentQueue` when passed in name is `string.Empty`
* Fixup API Approval
  • Loading branch information
lukebakken committed Mar 23, 2023
1 parent 2b06f48 commit b438e8e
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 13 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ build/

BenchmarkDotNet.Artifacts/*

APIApproval.Approve.received.txt
projects/Unit/APIApproval.Approve.received.txt
projects/Unit/APIApproval.Approve.*.received.txt

# Visual Studio 2015 cache/options directory
.vs/
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ public interface IChannel : IDisposable
/// </summary>
ulong NextPublishSeqNo { get; }

/// <summary>
/// The name of the last queue declared on this channel.
/// </summary>
/// <remarks>
/// https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.queue-name
/// </remarks>
string CurrentQueue { get; }

/// <summary>
/// Signalled when a Basic.Ack command arrives from the broker.
/// </summary>
Expand Down
67 changes: 60 additions & 7 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,76 @@ public event EventHandler<EventArgs> Recovery
remove { InnerChannel.Recovery -= value; }
}

public IEnumerable<string> ConsumerTags => _recordedConsumerTags;
public IEnumerable<string> ConsumerTags
{
get
{
ThrowIfDisposed();
return _recordedConsumerTags;
}
}

public int ChannelNumber => InnerChannel.ChannelNumber;
public int ChannelNumber
{
get
{
ThrowIfDisposed();
return InnerChannel.ChannelNumber;
}
}

public ShutdownEventArgs CloseReason => InnerChannel.CloseReason;
public ShutdownEventArgs CloseReason
{
get
{
ThrowIfDisposed();
return InnerChannel.CloseReason;
}
}

public IBasicConsumer DefaultConsumer
{
get => InnerChannel.DefaultConsumer;
set => InnerChannel.DefaultConsumer = value;
get
{
ThrowIfDisposed();
return InnerChannel.DefaultConsumer;
}

set
{
ThrowIfDisposed();
InnerChannel.DefaultConsumer = value;
}
}

public bool IsClosed => !IsOpen;

public bool IsOpen => _innerChannel != null && _innerChannel.IsOpen;
public bool IsOpen
{
get
{
ThrowIfDisposed();
return _innerChannel != null && _innerChannel.IsOpen;
}
}

public ulong NextPublishSeqNo
{
get
{
ThrowIfDisposed();
return InnerChannel.NextPublishSeqNo;
}
}

public ulong NextPublishSeqNo => InnerChannel.NextPublishSeqNo;
public string CurrentQueue
{
get
{
ThrowIfDisposed();
return InnerChannel.CurrentQueue;
}
}

internal void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverConsumers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private void UpdateConsumerQueue(string oldName, string newName)
{
if (consumer.Queue == oldName)
{
_recordedConsumers[consumer.ConsumerTag] = RecordedConsumer.WithNewQueueNameTag(newName, consumer);
_recordedConsumers[consumer.ConsumerTag] = RecordedConsumer.WithNewQueueName(newName, consumer);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ public IBasicConsumer DefaultConsumer

public ulong NextPublishSeqNo { get; private set; }

public string CurrentQueue { get; private set; }

public ISession Session { get; private set; }

protected void TakeOver(ChannelBase other)
Expand Down Expand Up @@ -1169,7 +1171,9 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
k.GetReply(ContinuationTimeout);
}
return k.m_result;
QueueDeclareOk result = k.m_result;
CurrentQueue = result.QueueName;
return result;
}


Expand Down
16 changes: 13 additions & 3 deletions projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,21 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer,
}
_consumer = consumer;

if (string.IsNullOrEmpty(queue))
if (queue is null)
{
throw new ArgumentNullException(nameof(queue));
}
_queue = queue;
else
{
if (queue == string.Empty)
{
_queue = _channel.CurrentQueue;
}
else
{
_queue = queue;
}
}

if (string.IsNullOrEmpty(consumerTag))
{
Expand All @@ -89,7 +99,7 @@ public static RecordedConsumer WithNewConsumerTag(string newTag, in RecordedCons
return new RecordedConsumer(old.Channel, old.Consumer, newTag, old.Queue, old.AutoAck, old.Exclusive, old.Arguments);
}

public static RecordedConsumer WithNewQueueNameTag(string newQueueName, in RecordedConsumer old)
public static RecordedConsumer WithNewQueueName(string newQueueName, in RecordedConsumer old)
{
return new RecordedConsumer(old.Channel, old.Consumer, old.ConsumerTag, newQueueName, old.AutoAck, old.Exclusive, old.Arguments);
}
Expand Down
1 change: 1 addition & 0 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ namespace RabbitMQ.Client
int ChannelNumber { get; }
RabbitMQ.Client.ShutdownEventArgs CloseReason { get; }
System.TimeSpan ContinuationTimeout { get; set; }
string CurrentQueue { get; }
RabbitMQ.Client.IBasicConsumer DefaultConsumer { get; set; }
bool IsClosed { get; }
bool IsOpen { get; }
Expand Down
33 changes: 33 additions & 0 deletions projects/Unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,39 @@ public void TestConsumerRecoveryOnClientNamedQueueWithOneRecovery()
}
}

[Fact]
public void TestConsumerRecoveryWithServerNamedQueue()
{
// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1238
using (AutorecoveringConnection c = CreateAutorecoveringConnection())
{
IChannel ch = c.CreateChannel();
QueueDeclareOk queueDeclareResult = ch.QueueDeclare(queue: string.Empty, durable: false, exclusive: true, autoDelete: true, arguments: null);
string qname = queueDeclareResult.QueueName;
Assert.False(string.IsNullOrEmpty(qname));

var cons = new EventingBasicConsumer(ch);
ch.BasicConsume(string.Empty, true, cons);
AssertConsumerCount(ch, qname, 1);

bool queueNameBeforeIsEqual = false;
bool queueNameChangeAfterRecoveryCalled = false;
string qnameAfterRecovery = null;
c.QueueNameChangeAfterRecovery += (source, ea) =>
{
queueNameChangeAfterRecoveryCalled = true;
queueNameBeforeIsEqual = qname.Equals(ea.NameBefore);
qnameAfterRecovery = ea.NameAfter;
};

CloseAndWaitForRecovery(c);

AssertConsumerCount(ch, qnameAfterRecovery, 1);
Assert.True(queueNameChangeAfterRecoveryCalled);
Assert.True(queueNameBeforeIsEqual);
}
}

[Fact]
public void TestConsumerRecoveryWithManyConsumers()
{
Expand Down

0 comments on commit b438e8e

Please sign in to comment.