Skip to content

Commit

Permalink
Implement basic.return support.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Oct 3, 2024
1 parent a12d192 commit ec993b3
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 131 deletions.
6 changes: 6 additions & 0 deletions projects/RabbitMQ.Client/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,11 @@ public static class Constants
/// for setting this value for a particular channel.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;

/// <summary>
/// The message header used to track publish sequence numbers, to allow correlation when
/// <c>basic.return</c> is sent via the broker.
/// </summary>
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";
}
}
6 changes: 4 additions & 2 deletions projects/RabbitMQ.Client/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ Task<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
/// <param name="basicProperties">The message properties.</param>
/// <param name="body">The message body.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <returns>Returns <c>true</c> if publisher confirmations enabled and the message was ack-ed</returns>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
Expand All @@ -219,10 +220,11 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
/// <param name="basicProperties">The message properties.</param>
/// <param name="body">The message body.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <returns>Returns <c>true</c> if publisher confirmations enabled and the message was ack-ed</returns>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
ValueTask<bool> BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
Expand Down
10 changes: 5 additions & 5 deletions projects/RabbitMQ.Client/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static Task<string> BasicConsumeAsync(this IChannel channel,
/// <remarks>
/// The publication occurs with mandatory=false.
/// </remarks>
public static ValueTask BasicPublishAsync<T>(this IChannel channel,
public static ValueTask<bool> BasicPublishAsync<T>(this IChannel channel,
PublicationAddress addr,
T basicProperties,
ReadOnlyMemory<byte> body,
Expand All @@ -94,7 +94,7 @@ public static ValueTask BasicPublishAsync<T>(this IChannel channel,
/// <remarks>
/// The publication occurs with mandatory=false and empty BasicProperties
/// </remarks>
public static ValueTask BasicPublishAsync(this IChannel channel,
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
string exchange,
string routingKey,
ReadOnlyMemory<byte> body,
Expand All @@ -109,7 +109,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
/// <remarks>
/// The publication occurs with mandatory=false and empty BasicProperties
/// </remarks>
public static ValueTask BasicPublishAsync(this IChannel channel,
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
CachedString exchange,
CachedString routingKey,
ReadOnlyMemory<byte> body,
Expand All @@ -124,7 +124,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
/// <remarks>
/// The publication occurs with empty BasicProperties
/// </remarks>
public static ValueTask BasicPublishAsync(this IChannel channel,
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
string exchange,
string routingKey,
bool mandatory,
Expand All @@ -140,7 +140,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
/// <remarks>
/// The publication occurs with empty BasicProperties
/// </remarks>
public static ValueTask BasicPublishAsync(this IChannel channel,
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
CachedString exchange,
CachedString routingKey,
bool mandatory,
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,15 @@ await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false)
public Task<BasicGetResult?> BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken)
=> InnerChannel.BasicGetAsync(queue, autoAck, cancellationToken);

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
public ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory,
TProperties basicProperties,
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken);

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
public ValueTask<bool> BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
bool mandatory,
TProperties basicProperties,
ReadOnlyMemory<byte> body,
Expand Down
Loading

0 comments on commit ec993b3

Please sign in to comment.