Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,20 +441,35 @@ public virtual async Task<PartitionProperties> GetPartitionPropertiesAsync(strin

/// <summary>
/// A set of information about the state of publishing for a partition, as observed by the <see cref="EventHubProducerClient" />. This
/// data can always be read, but will only be populated with information relevant to the features which are active for the producer client.
/// data can always be read, but will only be populated with information relevant to the active features for the producer client.
/// </summary>
///
/// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
///
/// <returns>The set of information about the publishing state of the requested partition, within the context of this producer.</returns>
///
/// <remarks>
/// The state of a partition is only understood by the <see cref="EventHubProducerClient" /> after events have been published to that
/// partition; calling this method for a partition before events have been published to it will return an empty set of properties.
/// </remarks>
///
public virtual async Task<PartitionPublishingProperties> GetPartitionPublishingPropertiesAsync(string partitionId,
CancellationToken cancellationToken = default)
{
Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient));
Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));

// If the producer does not require stateful partitions, return an empty
// instance.

if (!RequiresStatefulPartitions(Options))
{
return PartitionPublishingProperties.Empty;
}

// If the state has not yet been initialized, then do so now.

var partitionState = PartitionState.GetOrAdd(partitionId, new PartitionPublishingState(partitionId));
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public class PartitionPublishingOptions
/// The starting number that should be used for the automatic sequencing of events for the associated partition, when published by this producer.
/// </summary>
///
/// <value>The starting sequence number to associate with the partition; if <c>null</c>, the Event Hubs service will control the value.</value>
/// <value>
/// <para>The starting sequence number to associate with the partition; if <c>null</c>, the Event Hubs service will control the value.</para>
///
/// <para>The sequence number will be in the range of <c>0</c> - <see cref="int.MaxValue"/> (inclusive) and will increase as events are published.
/// When more than <see cref="int.MaxValue" /> events have been published, the sequence number will roll over to <c>0</c>.</para>
/// </value>
///
/// <remarks>
/// The starting sequence number is only recognized and relevant when certain features of the producer are enabled. For example, it is used by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,27 @@ namespace Azure.Messaging.EventHubs.Producer
///
public class PartitionPublishingProperties
{
/// <summary>An empty set of properties.</summary>
private static PartitionPublishingProperties s_emptyInstance;

/// <summary>
/// Returns a set of properties that represents an empty set of properties
/// suitable for use when partitions are not inherently stateful.
/// </summary>
///
internal static PartitionPublishingProperties Empty
{
get
{
// The race condition here is benign; because the resulting properties
// are not mutable, there is not impact to having the reference updated after
// initial creation.

s_emptyInstance ??= new PartitionPublishingProperties(false, null, null, null);
return s_emptyInstance;
}
}

/// <summary>
/// Indicates whether or not idempotent publishing is enabled for the producer and, by extension, the associated partition.
/// </summary>
Expand All @@ -36,6 +57,12 @@ public class PartitionPublishingProperties
/// successfully.
/// </summary>
///
/// <value>
/// The sequence number will be in the range of <c>0</c> - <see cref="int.MaxValue"/> (inclusive) and will
/// increase as events are published. When more than <see cref="int.MaxValue" /> events have been published,
/// the sequence number will roll over to <c>0</c>.
/// </value>
///
public int? LastPublishedSequenceNumber { get; }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ internal class PartitionPublishingState
/// successfully.
/// </summary>
///
/// <value>
/// The sequence number will be in the range of <c>0</c> - <see cref="int.MaxValue"/> (inclusive) and will
/// increase as events are published. When more than <see cref="int.MaxValue" /> events have been published,
/// the sequence number will roll over to <c>0</c>.
/// </value>
///
public int? LastPublishedSequenceNumber { get; set; }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public async Task ReadPartitionPublishingPropertiesAsyncInitializesPartitionStat
/// </summary>
///
[Test]
public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionState()
public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionStateWhenIdempotentPublishingEnabled()
{
var expectedPartition = "5";
var expectedProperties = new PartitionPublishingProperties(true, 123, 456, 798);
Expand Down Expand Up @@ -479,6 +479,41 @@ public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionState()
"Partition state should not have been initialized twice.");
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.GetPartitionPublishingPropertiesAsync" />
/// method.
/// </summary>
///
[Test]
public async Task ReadPartitionPublishingPropertiesAsyncReturnsEmptyPartitionStateWhenIdempotentPublishingDisabled()
{
var expectedPartition = "5";
var expectedProperties = PartitionPublishingProperties.Empty;
var mockTransport = new Mock<TransportProducer>();
var connection = new MockConnection(() => mockTransport.Object);

var producer = new EventHubProducerClient(connection, new EventHubProducerClientOptions
{
EnableIdempotentPartitions = false
});

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

var readProperties = await producer.GetPartitionPublishingPropertiesAsync(expectedPartition, cancellationSource.Token);

Assert.That(readProperties, Is.Not.Null, "The read properties should have been created.");
Assert.That(readProperties.ProducerGroupId, Is.EqualTo(expectedProperties.ProducerGroupId), "The producer group should match.");
Assert.That(readProperties.OwnerLevel, Is.EqualTo(expectedProperties.OwnerLevel), "The owner level should match.");
Assert.That(readProperties.LastPublishedSequenceNumber, Is.EqualTo(expectedProperties.LastPublishedSequenceNumber), "The sequence number should match.");

mockTransport
.Verify(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(
It.IsAny<CancellationToken>()),
Times.Never,
"Partition state should not have been initialized.");
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
Expand Down Expand Up @@ -724,6 +759,8 @@ public void SendIdempotentRequiresThePartition()
EnableIdempotentPartitions = true
});

Assert.That(async () => await producer.SendAsync(events), Throws.InstanceOf<InvalidOperationException>(), "Idempotent publishing requires the send options.");

var sendOptions = new SendEventOptions();
Assert.That(async () => await producer.SendAsync(events, sendOptions), Throws.InstanceOf<InvalidOperationException>(), "Automatic routing cannot be used with idempotent publishing.");

Expand Down