44using System . Linq ;
55using System . Reflection ;
66using System . Threading ;
7+ using System . Threading . Channels ;
78using System . Threading . Tasks ;
89using Microsoft . Extensions . Logging ;
910using Microsoft . Extensions . Options ;
1011using Orleans . Configuration ;
1112using Orleans . Internal ;
1213using Orleans . Runtime . Utilities ;
13- using Orleans . Serialization ;
1414using Orleans . Serialization . TypeSystem ;
1515
1616namespace Orleans . Runtime . MembershipService
@@ -38,6 +38,13 @@ internal class MembershipTableManager : IHealthCheckParticipant, ILifecycleParti
3838 private readonly SiloAddress myAddress ;
3939 private readonly AsyncEnumerable < MembershipTableSnapshot > updates ;
4040 private readonly IAsyncTimer membershipUpdateTimer ;
41+ private readonly CancellationTokenSource _shutdownCts = new ( ) ;
42+
43+ private readonly Task _suspectOrKillsListTask ;
44+ private readonly Channel < SuspectOrKillRequest > _trySuspectOrKillChannel = Channel . CreateBounded < SuspectOrKillRequest > ( new BoundedChannelOptions ( 100 ) { FullMode = BoundedChannelFullMode . DropOldest } ) ;
45+
46+ // For testing.
47+ internal AutoResetEvent TestingSuspectOrKillIdle = new ( false ) ;
4148
4249 private MembershipTableSnapshot snapshot ;
4350
@@ -71,6 +78,8 @@ public MembershipTableManager(
7178 this . membershipUpdateTimer = timerFactory . Create (
7279 this . clusterMembershipOptions . TableRefreshTimeout ,
7380 nameof ( PeriodicallyRefreshMembershipTable ) ) ;
81+
82+ _suspectOrKillsListTask = Task . Run ( ProcessSuspectOrKillLists ) ;
7483 }
7584
7685 internal Func < DateTime > GetDateTimeUtcNow { get ; set ; } = ( ) => DateTime . UtcNow ;
@@ -243,28 +252,32 @@ private async Task PeriodicallyRefreshMembershipTable()
243252 if ( this . log . IsEnabled ( LogLevel . Debug ) ) this . log . LogDebug ( "Starting periodic membership table refreshes" ) ;
244253 try
245254 {
246- var targetMilliseconds = ( int ) this . clusterMembershipOptions . TableRefreshTimeout . TotalMilliseconds ;
247-
248- TimeSpan ? onceOffDelay = RandomTimeSpan . Next ( this . clusterMembershipOptions . TableRefreshTimeout ) ;
249- while ( await this . membershipUpdateTimer . NextTick ( onceOffDelay ) )
255+ // jitter for initial
256+ TimeSpan ? overrideDelayPeriod = RandomTimeSpan . Next ( this . clusterMembershipOptions . TableRefreshTimeout ) ;
257+ var exponentialBackoff = new ExponentialBackoff ( EXP_BACKOFF_CONTENTION_MIN , EXP_BACKOFF_CONTENTION_MAX , EXP_BACKOFF_STEP ) ;
258+ var runningFailures = 0 ;
259+ while ( await this . membershipUpdateTimer . NextTick ( overrideDelayPeriod ) )
250260 {
251- onceOffDelay = default ;
252-
253261 try
254262 {
255263 var stopwatch = ValueStopwatch . StartNew ( ) ;
264+
256265 await this . Refresh ( ) ;
257266 if ( this . log . IsEnabled ( LogLevel . Trace ) ) this . log . LogTrace ( "Refreshing membership table took {Elapsed}" , stopwatch . Elapsed ) ;
267+ // reset to allow normal refresh period after success
268+ overrideDelayPeriod = default ;
269+ runningFailures = 0 ;
258270 }
259271 catch ( Exception exception )
260272 {
273+ runningFailures += 1 ;
261274 this . log . LogWarning (
262275 ( int ) ErrorCode . MembershipUpdateIAmAliveFailure ,
263276 exception ,
264- "Failed to refresh membership table, will retry shortly" ) ;
277+ "Failed to refresh membership table, will retry shortly. Retry attempt {retries}" , runningFailures ) ;
265278
266- // Retry quickly
267- onceOffDelay = TimeSpan . FromMilliseconds ( 200 ) ;
279+ // Retry quickly and then exponentially back off
280+ overrideDelayPeriod = exponentialBackoff . Next ( runningFailures ) ;
268281 }
269282 }
270283 }
@@ -584,19 +597,13 @@ private async Task<bool> CleanupMyTableEntries(MembershipTableData table)
584597 if ( log . IsEnabled ( LogLevel . Debug ) ) log . LogDebug ( "CleanupTableEntries: About to DeclareDead {Count} outdated silos in the table: {Silos}" , silosToDeclareDead . Count ,
585598 Utils . EnumerableToString ( silosToDeclareDead . Select ( tuple => tuple . Item1 ) ) ) ;
586599
587- var result = true ;
588- var nextVersion = table . Version ;
589-
590600 foreach ( var siloData in silosToDeclareDead )
591601 {
592- MembershipEntry entry = siloData . Item1 ;
593- string eTag = siloData . Item2 ;
594- bool ok = await DeclareDead ( entry , eTag , nextVersion , GetDateTimeUtcNow ( ) ) ;
595- if ( ! ok ) result = false ;
596- nextVersion = nextVersion . Next ( ) ; // advance the table version (if write succeded, we advanced the version. if failed, someone else did. It is safe anyway).
602+ await _trySuspectOrKillChannel . Writer . WriteAsync (
603+ SuspectOrKillRequest . CreateKillRequest ( siloData . Item1 . SiloAddress ) ) ;
597604 }
598605
599- return result ;
606+ return true ;
600607 }
601608
602609 private void KillMyselfLocally ( string reason )
@@ -637,9 +644,89 @@ bool IsFunctionalForMembership(SiloStatus status)
637644 }
638645 }
639646
647+ private class SuspectOrKillRequest
648+ {
649+ public SiloAddress SiloAddress { get ; set ; }
650+ public SiloAddress OtherSilo { get ; set ; }
651+ public RequestType Type { get ; set ; }
652+
653+ public enum RequestType
654+ {
655+ Unknown = 0 ,
656+ SuspectOrKill ,
657+ Kill
658+ }
659+
660+ public static SuspectOrKillRequest CreateKillRequest ( SiloAddress silo )
661+ {
662+ return new SuspectOrKillRequest
663+ {
664+ SiloAddress = silo ,
665+ OtherSilo = null ,
666+ Type = RequestType . Kill
667+ } ;
668+ }
669+
670+ public static SuspectOrKillRequest CreateSuspectOrKillRequest ( SiloAddress silo , SiloAddress otherSilo )
671+ {
672+ return new SuspectOrKillRequest
673+ {
674+ SiloAddress = silo ,
675+ OtherSilo = otherSilo ,
676+ Type = RequestType . SuspectOrKill
677+ } ;
678+ }
679+ }
680+
640681 public async Task < bool > TryKill ( SiloAddress silo )
641682 {
642- var table = await membershipTableProvider . ReadAll ( ) ;
683+ await _trySuspectOrKillChannel . Writer . WriteAsync ( SuspectOrKillRequest . CreateKillRequest ( silo ) ) ;
684+ return true ;
685+ }
686+
687+ public async Task ProcessSuspectOrKillLists ( )
688+ {
689+ var backoff = new ExponentialBackoff ( EXP_BACKOFF_ERROR_MIN , EXP_BACKOFF_ERROR_MAX ,
690+ EXP_BACKOFF_STEP ) ;
691+ var runningFailureCount = 0 ;
692+ var reader = _trySuspectOrKillChannel . Reader ;
693+ while ( await reader . WaitToReadAsync ( _shutdownCts . Token ) )
694+ {
695+ while ( reader . TryRead ( out var request ) )
696+ {
697+ await Task . Delay ( backoff . Next ( runningFailureCount ) , _shutdownCts . Token ) ;
698+
699+ try
700+ {
701+ switch ( request . Type )
702+ {
703+ case SuspectOrKillRequest . RequestType . Kill :
704+ await InnerTryKill ( request . SiloAddress , _shutdownCts . Token ) ;
705+ break ;
706+ case SuspectOrKillRequest . RequestType . SuspectOrKill :
707+ await InnerTryToSuspectOrKill ( request . SiloAddress , request . OtherSilo , _shutdownCts . Token ) ;
708+ break ;
709+ }
710+ runningFailureCount = 0 ;
711+ }
712+ catch ( Exception ex )
713+ {
714+ runningFailureCount += 1 ;
715+ log . LogError ( ex , "Error while processing suspect or kill lists. '{FailureCount}' consecutive failures." , runningFailureCount ) ;
716+ await _trySuspectOrKillChannel . Writer . WriteAsync ( request , _shutdownCts . Token ) ;
717+ }
718+
719+ if ( ! reader . TryPeek ( out _ ) )
720+ {
721+ TestingSuspectOrKillIdle . Set ( ) ;
722+ }
723+ }
724+ }
725+ }
726+
727+ private async Task < bool > InnerTryKill ( SiloAddress silo , CancellationToken cancellationToken )
728+ {
729+ var table = await membershipTableProvider . ReadAll ( ) . WaitAsync ( cancellationToken ) ;
643730
644731 if ( log . IsEnabled ( LogLevel . Debug ) )
645732 {
@@ -685,12 +772,18 @@ public async Task<bool> TryKill(SiloAddress silo)
685772 ( int ) ErrorCode . MembershipMarkingAsDead ,
686773 "Going to mark silo {SiloAddress} dead as a result of a call to TryKill" ,
687774 entry . SiloAddress ) ;
688- return await DeclareDead ( entry , eTag , table . Version , GetDateTimeUtcNow ( ) ) ;
775+ return await DeclareDead ( entry , eTag , table . Version , GetDateTimeUtcNow ( ) ) . WaitAsync ( cancellationToken ) ;
689776 }
690777
691778 public async Task < bool > TryToSuspectOrKill ( SiloAddress silo , SiloAddress indirectProbingSilo = null )
692779 {
693- var table = await membershipTableProvider . ReadAll ( ) ;
780+ await _trySuspectOrKillChannel . Writer . WriteAsync ( SuspectOrKillRequest . CreateSuspectOrKillRequest ( silo , indirectProbingSilo ) ) ;
781+ return true ;
782+ }
783+
784+ private async Task < bool > InnerTryToSuspectOrKill ( SiloAddress silo , SiloAddress indirectProbingSilo , CancellationToken cancellationToken )
785+ {
786+ var table = await membershipTableProvider . ReadAll ( ) . WaitAsync ( cancellationToken ) ;
694787 var now = GetDateTimeUtcNow ( ) ;
695788
696789 if ( log . IsEnabled ( LogLevel . Debug ) ) log . LogDebug ( "TryToSuspectOrKill: Read Membership table {Table}" , table . ToString ( ) ) ;
@@ -720,7 +813,9 @@ public async Task<bool> TryToSuspectOrKill(SiloAddress silo, SiloAddress indirec
720813 {
721814 // this should not happen ...
722815 log . LogError ( ( int ) ErrorCode . MembershipFailedToReadSilo , "Could not find silo entry for silo {Silo} in the table." , silo ) ;
723- throw new KeyNotFoundException ( $ "Could not find silo entry for silo { silo } in the table.") ;
816+ //What is a caller going to do? The silo is not in the table which is what we are trying to achieve.
817+ //throw new KeyNotFoundException($"Could not find silo entry for silo {silo} in the table.");
818+ return false ;
724819 }
725820
726821 var entry = tuple . Item1 . Copy ( ) ;
@@ -794,7 +889,7 @@ public async Task<bool> TryToSuspectOrKill(SiloAddress silo, SiloAddress indirec
794889 activeNonStaleSilos ,
795890 PrintSuspectList ( entry . SuspectTimes ) ) ;
796891
797- return await DeclareDead ( entry , eTag , table . Version , now ) ;
892+ return await DeclareDead ( entry , eTag , table . Version , now ) . WaitAsync ( cancellationToken ) ;
798893 }
799894
800895 log . LogInformation (
@@ -807,10 +902,10 @@ public async Task<bool> TryToSuspectOrKill(SiloAddress silo, SiloAddress indirec
807902 PrintSuspectList ( freshVotes ) ) ;
808903
809904 // If we fail to update here we will retry later.
810- var ok = await membershipTableProvider . UpdateRow ( entry , eTag , table . Version . Next ( ) ) ;
905+ var ok = await membershipTableProvider . UpdateRow ( entry , eTag , table . Version . Next ( ) ) . WaitAsync ( cancellationToken ) ;
811906 if ( ok )
812907 {
813- table = await membershipTableProvider . ReadAll ( ) ;
908+ table = await membershipTableProvider . ReadAll ( ) . WaitAsync ( cancellationToken ) ;
814909 this . ProcessTableUpdate ( table , "TrySuspectOrKill" ) ;
815910
816911 // Gossip using the local silo status, since this is just informational to propagate the suspicion vote.
@@ -877,18 +972,22 @@ async Task OnRuntimeGrainServicesStart(CancellationToken ct)
877972
878973 async Task OnRuntimeGrainServicesStop ( CancellationToken ct )
879974 {
975+ tasks . Add ( _suspectOrKillsListTask ) ;
976+ _trySuspectOrKillChannel . Writer . TryComplete ( ) ;
880977 this . membershipUpdateTimer . Dispose ( ) ;
978+ _shutdownCts . Cancel ( ) ;
881979
882980 // Allow some minimum time for graceful shutdown.
883981 var gracePeriod = Task . WhenAll ( Task . Delay ( ClusterMembershipOptions . ClusteringShutdownGracePeriod ) , ct . WhenCancelled ( ) ) ;
884- await Task . WhenAny ( gracePeriod , Task . WhenAll ( tasks ) ) ;
982+ await Task . WhenAny ( gracePeriod , Task . WhenAll ( tasks ) ) . SuppressThrowing ( ) ;
885983 }
886984 }
887985
888986 public void Dispose ( )
889987 {
890988 this . updates . Dispose ( ) ;
891989 this . membershipUpdateTimer . Dispose ( ) ;
990+ _shutdownCts . Dispose ( ) ;
892991 }
893992 }
894993}
0 commit comments