From e526f9085d94f9e142a330e20f152caecf180ebb Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Mon, 7 Aug 2023 16:24:23 -0700
Subject: [PATCH 01/19] Event Hubs - Don't pass cancelled token to TryExecute
when draining
---
.../EventHubListener.PartitionProcessor.cs | 14 +-
.../src/Listeners/EventHubListener.cs | 17 ++-
.../src/Processor/EventProcessorHost.cs | 18 ++-
.../tests/EventHubEndToEndTests.cs | 133 +++++++++++++-----
.../tests/EventHubListenerTests.cs | 30 ++--
5 files changed, 150 insertions(+), 62 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
index 9af963451722..0efeac8a1334 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
@@ -44,13 +44,14 @@ internal class PartitionProcessor : IEventProcessor, IDisposable
private Task _cachedEventsBackgroundTask;
private CancellationTokenSource _cachedEventsBackgroundTaskCts;
private SemaphoreSlim _cachedEventsGuard;
+ private readonly CancellationToken _disposingToken;
///
/// When we have a minimum batch size greater than 1, this class manages caching events.
///
internal PartitionProcessorEventsManager CachedEventsManager { get; }
- public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch)
+ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, CancellationToken disposingToken)
{
_executor = executor;
_singleDispatch = singleDispatch;
@@ -58,7 +59,8 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
_logger = logger;
_firstFunctionInvocation = true;
_maxWaitTime = options.MaxWaitTime;
- _minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default
+ _minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default'
+ _disposingToken = disposingToken;
// Events are only cached when building a batch of minimum size.
if (_minimumBatchesEnabled)
@@ -135,7 +137,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
TriggerDetails = eventHubTriggerInput.GetTriggerDetails(context)
};
- await _executor.TryExecuteAsync(input, linkedCts.Token).ConfigureAwait(false);
+ await _executor.TryExecuteAsync(input, _disposingToken).ConfigureAwait(false);
_firstFunctionInvocation = false;
eventToCheckpoint = events[i];
}
@@ -168,7 +170,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
_logger.LogDebug($"Partition Processor received events and is attempting to invoke function ({details})");
UpdateCheckpointContext(triggerEvents, context);
- await TriggerExecute(triggerEvents, context, linkedCts.Token).ConfigureAwait(false);
+ await TriggerExecute(triggerEvents, context, _disposingToken).ConfigureAwait(false);
eventToCheckpoint = triggerEvents.Last();
// If there is a background timer task, cancel it and dispose of the cancellation token. If there
@@ -201,7 +203,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
else
{
UpdateCheckpointContext(events, context);
- await TriggerExecute(events, context, linkedCts.Token).ConfigureAwait(false);
+ await TriggerExecute(events, context, _disposingToken).ConfigureAwait(false);
eventToCheckpoint = events.LastOrDefault();
}
@@ -276,7 +278,7 @@ private async Task MonitorCachedEvents(DateTimeOffset? lastCheckpointTime, Cance
var details = GetOperationDetails(_mostRecentPartitionContext, "MaxWaitTimeElapsed");
_logger.LogDebug($"Partition Processor has waited MaxWaitTime since last invocation and is attempting to invoke function on all held events ({details})");
- await TriggerExecute(triggerEvents, _mostRecentPartitionContext, backgroundCancellationTokenSource.Token).ConfigureAwait(false);
+ await TriggerExecute(triggerEvents, _mostRecentPartitionContext, _disposingToken).ConfigureAwait(false);
if (!backgroundCancellationTokenSource.Token.IsCancellationRequested)
{
await CheckpointAsync(triggerEvents.Last(), _mostRecentPartitionContext).ConfigureAwait(false);
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
index b53176779c84..7729dba49a12 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
@@ -27,6 +27,7 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private string _details;
+ private CancellationTokenSource _disposingCancellationTokenSource;
public EventHubListener(
string functionId,
@@ -45,6 +46,7 @@ public EventHubListener(
_checkpointStore = checkpointStore;
_options = options;
_logger = _loggerFactory.CreateLogger();
+ _disposingCancellationTokenSource = new CancellationTokenSource();
EventHubMetricsProvider metricsProvider = new EventHubMetricsProvider(functionId, consumerClient, checkpointStore, _loggerFactory.CreateLogger());
@@ -68,19 +70,22 @@ public EventHubListener(
}
///
- /// Cancel any in progress listen operation.
+ /// Cancel should be called prior to Dispose. We just validate that we are not already disposed.
+ /// This is consistent with the Service Bus listener behavior.
///
void IListener.Cancel()
{
-#pragma warning disable AZC0102
- StopAsync(CancellationToken.None).GetAwaiter().GetResult();
-#pragma warning restore AZC0102
+ if (_disposingCancellationTokenSource.IsCancellationRequested)
+ {
+ throw new ObjectDisposedException(nameof(IListener));
+ }
}
void IDisposable.Dispose()
{
+ _disposingCancellationTokenSource.Cancel();
#pragma warning disable AZC0102
- StopAsync(CancellationToken.None).GetAwaiter().GetResult();
+ _eventProcessorHost.DisposeAsync().GetAwaiter().GetResult();
#pragma warning restore AZC0102
}
@@ -101,7 +106,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
IEventProcessor IEventProcessorFactory.CreatePartitionProcessor()
{
- return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger(), _singleDispatch);
+ return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger(), _singleDispatch, _disposingCancellationTokenSource.Token);
}
public IScaleMonitor GetMonitor()
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
index 0fee40d75aed..d31143598f98 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
@@ -21,6 +21,8 @@ internal class EventProcessorHost : EventProcessor
private BlobCheckpointStoreInternal _checkpointStore;
private ConcurrentDictionary _lastReadCheckpoint = new();
+ private EventHubConnection _connection;
+ private Func ConnectionFactory { get; }
///
/// Mocking constructor
@@ -36,6 +38,7 @@ public EventProcessorHost(string consumerGroup,
int eventBatchMaximumCount,
Action exceptionHandler) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options)
{
+ ConnectionFactory = () => new EventHubConnection(connectionString, eventHubName, options.ConnectionOptions);
_exceptionHandler = exceptionHandler;
}
@@ -47,6 +50,7 @@ public EventProcessorHost(string consumerGroup,
int eventBatchMaximumCount,
Action exceptionHandler) : base(eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
{
+ ConnectionFactory = () => new EventHubConnection(fullyQualifiedNamespace, eventHubName, credential, options.ConnectionOptions);
_exceptionHandler = exceptionHandler;
}
@@ -66,7 +70,8 @@ protected override async Task GetCheckpointAsync(strin
if (checkpoint is BlobCheckpointStoreInternal.BlobStorageCheckpoint blobCheckpoint && blobCheckpoint is not null)
{
- _lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1, blobCheckpoint.LastModified);
+ _lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1,
+ blobCheckpoint.LastModified);
}
return checkpoint;
@@ -136,6 +141,17 @@ protected override Task OnPartitionProcessingStoppedAsync(EventProcessorHostPart
return partition.EventProcessor.CloseAsync(partition, reason);
}
+ protected override EventHubConnection CreateConnection()
+ {
+ _connection = ConnectionFactory();
+ return _connection;
+ }
+
+ public async Task DisposeAsync()
+ {
+ await _connection.DisposeAsync().ConfigureAwait(false);
+ }
+
public async Task StartProcessingAsync(
IEventProcessorFactory processorFactory,
BlobCheckpointStoreInternal checkpointStore,
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
index 17590873e0c1..ea60621c29b3 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
@@ -31,7 +31,8 @@ public class EventHubEndToEndTests : WebJobsEventHubTestBase
{
private static readonly TimeSpan NoEventReadTimeout = TimeSpan.FromSeconds(5);
- private static EventWaitHandle _eventWait;
+ private static EventWaitHandle _eventWait1;
+ private static EventWaitHandle _eventWait2;
private static List _results;
private static DateTimeOffset _initialOffsetEnqueuedTimeUTC;
@@ -39,13 +40,14 @@ public class EventHubEndToEndTests : WebJobsEventHubTestBase
public void SetUp()
{
_results = new List();
- _eventWait = new ManualResetEvent(initialState: false);
+ _eventWait1 = new ManualResetEvent(initialState: false);
+ _eventWait2 = new ManualResetEvent(initialState: false);
}
[TearDown]
public void TearDown()
{
- _eventWait.Dispose();
+ _eventWait1.Dispose();
}
[Test]
@@ -56,8 +58,9 @@ public async Task EventHub_PocoBinding()
{
await jobHost.CallAsync(nameof(EventHubTestBindToPocoJobs.SendEvent_TestHub));
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage);
@@ -75,8 +78,9 @@ public async Task EventHub_StringBinding()
{
await jobHost.CallAsync(nameof(EventHubTestBindToStringJobs.SendEvent_TestHub), new { input = "data" });
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage);
CollectionAssert.Contains(logs, $"Input(data)");
@@ -94,13 +98,27 @@ public async Task EventHub_SingleDispatch()
{
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobs.SendEvent_TestHub), new { input = "data" });
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
AssertSingleDispatchLogs(host);
}
+ [Test]
+ public async Task EventHub_SingleDispatch_Dispose()
+ {
+ await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
+ await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
+ var (_, host) = BuildHost(ConfigureTestEventHub);
+
+ bool result = _eventWait1.WaitOne(Timeout);
+ Assert.True(result);
+ host.Dispose();
+ _eventWait2.Set();
+ }
+
[Test]
public async Task EventHub_SingleDispatch_ConsumerGroup()
{
@@ -120,8 +138,9 @@ public async Task EventHub_SingleDispatch_ConsumerGroup()
{
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchWithConsumerGroupJobs.SendEvent_TestHub));
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
}
@@ -133,7 +152,7 @@ public async Task EventHub_SingleDispatch_BinaryData()
{
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobsBinaryData.SendEvent_TestHub), new { input = "data" });
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
}
@@ -148,8 +167,9 @@ public async Task EventHub_ProducerClient()
{
await jobHost.CallAsync(nameof(EventHubTestClientDispatch.SendEvents));
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
}
@@ -161,8 +181,9 @@ public async Task EventHub_Collector()
{
await jobHost.CallAsync(nameof(EventHubTestCollectorDispatch.SendEvents));
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
}
@@ -174,8 +195,9 @@ public async Task EventHub_CollectorPartitionKey()
{
await jobHost.CallAsync(nameof(EventHubTestCollectorDispatch.SendEventsWithKey));
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
}
@@ -292,8 +314,9 @@ public async Task AssertCanSendReceiveMessage(Action hostConfigura
{
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobWithConnection.SendEvent_TestHub), new { input = "data" });
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
}
@@ -306,8 +329,9 @@ public async Task EventHub_MultipleDispatch()
int numEvents = 5;
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobs.SendEvents_TestHub), new { numEvents = numEvents, input = "data" });
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
AssertMultipleDispatchLogs(host);
@@ -322,8 +346,9 @@ public async Task EventHub_MultipleDispatch_BinaryData()
int numEvents = 5;
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobsBinaryData.SendEvents_TestHub), new { numEvents = numEvents, input = "data" });
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
AssertMultipleDispatchLogs(host);
@@ -357,13 +382,27 @@ public async Task EventHub_MultipleDispatch_MinBatchSize()
int numEvents = 5;
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchMinBatchSizeJobs.SendEvents_TestHub), new { numEvents = numEvents, input = "data" });
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
AssertMultipleDispatchLogsMinBatch(host);
}
+ [Test]
+ public async Task EventHub_MultipleDispatch_Dispose()
+ {
+ await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
+ await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
+ var (_, host) = BuildHost();
+
+ bool result = _eventWait1.WaitOne(Timeout);
+ Assert.True(result);
+ host.Dispose();
+ _eventWait2.Set();
+ }
+
private static void AssertMultipleDispatchLogsMinBatch(IHost host)
{
IEnumerable logMessages = host.GetTestLoggerProvider()
@@ -419,9 +458,10 @@ public async Task EventHub_PartitionKey()
using (jobHost)
{
await jobHost.CallAsync(nameof(EventHubPartitionKeyTestJobs.SendEvents_TestHub), new { input = "data" });
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
}
@@ -445,8 +485,9 @@ public async Task EventHub_InitialOffsetFromStart()
});
using (jobHost)
{
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
}
@@ -472,7 +513,7 @@ public async Task EventHub_InitialOffsetFromEnd()
using (jobHost)
{
// We don't expect to get signaled as there should be no messages received with a FromEnd initial offset
- bool result = _eventWait.WaitOne(NoEventReadTimeout);
+ bool result = _eventWait1.WaitOne(NoEventReadTimeout);
Assert.False(result, "An event was received while none were expected.");
// send events which should be received. To ensure that the test is
@@ -489,12 +530,13 @@ public async Task EventHub_InitialOffsetFromEnd()
}
});
- result = _eventWait.WaitOne(Timeout);
+ result = _eventWait1.WaitOne(Timeout);
cts.Cancel();
try { await sendTask; } catch { /* Ignore, we're not testing sends */ }
Assert.True(result);
+ await jobHost.StopAsync();
}
}
@@ -539,8 +581,9 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
});
using (jobHost)
{
- bool result = _eventWait.WaitOne(Timeout);
+ bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
}
@@ -574,7 +617,29 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
Assert.AreNotEqual(default(LastEnqueuedEventProperties), triggerPartitionContext.ReadLastEnqueuedEventProperties());
Assert.True(triggerPartitionContext.IsCheckpointingAfterInvocation);
- _eventWait.Set();
+ _eventWait1.Set();
+ }
+ }
+
+ public class EventHubTestSingleDispatchJobs_Dispose
+ {
+ public static void SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string evt, CancellationToken cancellationToken)
+ {
+ _eventWait1.Set();
+ bool result = _eventWait2.WaitOne(Timeout);
+ Assert.IsTrue(result);
+ Assert.IsTrue(cancellationToken.IsCancellationRequested);
+ }
+ }
+
+ public class EventHubTestMultipleDispatchJobs_Dispose
+ {
+ public static void SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string[] evt, CancellationToken cancellationToken)
+ {
+ _eventWait1.Set();
+ bool result = _eventWait2.WaitOne(Timeout);
+ Assert.IsTrue(result);
+ Assert.IsTrue(cancellationToken.IsCancellationRequested);
}
}
@@ -605,7 +670,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
Assert.AreEqual(eventData.PartitionKey, s_partitionKey);
}
- _eventWait.Set();
+ _eventWait1.Set();
}
}
@@ -622,7 +687,7 @@ await producer.SendAsync(new[]
public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = TestHubName)] EventData eventData)
{
Assert.AreEqual(eventData.EventBody.ToString(), "Event 1");
- _eventWait.Set();
+ _eventWait1.Set();
}
}
@@ -637,7 +702,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
{
Assert.AreEqual(evt, nameof(EventHubTestSingleDispatchWithConsumerGroupJobs));
- _eventWait.Set();
+ _eventWait1.Set();
}
}
@@ -653,7 +718,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
IDictionary systemProperties)
{
Assert.AreEqual("data", evt.ToString());
- _eventWait.Set();
+ _eventWait1.Set();
}
}
@@ -669,7 +734,7 @@ public static void BindToPoco([EventHubTrigger(TestHubName, Connection = TestHub
Assert.AreEqual(input.Value, "data");
Assert.AreEqual(input.Name, "foo");
logger.LogInformation($"PocoValues(foo,data)");
- _eventWait.Set();
+ _eventWait1.Set();
}
}
@@ -683,7 +748,7 @@ public static void SendEvent_TestHub(string input, [EventHub(TestHubName, Connec
public static void BindToString([EventHubTrigger(TestHubName, Connection = TestHubName)] string input, ILogger logger)
{
logger.LogInformation($"Input({input})");
- _eventWait.Set();
+ _eventWait1.Set();
}
}
@@ -729,7 +794,7 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHubName, Connectio
if (s_processedEventCount == s_eventCount)
{
_results.AddRange(events);
- _eventWait.Set();
+ _eventWait1.Set();
}
}
}
@@ -762,7 +827,7 @@ public static void ProcessMultipleEventsBinaryData([EventHubTrigger(TestHubName,
// filter for the ID the current test is using
if (s_processedEventCount == s_eventCount)
{
- _eventWait.Set();
+ _eventWait1.Set();
}
}
}
@@ -827,7 +892,7 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHubName, Connectio
if (s_processedEventCount >= s_eventCount)
{
- _eventWait.Set();
+ _eventWait1.Set();
}
}
}
@@ -875,7 +940,7 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName,
if (_results.Count > 0)
{
- _eventWait.Set();
+ _eventWait1.Set();
}
}
}
@@ -894,7 +959,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
Assert.AreEqual("value1", properties["TestProp1"]);
Assert.AreEqual("value2", properties["TestProp2"]);
- _eventWait.Set();
+ _eventWait1.Set();
}
}
public class TestPoco
@@ -909,7 +974,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties,
IDictionary systemProperties)
{
- _eventWait.Set();
+ _eventWait1.Set();
}
}
@@ -934,7 +999,7 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHubName, Connectio
{
Assert.GreaterOrEqual(DateTimeOffset.Parse(result), earliestAllowedOffset);
}
- _eventWait.Set();
+ _eventWait1.Set();
}
}
}
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
index 406c6711dd12..42bc8b20d1bf 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
@@ -49,7 +49,7 @@ public async Task ProcessEvents_SingleDispatch_CheckpointsCorrectly(int batchChe
var loggerMock = new Mock();
var executor = new Mock(MockBehavior.Strict);
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true));
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default);
for (int i = 0; i < 100; i++)
{
@@ -89,7 +89,7 @@ public async Task ProcessEvents_MultipleDispatch_CheckpointsCorrectly(int batchC
var loggerMock = new Mock();
var executor = new Mock(MockBehavior.Strict);
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true));
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default);
for (int i = 0; i < 100; i++)
{
@@ -135,7 +135,7 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_CheckpointsCorrectly_N
var executor = new Mock(MockBehavior.Strict);
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true));
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default);
for (int i = 0; i < 60; i++)
{
@@ -191,7 +191,7 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_CheckpointsCorrectly_R
var executor = new Mock(MockBehavior.Strict);
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true));
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default);
for (int i = 0; i < 60; i++)
{
@@ -245,7 +245,7 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_CheckpointsCorrectly_O
var executor = new Mock(MockBehavior.Strict);
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new FunctionResult(true));
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default);
for (int i = 0; i < 60; i++)
{
@@ -313,7 +313,7 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_BackgroundInvokesParti
})
.ReturnsAsync(new FunctionResult(true));
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, false, default);
for (int i = 0; i < 60; i++)
{
@@ -370,7 +370,7 @@ public async Task ProcessEvents_Failure_Checkpoints()
var loggerMock = new Mock();
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default);
await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
@@ -397,7 +397,7 @@ public async Task ProcessEvents_OwnershipLost_DoesNotCheckpoint()
var loggerMock = new Mock();
var executor = new Mock(MockBehavior.Strict);
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default);
List events = new List();
List results = new List();
@@ -452,7 +452,7 @@ public async Task ProcessEvents_Succeeds_ShuttingDown_DoesNotCheckpoint()
var executor = new Mock(MockBehavior.Strict);
int execution = 0;
var loggerMock = new Mock();
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default);
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() =>
{
@@ -496,7 +496,7 @@ public async Task ProcessEvents_Fails_ShuttingDown_DoesNotCheckpoint()
var executor = new Mock(MockBehavior.Strict);
int execution = 0;
var loggerMock = new Mock();
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default);
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() =>
{
@@ -528,7 +528,7 @@ public async Task CloseAsync_Shutdown_DoesNotCheckpoint()
var executor = new Mock(MockBehavior.Strict);
var loggerMock = new Mock();
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default);
await eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.Shutdown);
@@ -550,7 +550,7 @@ public async Task Partition_OwnershipLost_DropsEvents()
var executor = new Mock(MockBehavior.Strict);
var loggerMock = new Mock();
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default);
var mockStoredEvents = new Queue();
mockStoredEvents.Enqueue(new EventData("E1"));
eventProcessor.CachedEventsManager.CachedEvents = mockStoredEvents;
@@ -567,7 +567,7 @@ public async Task ProcessErrorsAsync_LoggedAsError()
var options = new EventHubOptions();
var executor = new Mock(MockBehavior.Strict);
var testLogger = new TestLogger("Test");
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, testLogger, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, testLogger, true, default);
var ex = new InvalidOperationException("My InvalidOperationException!");
@@ -585,7 +585,7 @@ public async Task ProcessErrorsAsync_RebalancingExceptions_LoggedAsInformation()
var options = new EventHubOptions();
var executor = new Mock(MockBehavior.Strict);
var testLogger = new TestLogger("Test");
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, testLogger, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, testLogger, true, default);
var disconnectedEx = new EventHubsException(true, "My ReceiverDisconnectedException!", EventHubsException.FailureReason.ConsumerDisconnected);
@@ -696,7 +696,7 @@ public async Task ProcessEvents_CancellationToken_CancelsExecution()
}
})
.ReturnsAsync(new FunctionResult(true));
- var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true);
+ var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default);
List events = new List() { new EventData(new byte[0]) };
CancellationTokenSource source = new CancellationTokenSource();
// Start another thread to cancel execution
From 7341cb428ebe949ae0f75694fe4fe66d0515c7ed Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Mon, 7 Aug 2023 16:29:06 -0700
Subject: [PATCH 02/19] test
---
.../tests/EventHubEndToEndTests.cs | 1 +
1 file changed, 1 insertion(+)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
index ea60621c29b3..e6fae49be0e4 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
@@ -154,6 +154,7 @@ public async Task EventHub_SingleDispatch_BinaryData()
bool result = _eventWait1.WaitOne(Timeout);
Assert.True(result);
+ await jobHost.StopAsync();
}
AssertSingleDispatchLogs(host);
From c0df431f12d9562fce972a8efa7a9c79d546ece8 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Mon, 7 Aug 2023 17:45:23 -0700
Subject: [PATCH 03/19] Fix nullref and add comment
---
.../src/Listeners/EventHubListener.cs | 5 +++++
.../src/Processor/EventProcessorHost.cs | 5 ++++-
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
index 7729dba49a12..d875097eee4d 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
@@ -84,9 +84,14 @@ void IListener.Cancel()
void IDisposable.Dispose()
{
_disposingCancellationTokenSource.Cancel();
+
#pragma warning disable AZC0102
_eventProcessorHost.DisposeAsync().GetAwaiter().GetResult();
#pragma warning restore AZC0102
+
+ // No need to dispose the _disposingCancellationTokenSource since we don't create it as a linked token and
+ // it won't use a timer, so the Dispose method is essentially a no-op. The downside to disposing it is that
+ // any customers who are trying to use it to cancel their own operations will get an ObjectDisposedException.
}
public async Task StartAsync(CancellationToken cancellationToken)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
index d31143598f98..c4e4f129ab2d 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
@@ -149,7 +149,10 @@ protected override EventHubConnection CreateConnection()
public async Task DisposeAsync()
{
- await _connection.DisposeAsync().ConfigureAwait(false);
+ if (_connection != null)
+ {
+ await _connection.DisposeAsync().ConfigureAwait(false);
+ }
}
public async Task StartProcessingAsync(
From 34e010cac639ae97b4d6e50ebcadedeb5d1f43ac Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Mon, 7 Aug 2023 17:48:51 -0700
Subject: [PATCH 04/19] Update
sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
---
.../src/Listeners/EventHubListener.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
index d875097eee4d..a7055c1b8df5 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
@@ -91,7 +91,7 @@ void IDisposable.Dispose()
// No need to dispose the _disposingCancellationTokenSource since we don't create it as a linked token and
// it won't use a timer, so the Dispose method is essentially a no-op. The downside to disposing it is that
- // any customers who are trying to use it to cancel their own operations will get an ObjectDisposedException.
+ // any customers who are trying to use it to cancel their own operations would get an ObjectDisposedException.
}
public async Task StartAsync(CancellationToken cancellationToken)
From 1260e353985dfeafd238561bc8e2fbc9252c0818 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Mon, 7 Aug 2023 18:41:12 -0700
Subject: [PATCH 05/19] Fix test
---
.../src/Processor/EventProcessorHost.cs | 2 +-
.../tests/EventHubListenerTests.cs | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
index c4e4f129ab2d..4ae2fc50acbc 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
@@ -147,7 +147,7 @@ protected override EventHubConnection CreateConnection()
return _connection;
}
- public async Task DisposeAsync()
+ public virtual async Task DisposeAsync()
{
if (_connection != null)
{
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
index 42bc8b20d1bf..c94d16637039 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
@@ -643,7 +643,7 @@ public void GetMonitor_ReturnsExpectedValue()
}
[Test]
- public void Dispose_StopsTheProcessor()
+ public void Dispose_ClosesTheConnection()
{
var functionId = "FunctionId";
var eventHubName = "EventHubName";
@@ -670,10 +670,10 @@ public void Dispose_StopsTheProcessor()
Mock.Of());
(listener as IListener).Dispose();
- host.Verify(h => h.StopProcessingAsync(CancellationToken.None), Times.Once);
+ host.Verify(h => h.StopProcessingAsync(CancellationToken.None), Times.Never);
+ host.Verify(h => h.DisposeAsync(), Times.Once);
- (listener as IListener).Cancel();
- host.Verify(h => h.StopProcessingAsync(CancellationToken.None), Times.Exactly(2));
+ Assert.Throws(() => (listener as IListener).Cancel());
}
[Test]
From 6c130f543e98a731a2f70d0c1d6b1f800e39234d Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Tue, 8 Aug 2023 12:57:46 -0700
Subject: [PATCH 06/19] PR fb
---
.../src/Listeners/EventHubListener.cs | 2 +-
.../src/Processor/EventProcessorHost.cs | 18 ----
.../tests/EventHubEndToEndTests.cs | 92 +++++++++----------
.../tests/EventHubListenerTests.cs | 5 +-
4 files changed, 47 insertions(+), 70 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
index a7055c1b8df5..90f2ae1c7df6 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
@@ -86,7 +86,7 @@ void IDisposable.Dispose()
_disposingCancellationTokenSource.Cancel();
#pragma warning disable AZC0102
- _eventProcessorHost.DisposeAsync().GetAwaiter().GetResult();
+ _eventProcessorHost.StopProcessingAsync().GetAwaiter().GetResult();
#pragma warning restore AZC0102
// No need to dispose the _disposingCancellationTokenSource since we don't create it as a linked token and
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
index 4ae2fc50acbc..3da4f8e57292 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
@@ -21,8 +21,6 @@ internal class EventProcessorHost : EventProcessor
private BlobCheckpointStoreInternal _checkpointStore;
private ConcurrentDictionary _lastReadCheckpoint = new();
- private EventHubConnection _connection;
- private Func ConnectionFactory { get; }
///
/// Mocking constructor
@@ -38,7 +36,6 @@ public EventProcessorHost(string consumerGroup,
int eventBatchMaximumCount,
Action exceptionHandler) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options)
{
- ConnectionFactory = () => new EventHubConnection(connectionString, eventHubName, options.ConnectionOptions);
_exceptionHandler = exceptionHandler;
}
@@ -50,7 +47,6 @@ public EventProcessorHost(string consumerGroup,
int eventBatchMaximumCount,
Action exceptionHandler) : base(eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
{
- ConnectionFactory = () => new EventHubConnection(fullyQualifiedNamespace, eventHubName, credential, options.ConnectionOptions);
_exceptionHandler = exceptionHandler;
}
@@ -141,20 +137,6 @@ protected override Task OnPartitionProcessingStoppedAsync(EventProcessorHostPart
return partition.EventProcessor.CloseAsync(partition, reason);
}
- protected override EventHubConnection CreateConnection()
- {
- _connection = ConnectionFactory();
- return _connection;
- }
-
- public virtual async Task DisposeAsync()
- {
- if (_connection != null)
- {
- await _connection.DisposeAsync().ConfigureAwait(false);
- }
- }
-
public async Task StartProcessingAsync(
IEventProcessorFactory processorFactory,
BlobCheckpointStoreInternal checkpointStore,
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
index e6fae49be0e4..adc8d94e6883 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
@@ -31,8 +31,7 @@ public class EventHubEndToEndTests : WebJobsEventHubTestBase
{
private static readonly TimeSpan NoEventReadTimeout = TimeSpan.FromSeconds(5);
- private static EventWaitHandle _eventWait1;
- private static EventWaitHandle _eventWait2;
+ private static EventWaitHandle _eventWait;
private static List _results;
private static DateTimeOffset _initialOffsetEnqueuedTimeUTC;
@@ -40,14 +39,13 @@ public class EventHubEndToEndTests : WebJobsEventHubTestBase
public void SetUp()
{
_results = new List();
- _eventWait1 = new ManualResetEvent(initialState: false);
- _eventWait2 = new ManualResetEvent(initialState: false);
+ _eventWait = new ManualResetEvent(initialState: false);
}
[TearDown]
public void TearDown()
{
- _eventWait1.Dispose();
+ _eventWait.Dispose();
}
[Test]
@@ -58,7 +56,7 @@ public async Task EventHub_PocoBinding()
{
await jobHost.CallAsync(nameof(EventHubTestBindToPocoJobs.SendEvent_TestHub));
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -78,7 +76,7 @@ public async Task EventHub_StringBinding()
{
await jobHost.CallAsync(nameof(EventHubTestBindToStringJobs.SendEvent_TestHub), new { input = "data" });
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
@@ -98,7 +96,7 @@ public async Task EventHub_SingleDispatch()
{
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobs.SendEvent_TestHub), new { input = "data" });
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -113,10 +111,9 @@ public async Task EventHub_SingleDispatch_Dispose()
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
var (_, host) = BuildHost(ConfigureTestEventHub);
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
host.Dispose();
- _eventWait2.Set();
}
[Test]
@@ -138,7 +135,7 @@ public async Task EventHub_SingleDispatch_ConsumerGroup()
{
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchWithConsumerGroupJobs.SendEvent_TestHub));
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -152,7 +149,7 @@ public async Task EventHub_SingleDispatch_BinaryData()
{
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobsBinaryData.SendEvent_TestHub), new { input = "data" });
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -168,7 +165,7 @@ public async Task EventHub_ProducerClient()
{
await jobHost.CallAsync(nameof(EventHubTestClientDispatch.SendEvents));
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -182,7 +179,7 @@ public async Task EventHub_Collector()
{
await jobHost.CallAsync(nameof(EventHubTestCollectorDispatch.SendEvents));
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -196,7 +193,7 @@ public async Task EventHub_CollectorPartitionKey()
{
await jobHost.CallAsync(nameof(EventHubTestCollectorDispatch.SendEventsWithKey));
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -315,7 +312,7 @@ public async Task AssertCanSendReceiveMessage(Action hostConfigura
{
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobWithConnection.SendEvent_TestHub), new { input = "data" });
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -330,7 +327,7 @@ public async Task EventHub_MultipleDispatch()
int numEvents = 5;
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobs.SendEvents_TestHub), new { numEvents = numEvents, input = "data" });
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -347,7 +344,7 @@ public async Task EventHub_MultipleDispatch_BinaryData()
int numEvents = 5;
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobsBinaryData.SendEvents_TestHub), new { numEvents = numEvents, input = "data" });
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -383,7 +380,7 @@ public async Task EventHub_MultipleDispatch_MinBatchSize()
int numEvents = 5;
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchMinBatchSizeJobs.SendEvents_TestHub), new { numEvents = numEvents, input = "data" });
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -398,10 +395,9 @@ public async Task EventHub_MultipleDispatch_Dispose()
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
var (_, host) = BuildHost();
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
host.Dispose();
- _eventWait2.Set();
}
private static void AssertMultipleDispatchLogsMinBatch(IHost host)
@@ -459,7 +455,7 @@ public async Task EventHub_PartitionKey()
using (jobHost)
{
await jobHost.CallAsync(nameof(EventHubPartitionKeyTestJobs.SendEvents_TestHub), new { input = "data" });
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
@@ -486,7 +482,7 @@ public async Task EventHub_InitialOffsetFromStart()
});
using (jobHost)
{
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -514,7 +510,7 @@ public async Task EventHub_InitialOffsetFromEnd()
using (jobHost)
{
// We don't expect to get signaled as there should be no messages received with a FromEnd initial offset
- bool result = _eventWait1.WaitOne(NoEventReadTimeout);
+ bool result = _eventWait.WaitOne(NoEventReadTimeout);
Assert.False(result, "An event was received while none were expected.");
// send events which should be received. To ensure that the test is
@@ -531,7 +527,7 @@ public async Task EventHub_InitialOffsetFromEnd()
}
});
- result = _eventWait1.WaitOne(Timeout);
+ result = _eventWait.WaitOne(Timeout);
cts.Cancel();
try { await sendTask; } catch { /* Ignore, we're not testing sends */ }
@@ -582,7 +578,7 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
});
using (jobHost)
{
- bool result = _eventWait1.WaitOne(Timeout);
+ bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
@@ -618,28 +614,28 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
Assert.AreNotEqual(default(LastEnqueuedEventProperties), triggerPartitionContext.ReadLastEnqueuedEventProperties());
Assert.True(triggerPartitionContext.IsCheckpointingAfterInvocation);
- _eventWait1.Set();
+ _eventWait.Set();
}
}
public class EventHubTestSingleDispatchJobs_Dispose
{
- public static void SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string evt, CancellationToken cancellationToken)
+ public static async Task SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string evt, CancellationToken cancellationToken)
{
- _eventWait1.Set();
- bool result = _eventWait2.WaitOne(Timeout);
- Assert.IsTrue(result);
+ _eventWait.Set();
+ // wait a small amount of time for the host to call dispose
+ await Task.Delay(2000, CancellationToken.None);
Assert.IsTrue(cancellationToken.IsCancellationRequested);
}
}
public class EventHubTestMultipleDispatchJobs_Dispose
{
- public static void SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string[] evt, CancellationToken cancellationToken)
+ public static async Task SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string[] evt, CancellationToken cancellationToken)
{
- _eventWait1.Set();
- bool result = _eventWait2.WaitOne(Timeout);
- Assert.IsTrue(result);
+ _eventWait.Set();
+ // wait a small amount of time for the host to call dispose
+ await Task.Delay(2000, CancellationToken.None);
Assert.IsTrue(cancellationToken.IsCancellationRequested);
}
}
@@ -671,7 +667,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
Assert.AreEqual(eventData.PartitionKey, s_partitionKey);
}
- _eventWait1.Set();
+ _eventWait.Set();
}
}
@@ -688,7 +684,7 @@ await producer.SendAsync(new[]
public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = TestHubName)] EventData eventData)
{
Assert.AreEqual(eventData.EventBody.ToString(), "Event 1");
- _eventWait1.Set();
+ _eventWait.Set();
}
}
@@ -703,7 +699,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
{
Assert.AreEqual(evt, nameof(EventHubTestSingleDispatchWithConsumerGroupJobs));
- _eventWait1.Set();
+ _eventWait.Set();
}
}
@@ -719,7 +715,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
IDictionary systemProperties)
{
Assert.AreEqual("data", evt.ToString());
- _eventWait1.Set();
+ _eventWait.Set();
}
}
@@ -735,7 +731,7 @@ public static void BindToPoco([EventHubTrigger(TestHubName, Connection = TestHub
Assert.AreEqual(input.Value, "data");
Assert.AreEqual(input.Name, "foo");
logger.LogInformation($"PocoValues(foo,data)");
- _eventWait1.Set();
+ _eventWait.Set();
}
}
@@ -749,7 +745,7 @@ public static void SendEvent_TestHub(string input, [EventHub(TestHubName, Connec
public static void BindToString([EventHubTrigger(TestHubName, Connection = TestHubName)] string input, ILogger logger)
{
logger.LogInformation($"Input({input})");
- _eventWait1.Set();
+ _eventWait.Set();
}
}
@@ -795,7 +791,7 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHubName, Connectio
if (s_processedEventCount == s_eventCount)
{
_results.AddRange(events);
- _eventWait1.Set();
+ _eventWait.Set();
}
}
}
@@ -828,7 +824,7 @@ public static void ProcessMultipleEventsBinaryData([EventHubTrigger(TestHubName,
// filter for the ID the current test is using
if (s_processedEventCount == s_eventCount)
{
- _eventWait1.Set();
+ _eventWait.Set();
}
}
}
@@ -893,7 +889,7 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHubName, Connectio
if (s_processedEventCount >= s_eventCount)
{
- _eventWait1.Set();
+ _eventWait.Set();
}
}
}
@@ -941,7 +937,7 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName,
if (_results.Count > 0)
{
- _eventWait1.Set();
+ _eventWait.Set();
}
}
}
@@ -960,7 +956,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
Assert.AreEqual("value1", properties["TestProp1"]);
Assert.AreEqual("value2", properties["TestProp2"]);
- _eventWait1.Set();
+ _eventWait.Set();
}
}
public class TestPoco
@@ -975,7 +971,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties,
IDictionary systemProperties)
{
- _eventWait1.Set();
+ _eventWait.Set();
}
}
@@ -1000,7 +996,7 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHubName, Connectio
{
Assert.GreaterOrEqual(DateTimeOffset.Parse(result), earliestAllowedOffset);
}
- _eventWait1.Set();
+ _eventWait.Set();
}
}
}
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
index c94d16637039..3b7c88589a8f 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
@@ -643,7 +643,7 @@ public void GetMonitor_ReturnsExpectedValue()
}
[Test]
- public void Dispose_ClosesTheConnection()
+ public void Dispose_StopsTheProcessor()
{
var functionId = "FunctionId";
var eventHubName = "EventHubName";
@@ -670,8 +670,7 @@ public void Dispose_ClosesTheConnection()
Mock.Of());
(listener as IListener).Dispose();
- host.Verify(h => h.StopProcessingAsync(CancellationToken.None), Times.Never);
- host.Verify(h => h.DisposeAsync(), Times.Once);
+ host.Verify(h => h.StopProcessingAsync(CancellationToken.None), Times.Once);
Assert.Throws(() => (listener as IListener).Cancel());
}
From 4f9295c384cb30e7d03f89b05810c1f70bdff605 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Tue, 8 Aug 2023 13:01:00 -0700
Subject: [PATCH 07/19] fix
---
.../src/Listeners/EventHubListener.PartitionProcessor.cs | 2 +-
.../src/Listeners/EventHubListener.cs | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
index 0efeac8a1334..20657c8cbe5b 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
@@ -59,7 +59,7 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
_logger = logger;
_firstFunctionInvocation = true;
_maxWaitTime = options.MaxWaitTime;
- _minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default'
+ _minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default
_disposingToken = disposingToken;
// Events are only cached when building a batch of minimum size.
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
index 90f2ae1c7df6..dbf9237e0619 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
@@ -86,7 +86,7 @@ void IDisposable.Dispose()
_disposingCancellationTokenSource.Cancel();
#pragma warning disable AZC0102
- _eventProcessorHost.StopProcessingAsync().GetAwaiter().GetResult();
+ StopAsync(CancellationToken.None).GetAwaiter().GetResult();
#pragma warning restore AZC0102
// No need to dispose the _disposingCancellationTokenSource since we don't create it as a linked token and
From e5fd097a8762b2bcc859840b2bd0ac23631e0adf Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 10:38:20 -0700
Subject: [PATCH 08/19] Respect drain mode
---
.../Config/EventHubExtensionConfigProvider.cs | 13 ++++++++--
.../src/Listeners/EventHubListener.cs | 21 +++++++++++-----
...EventHubTriggerAttributeBindingProvider.cs | 9 +++++--
.../tests/EventHubEndToEndTests.cs | 24 +++++++++++++++++++
.../tests/EventHubListenerTests.cs | 7 ++++--
...HubTriggerAttributeBindingProviderTests.cs | 3 ++-
6 files changed, 64 insertions(+), 13 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs
index 2af642f46a81..417f967c3aaa 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs
@@ -9,6 +9,7 @@
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
+using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Configuration;
@@ -28,19 +29,22 @@ internal class EventHubExtensionConfigProvider : IExtensionConfigProvider
private readonly IConverterManager _converterManager;
private readonly IWebJobsExtensionConfiguration _configuration;
private readonly EventHubClientFactory _clientFactory;
+ private readonly IDrainModeManager _drainModeManager;
public EventHubExtensionConfigProvider(
IOptions options,
ILoggerFactory loggerFactory,
IConverterManager converterManager,
IWebJobsExtensionConfiguration configuration,
- EventHubClientFactory clientFactory)
+ EventHubClientFactory clientFactory,
+ IDrainModeManager drainModeManager)
{
_options = options;
_loggerFactory = loggerFactory;
_converterManager = converterManager;
_configuration = configuration;
_clientFactory = clientFactory;
+ _drainModeManager = drainModeManager;
}
internal Action ExceptionHandler { get; set; }
@@ -71,7 +75,12 @@ public void Initialize(ExtensionConfigContext context)
.AddOpenConverter(ConvertPocoToEventData);
// register our trigger binding provider
- var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(_converterManager, _options, _loggerFactory, _clientFactory);
+ var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(
+ _converterManager,
+ _options,
+ _loggerFactory,
+ _clientFactory,
+ _drainModeManager);
context.AddBindingRule()
.BindToTrigger(triggerBindingProvider);
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
index dbf9237e0619..490cc0c4a5c7 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
@@ -7,6 +7,7 @@
using Azure.Messaging.EventHubs.Primitives;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Extensions.EventHubs.Listeners;
+using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -27,7 +28,8 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private string _details;
- private CancellationTokenSource _disposingCancellationTokenSource;
+ private CancellationTokenSource _functionExecutionCancellationTokenSource;
+ private readonly IDrainModeManager _drainModeManager;
public EventHubListener(
string functionId,
@@ -37,7 +39,8 @@ public EventHubListener(
IEventHubConsumerClient consumerClient,
BlobCheckpointStoreInternal checkpointStore,
EventHubOptions options,
- ILoggerFactory loggerFactory)
+ ILoggerFactory loggerFactory,
+ IDrainModeManager drainModeManager)
{
_loggerFactory = loggerFactory;
_executor = executor;
@@ -46,7 +49,8 @@ public EventHubListener(
_checkpointStore = checkpointStore;
_options = options;
_logger = _loggerFactory.CreateLogger();
- _disposingCancellationTokenSource = new CancellationTokenSource();
+ _functionExecutionCancellationTokenSource = new CancellationTokenSource();
+ _drainModeManager = drainModeManager;
EventHubMetricsProvider metricsProvider = new EventHubMetricsProvider(functionId, consumerClient, checkpointStore, _loggerFactory.CreateLogger());
@@ -75,7 +79,7 @@ public EventHubListener(
///
void IListener.Cancel()
{
- if (_disposingCancellationTokenSource.IsCancellationRequested)
+ if (_functionExecutionCancellationTokenSource.IsCancellationRequested)
{
throw new ObjectDisposedException(nameof(IListener));
}
@@ -83,7 +87,7 @@ void IListener.Cancel()
void IDisposable.Dispose()
{
- _disposingCancellationTokenSource.Cancel();
+ _functionExecutionCancellationTokenSource.Cancel();
#pragma warning disable AZC0102
StopAsync(CancellationToken.None).GetAwaiter().GetResult();
@@ -104,6 +108,11 @@ public async Task StartAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
+ if (!_drainModeManager.IsDrainModeEnabled)
+ {
+ _functionExecutionCancellationTokenSource.Cancel();
+ }
+
await _eventProcessorHost.StopProcessingAsync(cancellationToken).ConfigureAwait(false);
_logger.LogDebug($"EventHub listener stopped ({_details})");
@@ -111,7 +120,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
IEventProcessor IEventProcessorFactory.CreatePartitionProcessor()
{
- return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger(), _singleDispatch, _disposingCancellationTokenSource.Token);
+ return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger(), _singleDispatch, _functionExecutionCancellationTokenSource.Token);
}
public IScaleMonitor GetMonitor()
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs
index d3c89b55bc49..7967eff8fbce 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs
@@ -7,6 +7,7 @@
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Primitives;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
+using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Triggers;
@@ -21,17 +22,20 @@ internal class EventHubTriggerAttributeBindingProvider : ITriggerBindingProvider
private readonly IOptions _options;
private readonly EventHubClientFactory _clientFactory;
private readonly IConverterManager _converterManager;
+ private readonly IDrainModeManager _drainModeManager;
public EventHubTriggerAttributeBindingProvider(
IConverterManager converterManager,
IOptions options,
ILoggerFactory loggerFactory,
- EventHubClientFactory clientFactory)
+ EventHubClientFactory clientFactory,
+ IDrainModeManager drainModeManager)
{
_converterManager = converterManager;
_options = options;
_clientFactory = clientFactory;
_loggerFactory = loggerFactory;
+ _drainModeManager = drainModeManager;
}
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
@@ -67,7 +71,8 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex
_clientFactory.GetEventHubConsumerClient(attribute.EventHubName, attribute.Connection, attribute.ConsumerGroup),
checkpointStore,
options,
- _loggerFactory);
+ _loggerFactory,
+ _drainModeManager);
return Task.FromResult(listener);
};
#pragma warning disable 618
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
index adc8d94e6883..51fec383f60c 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
@@ -116,6 +116,18 @@ public async Task EventHub_SingleDispatch_Dispose()
host.Dispose();
}
+ [Test]
+ public async Task EventHub_SingleDispatch_StopWithoutDrain()
+ {
+ await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
+ await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
+ var (_, host) = BuildHost(ConfigureTestEventHub);
+
+ bool result = _eventWait.WaitOne(Timeout);
+ Assert.True(result);
+ await host.StopAsync();
+ }
+
[Test]
public async Task EventHub_SingleDispatch_ConsumerGroup()
{
@@ -400,6 +412,18 @@ public async Task EventHub_MultipleDispatch_Dispose()
host.Dispose();
}
+ [Test]
+ public async Task EventHub_MultipleDispatch_StopWithoutDrain()
+ {
+ await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
+ await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
+ var (_, host) = BuildHost();
+
+ bool result = _eventWait.WaitOne(Timeout);
+ Assert.True(result);
+ await host.StopAsync();
+ }
+
private static void AssertMultipleDispatchLogsMinBatch(IHost host)
{
IEnumerable logMessages = host.GetTestLoggerProvider()
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
index 3b7c88589a8f..b038d3e342b2 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
@@ -14,6 +14,7 @@
using Azure.Messaging.EventHubs.Tests;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
+using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -630,7 +631,8 @@ public void GetMonitor_ReturnsExpectedValue()
consumerClientMock.Object,
Mock.Of(),
new EventHubOptions(),
- Mock.Of());
+ Mock.Of(),
+ Mock.Of());
IScaleMonitor scaleMonitor = listener.GetMonitor();
@@ -667,7 +669,8 @@ public void Dispose_StopsTheProcessor()
consumerClientMock.Object,
Mock.Of(),
new EventHubOptions(),
- Mock.Of());
+ Mock.Of(),
+ Mock.Of());
(listener as IListener).Dispose();
host.Verify(h => h.StopProcessingAsync(CancellationToken.None), Times.Once);
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTriggerAttributeBindingProviderTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTriggerAttributeBindingProviderTests.cs
index 32ddf5803c2d..8a4042a2b21a 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTriggerAttributeBindingProviderTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTriggerAttributeBindingProviderTests.cs
@@ -13,6 +13,7 @@
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Azure.WebJobs.EventHubs.Tests;
+using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
@@ -54,7 +55,7 @@ public EventHubTriggerAttributeBindingProviderTests()
It.IsAny())).Returns(blobServiceClient.Object);
var factory = ConfigurationUtilities.CreateFactory(configuration, options, componentFactory.Object);
- _provider = new EventHubTriggerAttributeBindingProvider(convertManager.Object, Options.Create(options), NullLoggerFactory.Instance, factory);
+ _provider = new EventHubTriggerAttributeBindingProvider(convertManager.Object, Options.Create(options), NullLoggerFactory.Instance, factory, Mock.Of());
}
[Test]
From ca92580b52172490bb95114226e0b1ed74d1c018 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 10:40:15 -0700
Subject: [PATCH 09/19] rename
---
.../EventHubListener.PartitionProcessor.cs | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
index 20657c8cbe5b..18a00c699a16 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
@@ -44,14 +44,14 @@ internal class PartitionProcessor : IEventProcessor, IDisposable
private Task _cachedEventsBackgroundTask;
private CancellationTokenSource _cachedEventsBackgroundTaskCts;
private SemaphoreSlim _cachedEventsGuard;
- private readonly CancellationToken _disposingToken;
+ private readonly CancellationToken _functionExecutionToken;
///
/// When we have a minimum batch size greater than 1, this class manages caching events.
///
internal PartitionProcessorEventsManager CachedEventsManager { get; }
- public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, CancellationToken disposingToken)
+ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, CancellationToken functionExecutionToken)
{
_executor = executor;
_singleDispatch = singleDispatch;
@@ -60,7 +60,7 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
_firstFunctionInvocation = true;
_maxWaitTime = options.MaxWaitTime;
_minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default
- _disposingToken = disposingToken;
+ _functionExecutionToken = functionExecutionToken;
// Events are only cached when building a batch of minimum size.
if (_minimumBatchesEnabled)
@@ -137,7 +137,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
TriggerDetails = eventHubTriggerInput.GetTriggerDetails(context)
};
- await _executor.TryExecuteAsync(input, _disposingToken).ConfigureAwait(false);
+ await _executor.TryExecuteAsync(input, _functionExecutionToken).ConfigureAwait(false);
_firstFunctionInvocation = false;
eventToCheckpoint = events[i];
}
@@ -170,7 +170,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
_logger.LogDebug($"Partition Processor received events and is attempting to invoke function ({details})");
UpdateCheckpointContext(triggerEvents, context);
- await TriggerExecute(triggerEvents, context, _disposingToken).ConfigureAwait(false);
+ await TriggerExecute(triggerEvents, context, _functionExecutionToken).ConfigureAwait(false);
eventToCheckpoint = triggerEvents.Last();
// If there is a background timer task, cancel it and dispose of the cancellation token. If there
@@ -203,7 +203,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
else
{
UpdateCheckpointContext(events, context);
- await TriggerExecute(events, context, _disposingToken).ConfigureAwait(false);
+ await TriggerExecute(events, context, _functionExecutionToken).ConfigureAwait(false);
eventToCheckpoint = events.LastOrDefault();
}
@@ -278,7 +278,7 @@ private async Task MonitorCachedEvents(DateTimeOffset? lastCheckpointTime, Cance
var details = GetOperationDetails(_mostRecentPartitionContext, "MaxWaitTimeElapsed");
_logger.LogDebug($"Partition Processor has waited MaxWaitTime since last invocation and is attempting to invoke function on all held events ({details})");
- await TriggerExecute(triggerEvents, _mostRecentPartitionContext, _disposingToken).ConfigureAwait(false);
+ await TriggerExecute(triggerEvents, _mostRecentPartitionContext, _functionExecutionToken).ConfigureAwait(false);
if (!backgroundCancellationTokenSource.Token.IsCancellationRequested)
{
await CheckpointAsync(triggerEvents.Last(), _mostRecentPartitionContext).ConfigureAwait(false);
From 4943356ec1ba60456b929e2cab866c35d7caba8d Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 10:45:59 -0700
Subject: [PATCH 10/19] Fix disposed check
---
.../src/Listeners/EventHubListener.cs | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
index 490cc0c4a5c7..817be67056b0 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
@@ -30,6 +30,7 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
private string _details;
private CancellationTokenSource _functionExecutionCancellationTokenSource;
private readonly IDrainModeManager _drainModeManager;
+ private volatile bool _disposed;
public EventHubListener(
string functionId,
@@ -79,7 +80,7 @@ public EventHubListener(
///
void IListener.Cancel()
{
- if (_functionExecutionCancellationTokenSource.IsCancellationRequested)
+ if (_disposed)
{
throw new ObjectDisposedException(nameof(IListener));
}
@@ -96,6 +97,7 @@ void IDisposable.Dispose()
// No need to dispose the _disposingCancellationTokenSource since we don't create it as a linked token and
// it won't use a timer, so the Dispose method is essentially a no-op. The downside to disposing it is that
// any customers who are trying to use it to cancel their own operations would get an ObjectDisposedException.
+ _disposed = true;
}
public async Task StartAsync(CancellationToken cancellationToken)
From d56621f0adf691d81f6d18729f83ffe7a49abb66 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 12:58:04 -0700
Subject: [PATCH 11/19] Fix token behavior
---
.../EventHubListener.PartitionProcessor.cs | 14 +++++-------
.../tests/EventHubListenerTests.cs | 22 +++++--------------
2 files changed, 11 insertions(+), 25 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
index 18a00c699a16..1757ad1213b2 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
@@ -28,8 +28,6 @@ internal sealed partial class EventHubListener
///
internal class PartitionProcessor : IEventProcessor, IDisposable
{
- private readonly CancellationTokenSource _cts = new();
-
private readonly ITriggeredFunctionExecutor _executor;
private readonly bool _singleDispatch;
private readonly ILogger _logger;
@@ -72,8 +70,7 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason)
{
- // signal cancellation for any in progress executions and clear the cached events
- _cts.Cancel();
+ // clear the cached events
CachedEventsManager?.ClearEventCache();
_logger.LogDebug(GetOperationDetails(context, $"CloseAsync, {reason}"));
@@ -104,7 +101,7 @@ public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception err
///
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages, CancellationToken partitionProcessingCancellationToken)
{
- using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, partitionProcessingCancellationToken);
+ using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, partitionProcessingCancellationToken);
_mostRecentPartitionContext = context;
var events = messages.ToArray();
EventData eventToCheckpoint = null;
@@ -151,10 +148,10 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
try
{
// Try to acquire the semaphore. This protects the cached events.
- if (!_cachedEventsGuard.Wait(0, linkedCts.Token))
+ if (!_cachedEventsGuard.Wait(0, _functionExecutionToken))
{
// This will throw if the cancellation token is canceled.
- await _cachedEventsGuard.WaitAsync(linkedCts.Token).ConfigureAwait(false);
+ await _cachedEventsGuard.WaitAsync(_functionExecutionToken).ConfigureAwait(false);
}
acquiredSemaphore = true;
@@ -188,7 +185,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
if (_cachedEventsBackgroundTaskCts == null && CachedEventsManager.HasCachedEvents)
{
// If there are events waiting to be processed, and no background task running, start a monitoring cycle.
- _cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
+ _cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(linkedCts.Token);
_cachedEventsBackgroundTask = MonitorCachedEvents(context.ProcessorHost.GetLastReadCheckpoint(context.PartitionId)?.LastModified, _cachedEventsBackgroundTaskCts);
}
}
@@ -410,7 +407,6 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
- _cts.Dispose();
_cachedEventsBackgroundTaskCts?.Dispose();
_cachedEventsGuard?.Dispose();
}
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
index b038d3e342b2..518225ad6aab 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
@@ -409,19 +409,15 @@ public async Task ProcessEvents_OwnershipLost_DoesNotCheckpoint()
}
int execution = 0;
- var cts = new CancellationTokenSource();
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() =>
{
- if (execution == 0)
- {
- eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.OwnershipLost).GetAwaiter().GetResult();
- }
var result = results[execution++];
return result;
});
- await eventProcessor.ProcessEventsAsync(partitionContext, events, cts.Token);
+ // Pass a cancellation token that is already signaled to simulate ownership loss
+ await eventProcessor.ProcessEventsAsync(partitionContext, events, new CancellationToken(true));
processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()),
@@ -457,15 +453,12 @@ public async Task ProcessEvents_Succeeds_ShuttingDown_DoesNotCheckpoint()
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() =>
{
- if (execution == 0)
- {
- eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.Shutdown).GetAwaiter().GetResult();
- }
var result = results[execution++];
return result;
});
- await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
+ // Pass a cancellation token that is already signaled to simulate shutdown
+ await eventProcessor.ProcessEventsAsync(partitionContext, events, new CancellationToken(true));
processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()),
@@ -501,15 +494,12 @@ public async Task ProcessEvents_Fails_ShuttingDown_DoesNotCheckpoint()
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() =>
{
- if (execution == 0)
- {
- eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.Shutdown).GetAwaiter().GetResult();
- }
var result = results[execution++];
return result;
});
- await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
+ // Pass a cancellation token that is already signaled to simulate shutdown
+ await eventProcessor.ProcessEventsAsync(partitionContext, events, new CancellationToken(true));
processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()),
From 722f04c31388f6360bae4f1205abd33093383584 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 13:01:09 -0700
Subject: [PATCH 12/19] Fix
---
.../src/Listeners/EventHubListener.PartitionProcessor.cs | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
index 1757ad1213b2..b5bd4818c446 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
@@ -148,10 +148,10 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
try
{
// Try to acquire the semaphore. This protects the cached events.
- if (!_cachedEventsGuard.Wait(0, _functionExecutionToken))
+ if (!_cachedEventsGuard.Wait(0, linkedCts.Token))
{
// This will throw if the cancellation token is canceled.
- await _cachedEventsGuard.WaitAsync(_functionExecutionToken).ConfigureAwait(false);
+ await _cachedEventsGuard.WaitAsync(linkedCts.Token).ConfigureAwait(false);
}
acquiredSemaphore = true;
From 2061f788b61500b3c6b7e7e9d4d4c352c0774dd2 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 13:03:08 -0700
Subject: [PATCH 13/19] Fix namespace on test class
---
.../tests/ScaleHostEndToEndTests.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs
index 3d2c39baf16c..b2f85d8a5385 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs
@@ -26,7 +26,7 @@
using Newtonsoft.Json.Linq;
using NUnit.Framework;
-namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests
+namespace Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests
{
[NonParallelizable]
[LiveOnly(true)]
From 87af388e7bc1e19b2d6109f79794b28681c97f74 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 13:20:09 -0700
Subject: [PATCH 14/19] Fix token link
---
.../src/Listeners/EventHubListener.PartitionProcessor.cs | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
index b5bd4818c446..5120d4a3f8f2 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
@@ -185,7 +185,8 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
if (_cachedEventsBackgroundTaskCts == null && CachedEventsManager.HasCachedEvents)
{
// If there are events waiting to be processed, and no background task running, start a monitoring cycle.
- _cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(linkedCts.Token);
+ // Don't reference linkedCts in the class level background task, as it will be disposed when the method goes out of scope.
+ _cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, partitionProcessingCancellationToken);
_cachedEventsBackgroundTask = MonitorCachedEvents(context.ProcessorHost.GetLastReadCheckpoint(context.PartitionId)?.LastModified, _cachedEventsBackgroundTaskCts);
}
}
From 0e7e6f6a807b7db260eb144cfbcbef9c3909de7e Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 13:43:34 -0700
Subject: [PATCH 15/19] revert stopasync calls in tests
---
.../tests/EventHubEndToEndTests.cs | 16 ----------------
1 file changed, 16 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
index 51fec383f60c..5163d1d41d3d 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
@@ -58,7 +58,6 @@ public async Task EventHub_PocoBinding()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage);
@@ -78,7 +77,6 @@ public async Task EventHub_StringBinding()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage);
CollectionAssert.Contains(logs, $"Input(data)");
@@ -98,7 +96,6 @@ public async Task EventHub_SingleDispatch()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
AssertSingleDispatchLogs(host);
@@ -149,7 +146,6 @@ public async Task EventHub_SingleDispatch_ConsumerGroup()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
}
@@ -163,7 +159,6 @@ public async Task EventHub_SingleDispatch_BinaryData()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
AssertSingleDispatchLogs(host);
@@ -179,7 +174,6 @@ public async Task EventHub_ProducerClient()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
}
@@ -193,7 +187,6 @@ public async Task EventHub_Collector()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
}
@@ -207,7 +200,6 @@ public async Task EventHub_CollectorPartitionKey()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
}
@@ -326,7 +318,6 @@ public async Task AssertCanSendReceiveMessage(Action hostConfigura
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
}
@@ -341,7 +332,6 @@ public async Task EventHub_MultipleDispatch()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
AssertMultipleDispatchLogs(host);
@@ -358,7 +348,6 @@ public async Task EventHub_MultipleDispatch_BinaryData()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
AssertMultipleDispatchLogs(host);
@@ -394,7 +383,6 @@ public async Task EventHub_MultipleDispatch_MinBatchSize()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
AssertMultipleDispatchLogsMinBatch(host);
@@ -482,7 +470,6 @@ public async Task EventHub_PartitionKey()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
}
@@ -508,7 +495,6 @@ public async Task EventHub_InitialOffsetFromStart()
{
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
}
@@ -557,7 +543,6 @@ public async Task EventHub_InitialOffsetFromEnd()
try { await sendTask; } catch { /* Ignore, we're not testing sends */ }
Assert.True(result);
- await jobHost.StopAsync();
}
}
@@ -604,7 +589,6 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
{
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
- await jobHost.StopAsync();
}
}
From f2e5cb16dd24d418b28d33d14a270d042fd2ed7b Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 14:28:59 -0700
Subject: [PATCH 16/19] Use separate token source for ownership loss
---
.../EventHubListener.PartitionProcessor.cs | 14 ++++--
.../src/Processor/EventProcessorHost.cs | 2 +-
.../src/Processor/IEventProcessor.cs | 2 +-
.../tests/EventHubListenerTests.cs | 47 +++++++++++--------
4 files changed, 40 insertions(+), 25 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
index 5120d4a3f8f2..89fb922c578f 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs
@@ -43,6 +43,7 @@ internal class PartitionProcessor : IEventProcessor, IDisposable
private CancellationTokenSource _cachedEventsBackgroundTaskCts;
private SemaphoreSlim _cachedEventsGuard;
private readonly CancellationToken _functionExecutionToken;
+ private readonly CancellationTokenSource _ownershipLostTokenSource;
///
/// When we have a minimum batch size greater than 1, this class manages caching events.
@@ -59,6 +60,7 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
_maxWaitTime = options.MaxWaitTime;
_minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default
_functionExecutionToken = functionExecutionToken;
+ _ownershipLostTokenSource = new CancellationTokenSource();
// Events are only cached when building a batch of minimum size.
if (_minimumBatchesEnabled)
@@ -70,6 +72,11 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason)
{
+ if (reason == ProcessingStoppedReason.OwnershipLost)
+ {
+ _ownershipLostTokenSource.Cancel();
+ }
+
// clear the cached events
CachedEventsManager?.ClearEventCache();
@@ -97,11 +104,10 @@ public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception err
///
/// The partition information for this partition.
/// The events to process.
- /// The cancellation token to respect if processing for the partition is canceled.
///
- public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages, CancellationToken partitionProcessingCancellationToken)
+ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages)
{
- using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, partitionProcessingCancellationToken);
+ using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, _ownershipLostTokenSource.Token);
_mostRecentPartitionContext = context;
var events = messages.ToArray();
EventData eventToCheckpoint = null;
@@ -186,7 +192,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
{
// If there are events waiting to be processed, and no background task running, start a monitoring cycle.
// Don't reference linkedCts in the class level background task, as it will be disposed when the method goes out of scope.
- _cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, partitionProcessingCancellationToken);
+ _cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, _ownershipLostTokenSource.Token);
_cachedEventsBackgroundTask = MonitorCachedEvents(context.ProcessorHost.GetLastReadCheckpoint(context.PartitionId)?.LastModified, _cachedEventsBackgroundTaskCts);
}
}
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
index 3da4f8e57292..fe854b6f2836 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
@@ -113,7 +113,7 @@ protected override Task OnProcessingEventBatchAsync(IEnumerable event
return Task.CompletedTask;
}
- return partition.EventProcessor.ProcessEventsAsync(partition, events, cancellationToken);
+ return partition.EventProcessor.ProcessEventsAsync(partition, events);
}
protected override async Task OnInitializingPartitionAsync(EventProcessorHostPartition partition, CancellationToken cancellationToken)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs
index fb213da5db5d..709345880446 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/IEventProcessor.cs
@@ -15,6 +15,6 @@ internal interface IEventProcessor
Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason);
Task OpenAsync(EventProcessorHostPartition context);
Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error);
- Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages, CancellationToken cancellationToken);
+ Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable messages);
}
}
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
index 518225ad6aab..be11955f9b0e 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
@@ -55,7 +55,7 @@ public async Task ProcessEvents_SingleDispatch_CheckpointsCorrectly(int batchChe
for (int i = 0; i < 100; i++)
{
List events = new List() { new EventData(new byte[0]) };
- await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
}
try
@@ -95,7 +95,7 @@ public async Task ProcessEvents_MultipleDispatch_CheckpointsCorrectly(int batchC
for (int i = 0; i < 100; i++)
{
List events = new List() { new EventData(new byte[0]), new EventData(new byte[0]), new EventData(new byte[0]) };
- await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
}
try
@@ -141,7 +141,7 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_CheckpointsCorrectly_N
for (int i = 0; i < 60; i++)
{
List events = new List() { new EventData("event1"), new EventData("event2"), new EventData("event3"), new EventData("event4"), new EventData("event5") };
- await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
}
try
@@ -197,7 +197,7 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_CheckpointsCorrectly_R
for (int i = 0; i < 60; i++)
{
List events = new List() { new EventData("event1"), new EventData("event2"), new EventData("event3"), new EventData("event4"), new EventData("event5") };
- await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
}
try
@@ -251,7 +251,7 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_CheckpointsCorrectly_O
for (int i = 0; i < 60; i++)
{
List events = new List() { new EventData("event1"), new EventData("event2"), new EventData("event3"), new EventData("event4"), new EventData("event5") };
- await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
}
try
@@ -319,7 +319,7 @@ public async Task ProcessEvents_MultipleDispatch_MinBatch_BackgroundInvokesParti
for (int i = 0; i < 60; i++)
{
List events = new List() { new EventData("event1"), new EventData("event2"), new EventData("event3"), new EventData("event4"), new EventData("event5") };
- await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
}
await completionSource.Task.TimeoutAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
@@ -373,7 +373,7 @@ public async Task ProcessEvents_Failure_Checkpoints()
var eventProcessor = new EventHubListener.PartitionProcessor(options, executor.Object, loggerMock.Object, true, default);
- await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()),
@@ -412,12 +412,15 @@ public async Task ProcessEvents_OwnershipLost_DoesNotCheckpoint()
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() =>
{
+ if (execution == 0)
+ {
+ eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.OwnershipLost).GetAwaiter().GetResult();
+ }
var result = results[execution++];
return result;
});
- // Pass a cancellation token that is already signaled to simulate ownership loss
- await eventProcessor.ProcessEventsAsync(partitionContext, events, new CancellationToken(true));
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()),
@@ -425,7 +428,7 @@ public async Task ProcessEvents_OwnershipLost_DoesNotCheckpoint()
}
///
- /// If function execution succeeds when the function host is shutting down, we should NOT checkpoint.
+ /// If function execution succeeds when the function host is shutting down, we should checkpoint.
///
[Test]
public async Task ProcessEvents_Succeeds_ShuttingDown_DoesNotCheckpoint()
@@ -453,23 +456,26 @@ public async Task ProcessEvents_Succeeds_ShuttingDown_DoesNotCheckpoint()
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() =>
{
+ if (execution == 0)
+ {
+ eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.Shutdown).GetAwaiter().GetResult();
+ }
var result = results[execution++];
return result;
});
- // Pass a cancellation token that is already signaled to simulate shutdown
- await eventProcessor.ProcessEventsAsync(partitionContext, events, new CancellationToken(true));
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()),
- Times.Never);
+ Times.Once);
}
///
- /// If function execution fails when the function host is shutting down, we should NOT checkpoint.
+ /// If function execution fails when the function host is shutting down, we should checkpoint.
///
[Test]
- public async Task ProcessEvents_Fails_ShuttingDown_DoesNotCheckpoint()
+ public async Task ProcessEvents_Fails_ShuttingDown_DoesCheckpoint()
{
var partitionContext = EventHubTests.GetPartitionContext();
var options = new EventHubOptions();
@@ -494,16 +500,19 @@ public async Task ProcessEvents_Fails_ShuttingDown_DoesNotCheckpoint()
executor.Setup(p => p.TryExecuteAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() =>
{
+ if (execution == 0)
+ {
+ eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.Shutdown).GetAwaiter().GetResult();
+ }
var result = results[execution++];
return result;
});
- // Pass a cancellation token that is already signaled to simulate shutdown
- await eventProcessor.ProcessEventsAsync(partitionContext, events, new CancellationToken(true));
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny(), It.IsAny()),
- Times.Never);
+ Times.Once);
}
[Test]
@@ -696,7 +705,7 @@ public async Task ProcessEvents_CancellationToken_CancelsExecution()
{
await Task.Delay(500);
});
- await eventProcessor.ProcessEventsAsync(partitionContext, events, source.Token);
+ await eventProcessor.ProcessEventsAsync(partitionContext, events);
}
}
}
From c1b3eb4f485c5515f700a6ae5e7bd8b5366223b5 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 15:35:05 -0700
Subject: [PATCH 17/19] Fix tests
---
.../tests/EventHubEndToEndTests.cs | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
index 5163d1d41d3d..c9fdc9978fce 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
@@ -96,6 +96,8 @@ public async Task EventHub_SingleDispatch()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
+
+ await StopWithDrainAsync(host);
}
AssertSingleDispatchLogs(host);
@@ -159,6 +161,8 @@ public async Task EventHub_SingleDispatch_BinaryData()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
+
+ await StopWithDrainAsync(host);
}
AssertSingleDispatchLogs(host);
@@ -332,6 +336,8 @@ public async Task EventHub_MultipleDispatch()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
+
+ await StopWithDrainAsync(host);
}
AssertMultipleDispatchLogs(host);
@@ -348,11 +354,21 @@ public async Task EventHub_MultipleDispatch_BinaryData()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
+
+ await StopWithDrainAsync(host);
}
AssertMultipleDispatchLogs(host);
}
+ private static async Task StopWithDrainAsync(IHost host)
+ {
+ // Enable drain mode so checkpointing occurs when stopping
+ var drainModeManager = host.Services.GetService();
+ await drainModeManager.EnableDrainModeAsync(CancellationToken.None);
+ await host.StopAsync();
+ }
+
[Test]
public async Task EventHub_MultipleDispatch_MinBatchSize()
{
@@ -383,6 +399,8 @@ public async Task EventHub_MultipleDispatch_MinBatchSize()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
+
+ await StopWithDrainAsync(host);
}
AssertMultipleDispatchLogsMinBatch(host);
From f74b8476ad06a158ab2dd973d0a649c596be3180 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 15:38:30 -0700
Subject: [PATCH 18/19] move test helper methods to one place
---
.../tests/EventHubEndToEndTests.cs | 112 +++++++++---------
1 file changed, 56 insertions(+), 56 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
index c9fdc9978fce..e71ed83f6770 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
@@ -361,14 +361,6 @@ public async Task EventHub_MultipleDispatch_BinaryData()
AssertMultipleDispatchLogs(host);
}
- private static async Task StopWithDrainAsync(IHost host)
- {
- // Enable drain mode so checkpointing occurs when stopping
- var drainModeManager = host.Services.GetService();
- await drainModeManager.EnableDrainModeAsync(CancellationToken.None);
- await host.StopAsync();
- }
-
[Test]
public async Task EventHub_MultipleDispatch_MinBatchSize()
{
@@ -430,54 +422,6 @@ public async Task EventHub_MultipleDispatch_StopWithoutDrain()
await host.StopAsync();
}
- private static void AssertMultipleDispatchLogsMinBatch(IHost host)
- {
- IEnumerable logMessages = host.GetTestLoggerProvider()
- .GetAllLogMessages();
-
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("Trigger Details:")
- && x.FormattedMessage.Contains("Offset:")).Any());
-
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("OpenAsync")).Any());
-
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("CheckpointAsync")
- && x.FormattedMessage.Contains("lease")
- && x.FormattedMessage.Contains("offset")
- && x.FormattedMessage.Contains("sequenceNumber")).Any());
-
- // Events are being sent in the EventHubTestMultipleDispatchMinBatchSizeJobs
- // class directly for this test
-
- AssertAzureSdkLogs(logMessages);
- }
-
- private static void AssertMultipleDispatchLogs(IHost host)
- {
- IEnumerable logMessages = host.GetTestLoggerProvider()
- .GetAllLogMessages();
-
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("Trigger Details:")
- && x.FormattedMessage.Contains("Offset:")).Any());
-
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("OpenAsync")).Any());
-
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("CheckpointAsync")
- && x.FormattedMessage.Contains("lease")
- && x.FormattedMessage.Contains("offset")
- && x.FormattedMessage.Contains("sequenceNumber")).Any());
-
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("Sending events to EventHub")).Any());
-
- AssertAzureSdkLogs(logMessages);
- }
-
[Test]
public async Task EventHub_PartitionKey()
{
@@ -610,6 +554,62 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
}
}
+ private static void AssertMultipleDispatchLogsMinBatch(IHost host)
+ {
+ IEnumerable logMessages = host.GetTestLoggerProvider()
+ .GetAllLogMessages();
+
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("Trigger Details:")
+ && x.FormattedMessage.Contains("Offset:")).Any());
+
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("OpenAsync")).Any());
+
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("CheckpointAsync")
+ && x.FormattedMessage.Contains("lease")
+ && x.FormattedMessage.Contains("offset")
+ && x.FormattedMessage.Contains("sequenceNumber")).Any());
+
+ // Events are being sent in the EventHubTestMultipleDispatchMinBatchSizeJobs
+ // class directly for this test
+
+ AssertAzureSdkLogs(logMessages);
+ }
+
+ private static void AssertMultipleDispatchLogs(IHost host)
+ {
+ IEnumerable logMessages = host.GetTestLoggerProvider()
+ .GetAllLogMessages();
+
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("Trigger Details:")
+ && x.FormattedMessage.Contains("Offset:")).Any());
+
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("OpenAsync")).Any());
+
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("CheckpointAsync")
+ && x.FormattedMessage.Contains("lease")
+ && x.FormattedMessage.Contains("offset")
+ && x.FormattedMessage.Contains("sequenceNumber")).Any());
+
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("Sending events to EventHub")).Any());
+
+ AssertAzureSdkLogs(logMessages);
+ }
+
+ private static async Task StopWithDrainAsync(IHost host)
+ {
+ // Enable drain mode so checkpointing occurs when stopping
+ var drainModeManager = host.Services.GetService();
+ await drainModeManager.EnableDrainModeAsync(CancellationToken.None);
+ await host.StopAsync();
+ }
+
private static void AssertAzureSdkLogs(IEnumerable logMessages)
{
Assert.True(logMessages.Any(x => x.Category.StartsWith("Azure.")));
From 964a09efdfc07db3fc4a4bc3a441a194265e928e Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 10 Aug 2023 16:20:59 -0700
Subject: [PATCH 19/19] Increase test delay
---
.../tests/EventHubEndToEndTests.cs | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
index e71ed83f6770..68d81103e6f3 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
@@ -650,7 +650,7 @@ public static async Task SendEvent_TestHub([EventHubTrigger(TestHubName, Connect
{
_eventWait.Set();
// wait a small amount of time for the host to call dispose
- await Task.Delay(2000, CancellationToken.None);
+ await Task.Delay(3000, CancellationToken.None);
Assert.IsTrue(cancellationToken.IsCancellationRequested);
}
}
@@ -661,7 +661,7 @@ public static async Task SendEvent_TestHub([EventHubTrigger(TestHubName, Connect
{
_eventWait.Set();
// wait a small amount of time for the host to call dispose
- await Task.Delay(2000, CancellationToken.None);
+ await Task.Delay(3000, CancellationToken.None);
Assert.IsTrue(cancellationToken.IsCancellationRequested);
}
}