Skip to content

Commit

Permalink
Spike an exception based approach (#1698)
Browse files Browse the repository at this point in the history
* Spike an exception based approach (misses removing the bool value return type)

* Extend the use of `_confirmSemaphore` to the duration of when exceptions could be caught.

* * Restore how @danielmarbach serialized the publish sequence number.

* Fix test

* * Fix bug in how headers are added to `BasicProperties` that don't already have them.

* Use `ValueTask` as the `BasicPublishAsync` return value.

---------

Co-authored-by: Daniel Marbach <[email protected]>
Co-authored-by: Luke Bakken <[email protected]>
  • Loading branch information
3 people authored Oct 7, 2024
1 parent 04c2aaf commit bc1204b
Show file tree
Hide file tree
Showing 18 changed files with 204 additions and 255 deletions.
10 changes: 6 additions & 4 deletions projects/RabbitMQ.Client/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,12 @@ Task<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
/// <param name="basicProperties">The message properties.</param>
/// <param name="body">The message body.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <returns>Returns <c>true</c> if publisher confirmations enabled and the message was ack-ed</returns>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// TODO
/// Throws <see cref="Exception"/> if a nack or basic.return is returned for the message.
/// </remarks>
ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, string routingKey,
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
Expand All @@ -220,11 +221,12 @@ ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, string routingKe
/// <param name="basicProperties">The message properties.</param>
/// <param name="body">The message body.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <returns>Returns <c>true</c> if publisher confirmations enabled and the message was ack-ed</returns>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// TODO
/// Throws <see cref="Exception"/> if a nack or basic.return is returned for the message.
/// </remarks>
ValueTask<bool> BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
Expand Down
10 changes: 5 additions & 5 deletions projects/RabbitMQ.Client/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static Task<string> BasicConsumeAsync(this IChannel channel,
/// <remarks>
/// The publication occurs with mandatory=false.
/// </remarks>
public static ValueTask<bool> BasicPublishAsync<T>(this IChannel channel,
public static ValueTask BasicPublishAsync<T>(this IChannel channel,
PublicationAddress addr,
T basicProperties,
ReadOnlyMemory<byte> body,
Expand All @@ -94,7 +94,7 @@ public static ValueTask<bool> BasicPublishAsync<T>(this IChannel channel,
/// <remarks>
/// The publication occurs with mandatory=false and empty BasicProperties
/// </remarks>
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
public static ValueTask BasicPublishAsync(this IChannel channel,
string exchange,
string routingKey,
ReadOnlyMemory<byte> body,
Expand All @@ -109,7 +109,7 @@ public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
/// <remarks>
/// The publication occurs with mandatory=false and empty BasicProperties
/// </remarks>
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
public static ValueTask BasicPublishAsync(this IChannel channel,
CachedString exchange,
CachedString routingKey,
ReadOnlyMemory<byte> body,
Expand All @@ -124,7 +124,7 @@ public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
/// <remarks>
/// The publication occurs with empty BasicProperties
/// </remarks>
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
public static ValueTask BasicPublishAsync(this IChannel channel,
string exchange,
string routingKey,
bool mandatory,
Expand All @@ -140,7 +140,7 @@ public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
/// <remarks>
/// The publication occurs with empty BasicProperties
/// </remarks>
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
public static ValueTask BasicPublishAsync(this IChannel channel,
CachedString exchange,
CachedString routingKey,
bool mandatory,
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,15 @@ await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false)
public Task<BasicGetResult?> BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken)
=> InnerChannel.BasicGetAsync(queue, autoAck, cancellationToken);

public ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, string routingKey,
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory,
TProperties basicProperties,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken);

public ValueTask<bool> BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
bool mandatory,
TProperties basicProperties,
ReadOnlyMemory<byte> body,
Expand Down
Loading

0 comments on commit bc1204b

Please sign in to comment.