diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterReceiver.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterReceiver.cs index 3e893543b28..29e1d785b00 100644 --- a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterReceiver.cs +++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterReceiver.cs @@ -107,7 +107,7 @@ private async Task Initialize() try { this.checkpointer = await this.checkpointerFactory(this.settings.Partition); - if(this.cache != null) + if (this.cache is not null) { this.cache.Dispose(); this.cache = null; @@ -178,17 +178,24 @@ public async Task> GetQueueMessagesAsync(int maxCount) this.monitor?.TrackMessagesReceived(messages.Count, oldestMessageEnqueueTime, newestMessageEnqueueTime); - List messageStreamPositions = this.cache.Add(messages, dequeueTimeUtc); - foreach (var streamPosition in messageStreamPositions) + // The this.cache field can be null if Shutdown() has been called + List messageStreamPositions = this.cache?.Add(messages, dequeueTimeUtc); + + if (messageStreamPositions is { Count: > 0 }) { - batches.Add(new StreamActivityNotificationBatch(streamPosition)); + foreach (var pos in messageStreamPositions) + { + batches.Add(new StreamActivityNotificationBatch(pos)); + } } + if (!this.checkpointer.CheckpointExists) { this.checkpointer.Update( messages[0].OffsetString, DateTime.UtcNow); } + return batches; } @@ -201,10 +208,11 @@ public bool TryPurgeFromCache(out IList purgedItems) { purgedItems = null; - //if not under pressure, signal the cache to do a time based purge - //if under pressure, which means consuming speed is less than producing speed, then shouldn't purge, and don't read more message into the cache + // if not under pressure, signal the cache to do a time based purge + // if under pressure, which means consuming speed is less than producing speed, then shouldn't purge, and don't read more message into the cache if (!this.IsUnderPressure()) - this.cache.SignalPurge(); + this.cache?.SignalPurge(); + return false; } @@ -247,6 +255,7 @@ public async Task Shutdown(TimeSpan timeout) { closeTask = localReceiver.CloseAsync(); } + // dispose of cache localCache?.Dispose();