Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 44 additions & 13 deletions bus/EasyCaching.Bus.RabbitMQ/DefaultRabbitMQBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,51 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
queueName = _options.QueueName;
}

Task.Factory.StartNew(() =>
Task.Factory.StartNew(
() => StartConsumer(queueName, topic),
TaskCreationOptions.LongRunning);
}


private void StartConsumer(string queueName, string topic)
{
var model = _subConnection.CreateModel();

model.ExchangeDeclare(_options.TopicExchangeName, ExchangeType.Topic, true, false, null);
model.QueueDeclare(queueName, false, false, true, null);
// bind the queue with the exchange.
model.QueueBind(queueName, _options.TopicExchangeName, topic);
var consumer = new EventingBasicConsumer(model);
consumer.Received += OnMessage;
consumer.Shutdown += (sender, e) =>
{
var model = _subConnection.CreateModel();
model.ExchangeDeclare(_options.TopicExchangeName, ExchangeType.Topic, true, false, null);
model.QueueDeclare(queueName, false, false, true, null);
// bind the queue with the exchange.
model.QueueBind(queueName, _options.TopicExchangeName, topic);
var consumer = new EventingBasicConsumer(model);
consumer.Received += OnMessage;
consumer.Shutdown += OnConsumerShutdown;

model.BasicConsume(queueName, true, consumer);

}, TaskCreationOptions.LongRunning);
OnConsumerShutdown(sender, e);
OnConsumerError(queueName, topic, model);
};

consumer.ConsumerCancelled += (s, e) =>
{
OnConsumerError(queueName, topic, model);
};

model.BasicConsume(queueName, true, consumer);
}

private void OnConsumerError(string queueName, string topic, IModel model)
{
StartConsumer(queueName, topic);
BaseOnReconnect();
try
{
if (model?.IsOpen == true)
{
model?.Dispose();
}
}
catch
{
// nothing to do
}
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion src/EasyCaching.Core/Bus/IEasyCachingSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public interface IEasyCachingSubscriber
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
void Subscribe(string topic, Action<EasyCachingMessage> action);
/// <param name="reconnectAction"> Reconnect Action.</param>
void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null);
}
}
5 changes: 3 additions & 2 deletions src/EasyCaching.Core/Bus/NullEasyCachingBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class NullEasyCachingBus : IEasyCachingBus
/// <see cref="T:EasyCaching.Core.Bus.NullEasyCachingBus"/> so the garbage collector can reclaim the memory that
/// the <see cref="T:EasyCaching.Core.Bus.NullEasyCachingBus"/> was occupying.</remarks>
public void Dispose() { }

/// <summary>
/// Publish the specified topic and message.
/// </summary>
Expand Down Expand Up @@ -54,7 +54,8 @@ public void Publish(string topic, EasyCachingMessage message)
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
public void Subscribe(string topic, Action<EasyCachingMessage> action)
/// <param name="reconnectAction">Reconnect Action.</param>
public void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null)
{

}
Expand Down
10 changes: 9 additions & 1 deletion src/EasyCaching.Core/EasyCachingAbstractBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public abstract class EasyCachingAbstractBus : IEasyCachingBus

protected Action<EasyCachingMessage> _handler;

protected Action _reconnectHandler;

protected string BusName { get; set; }

public string Name => this.BusName;
Expand Down Expand Up @@ -74,9 +76,10 @@ public void Publish(string topic, EasyCachingMessage message)
}
}

public void Subscribe(string topic, Action<EasyCachingMessage> action)
public void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction)
{
_handler = action;
_reconnectHandler = reconnectAction;
BaseSubscribe(topic, action);
}

Expand Down Expand Up @@ -105,5 +108,10 @@ public virtual void BaseOnMessage(EasyCachingMessage message)
}
}
}

public virtual void BaseOnReconnect()
{
_reconnectHandler?.Invoke();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,13 @@ public class HybridCachingOptions
/// When sending message failed, we will retry some times, default is 3 times.
/// </remarks>
public int BusRetryCount { get; set; } = 3;

/// <summary>
/// Flush the local cache on bus disconnection/reconnection
/// </summary>
/// <remarks>
/// Flushing the local cache will avoid using stale data but may cause app jitters until the local cache get's re-populated.
/// </remarks>
public bool FlushLocalCacheOnBusReconnection { get; set; } = false;
}
}
17 changes: 16 additions & 1 deletion src/EasyCaching.HybridCache/HybridCachingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ string name
else this._distributedCache = distributed;

this._bus = bus ?? NullEasyCachingBus.Instance;
this._bus.Subscribe(_options.TopicName, OnMessage);
this._bus.Subscribe(_options.TopicName, OnMessage, OnReconnect);

this._cacheId = Guid.NewGuid().ToString("N");

Expand Down Expand Up @@ -159,6 +159,21 @@ private void OnMessage(EasyCachingMessage message)
}
}

/// <summary>
/// On reconnect (flushes local memory as it could be stale).
/// </summary>

private void OnReconnect()
{
if (!_options.FlushLocalCacheOnBusReconnection)
{
return;
}

LogMessage("Flushing local cache due to bus reconnection");
_localCache.Flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why flushing in-memory keys here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a disconnection happens (ie the load balancer between app and rabbit fails) then the queue will be deleted and any invalidations will be missed. I believe the correct approach is to flush the in-memory buffer in order to avoid having stale data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a good idea to flush the in memory caching here!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that understand. Do you suggest to keep the (possibly stale) in-memory cache as is after a disconnection or do you have in mind a better way to refresh the in-memory cache?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you suggest to keep the (possibly stale) in-memory cache as is after a disconnection

Yes.

When a reconnection occurs, not all memory caches are expired, and if flush is executed at this time, it may cause application jitter.

In addition, all in-memory caches have an expiration time, and they are automatically removed after they expire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's an understandable argument. Our use case requires however to be sure that the case is not stale. Would it be ok if we do either of these things:

a) Add an option to FlushOnReconnect (default: false)
b) Add a callback in order for the application to decide how to handle a reconnection

I would prefer (a) but I can do (b)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(a) is a good idea.
(b) is ok as well.

😄

}

/// <summary>
/// Exists the specified cacheKey.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion test/EasyCaching.UnitTests/Fake/FakeBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void Publish(string topic, EasyCachingMessage message)
return Task.CompletedTask;
}

public void Subscribe(string topic, Action<EasyCachingMessage> action)
public void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null)
{

}
Expand Down