Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1ca3cb1
Introduce CancellationToken to `WithCircuitBreaker`
Arkatufus May 6, 2025
46efbb3
Update API Approval list
Arkatufus May 6, 2025
422d2d0
Merge branch 'dev' into Harden-CircuitBreaker
Arkatufus May 6, 2025
72589eb
Unify TaskCancelledException to TimeoutException (old behavior)
Arkatufus May 7, 2025
9ea5975
Merge branch 'dev' into Harden-CircuitBreaker
Arkatufus May 7, 2025
8238c0d
Make Akka.Persistence API changes backward compatible
Arkatufus May 7, 2025
9eb97fc
Merge branch 'Harden-CircuitBreaker' of github.com:Arkatufus/akka.net…
Arkatufus May 7, 2025
704731a
cleanup obsolete warnings
Arkatufus May 7, 2025
75c1660
Merge branch 'dev' into Harden-CircuitBreaker
Aaronontheweb May 8, 2025
fa4485a
Update API Approval list
Arkatufus May 8, 2025
4d8ef03
Merge branch 'Harden-CircuitBreaker' of github.com:Arkatufus/akka.net…
Arkatufus May 8, 2025
ec19876
Cleanup refactor warnings
Arkatufus May 8, 2025
f154a82
Merge branch 'dev' into Harden-CircuitBreaker
Arkatufus May 8, 2025
d714290
Merge branch 'dev' into Harden-CircuitBreaker
Aaronontheweb May 8, 2025
f0dfad8
Revert `.ContinueWith()` refactor
Arkatufus May 9, 2025
7f1a0ce
Merge branch 'Harden-CircuitBreaker' of github.com:Arkatufus/akka.net…
Arkatufus May 9, 2025
98b351d
Code cleanup, remove stupid codes
Arkatufus May 9, 2025
7a5a20d
Code cleanup, make sure `AtomicState.Callthrough()` have the same beh…
Arkatufus May 9, 2025
19e73df
Update API Approval list
Arkatufus May 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,14 @@ protected internal override bool AroundReceive(Receive receive, object message)
return true;
}



/// <summary>
/// TBD
/// </summary>
/// <param name="messages">TBD</param>
/// <exception cref="TimeoutException">
/// This exception is thrown when the store has not been initialized.
/// </exception>
/// <returns>TBD</returns>
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
{
var trueMsgs = messages.ToArray();

if (_store == null)
return StoreNotInitialized<IImmutableList<Exception>>();

return _store.Ask<object>(sender => new WriteMessages(trueMsgs, sender, 1), Timeout, CancellationToken.None)
return _store.Ask<object>(sender => new WriteMessages(trueMsgs, sender, 1), Timeout, cancellationToken)
.ContinueWith(r =>
{
if (r.IsCanceled)
Expand All @@ -190,23 +180,14 @@ protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerabl
}, TaskContinuationOptions.ExecuteSynchronously);
}

/// <summary>
/// TBD
/// </summary>
/// <param name="persistenceId">TBD</param>
/// <param name="toSequenceNr">TBD</param>
/// <exception cref="TimeoutException">
/// This exception is thrown when the store has not been initialized.
/// </exception>
/// <returns>TBD</returns>
protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken)
{
if (_store == null)
return StoreNotInitialized<object>();

var result = new TaskCompletionSource<object>();

_store.Ask<object>(sender => new DeleteMessagesTo(persistenceId, toSequenceNr, sender), Timeout, CancellationToken.None).ContinueWith(r =>
_store.Ask<object>(sender => new DeleteMessagesTo(persistenceId, toSequenceNr, sender), Timeout, cancellationToken).ContinueWith(r =>
{
if (r.IsFaulted)
result.TrySetException(r.Exception);
Expand Down Expand Up @@ -245,23 +226,14 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten
return replayCompletionPromise.Task;
}

/// <summary>
/// TBD
/// </summary>
/// <param name="persistenceId">TBD</param>
/// <param name="fromSequenceNr">TBD</param>
/// <exception cref="TimeoutException">
/// This exception is thrown when the store has not been initialized.
/// </exception>
/// <returns>TBD</returns>
public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken)
{
if (_store == null)
return StoreNotInitialized<long>();

var result = new TaskCompletionSource<long>();

_store.Ask<object>(sender => new ReplayMessages(0, 0, 0, persistenceId, sender), Timeout, CancellationToken.None)
_store.Ask<object>(sender => new ReplayMessages(0, 0, 0, persistenceId, sender), Timeout, cancellationToken)
.ContinueWith(t =>
{
if (t.IsFaulted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
Expand Down Expand Up @@ -89,14 +90,15 @@ protected internal override bool AroundReceive(Receive receive, object message)
return true;
}

protected async override Task DeleteAsync(SnapshotMetadata metadata)
protected override async Task DeleteAsync(
SnapshotMetadata metadata,
CancellationToken cancellationToken)
{
if (_store == null)
throw new TimeoutException("Store not intialized.");
var s = Sender;
throw new TimeoutException("Store not initialized.");
try
{
var response = await _store.Ask(new DeleteSnapshot(metadata), Timeout);
var response = await _store.Ask(new DeleteSnapshot(metadata), Timeout, cancellationToken);
if (response is DeleteSnapshotFailure f)
{
ExceptionDispatchInfo.Capture(f.Cause).Throw();
Expand All @@ -108,14 +110,16 @@ protected async override Task DeleteAsync(SnapshotMetadata metadata)
}
}

protected async override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task DeleteAsync(
string persistenceId,
SnapshotSelectionCriteria criteria,
CancellationToken cancellationToken)
{
if (_store == null)
throw new TimeoutException("Store not intialized.");
var s = Sender;
throw new TimeoutException("Store not initialized.");
try
{
var response = await _store.Ask(new DeleteSnapshots(persistenceId, criteria), Timeout);
var response = await _store.Ask(new DeleteSnapshots(persistenceId, criteria), Timeout, cancellationToken);
if (response is DeleteSnapshotsFailure f)
{
ExceptionDispatchInfo.Capture(f.Cause).Throw();
Expand All @@ -127,14 +131,16 @@ protected async override Task DeleteAsync(string persistenceId, SnapshotSelectio
}
}

protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task<SelectedSnapshot> LoadAsync(
string persistenceId,
SnapshotSelectionCriteria criteria,
CancellationToken cancellationToken)
{
if (_store == null)
throw new TimeoutException("Store not intialized.");
var s = Sender;
throw new TimeoutException("Store not initialized.");
try
{
var response = await _store.Ask(new LoadSnapshot(persistenceId, criteria, criteria.MaxSequenceNr), Timeout);
var response = await _store.Ask(new LoadSnapshot(persistenceId, criteria, criteria.MaxSequenceNr), Timeout, cancellationToken);
switch (response)
{
case LoadSnapshotResult ls:
Expand All @@ -154,14 +160,16 @@ protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId,
throw new TimeoutException();
}

protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
protected override async Task SaveAsync(
SnapshotMetadata metadata,
object snapshot,
CancellationToken cancellationToken)
{
if (_store == null)
throw new TimeoutException("Store not intialized.");
var s = Sender;
throw new TimeoutException("Store not initialized.");
try
{
var response = await _store.Ask(new SaveSnapshot(metadata, snapshot), Timeout);
var response = await _store.Ask(new SaveSnapshot(metadata, snapshot), Timeout, cancellationToken);
if (response is SaveSnapshotFailure f)
{
ExceptionDispatchInfo.Capture(f.Cause).Throw();
Expand Down
25 changes: 18 additions & 7 deletions src/contrib/cluster/Akka.Cluster.Sharding.Tests/Bugfix7399Specs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -150,22 +151,24 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten
recoveryCallback);
}

protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
{
if (!Working)
{
throw new ApplicationException("Failed");
}

return base.WriteMessagesAsync(messages);
return base.WriteMessagesAsync(messages, cancellationToken);
}
}

public class FailingSnapshot : SnapshotStore
{
public static bool Working = false;

protected override Task DeleteAsync(SnapshotMetadata metadata)
protected override Task DeleteAsync(
SnapshotMetadata metadata,
CancellationToken cancellationToken)
{
if (!Working)
{
Expand All @@ -175,7 +178,10 @@ protected override Task DeleteAsync(SnapshotMetadata metadata)
return Task.CompletedTask;
}

protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override Task DeleteAsync(
string persistenceId,
SnapshotSelectionCriteria criteria,
CancellationToken cancellationToken)
{
if (!Working)
{
Expand All @@ -185,8 +191,10 @@ protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCrite
return Task.CompletedTask;
}

protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId,
SnapshotSelectionCriteria criteria)
protected override async Task<SelectedSnapshot> LoadAsync(
string persistenceId,
SnapshotSelectionCriteria criteria,
CancellationToken cancellationToken)
{
if (!Working)
{
Expand All @@ -196,7 +204,10 @@ protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId,
return null;
}

protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot)
protected override Task SaveAsync(
SnapshotMetadata metadata,
object snapshot,
CancellationToken cancellationToken)
{
if (!Working)
{
Expand Down

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Original file line number Diff line number Diff line change
Expand Up @@ -4529,10 +4529,22 @@ namespace Akka.Pattern
public Akka.Pattern.CircuitBreaker OnHalfOpen(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { }
public void Succeed() { }
[System.ObsoleteAttribute("Use WithCircuitBreaker() that accepts functions with CancellationToken parameter." +
" Since 1.5.42")]
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<T>> body) { }
[System.ObsoleteAttribute("Use WithCircuitBreaker() that accepts functions with CancellationToken parameter." +
" Since 1.5.42")]
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T, TState>(TState state, System.Func<TState, System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T, TState>(TState state, System.Func<TState, System.Threading.CancellationToken, System.Threading.Tasks.Task<T>> body) { }
[System.ObsoleteAttribute("Use WithCircuitBreaker() that accepts functions with CancellationToken parameter." +
" Since 1.5.42")]
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.Tasks.Task> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task> body) { }
[System.ObsoleteAttribute("Use WithCircuitBreaker() that accepts functions with CancellationToken parameter." +
" Since 1.5.42")]
public System.Threading.Tasks.Task WithCircuitBreaker<TState>(TState state, System.Func<TState, System.Threading.Tasks.Task> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker<TState>(TState state, System.Func<TState, System.Threading.CancellationToken, System.Threading.Tasks.Task> body) { }
public Akka.Pattern.CircuitBreaker WithExponentialBackoff(System.TimeSpan maxResetTimeout) { }
public Akka.Pattern.CircuitBreaker WithRandomFactor(double randomFactor) { }
public void WithSyncCircuitBreaker(System.Action body) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4519,10 +4519,22 @@ namespace Akka.Pattern
public Akka.Pattern.CircuitBreaker OnHalfOpen(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { }
public void Succeed() { }
[System.ObsoleteAttribute("Use WithCircuitBreaker() that accepts functions with CancellationToken parameter." +
" Since 1.5.42")]
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<T>> body) { }
[System.ObsoleteAttribute("Use WithCircuitBreaker() that accepts functions with CancellationToken parameter." +
" Since 1.5.42")]
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T, TState>(TState state, System.Func<TState, System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T, TState>(TState state, System.Func<TState, System.Threading.CancellationToken, System.Threading.Tasks.Task<T>> body) { }
[System.ObsoleteAttribute("Use WithCircuitBreaker() that accepts functions with CancellationToken parameter." +
" Since 1.5.42")]
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.Tasks.Task> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task> body) { }
[System.ObsoleteAttribute("Use WithCircuitBreaker() that accepts functions with CancellationToken parameter." +
" Since 1.5.42")]
public System.Threading.Tasks.Task WithCircuitBreaker<TState>(TState state, System.Func<TState, System.Threading.Tasks.Task> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker<TState>(TState state, System.Func<TState, System.Threading.CancellationToken, System.Threading.Tasks.Task> body) { }
public Akka.Pattern.CircuitBreaker WithExponentialBackoff(System.TimeSpan maxResetTimeout) { }
public Akka.Pattern.CircuitBreaker WithRandomFactor(double randomFactor) { }
public void WithSyncCircuitBreaker(System.Action body) { }
Expand Down
Loading
Loading