diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs index 2af642f46a81..417f967c3aaa 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs @@ -9,6 +9,7 @@ using Microsoft.Azure.WebJobs.Description; using Microsoft.Azure.WebJobs.EventHubs.Listeners; using Microsoft.Azure.WebJobs.EventHubs.Processor; +using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.Host.Config; using Microsoft.Azure.WebJobs.Host.Configuration; @@ -28,19 +29,22 @@ internal class EventHubExtensionConfigProvider : IExtensionConfigProvider private readonly IConverterManager _converterManager; private readonly IWebJobsExtensionConfiguration _configuration; private readonly EventHubClientFactory _clientFactory; + private readonly IDrainModeManager _drainModeManager; public EventHubExtensionConfigProvider( IOptions options, ILoggerFactory loggerFactory, IConverterManager converterManager, IWebJobsExtensionConfiguration configuration, - EventHubClientFactory clientFactory) + EventHubClientFactory clientFactory, + IDrainModeManager drainModeManager) { _options = options; _loggerFactory = loggerFactory; _converterManager = converterManager; _configuration = configuration; _clientFactory = clientFactory; + _drainModeManager = drainModeManager; } internal Action ExceptionHandler { get; set; } @@ -71,7 +75,12 @@ public void Initialize(ExtensionConfigContext context) .AddOpenConverter(ConvertPocoToEventData); // register our trigger binding provider - var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(_converterManager, _options, _loggerFactory, _clientFactory); + var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider( + _converterManager, + _options, + _loggerFactory, + _clientFactory, + _drainModeManager); context.AddBindingRule() .BindToTrigger(triggerBindingProvider); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs index 9af963451722..89fb922c578f 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs @@ -28,8 +28,6 @@ internal sealed partial class EventHubListener /// internal class PartitionProcessor : IEventProcessor, IDisposable { - private readonly CancellationTokenSource _cts = new(); - private readonly ITriggeredFunctionExecutor _executor; private readonly bool _singleDispatch; private readonly ILogger _logger; @@ -44,13 +42,15 @@ internal class PartitionProcessor : IEventProcessor, IDisposable private Task _cachedEventsBackgroundTask; private CancellationTokenSource _cachedEventsBackgroundTaskCts; private SemaphoreSlim _cachedEventsGuard; + private readonly CancellationToken _functionExecutionToken; + private readonly CancellationTokenSource _ownershipLostTokenSource; /// /// When we have a minimum batch size greater than 1, this class manages caching events. /// internal PartitionProcessorEventsManager CachedEventsManager { get; } - public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch) + public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, CancellationToken functionExecutionToken) { _executor = executor; _singleDispatch = singleDispatch; @@ -59,6 +59,8 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex _firstFunctionInvocation = true; _maxWaitTime = options.MaxWaitTime; _minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default + _functionExecutionToken = functionExecutionToken; + _ownershipLostTokenSource = new CancellationTokenSource(); // Events are only cached when building a batch of minimum size. if (_minimumBatchesEnabled) @@ -70,8 +72,12 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason) { - // signal cancellation for any in progress executions and clear the cached events - _cts.Cancel(); + if (reason == ProcessingStoppedReason.OwnershipLost) + { + _ownershipLostTokenSource.Cancel(); + } + + // clear the cached events CachedEventsManager?.ClearEventCache(); _logger.LogDebug(GetOperationDetails(context, $"CloseAsync, {reason}")); @@ -98,11 +104,10 @@ public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception err /// /// The partition information for this partition. /// The events to process. - /// The cancellation token to respect if processing for the partition is canceled. /// - public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages, CancellationToken partitionProcessingCancellationToken) + public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages) { - using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, partitionProcessingCancellationToken); + using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, _ownershipLostTokenSource.Token); _mostRecentPartitionContext = context; var events = messages.ToArray(); EventData eventToCheckpoint = null; @@ -135,7 +140,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume TriggerDetails = eventHubTriggerInput.GetTriggerDetails(context) }; - await _executor.TryExecuteAsync(input, linkedCts.Token).ConfigureAwait(false); + await _executor.TryExecuteAsync(input, _functionExecutionToken).ConfigureAwait(false); _firstFunctionInvocation = false; eventToCheckpoint = events[i]; } @@ -168,7 +173,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume _logger.LogDebug($"Partition Processor received events and is attempting to invoke function ({details})"); UpdateCheckpointContext(triggerEvents, context); - await TriggerExecute(triggerEvents, context, linkedCts.Token).ConfigureAwait(false); + await TriggerExecute(triggerEvents, context, _functionExecutionToken).ConfigureAwait(false); eventToCheckpoint = triggerEvents.Last(); // If there is a background timer task, cancel it and dispose of the cancellation token. If there @@ -186,7 +191,8 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume if (_cachedEventsBackgroundTaskCts == null && CachedEventsManager.HasCachedEvents) { // If there are events waiting to be processed, and no background task running, start a monitoring cycle. - _cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token); + // Don't reference linkedCts in the class level background task, as it will be disposed when the method goes out of scope. + _cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, _ownershipLostTokenSource.Token); _cachedEventsBackgroundTask = MonitorCachedEvents(context.ProcessorHost.GetLastReadCheckpoint(context.PartitionId)?.LastModified, _cachedEventsBackgroundTaskCts); } } @@ -201,7 +207,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume else { UpdateCheckpointContext(events, context); - await TriggerExecute(events, context, linkedCts.Token).ConfigureAwait(false); + await TriggerExecute(events, context, _functionExecutionToken).ConfigureAwait(false); eventToCheckpoint = events.LastOrDefault(); } @@ -276,7 +282,7 @@ private async Task MonitorCachedEvents(DateTimeOffset? lastCheckpointTime, Cance var details = GetOperationDetails(_mostRecentPartitionContext, "MaxWaitTimeElapsed"); _logger.LogDebug($"Partition Processor has waited MaxWaitTime since last invocation and is attempting to invoke function on all held events ({details})"); - await TriggerExecute(triggerEvents, _mostRecentPartitionContext, backgroundCancellationTokenSource.Token).ConfigureAwait(false); + await TriggerExecute(triggerEvents, _mostRecentPartitionContext, _functionExecutionToken).ConfigureAwait(false); if (!backgroundCancellationTokenSource.Token.IsCancellationRequested) { await CheckpointAsync(triggerEvents.Last(), _mostRecentPartitionContext).ConfigureAwait(false); @@ -408,7 +414,6 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - _cts.Dispose(); _cachedEventsBackgroundTaskCts?.Dispose(); _cachedEventsGuard?.Dispose(); } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs index b53176779c84..817be67056b0 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs @@ -7,6 +7,7 @@ using Azure.Messaging.EventHubs.Primitives; using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.Extensions.EventHubs.Listeners; +using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Scale; @@ -27,6 +28,9 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private string _details; + private CancellationTokenSource _functionExecutionCancellationTokenSource; + private readonly IDrainModeManager _drainModeManager; + private volatile bool _disposed; public EventHubListener( string functionId, @@ -36,7 +40,8 @@ public EventHubListener( IEventHubConsumerClient consumerClient, BlobCheckpointStoreInternal checkpointStore, EventHubOptions options, - ILoggerFactory loggerFactory) + ILoggerFactory loggerFactory, + IDrainModeManager drainModeManager) { _loggerFactory = loggerFactory; _executor = executor; @@ -45,6 +50,8 @@ public EventHubListener( _checkpointStore = checkpointStore; _options = options; _logger = _loggerFactory.CreateLogger(); + _functionExecutionCancellationTokenSource = new CancellationTokenSource(); + _drainModeManager = drainModeManager; EventHubMetricsProvider metricsProvider = new EventHubMetricsProvider(functionId, consumerClient, checkpointStore, _loggerFactory.CreateLogger()); @@ -68,20 +75,29 @@ public EventHubListener( } /// - /// Cancel any in progress listen operation. + /// Cancel should be called prior to Dispose. We just validate that we are not already disposed. + /// This is consistent with the Service Bus listener behavior. /// void IListener.Cancel() { -#pragma warning disable AZC0102 - StopAsync(CancellationToken.None).GetAwaiter().GetResult(); -#pragma warning restore AZC0102 + if (_disposed) + { + throw new ObjectDisposedException(nameof(IListener)); + } } void IDisposable.Dispose() { + _functionExecutionCancellationTokenSource.Cancel(); + #pragma warning disable AZC0102 StopAsync(CancellationToken.None).GetAwaiter().GetResult(); #pragma warning restore AZC0102 + + // No need to dispose the _disposingCancellationTokenSource since we don't create it as a linked token and + // it won't use a timer, so the Dispose method is essentially a no-op. The downside to disposing it is that + // any customers who are trying to use it to cancel their own operations would get an ObjectDisposedException. + _disposed = true; } public async Task StartAsync(CancellationToken cancellationToken) @@ -94,6 +110,11 @@ public async Task StartAsync(CancellationToken cancellationToken) public async Task StopAsync(CancellationToken cancellationToken) { + if (!_drainModeManager.IsDrainModeEnabled) + { + _functionExecutionCancellationTokenSource.Cancel(); + } + await _eventProcessorHost.StopProcessingAsync(cancellationToken).ConfigureAwait(false); _logger.LogDebug($"EventHub listener stopped ({_details})"); @@ -101,7 +122,7 @@ public async Task StopAsync(CancellationToken cancellationToken) IEventProcessor IEventProcessorFactory.CreatePartitionProcessor() { - return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger(), _singleDispatch); + return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger(), _singleDispatch, _functionExecutionCancellationTokenSource.Token); } public IScaleMonitor GetMonitor() diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs index 0fee40d75aed..fe854b6f2836 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs @@ -66,7 +66,8 @@ protected override async Task GetCheckpointAsync(strin if (checkpoint is BlobCheckpointStoreInternal.BlobStorageCheckpoint blobCheckpoint && blobCheckpoint is not null) { - _lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1, blobCheckpoint.LastModified); + _lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1, + blobCheckpoint.LastModified); } return checkpoint; @@ -112,7 +113,7 @@ protected override Task OnProcessingEventBatchAsync(IEnumerable event return Task.CompletedTask; } - return partition.EventProcessor.ProcessEventsAsync(partition, events, cancellationToken); + return partition.EventProcessor.ProcessEventsAsync(partition, events); } protected override async Task OnInitializingPartitionAsync(EventProcessorHostPartition partition, CancellationToken cancellationToken) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs index fb213da5db5d..709345880446 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs @@ -15,6 +15,6 @@ internal interface IEventProcessor Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason); Task OpenAsync(EventProcessorHostPartition context); Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error); - Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages, CancellationToken cancellationToken); + Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages); } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs index d3c89b55bc49..7967eff8fbce 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs @@ -7,6 +7,7 @@ using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Primitives; using Microsoft.Azure.WebJobs.EventHubs.Listeners; +using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Triggers; @@ -21,17 +22,20 @@ internal class EventHubTriggerAttributeBindingProvider : ITriggerBindingProvider private readonly IOptions _options; private readonly EventHubClientFactory _clientFactory; private readonly IConverterManager _converterManager; + private readonly IDrainModeManager _drainModeManager; public EventHubTriggerAttributeBindingProvider( IConverterManager converterManager, IOptions options, ILoggerFactory loggerFactory, - EventHubClientFactory clientFactory) + EventHubClientFactory clientFactory, + IDrainModeManager drainModeManager) { _converterManager = converterManager; _options = options; _clientFactory = clientFactory; _loggerFactory = loggerFactory; + _drainModeManager = drainModeManager; } [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] @@ -67,7 +71,8 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex _clientFactory.GetEventHubConsumerClient(attribute.EventHubName, attribute.Connection, attribute.ConsumerGroup), checkpointStore, options, - _loggerFactory); + _loggerFactory, + _drainModeManager); return Task.FromResult(listener); }; #pragma warning disable 618 diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index 17590873e0c1..68d81103e6f3 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -96,11 +96,37 @@ public async Task EventHub_SingleDispatch() bool result = _eventWait.WaitOne(Timeout); Assert.True(result); + + await StopWithDrainAsync(host); } AssertSingleDispatchLogs(host); } + [Test] + public async Task EventHub_SingleDispatch_Dispose() + { + await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); + await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); + var (_, host) = BuildHost(ConfigureTestEventHub); + + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result); + host.Dispose(); + } + + [Test] + public async Task EventHub_SingleDispatch_StopWithoutDrain() + { + await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); + await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); + var (_, host) = BuildHost(ConfigureTestEventHub); + + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result); + await host.StopAsync(); + } + [Test] public async Task EventHub_SingleDispatch_ConsumerGroup() { @@ -135,6 +161,8 @@ public async Task EventHub_SingleDispatch_BinaryData() bool result = _eventWait.WaitOne(Timeout); Assert.True(result); + + await StopWithDrainAsync(host); } AssertSingleDispatchLogs(host); @@ -308,6 +336,8 @@ public async Task EventHub_MultipleDispatch() bool result = _eventWait.WaitOne(Timeout); Assert.True(result); + + await StopWithDrainAsync(host); } AssertMultipleDispatchLogs(host); @@ -324,6 +354,8 @@ public async Task EventHub_MultipleDispatch_BinaryData() bool result = _eventWait.WaitOne(Timeout); Assert.True(result); + + await StopWithDrainAsync(host); } AssertMultipleDispatchLogs(host); @@ -359,57 +391,35 @@ public async Task EventHub_MultipleDispatch_MinBatchSize() bool result = _eventWait.WaitOne(Timeout); Assert.True(result); + + await StopWithDrainAsync(host); } AssertMultipleDispatchLogsMinBatch(host); } - private static void AssertMultipleDispatchLogsMinBatch(IHost host) + [Test] + public async Task EventHub_MultipleDispatch_Dispose() { - IEnumerable logMessages = host.GetTestLoggerProvider() - .GetAllLogMessages(); - - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("Trigger Details:") - && x.FormattedMessage.Contains("Offset:")).Any()); - - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("OpenAsync")).Any()); - - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("CheckpointAsync") - && x.FormattedMessage.Contains("lease") - && x.FormattedMessage.Contains("offset") - && x.FormattedMessage.Contains("sequenceNumber")).Any()); - - // Events are being sent in the EventHubTestMultipleDispatchMinBatchSizeJobs - // class directly for this test + await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); + await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); + var (_, host) = BuildHost(); - AssertAzureSdkLogs(logMessages); - } + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result); + host.Dispose(); + } - private static void AssertMultipleDispatchLogs(IHost host) + [Test] + public async Task EventHub_MultipleDispatch_StopWithoutDrain() { - IEnumerable logMessages = host.GetTestLoggerProvider() - .GetAllLogMessages(); - - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("Trigger Details:") - && x.FormattedMessage.Contains("Offset:")).Any()); - - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("OpenAsync")).Any()); - - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("CheckpointAsync") - && x.FormattedMessage.Contains("lease") - && x.FormattedMessage.Contains("offset") - && x.FormattedMessage.Contains("sequenceNumber")).Any()); - - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("Sending events to EventHub")).Any()); + await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); + await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); + var (_, host) = BuildHost(); - AssertAzureSdkLogs(logMessages); + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result); + await host.StopAsync(); } [Test] @@ -544,6 +554,62 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime() } } + private static void AssertMultipleDispatchLogsMinBatch(IHost host) + { + IEnumerable logMessages = host.GetTestLoggerProvider() + .GetAllLogMessages(); + + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("Trigger Details:") + && x.FormattedMessage.Contains("Offset:")).Any()); + + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("OpenAsync")).Any()); + + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("CheckpointAsync") + && x.FormattedMessage.Contains("lease") + && x.FormattedMessage.Contains("offset") + && x.FormattedMessage.Contains("sequenceNumber")).Any()); + + // Events are being sent in the EventHubTestMultipleDispatchMinBatchSizeJobs + // class directly for this test + + AssertAzureSdkLogs(logMessages); + } + + private static void AssertMultipleDispatchLogs(IHost host) + { + IEnumerable logMessages = host.GetTestLoggerProvider() + .GetAllLogMessages(); + + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("Trigger Details:") + && x.FormattedMessage.Contains("Offset:")).Any()); + + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("OpenAsync")).Any()); + + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("CheckpointAsync") + && x.FormattedMessage.Contains("lease") + && x.FormattedMessage.Contains("offset") + && x.FormattedMessage.Contains("sequenceNumber")).Any()); + + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("Sending events to EventHub")).Any()); + + AssertAzureSdkLogs(logMessages); + } + + private static async Task StopWithDrainAsync(IHost host) + { + // Enable drain mode so checkpointing occurs when stopping + var drainModeManager = host.Services.GetService(); + await drainModeManager.EnableDrainModeAsync(CancellationToken.None); + await host.StopAsync(); + } + private static void AssertAzureSdkLogs(IEnumerable logMessages) { Assert.True(logMessages.Any(x => x.Category.StartsWith("Azure."))); @@ -578,6 +644,28 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = } } + public class EventHubTestSingleDispatchJobs_Dispose + { + public static async Task SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string evt, CancellationToken cancellationToken) + { + _eventWait.Set(); + // wait a small amount of time for the host to call dispose + await Task.Delay(3000, CancellationToken.None); + Assert.IsTrue(cancellationToken.IsCancellationRequested); + } + } + + public class EventHubTestMultipleDispatchJobs_Dispose + { + public static async Task SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string[] evt, CancellationToken cancellationToken) + { + _eventWait.Set(); + // wait a small amount of time for the host to call dispose + await Task.Delay(3000, CancellationToken.None); + Assert.IsTrue(cancellationToken.IsCancellationRequested); + } + } + public class EventHubTestCollectorDispatch { private static string s_partitionKey = null; diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs index 406c6711dd12..be11955f9b0e 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs @@ -14,6 +14,7 @@ using Azure.Messaging.EventHubs.Tests; using Microsoft.Azure.WebJobs.EventHubs.Listeners; using Microsoft.Azure.WebJobs.EventHubs.Processor; +using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Scale; @@ -49,12 +50,12 @@ public async Task ProcessEvents_SingleDispatch_CheckpointsCorrectly(int batchChe var loggerMock = new Mock(); var executor = new Mock(MockBehavior.Strict); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true)); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default); for (int i = 0; i < 100; i++) { List events = new List() { new EventData(new byte[0]) }; - await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None); + await eventProcessor.ProcessEventsAsync(partitionContext, events); } try @@ -89,12 +90,12 @@ public async Task ProcessEvents_MultipleDispatch_CheckpointsCorrectly(int batchC var loggerMock = new Mock(); var executor = new Mock(MockBehavior.Strict); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true)); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default); for (int i = 0; i < 100; i++) { List events = new List() { new EventData(new byte[0]), new EventData(new byte[0]), new EventData(new byte[0]) }; - await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None); + await eventProcessor.ProcessEventsAsync(partitionContext, events); } try @@ -135,12 +136,12 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_CheckpointsCorrectly_N var executor = new Mock(MockBehavior.Strict); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true)); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default); for (int i = 0; i < 60; i++) { List events = new List() { new EventData("event1"), new EventData("event2"), new EventData("event3"), new EventData("event4"), new EventData("event5") }; - await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None); + await eventProcessor.ProcessEventsAsync(partitionContext, events); } try @@ -191,12 +192,12 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_CheckpointsCorrectly_R var executor = new Mock(MockBehavior.Strict); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true)); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default); for (int i = 0; i < 60; i++) { List events = new List() { new EventData("event1"), new EventData("event2"), new EventData("event3"), new EventData("event4"), new EventData("event5") }; - await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None); + await eventProcessor.ProcessEventsAsync(partitionContext, events); } try @@ -245,12 +246,12 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_CheckpointsCorrectly_O var executor = new Mock(MockBehavior.Strict); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true)); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default); for (int i = 0; i < 60; i++) { List events = new List() { new EventData("event1"), new EventData("event2"), new EventData("event3"), new EventData("event4"), new EventData("event5") }; - await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None); + await eventProcessor.ProcessEventsAsync(partitionContext, events); } try @@ -313,12 +314,12 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_BackgroundInvokesParti }) .ReturnsAsync(new FunctionResult(true)); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default); for (int i = 0; i < 60; i++) { List events = new List() { new EventData("event1"), new EventData("event2"), new EventData("event3"), new EventData("event4"), new EventData("event5") }; - await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None); + await eventProcessor.ProcessEventsAsync(partitionContext, events); } await completionSource.Task.TimeoutAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); @@ -370,9 +371,9 @@ public async Task ProcessEvents_Failure_Checkpoints() var loggerMock = new Mock(); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default); - await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None); + await eventProcessor.ProcessEventsAsync(partitionContext, events); processor.Verify( p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()), @@ -397,7 +398,7 @@ public async Task ProcessEvents_OwnershipLost_DoesNotCheckpoint() var loggerMock = new Mock(); var executor = new Mock(MockBehavior.Strict); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default); List events = new List(); List results = new List(); @@ -408,7 +409,6 @@ public async Task ProcessEvents_OwnershipLost_DoesNotCheckpoint() } int execution = 0; - var cts = new CancellationTokenSource(); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() => { @@ -420,7 +420,7 @@ public async Task ProcessEvents_OwnershipLost_DoesNotCheckpoint() return result; }); - await eventProcessor.ProcessEventsAsync(partitionContext, events, cts.Token); + await eventProcessor.ProcessEventsAsync(partitionContext, events); processor.Verify( p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()), @@ -428,7 +428,7 @@ public async Task ProcessEvents_OwnershipLost_DoesNotCheckpoint() } /// - /// If function execution succeeds when the function host is shutting down, we should NOT checkpoint. + /// If function execution succeeds when the function host is shutting down, we should checkpoint. /// [Test] public async Task ProcessEvents_Succeeds_ShuttingDown_DoesNotCheckpoint() @@ -452,7 +452,7 @@ public async Task ProcessEvents_Succeeds_ShuttingDown_DoesNotCheckpoint() var executor = new Mock(MockBehavior.Strict); int execution = 0; var loggerMock = new Mock(); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() => { @@ -464,18 +464,18 @@ public async Task ProcessEvents_Succeeds_ShuttingDown_DoesNotCheckpoint() return result; }); - await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None); + await eventProcessor.ProcessEventsAsync(partitionContext, events); processor.Verify( p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()), - Times.Never); + Times.Once); } /// - /// If function execution fails when the function host is shutting down, we should NOT checkpoint. + /// If function execution fails when the function host is shutting down, we should checkpoint. /// [Test] - public async Task ProcessEvents_Fails_ShuttingDown_DoesNotCheckpoint() + public async Task ProcessEvents_Fails_ShuttingDown_DoesCheckpoint() { var partitionContext = EventHubTests.GetPartitionContext(); var options = new EventHubOptions(); @@ -496,7 +496,7 @@ public async Task ProcessEvents_Fails_ShuttingDown_DoesNotCheckpoint() var executor = new Mock(MockBehavior.Strict); int execution = 0; var loggerMock = new Mock(); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default); executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() => { @@ -508,11 +508,11 @@ public async Task ProcessEvents_Fails_ShuttingDown_DoesNotCheckpoint() return result; }); - await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None); + await eventProcessor.ProcessEventsAsync(partitionContext, events); processor.Verify( p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()), - Times.Never); + Times.Once); } [Test] @@ -528,7 +528,7 @@ public async Task CloseAsync_Shutdown_DoesNotCheckpoint() var executor = new Mock(MockBehavior.Strict); var loggerMock = new Mock(); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default); await eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.Shutdown); @@ -550,7 +550,7 @@ public async Task Partition_OwnershipLost_DropsEvents() var executor = new Mock(MockBehavior.Strict); var loggerMock = new Mock(); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default); var mockStoredEvents = new Queue(); mockStoredEvents.Enqueue(new EventData("E1")); eventProcessor.CachedEventsManager.CachedEvents = mockStoredEvents; @@ -567,7 +567,7 @@ public async Task ProcessErrorsAsync_LoggedAsError() var options = new EventHubOptions(); var executor = new Mock(MockBehavior.Strict); var testLogger = new TestLogger("Test"); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, testLogger, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, testLogger, true, default); var ex = new InvalidOperationException("My InvalidOperationException!"); @@ -585,7 +585,7 @@ public async Task ProcessErrorsAsync_RebalancingExceptions_LoggedAsInformation() var options = new EventHubOptions(); var executor = new Mock(MockBehavior.Strict); var testLogger = new TestLogger("Test"); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, testLogger, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, testLogger, true, default); var disconnectedEx = new EventHubsException(true, "My ReceiverDisconnectedException!", EventHubsException.FailureReason.ConsumerDisconnected); @@ -630,7 +630,8 @@ public void GetMonitor_ReturnsExpectedValue() consumerClientMock.Object, Mock.Of(), new EventHubOptions(), - Mock.Of()); + Mock.Of(), + Mock.Of()); IScaleMonitor scaleMonitor = listener.GetMonitor(); @@ -667,13 +668,13 @@ public void Dispose_StopsTheProcessor() consumerClientMock.Object, Mock.Of(), new EventHubOptions(), - Mock.Of()); + Mock.Of(), + Mock.Of()); (listener as IListener).Dispose(); host.Verify(h => h.StopProcessingAsync(CancellationToken.None), Times.Once); - (listener as IListener).Cancel(); - host.Verify(h => h.StopProcessingAsync(CancellationToken.None), Times.Exactly(2)); + Assert.Throws(() => (listener as IListener).Cancel()); } [Test] @@ -696,7 +697,7 @@ public async Task ProcessEvents_CancellationToken_CancelsExecution() } }) .ReturnsAsync(new FunctionResult(true)); - var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true); + var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default); List events = new List() { new EventData(new byte[0]) }; CancellationTokenSource source = new CancellationTokenSource(); // Start another thread to cancel execution @@ -704,7 +705,7 @@ public async Task ProcessEvents_CancellationToken_CancelsExecution() { await Task.Delay(500); }); - await eventProcessor.ProcessEventsAsync(partitionContext, events, source.Token); + await eventProcessor.ProcessEventsAsync(partitionContext, events); } } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTriggerAttributeBindingProviderTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTriggerAttributeBindingProviderTests.cs index 32ddf5803c2d..8a4042a2b21a 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTriggerAttributeBindingProviderTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTriggerAttributeBindingProviderTests.cs @@ -13,6 +13,7 @@ using Microsoft.Azure.WebJobs.EventHubs.Listeners; using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.EventHubs.Tests; +using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Protocols; @@ -54,7 +55,7 @@ public EventHubTriggerAttributeBindingProviderTests() It.IsAny())).Returns(blobServiceClient.Object); var factory = ConfigurationUtilities.CreateFactory(configuration, options, componentFactory.Object); - _provider = new EventHubTriggerAttributeBindingProvider(convertManager.Object, Options.Create(options), NullLoggerFactory.Instance, factory); + _provider = new EventHubTriggerAttributeBindingProvider(convertManager.Object, Options.Create(options), NullLoggerFactory.Instance, factory, Mock.Of()); } [Test] diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs index 3d2c39baf16c..b2f85d8a5385 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs @@ -26,7 +26,7 @@ using Newtonsoft.Json.Linq; using NUnit.Framework; -namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests +namespace Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests { [NonParallelizable] [LiveOnly(true)]