From 9463812e9e468e45c29a4568e928ec3ba48942d2 Mon Sep 17 00:00:00 2001 From: "t.sarmis" Date: Fri, 1 Jul 2022 17:00:22 +0300 Subject: [PATCH] feat: restore queue model and consumer on disconnection --- .../DefaultRabbitMQBus.cs | 57 ++++++++++++++----- .../Bus/IEasyCachingSubscriber.cs | 3 +- .../Bus/NullEasyCachingBus.cs | 5 +- .../EasyCachingAbstractBus.cs | 10 +++- .../Configurations/HybridCachingOptions.cs | 8 +++ .../HybridCachingProvider.cs | 17 +++++- test/EasyCaching.UnitTests/Fake/FakeBus.cs | 2 +- 7 files changed, 83 insertions(+), 19 deletions(-) diff --git a/bus/EasyCaching.Bus.RabbitMQ/DefaultRabbitMQBus.cs b/bus/EasyCaching.Bus.RabbitMQ/DefaultRabbitMQBus.cs index 973cc103..5ef8a1a2 100644 --- a/bus/EasyCaching.Bus.RabbitMQ/DefaultRabbitMQBus.cs +++ b/bus/EasyCaching.Bus.RabbitMQ/DefaultRabbitMQBus.cs @@ -150,20 +150,51 @@ public override void BaseSubscribe(string topic, Action acti queueName = _options.QueueName; } - Task.Factory.StartNew(() => + Task.Factory.StartNew( + () => StartConsumer(queueName, topic), + TaskCreationOptions.LongRunning); + } + + + private void StartConsumer(string queueName, string topic) + { + var model = _subConnection.CreateModel(); + + model.ExchangeDeclare(_options.TopicExchangeName, ExchangeType.Topic, true, false, null); + model.QueueDeclare(queueName, false, false, true, null); + // bind the queue with the exchange. + model.QueueBind(queueName, _options.TopicExchangeName, topic); + var consumer = new EventingBasicConsumer(model); + consumer.Received += OnMessage; + consumer.Shutdown += (sender, e) => { - var model = _subConnection.CreateModel(); - model.ExchangeDeclare(_options.TopicExchangeName, ExchangeType.Topic, true, false, null); - model.QueueDeclare(queueName, false, false, true, null); - // bind the queue with the exchange. - model.QueueBind(queueName, _options.TopicExchangeName, topic); - var consumer = new EventingBasicConsumer(model); - consumer.Received += OnMessage; - consumer.Shutdown += OnConsumerShutdown; - - model.BasicConsume(queueName, true, consumer); - - }, TaskCreationOptions.LongRunning); + OnConsumerShutdown(sender, e); + OnConsumerError(queueName, topic, model); + }; + + consumer.ConsumerCancelled += (s, e) => + { + OnConsumerError(queueName, topic, model); + }; + + model.BasicConsume(queueName, true, consumer); + } + + private void OnConsumerError(string queueName, string topic, IModel model) + { + StartConsumer(queueName, topic); + BaseOnReconnect(); + try + { + if (model?.IsOpen == true) + { + model?.Dispose(); + } + } + catch + { + // nothing to do + } } /// diff --git a/src/EasyCaching.Core/Bus/IEasyCachingSubscriber.cs b/src/EasyCaching.Core/Bus/IEasyCachingSubscriber.cs index 768005ce..24fc9eff 100644 --- a/src/EasyCaching.Core/Bus/IEasyCachingSubscriber.cs +++ b/src/EasyCaching.Core/Bus/IEasyCachingSubscriber.cs @@ -12,6 +12,7 @@ public interface IEasyCachingSubscriber /// /// Topic. /// Action. - void Subscribe(string topic, Action action); + /// Reconnect Action. + void Subscribe(string topic, Action action, Action reconnectAction = null); } } diff --git a/src/EasyCaching.Core/Bus/NullEasyCachingBus.cs b/src/EasyCaching.Core/Bus/NullEasyCachingBus.cs index a64a16e8..f58f6574 100644 --- a/src/EasyCaching.Core/Bus/NullEasyCachingBus.cs +++ b/src/EasyCaching.Core/Bus/NullEasyCachingBus.cs @@ -26,7 +26,7 @@ public class NullEasyCachingBus : IEasyCachingBus /// so the garbage collector can reclaim the memory that /// the was occupying. public void Dispose() { } - + /// /// Publish the specified topic and message. /// @@ -54,7 +54,8 @@ public void Publish(string topic, EasyCachingMessage message) /// /// Topic. /// Action. - public void Subscribe(string topic, Action action) + /// Reconnect Action. + public void Subscribe(string topic, Action action, Action reconnectAction = null) { } diff --git a/src/EasyCaching.Core/EasyCachingAbstractBus.cs b/src/EasyCaching.Core/EasyCachingAbstractBus.cs index 5031bd27..28d7a25a 100644 --- a/src/EasyCaching.Core/EasyCachingAbstractBus.cs +++ b/src/EasyCaching.Core/EasyCachingAbstractBus.cs @@ -18,6 +18,8 @@ public abstract class EasyCachingAbstractBus : IEasyCachingBus protected Action _handler; + protected Action _reconnectHandler; + protected string BusName { get; set; } public string Name => this.BusName; @@ -74,9 +76,10 @@ public void Publish(string topic, EasyCachingMessage message) } } - public void Subscribe(string topic, Action action) + public void Subscribe(string topic, Action action, Action reconnectAction) { _handler = action; + _reconnectHandler = reconnectAction; BaseSubscribe(topic, action); } @@ -105,5 +108,10 @@ public virtual void BaseOnMessage(EasyCachingMessage message) } } } + + public virtual void BaseOnReconnect() + { + _reconnectHandler?.Invoke(); + } } } diff --git a/src/EasyCaching.HybridCache/Configurations/HybridCachingOptions.cs b/src/EasyCaching.HybridCache/Configurations/HybridCachingOptions.cs index 0840f493..83608b9a 100644 --- a/src/EasyCaching.HybridCache/Configurations/HybridCachingOptions.cs +++ b/src/EasyCaching.HybridCache/Configurations/HybridCachingOptions.cs @@ -43,5 +43,13 @@ public class HybridCachingOptions /// When sending message failed, we will retry some times, default is 3 times. /// public int BusRetryCount { get; set; } = 3; + + /// + /// Flush the local cache on bus disconnection/reconnection + /// + /// + /// Flushing the local cache will avoid using stale data but may cause app jitters until the local cache get's re-populated. + /// + public bool FlushLocalCacheOnBusReconnection { get; set; } = false; } } diff --git a/src/EasyCaching.HybridCache/HybridCachingProvider.cs b/src/EasyCaching.HybridCache/HybridCachingProvider.cs index e8594fa8..8183408c 100644 --- a/src/EasyCaching.HybridCache/HybridCachingProvider.cs +++ b/src/EasyCaching.HybridCache/HybridCachingProvider.cs @@ -93,7 +93,7 @@ string name else this._distributedCache = distributed; this._bus = bus ?? NullEasyCachingBus.Instance; - this._bus.Subscribe(_options.TopicName, OnMessage); + this._bus.Subscribe(_options.TopicName, OnMessage, OnReconnect); this._cacheId = Guid.NewGuid().ToString("N"); @@ -159,6 +159,21 @@ private void OnMessage(EasyCachingMessage message) } } + /// + /// On reconnect (flushes local memory as it could be stale). + /// + + private void OnReconnect() + { + if (!_options.FlushLocalCacheOnBusReconnection) + { + return; + } + + LogMessage("Flushing local cache due to bus reconnection"); + _localCache.Flush(); + } + /// /// Exists the specified cacheKey. /// diff --git a/test/EasyCaching.UnitTests/Fake/FakeBus.cs b/test/EasyCaching.UnitTests/Fake/FakeBus.cs index db96bc11..f4077c26 100644 --- a/test/EasyCaching.UnitTests/Fake/FakeBus.cs +++ b/test/EasyCaching.UnitTests/Fake/FakeBus.cs @@ -19,7 +19,7 @@ public void Publish(string topic, EasyCachingMessage message) return Task.CompletedTask; } - public void Subscribe(string topic, Action action) + public void Subscribe(string topic, Action action, Action reconnectAction = null) { }