Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
# Release History

## 5.0.0-beta.7 (Unreleased)

### Features Added
## 5.0.0-beta.7 (2021-07-07)

### Breaking Changes

### Key Bugs Fixed

### Fixed

- Renamed `MaxBatchSize` to `MaxEventBatchSize` in `EventHubsOptions`.

## 5.0.0-beta.6 (2021-06-09)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public EventHubOptions() { }
public System.Uri CustomEndpointAddress { get { throw null; } set { } }
public Microsoft.Azure.WebJobs.EventHubs.InitialOffsetOptions InitialOffsetOptions { get { throw null; } }
public System.TimeSpan LoadBalancingUpdateInterval { get { throw null; } set { } }
public int MaxBatchSize { get { throw null; } set { } }
public int MaxEventBatchSize { get { throw null; } set { } }
public System.TimeSpan PartitionOwnershipExpirationInterval { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ internal EventProcessorHost GetEventProcessorHost(string eventHubName, string co
eventHubName: eventHubName,
credential: info.TokenCredential,
options: _options.EventProcessorOptions,
eventBatchMaximumCount: _options.MaxBatchSize,
eventBatchMaximumCount: _options.MaxEventBatchSize,
exceptionHandler: _options.ExceptionHandler);
}

return new EventProcessorHost(consumerGroup: consumerGroup,
connectionString: NormalizeConnectionString(info.ConnectionString, eventHubName),
eventHubName: eventHubName,
options: _options.EventProcessorOptions,
eventBatchMaximumCount: _options.MaxBatchSize,
eventBatchMaximumCount: _options.MaxEventBatchSize,
exceptionHandler: _options.ExceptionHandler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class EventHubOptions : IOptionsFormatter
{
public EventHubOptions()
{
MaxBatchSize = 10;
MaxEventBatchSize = 10;
ConnectionOptions = new EventHubConnectionOptions()
{
TransportType = EventHubsTransportType.AmqpTcp
Expand Down Expand Up @@ -112,22 +112,22 @@ public int BatchCheckpointFrequency
}
}

private int _maxBatchSize;
private int _maxEventBatchSize;

/// <summary>
/// Gets or sets the maximum number of events delivered in a batch. Default 10.
/// </summary>
public int MaxBatchSize
public int MaxEventBatchSize
{
get => _maxBatchSize;
get => _maxEventBatchSize;

set
{
if (value < 1)
{
throw new ArgumentException("Batch size must be larger than 0.");
}
_maxBatchSize = value;
_maxEventBatchSize = value;
}
}

Expand Down Expand Up @@ -187,7 +187,7 @@ string IOptionsFormatter.Format()
{
JObject options = new JObject
{
{ nameof(MaxBatchSize), MaxBatchSize },
{ nameof(MaxEventBatchSize), MaxEventBatchSize },
{ nameof(BatchCheckpointFrequency), BatchCheckpointFrequency },
{ nameof(TransportType), TransportType.ToString()},
{ nameof(WebProxy), WebProxy is WebProxy proxy ? proxy.Address.AbsoluteUri : string.Empty },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action<
"EventProcessorOptions:EnableReceiverRuntimeMetric",
options.TrackLastEnqueuedEventProperties);

options.MaxBatchSize = section.GetValue(
options.MaxEventBatchSize = section.GetValue(
"EventProcessorOptions:MaxBatchSize",
options.MaxBatchSize);
options.MaxEventBatchSize);

options.PrefetchCount = section.GetValue(
"EventProcessorOptions:PrefetchCount",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void ConfigureOptions_AppliesValuesCorrectly()
{
EventHubOptions options = CreateOptionsFromConfig();

Assert.AreEqual(123, options.MaxBatchSize);
Assert.AreEqual(123, options.MaxEventBatchSize);
Assert.AreEqual(true, options.EventProcessorOptions.TrackLastEnqueuedEventProperties);
Assert.AreEqual(123, options.EventProcessorOptions.PrefetchCount);
Assert.AreEqual(5, options.BatchCheckpointFrequency);
Expand Down Expand Up @@ -69,7 +69,7 @@ public void ConfigureOptions_Format_Returns_Expected()
b => { b.AddEventHubs(); },
jsonStream: new BinaryData(jObject.ToString()).ToStream());

Assert.AreEqual(123, result.MaxBatchSize);
Assert.AreEqual(123, result.MaxEventBatchSize);
Assert.AreEqual(5, result.BatchCheckpointFrequency);
Assert.True(result.TrackLastEnqueuedEventProperties);
Assert.AreEqual(123, result.PrefetchCount);
Expand All @@ -92,7 +92,7 @@ public void ConfigureOptions_AppliesValuesCorrectly_BackCompat()
{
EventHubOptions options = CreateOptionsFromConfigBackCompat();

Assert.AreEqual(123, options.MaxBatchSize);
Assert.AreEqual(123, options.MaxEventBatchSize);
Assert.AreEqual(true, options.EventProcessorOptions.TrackLastEnqueuedEventProperties);
Assert.AreEqual(123, options.EventProcessorOptions.PrefetchCount);
Assert.AreEqual(5, options.BatchCheckpointFrequency);
Expand All @@ -116,7 +116,7 @@ public void ConfigureOptions_Format_Returns_Expected_BackCompat()
b => { b.AddEventHubs(); },
jsonStream: new BinaryData(jObject.ToString()).ToStream());

Assert.AreEqual(123, result.MaxBatchSize);
Assert.AreEqual(123, result.MaxEventBatchSize);
Assert.AreEqual(5, result.BatchCheckpointFrequency);
Assert.True(result.TrackLastEnqueuedEventProperties);
Assert.AreEqual(123, result.PrefetchCount);
Expand Down Expand Up @@ -219,7 +219,7 @@ private EventHubOptions CreateOptionsFromConfig()
string extensionPath = "AzureWebJobs:Extensions:EventHubs";
var values = new Dictionary<string, string>
{
{ $"{extensionPath}:MaxBatchSize", "123" },
{ $"{extensionPath}:MaxEventBatchSize", "123" },
{ $"{extensionPath}:TrackLastEnqueuedEventProperties", "true" },
{ $"{extensionPath}:PrefetchCount", "123" },
{ $"{extensionPath}:BatchCheckpointFrequency", "5" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void InitializeFromHostMetadata()
var eventProcessorOptions = options.EventProcessorOptions;
Assert.AreEqual(200, eventProcessorOptions.PrefetchCount);
Assert.AreEqual(5, options.BatchCheckpointFrequency);
Assert.AreEqual(100, options.MaxBatchSize);
Assert.AreEqual(100, options.MaxEventBatchSize);
Assert.AreEqual(31, options.EventProcessorOptions.PartitionOwnershipExpirationInterval.TotalSeconds);
Assert.AreEqual(21, options.EventProcessorOptions.LoadBalancingUpdateInterval.TotalSeconds);
Assert.AreEqual(EventPosition.Latest, eventProcessorOptions.DefaultStartingPosition);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
# Release History

## 5.0.0-beta.5 (Unreleased)

### Features Added
## 5.0.0-beta.5 (2021-07-07)

### Breaking Changes

### Key Bugs Fixed

### Fixed

- Renamed `ServiceBusEntityType` property to `EntityType`.
- Renamed `messageActions` and `sessionActions` parameters to `actions` in `MessageProcessor` and `SessionMessageProcessor`.
- Renamed `MaxBatchSize` to `MaxMessageBatchSize` in `ServiceBusOptions`.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pakrym, @jsquire do we want to rename the equivalent in EH to MaxEventBatchSize?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't in the board reviews, so please take my thoughts with a grain of salt... but I think that makes sense.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just based on a comment in the API review from @KrzysztofCwalina

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense.


## 5.0.0-beta.4 (2021-06-22)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ public ServiceBusAccountAttribute(string account) { }
[System.Diagnostics.DebuggerDisplayAttribute("{QueueOrTopicName,nq}")]
public sealed partial class ServiceBusAttribute : System.Attribute, Microsoft.Azure.WebJobs.IConnectionProvider
{
public ServiceBusAttribute(string queueOrTopicName, Microsoft.Azure.WebJobs.ServiceBus.ServiceBusEntityType serviceBusEntityType = Microsoft.Azure.WebJobs.ServiceBus.ServiceBusEntityType.Queue) { }
public ServiceBusAttribute(string queueOrTopicName, Microsoft.Azure.WebJobs.ServiceBus.ServiceBusEntityType entityType = Microsoft.Azure.WebJobs.ServiceBus.ServiceBusEntityType.Queue) { }
public string Connection { get { throw null; } set { } }
public Microsoft.Azure.WebJobs.ServiceBus.ServiceBusEntityType EntityType { get { throw null; } set { } }
public string QueueOrTopicName { get { throw null; } }
public Microsoft.Azure.WebJobs.ServiceBus.ServiceBusEntityType ServiceBusEntityType { get { throw null; } set { } }
}
[Microsoft.Azure.WebJobs.ConnectionProviderAttribute(typeof(Microsoft.Azure.WebJobs.ServiceBusAccountAttribute))]
[Microsoft.Azure.WebJobs.Description.BindingAttribute]
Expand All @@ -40,8 +40,8 @@ public partial class MessageProcessor
{
public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor) { }
protected internal Azure.Messaging.ServiceBus.ServiceBusProcessor Processor { get { throw null; } }
protected internal virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions actions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions actions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
}
public partial class MessagingProvider
{
Expand Down Expand Up @@ -77,9 +77,9 @@ public ServiceBusOptions() { }
public System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ExceptionHandler { get { throw null; } set { } }
public Newtonsoft.Json.JsonSerializerSettings JsonSerializerSettings { get { throw null; } set { } }
public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } set { } }
public int MaxBatchSize { get { throw null; } set { } }
public int MaxConcurrentCalls { get { throw null; } set { } }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int MaxMessageBatchSize { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public System.TimeSpan? SessionIdleTimeout { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } set { } }
Expand All @@ -96,8 +96,8 @@ public partial class SessionMessageProcessor
{
public SessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusSessionProcessor processor) { }
protected internal Azure.Messaging.ServiceBus.ServiceBusSessionProcessor Processor { get { throw null; } }
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions actions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions actions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
}
}
namespace Microsoft.Extensions.Hosting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public async Task<IValueProvider> BindAsync(BindingContext context)
var entity = new ServiceBusEntity
{
MessageSender = messageSender,
ServiceBusEntityType = _attribute.ServiceBusEntityType,
ServiceBusEntityType = _attribute.EntityType,
};

return await BindAsync(entity, context.ValueContext).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public StringToServiceBusEntityConverter(ServiceBusAttribute attribute, IBindabl
{
_attribute = attribute;
_defaultPath = defaultPath;
_serviceBusEntityType = _attribute.ServiceBusEntityType;
_serviceBusEntityType = _attribute.EntityType;
_messagingProvider = messagingProvider;
_clientFactory = clientFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public int MaxConcurrentSessions
/// Gets or sets the maximum number of messages that will be passed to each function call. This only applies for functions that receive
/// a batch of messages. The default value is 1000.
/// </summary>
public int MaxBatchSize { get; set; } = 1000;
public int MaxMessageBatchSize { get; set; } = 1000;

/// <summary>
/// Gets or sets the maximum amount of time to wait for a message to be received for the
Expand Down Expand Up @@ -179,7 +179,7 @@ string IOptionsFormatter.Format()
{ nameof(MaxAutoLockRenewalDuration), MaxAutoLockRenewalDuration },
{ nameof(MaxConcurrentCalls), MaxConcurrentCalls },
{ nameof(MaxConcurrentSessions), MaxConcurrentSessions },
{ nameof(MaxBatchSize), MaxBatchSize },
{ nameof(MaxMessageBatchSize), MaxMessageBatchSize },
{ nameof(SessionIdleTimeout), SessionIdleTimeout.ToString() ?? string.Empty }
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private async Task RunBatchReceiveLoopAsync(CancellationToken cancellationToken)

IReadOnlyList<ServiceBusReceivedMessage> messages =
await receiver.ReceiveMessagesAsync(
_serviceBusOptions.MaxBatchSize,
_serviceBusOptions.MaxMessageBatchSize,
cancellationToken: cancellationToken).AwaitWithCancellation(cancellationToken);

if (messages.Count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public MessageProcessor(ServiceBusProcessor processor)
/// This method is called when there is a new message to process, before the job function is invoked.
/// This allows any preprocessing to take place on the message before processing begins.
/// </summary>
/// <param name="messageActions">The set of actions that can be performed on a <see cref="ServiceBusReceivedMessage"/>.</param>
/// <param name="actions">The set of actions that can be performed on a <see cref="ServiceBusReceivedMessage"/>.</param>
/// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to process.</param>
/// <param name="cancellationToken">A cancellation token that will be cancelled when the processor is shutting down.</param>
/// <returns>A <see cref="Task"/> that returns true if the message processing should continue, false otherwise.</returns>
protected internal virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, CancellationToken cancellationToken)
protected internal virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMessageActions actions, ServiceBusReceivedMessage message, CancellationToken cancellationToken)
{
return Task.FromResult<bool>(true);
}
Expand All @@ -49,12 +49,12 @@ protected internal virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMess
/// is configured. E.g. if <see cref="ServiceBusProcessorOptions.AutoCompleteMessages"/> is false, it is up to the job function to complete
/// the message.
/// </remarks>
/// <param name="messageActions">The set of actions that can be performed on a <see cref="ServiceBusReceivedMessage"/>.</param>
/// <param name="actions">The set of actions that can be performed on a <see cref="ServiceBusReceivedMessage"/>.</param>
/// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to process.</param>
/// <param name="result">The <see cref="FunctionResult"/> from the job invocation.</param>
/// <param name="cancellationToken">A cancellation token that will be cancelled when the processor is shutting down.</param>
/// <returns>A <see cref="Task"/> that will complete the message processing.</returns>
protected internal virtual Task CompleteProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, FunctionResult result, CancellationToken cancellationToken)
protected internal virtual Task CompleteProcessingMessageAsync(ServiceBusMessageActions actions, ServiceBusReceivedMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (message is null)
{
Expand Down
Loading