Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
namespace Azure.Messaging.ServiceBus
{
/// <summary>
/// Contains information about a receiver that has attempted to receive a message from the Azure Service Bus entity.
/// The <see cref="ProcessMessageEventArgs"/> contain event args that are specific
/// to the <see cref="ServiceBusReceivedMessage"/> that is being processed.
/// </summary>
public class ProcessMessageEventArgs : EventArgs
{
Expand All @@ -24,6 +25,12 @@ public class ProcessMessageEventArgs : EventArgs
/// </summary>
public CancellationToken CancellationToken { get; }

/// <summary>
/// Indicates whether the user has settled the message as part of their callback.
/// If they have done so, we will not autocomplete.
/// </summary>
internal bool IsMessageSettled { get; set; }

private readonly ServiceBusReceiver _receiver;

/// <summary>
Expand All @@ -50,9 +57,12 @@ internal ProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusRe
public async Task AbandonAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _receiver.AbandonAsync(message, propertiesToModify, cancellationToken)
.ConfigureAwait(false);
IsMessageSettled = true;
}

/// <summary>
///
Expand All @@ -62,11 +72,14 @@ await _receiver.AbandonAsync(message, propertiesToModify, cancellationToken)
/// <returns></returns>
public async Task CompleteAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _receiver.CompleteAsync(
message,
cancellationToken)
.ConfigureAwait(false);
IsMessageSettled = true;
}

/// <summary>
///
Expand All @@ -80,13 +93,16 @@ public async Task DeadLetterAsync(
ServiceBusReceivedMessage message,
string deadLetterReason,
string deadLetterErrorDescription = default,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _receiver.DeadLetterAsync(
message,
deadLetterReason,
deadLetterErrorDescription,
cancellationToken)
.ConfigureAwait(false);
IsMessageSettled = true;
}

/// <summary>
///
Expand All @@ -98,12 +114,15 @@ await _receiver.DeadLetterAsync(
public async Task DeadLetterAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _receiver.DeadLetterAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
IsMessageSettled = true;
}

/// <summary>
///
Expand All @@ -115,25 +134,14 @@ await _receiver.DeadLetterAsync(
public async Task DeferAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _receiver.DeferAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);

/// <summary>
///
/// </summary>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task RenewMessageLockAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default) =>
await _receiver.RenewMessageLockAsync(
message,
cancellationToken)
.ConfigureAwait(false);
IsMessageSettled = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Messaging.ServiceBus
{
/// <summary>
/// The <see cref="ProcessSessionEventArgs"/> contain event args related to the session being processed.
/// </summary>
public class ProcessSessionEventArgs : EventArgs
{
/// <summary>
/// The processor's <see cref="System.Threading.CancellationToken"/> instance which will be
/// cancelled in the event that <see cref="ServiceBusProcessor.StopProcessingAsync"/> is called.
/// </summary>
public CancellationToken CancellationToken { get; }

/// <summary>
/// The <see cref="ServiceBusSessionReceiver"/> that will be used for setting and getting session
/// state.
/// </summary>
private readonly ServiceBusSessionReceiver _sessionReceiver;

/// <summary>
/// The Session Id associated with the session being processed.
/// </summary>
public string SessionId => _sessionReceiver.SessionId;

/// <summary>
/// Gets the <see cref="DateTimeOffset"/> that the current session is locked until.
/// </summary>
public DateTimeOffset SessionLockedUntil => _sessionReceiver.SessionLockedUntil;

/// <summary>
/// Initializes a new instance of the <see cref="ProcessSessionEventArgs"/> class.
/// </summary>
///
/// <param name="receiver">The <see cref="ServiceBusSessionReceiver"/> that will be used for all settlement methods
/// for the args.</param>
/// <param name="cancellationToken">The processor's <see cref="System.Threading.CancellationToken"/> instance which will be cancelled in the event that <see cref="ServiceBusProcessor.StopProcessingAsync"/> is called.</param>
internal ProcessSessionEventArgs(
ServiceBusSessionReceiver receiver,
CancellationToken cancellationToken)
{
_sessionReceiver = receiver;
CancellationToken = cancellationToken;
}

/// <summary>
/// Gets the session state.
/// </summary>
///
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The session state as byte array.</returns>
public virtual async Task<byte[]> GetSessionStateAsync(
CancellationToken cancellationToken = default) =>
await _sessionReceiver.GetSessionStateAsync(cancellationToken).ConfigureAwait(false);

/// <summary>
/// Set a custom state on the session which can be later retrieved using <see cref="GetSessionStateAsync"/>
/// </summary>
///
/// <param name="sessionState">A byte array of session state</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public virtual async Task SetSessionStateAsync(
byte[] sessionState,
CancellationToken cancellationToken = default) =>
await _sessionReceiver.SetSessionStateAsync(sessionState, cancellationToken).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Messaging.ServiceBus
{
/// <summary>
/// The <see cref="ProcessSessionMessageEventArgs"/> contain event args that are specific
/// to the <see cref="ServiceBusReceivedMessage"/> that is being processed.
/// to the <see cref="ServiceBusReceivedMessage"/> and session that is being processed.
/// </summary>
public class ProcessSessionMessageEventArgs : EventArgs
{
Expand Down Expand Up @@ -43,7 +42,13 @@ public class ProcessSessionMessageEventArgs : EventArgs
public DateTimeOffset SessionLockedUntil => _sessionReceiver.SessionLockedUntil;

/// <summary>
/// Initializes a new instance of the <see cref="ProcessMessageEventArgs"/> class.
/// Indicates whether the user has settled the message as part of their callback.
/// If they have done so, we will not autocomplete.
/// </summary>
internal bool IsMessageSettled { get; set; }

/// <summary>
/// Initializes a new instance of the <see cref="ProcessSessionMessageEventArgs"/> class.
/// </summary>
///
/// <param name="message">The current <see cref="ServiceBusReceivedMessage"/>.</param>
Expand Down Expand Up @@ -86,25 +91,6 @@ public virtual async Task SetSessionStateAsync(
CancellationToken cancellationToken = default) =>
await _sessionReceiver.SetSessionStateAsync(sessionState, cancellationToken).ConfigureAwait(false);

/// <summary>
/// Renews the lock on the session specified by the <see cref="SessionId"/>. The lock will be renewed based on the setting specified on the entity.
/// </summary>
///
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <remarks>
/// <para>
/// When you get session receiver, the session is locked for this receiver by the service for a duration as specified during the Queue/Subscription creation.
/// If processing of the session requires longer than this duration, the session-lock needs to be renewed.
/// For each renewal, it resets the time the session is locked by the LockDuration set on the Entity.
/// </para>
/// <para>
/// Renewal of session renews all the messages in the session as well. Each individual message need not be renewed.
/// </para>
/// </remarks>
public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default) =>
await _sessionReceiver.RenewSessionLockAsync(cancellationToken).ConfigureAwait(false);

/// <summary>
/// Abandons a <see cref="ServiceBusReceivedMessage"/>. This will make the message available again for immediate processing as the lock on the message held by the processor will be released.
/// </summary>
Expand All @@ -123,9 +109,12 @@ public virtual async Task RenewSessionLockAsync(CancellationToken cancellationTo
public async Task AbandonAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _sessionReceiver.AbandonAsync(message, propertiesToModify, cancellationToken)
.ConfigureAwait(false);
.ConfigureAwait(false);
IsMessageSettled = true;
}

/// <summary>
/// Completes a <see cref="ServiceBusReceivedMessage"/>. This will delete the message from the service.
Expand All @@ -141,11 +130,14 @@ await _sessionReceiver.AbandonAsync(message, propertiesToModify, cancellationTok
/// <returns>A task to be resolved on when the operation has completed.</returns>
public async Task CompleteAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _sessionReceiver.CompleteAsync(
message,
cancellationToken)
.ConfigureAwait(false);
IsMessageSettled = true;
}

/// <summary>
/// Moves a message to the deadletter sub-queue.
Expand All @@ -167,13 +159,16 @@ public async Task DeadLetterAsync(
ServiceBusReceivedMessage message,
string deadLetterReason,
string deadLetterErrorDescription = default,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _sessionReceiver.DeadLetterAsync(
message,
deadLetterReason,
deadLetterErrorDescription,
cancellationToken)
.ConfigureAwait(false);
IsMessageSettled = true;
}

/// <summary>
/// Moves a message to the deadletter sub-queue.
Expand All @@ -193,12 +188,15 @@ await _sessionReceiver.DeadLetterAsync(
public async Task DeadLetterAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _sessionReceiver.DeadLetterAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
IsMessageSettled = true;
}

/// <summary> Defers the processing for a message.</summary>
///
Expand All @@ -219,11 +217,14 @@ await _sessionReceiver.DeadLetterAsync(
public async Task DeferAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
CancellationToken cancellationToken = default)
{
await _sessionReceiver.DeferAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
IsMessageSettled = true;
}
}
}
Loading