Skip to content

Commit 51ebdb1

Browse files
authored
Remove abstraction layers from EventHubs WebJobs extensions (#17271)
There are some layers in EventHubs WebJobs extensions that were the artifact of Track 2 refactoring. Remove them to simplify the code a bit. Major changes: * Merge `Processor` and `EventProcessorHost` * Merge `ProcessorPartitionContext` and `Partition`, rename to `EventProcessorHostPartition` * Remove ICheckpointer
1 parent f4dc2e9 commit 51ebdb1

File tree

9 files changed

+212
-241
lines changed

9 files changed

+212
-241
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,11 +254,12 @@ internal EventProcessorHost GetEventProcessorHost(string eventHubName, string co
254254
consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName;
255255

256256
// Use blob prefix support available in EPH starting in 2.2.6
257-
EventProcessorHost host = new EventProcessorHost(
257+
EventProcessorHost host = new EventProcessorHost(consumerGroup: consumerGroup,
258+
connectionString: creds.EventHubConnectionString,
258259
eventHubName: eventHubName,
259-
consumerGroupName: consumerGroup,
260-
eventHubConnectionString: creds.EventHubConnectionString,
261-
exceptionHandler: _exceptionHandler);
260+
options: this.EventProcessorOptions,
261+
eventBatchMaximumCount: _maxBatchSize,
262+
invokeProcessorAfterReceiveTimeout: InvokeProcessorAfterReceiveTimeout, exceptionHandler: _exceptionHandler);
262263

263264
return host;
264265
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca
3030
private readonly BlobsCheckpointStore _checkpointStore;
3131
private readonly EventHubOptions _options;
3232
private readonly ILogger _logger;
33-
private bool _started;
3433

3534
private Lazy<EventHubsScaleMonitor> _scaleMonitor;
3635

@@ -70,17 +69,12 @@ void IDisposable.Dispose()
7069

7170
public async Task StartAsync(CancellationToken cancellationToken)
7271
{
73-
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options.MaxBatchSize, _options.InvokeProcessorAfterReceiveTimeout, _checkpointStore, _options.EventProcessorOptions).ConfigureAwait(false);
74-
_started = true;
72+
await _eventProcessorHost.StartProcessingAsync(this, _checkpointStore, cancellationToken).ConfigureAwait(false);
7573
}
7674

7775
public async Task StopAsync(CancellationToken cancellationToken)
7876
{
79-
if (_started)
80-
{
81-
await _eventProcessorHost.UnregisterEventProcessorAsync().ConfigureAwait(false);
82-
}
83-
_started = false;
77+
await _eventProcessorHost.StopProcessingAsync(cancellationToken).ConfigureAwait(false);
8478
}
8579

8680
IEventProcessor IEventProcessorFactory.CreateEventProcessor()
@@ -93,37 +87,27 @@ public IScaleMonitor GetMonitor()
9387
return _scaleMonitor.Value;
9488
}
9589

96-
/// <summary>
97-
/// Wrapper for un-mockable checkpoint APIs to aid in unit testing
98-
/// </summary>
99-
internal interface ICheckpointer
100-
{
101-
Task CheckpointAsync(ProcessorPartitionContext context);
102-
}
103-
10490
// We get a new instance each time Start() is called.
10591
// We'll get a listener per partition - so they can potentialy run in parallel even on a single machine.
106-
internal class EventProcessor : IEventProcessor, IDisposable, ICheckpointer
92+
internal class EventProcessor : IEventProcessor, IDisposable
10793
{
10894
private readonly ITriggeredFunctionExecutor _executor;
10995
private readonly bool _singleDispatch;
11096
private readonly ILogger _logger;
11197
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
112-
private readonly ICheckpointer _checkpointer;
11398
private readonly int _batchCheckpointFrequency;
11499
private int _batchCounter;
115100
private bool _disposed;
116101

117-
public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, ICheckpointer checkpointer = null)
102+
public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch)
118103
{
119-
_checkpointer = checkpointer ?? this;
120104
_executor = executor;
121105
_singleDispatch = singleDispatch;
122106
_batchCheckpointFrequency = options.BatchCheckpointFrequency;
123107
_logger = logger;
124108
}
125109

126-
public Task CloseAsync(ProcessorPartitionContext context, ProcessingStoppedReason reason)
110+
public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason)
127111
{
128112
// signal cancellation for any in progress executions
129113
_cts.Cancel();
@@ -132,13 +116,13 @@ public Task CloseAsync(ProcessorPartitionContext context, ProcessingStoppedReaso
132116
return Task.CompletedTask;
133117
}
134118

135-
public Task OpenAsync(ProcessorPartitionContext context)
119+
public Task OpenAsync(EventProcessorHostPartition context)
136120
{
137121
_logger.LogDebug(GetOperationDetails(context, "OpenAsync"));
138122
return Task.CompletedTask;
139123
}
140124

141-
public Task ProcessErrorAsync(ProcessorPartitionContext context, Exception error)
125+
public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error)
142126
{
143127
string errorDetails = $"Processing error (Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}').";
144128

@@ -147,7 +131,7 @@ public Task ProcessErrorAsync(ProcessorPartitionContext context, Exception error
147131
return Task.CompletedTask;
148132
}
149133

150-
public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumerable<EventData> messages)
134+
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages)
151135
{
152136
var triggerInput = new EventHubTriggerInput
153137
{
@@ -209,8 +193,7 @@ public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumera
209193
// code, and capture/log/persist failed events, since they won't be retried.
210194
if (messages.Any())
211195
{
212-
context.CheckpointEvent = messages.Last();
213-
await CheckpointAsync(context).ConfigureAwait(false);
196+
await CheckpointAsync(messages.Last(), context).ConfigureAwait(false);
214197
}
215198
}
216199

@@ -222,12 +205,12 @@ private async Task TryExecuteWithLoggingAsync(TriggeredFunctionData input, Event
222205
}
223206
}
224207

225-
private async Task CheckpointAsync(ProcessorPartitionContext context)
208+
private async Task CheckpointAsync(EventData checkpointEvent, EventProcessorHostPartition context)
226209
{
227210
bool checkpointed = false;
228211
if (_batchCheckpointFrequency == 1)
229212
{
230-
await _checkpointer.CheckpointAsync(context).ConfigureAwait(false);
213+
await context.CheckpointAsync(checkpointEvent).ConfigureAwait(false);
231214
checkpointed = true;
232215
}
233216
else
@@ -236,7 +219,7 @@ private async Task CheckpointAsync(ProcessorPartitionContext context)
236219
if (++_batchCounter >= _batchCheckpointFrequency)
237220
{
238221
_batchCounter = 0;
239-
await _checkpointer.CheckpointAsync(context).ConfigureAwait(false);
222+
await context.CheckpointAsync(checkpointEvent).ConfigureAwait(false);
240223
checkpointed = true;
241224
}
242225
}
@@ -264,11 +247,6 @@ public void Dispose()
264247
Dispose(true);
265248
}
266249

267-
async Task ICheckpointer.CheckpointAsync(ProcessorPartitionContext context)
268-
{
269-
await context.CheckpointAsync().ConfigureAwait(false);
270-
}
271-
272250
private static Dictionary<string, object> GetLinksScope(EventData message)
273251
{
274252
if (TryGetLinkedActivity(message, out var link))
@@ -319,7 +297,7 @@ private static bool TryGetLinkedActivity(EventData message, out Activity link)
319297
return false;
320298
}
321299

322-
private static string GetOperationDetails(ProcessorPartitionContext context, string operation)
300+
private static string GetOperationDetails(EventProcessorHostPartition context, string operation)
323301
{
324302
StringWriter sw = new StringWriter();
325303
using (JsonTextWriter writer = new JsonTextWriter(sw) { Formatting = Formatting.None })

0 commit comments

Comments
 (0)