diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs index ebdcabdd8..9f462684e 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs @@ -47,7 +47,9 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, var envelope = mapper!.CreateEnvelope(result.Topic, message); envelope.Offset = result.Offset.Value; envelope.MessageType ??= _messageTypeName; - if (topic.StampConsumerGroupIdOnEnvelope) envelope.GroupId = config.GroupId; + + if (topic.StampConsumerGroupIdOnEnvelope) + envelope.GroupId = config.GroupId; await receiver.ReceivedAsync(this, envelope); } @@ -105,10 +107,11 @@ public ValueTask DeferAsync(Envelope envelope) return _receiver.ReceivedAsync(this, envelope); } - public ValueTask DisposeAsync() + public async ValueTask DisposeAsync() { - Dispose(); - return ValueTask.CompletedTask; + await StopAsync(); + _consumer.SafeDispose(); + _runner.Dispose(); } public Uri Address { get; } @@ -165,6 +168,8 @@ public async Task MoveToErrorsAsync(Envelope envelope, Exception exception) public void Dispose() { + _cancellation.Cancel(); + _runner.Wait(); _consumer.SafeDispose(); _runner.Dispose(); } diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTopicGroupListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTopicGroupListener.cs index cb5b1d365..a4705ea43 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTopicGroupListener.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTopicGroupListener.cs @@ -45,7 +45,9 @@ public KafkaTopicGroupListener(KafkaTopicGroup endpoint, ConsumerConfig config, var envelope = mapper!.CreateEnvelope(result.Topic, message); envelope.Offset = result.Offset.Value; - if (endpoint.StampConsumerGroupIdOnEnvelope) envelope.GroupId = config.GroupId; + + if (endpoint.StampConsumerGroupIdOnEnvelope) + envelope.GroupId = config.GroupId; await receiver.ReceivedAsync(this, envelope); } @@ -101,10 +103,11 @@ public ValueTask DeferAsync(Envelope envelope) return _receiver.ReceivedAsync(this, envelope); } - public ValueTask DisposeAsync() + public async ValueTask DisposeAsync() { - Dispose(); - return ValueTask.CompletedTask; + await StopAsync(); + _consumer.SafeDispose(); + _runner.Dispose(); } public Uri Address { get; } @@ -162,6 +165,8 @@ public async Task MoveToErrorsAsync(Envelope envelope, Exception exception) public void Dispose() { + _cancellation.Cancel(); + _runner.Wait(); _consumer.SafeDispose(); _runner.Dispose(); }