Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery does not appear to save consumer arguments #1293

Closed
lukebakken opened this issue Jan 25, 2023 · 4 comments · Fixed by #1305
Closed

Recovery does not appear to save consumer arguments #1293

lukebakken opened this issue Jan 25, 2023 · 4 comments · Fixed by #1305
Assignees
Milestone

Comments

@lukebakken
Copy link
Contributor

Reported here: https://groups.google.com/g/rabbitmq-users/c/rHfpTUFudeM

Code provided to reproduce:

ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri(connectionString);

var rmqConnection = factory.CreateConnection();
IModel _channel =  rmqConnection.CreateModel();

add arguments 
var arguments = new Dictionary<string, object>();
arguments.Add("x-stream-offset", _streamOffset);

_channel.BasicConsume(streamId, false, "", arguments, messageReceiver);

Note: I have not yet reproduced this.

@lukebakken lukebakken added this to the 6.4.1 milestone Jan 25, 2023
@lukebakken lukebakken self-assigned this Jan 25, 2023
@DmytroMyslyvets
Copy link

DmytroMyslyvets commented Feb 2, 2023

So, about this issue. As I see it, it would be convenient to have an event that is called before the channels are restored. And the ability to update Consumer arguments. This is what I miss so much..
At the moment, I see only one way is manual recovery after a connection loss.

@Zerpet Zerpet self-assigned this Feb 7, 2023
@Zerpet
Copy link
Contributor

Zerpet commented Feb 13, 2023

I've managed to reproduce the behaviour described. The library records the consumer arguments when AutorecoveringModel.BasicConsume() is called:

public string BasicConsume(
string queue,
bool autoAck,
string consumerTag,
bool noLocal,
bool exclusive,
IDictionary<string, object> arguments,
IBasicConsumer consumer)
{
string result = InnerChannel.BasicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer);
_connection.RecordConsumer(new RecordedConsumer(this, consumer, queue, autoAck, result, exclusive, arguments));
return result;
}

At that time, we keep a copy of the consumer arguments (among other things):

internal void RecordConsumer(in RecordedConsumer consumer)
{
if (!_config.TopologyRecoveryEnabled)
{
return;
}
lock (_recordedEntitiesLock)
{
_recordedConsumers[consumer.ConsumerTag] = consumer;
}
}

If the consumer arguments are modified afterwards, the library "misses" those updates.

This makes using Streams over AMQP a bit painful to use, because consumers are expected to track the offset themselves. However, during topology recovery, consumers don't have a mechanism to update the consumer arguments i.e. x-stream-offset value.

Whilst this is not technically a bug, it is a undesirable situation, because users are forced to opt-out from topology recovery in order to use Streams over AMQP effectively.

@Zerpet
Copy link
Contributor

Zerpet commented Feb 15, 2023

I've been looking at potential options to address this. I'm not convinced that firing an event before recovering a consumer will fix this problem. We could fire an event and pass the consumer arguments (or a reference to the arguments). The first counter-argument that comes to mind is that, we will have to fire an event for each consumer about to recover, wait for the event handler to finish, then proceed with the recovery. This could potentially delay the recovery, if the event handler, for some reason, takes a long time to finish. In addition, what should we do if the event handler throws an exception, or encounters an error? Should we abort the recovery? Should we just move on?

The alternative of disabling topology recovery is not catastrophic, but inconvenient. You can register an event handler for IAutorecoveringConnection.RecoverySucceeded and re-register your consumers (assuming your consumers keep track of their offsets, and assuming your class derives from a consumer class). Something like this, with better error checking and re-try logic.

    public void Subscribe(string queueName)
    {
        _consumerChannel = _connection.CreateModel();
        _consumerChannel.BasicQos(0, 100, false);
        var arguments = new Dictionary<string, object> { { "x-stream-offset", StreamOffset } };
        _consumerTag = _consumerChannel.BasicConsume(queueName, false, "", arguments, this);
    }

// -----

        var myConsumers = new List<MyConsumer>(1);
        myConsumers.Add(new MyConsumer(0L, rmqConnection));
        myConsumers[0].Subscribe(StreamId);

        rmqConnection.RecoverySucceeded += (sender, eventArgs) =>
        {
            foreach (var c in myConsumers)
            {
                c.Subscribe(StreamId);
            }
        };

@michaelklishin
Copy link
Member

@Zerpet or connections or channels could "reload" their argument list from somewhere right before they start topology recovery. Not sure where but I assume all user API actions are on the Connection or IModel instances, so this sounds plausible to me.

@lukebakken lukebakken modified the milestones: 6.4.1, 6.5.0 Feb 21, 2023
Zerpet added a commit that referenced this issue Feb 22, 2023
Related to #1293. In the context of Streams over AMQP,
the consumer must keep track of the consumer offset.
When a consumer subscribes to a stream-type queue, it
may provide a consumer argument to specify a "point"
in the stream to attach to.

This library records consumers, and their arguments,
for topology recovery purposes. When a consumer is
declared, it starts reading at an arbitrary point e.g.
offset=123. After receiving messages, the offset "moves"
forward. In the event of a connection recovery due to
e.g. network error, the consumer is re-declared with
the offset recorded when it was first declared i.e.
offset=123. This is not correct, because the consumer
has received some messages, and the offset value when
it was first declared is not accurate anymore.

This commit adds an event that fire when a consumer is
about to be recovered, but has not started recovering yet.
This event exposes the consumer arguments as a reference
type, allowing an event handler to update the consumer
offset, or any other consumer argument.

Signed-off-by: Aitor Perez Cedres <[email protected]>
Zerpet added a commit that referenced this issue Feb 23, 2023
Related to #1293. In the context of Streams over AMQP,
the consumer must keep track of the consumer offset.
When a consumer subscribes to a stream-type queue, it
may provide a consumer argument to specify a "point"
in the stream to attach to.

This library records consumers, and their arguments,
for topology recovery purposes. When a consumer is
declared, it starts reading at an arbitrary point e.g.
offset=123. After receiving messages, the offset "moves"
forward. In the event of a connection recovery due to
e.g. network error, the consumer is re-declared with
the offset recorded when it was first declared i.e.
offset=123. This is not correct, because the consumer
has received some messages, and the offset value when
it was first declared is not accurate anymore.

This commit adds an event that fire when a consumer is
about to be recovered, but has not started recovering yet.
This event exposes the consumer arguments as a reference
type, allowing an event handler to update the consumer
offset, or any other consumer argument.

Signed-off-by: Aitor Perez Cedres <[email protected]>
Zerpet added a commit that referenced this issue Feb 23, 2023
Related to #1293. In the context of Streams over AMQP,
the consumer must keep track of the consumer offset.
When a consumer subscribes to a stream-type queue, it
may provide a consumer argument to specify a "point"
in the stream to attach to.

This library records consumers, and their arguments,
for topology recovery purposes. When a consumer is
declared, it starts reading at an arbitrary point e.g.
offset=123. After receiving messages, the offset "moves"
forward. In the event of a connection recovery due to
e.g. network error, the consumer is re-declared with
the offset recorded when it was first declared i.e.
offset=123. This is not correct, because the consumer
has received some messages, and the offset value when
it was first declared is not accurate anymore.

This commit adds an event that fire when a consumer is
about to be recovered, but has not started recovering yet.
This event exposes the consumer arguments as a reference
type, allowing an event handler to update the consumer
offset, or any other consumer argument.

Signed-off-by: Aitor Perez Cedres <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants