Skip to content

Commit

Permalink
refactor: reimplement the supervisor strategy (#625)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Feb 13, 2025
1 parent 056ddcd commit 15a5c94
Show file tree
Hide file tree
Showing 21 changed files with 886 additions and 509 deletions.
61 changes: 35 additions & 26 deletions actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (x *actorSystem) Start(ctx context.Context) error {
AddError(x.spawnUserGuardian(ctx)).
AddError(x.spawnRebalancer(ctx)).
AddError(x.spawnJanitor(ctx)).
AddError(x.spawnDeadletters(ctx)).
AddError(x.spawnDeadletter(ctx)).
Error(); err != nil {
return errorschain.
New(errorschain.ReturnAll()).
Expand Down Expand Up @@ -1762,8 +1762,8 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
}

// set the supervisor strategies when defined
if len(spawnConfig.supervisorStrategies) != 0 {
pidOpts = append(pidOpts, withSupervisorStrategies(spawnConfig.supervisorStrategies...))
if spawnConfig.supervisor != nil {
pidOpts = append(pidOpts, withSupervisor(spawnConfig.supervisor))
}

// enable stash
Expand Down Expand Up @@ -1884,13 +1884,11 @@ func (x *actorSystem) spawnRootGuardian(ctx context.Context) error {
func (x *actorSystem) spawnSystemGuardian(ctx context.Context) error {
var err error
actorName := x.reservedName(systemGuardianType)

x.systemGuardian, err = x.configPID(ctx,
actorName,
newSystemGuardian(),
WithSupervisorStrategies(
NewSupervisorStrategy(PanicError{}, NewStopDirective()),
),
)
WithSupervisor(NewSupervisor()))
if err != nil {
return fmt.Errorf("actor=%s failed to start system guardian: %w", actorName, err)
}
Expand All @@ -1907,9 +1905,7 @@ func (x *actorSystem) spawnUserGuardian(ctx context.Context) error {
x.userGuardian, err = x.configPID(ctx,
actorName,
newUserGuardian(),
WithSupervisorStrategies(
NewSupervisorStrategy(PanicError{}, NewStopDirective()),
))
WithSupervisor(NewSupervisor()))
if err != nil {
return fmt.Errorf("actor=%s failed to start user guardian: %w", actorName, err)
}
Expand All @@ -1923,13 +1919,18 @@ func (x *actorSystem) spawnUserGuardian(ctx context.Context) error {
func (x *actorSystem) spawnJanitor(ctx context.Context) error {
var err error
actorName := x.reservedName(deathWatchType)

// define the supervisor strategy to use
supervisor := NewSupervisor(
WithStrategy(OneForOneStrategy),
WithDirective(PanicError{}, StopDirective),
WithDirective(InternalError{}, StopDirective),
)

x.deathWatch, err = x.configPID(ctx,
actorName,
newDeathWatch(),
WithSupervisorStrategies(
NewSupervisorStrategy(PanicError{}, NewStopDirective()),
NewSupervisorStrategy(InternalError{}, NewStopDirective()),
),
WithSupervisor(supervisor),
)
if err != nil {
return fmt.Errorf("actor=%s failed to start the deathWatch: %w", actorName, err)
Expand All @@ -1945,16 +1946,21 @@ func (x *actorSystem) spawnRebalancer(ctx context.Context) error {
if x.clusterEnabled.Load() {
var err error
actorName := x.reservedName(rebalancerType)

// define the supervisor strategy to use
supervisor := NewSupervisor(
WithStrategy(OneForOneStrategy),
WithDirective(PanicError{}, RestartDirective),
WithDirective(&runtime.PanicNilError{}, RestartDirective),
WithDirective(rebalancingError{}, RestartDirective),
WithDirective(InternalError{}, ResumeDirective),
WithDirective(SpawnError{}, ResumeDirective),
)

x.rebalancer, err = x.configPID(ctx,
actorName,
newRebalancer(x.reflection, x.remoting),
WithSupervisorStrategies(
NewSupervisorStrategy(PanicError{}, NewRestartDirective()),
NewSupervisorStrategy(&runtime.PanicNilError{}, NewRestartDirective()),
NewSupervisorStrategy(rebalancingError{}, NewRestartDirective()),
NewSupervisorStrategy(InternalError{}, NewResumeDirective()),
NewSupervisorStrategy(SpawnError{}, NewResumeDirective()),
),
WithSupervisor(supervisor),
)
if err != nil {
return fmt.Errorf("actor=%s failed to start cluster rebalancer: %w", actorName, err)
Expand All @@ -1966,16 +1972,19 @@ func (x *actorSystem) spawnRebalancer(ctx context.Context) error {
return nil
}

// spawnDeadletters creates the deadletters synthetic actor
func (x *actorSystem) spawnDeadletters(ctx context.Context) error {
// spawnDeadletter creates the deadletters synthetic actor
func (x *actorSystem) spawnDeadletter(ctx context.Context) error {
var err error
actorName := x.reservedName(deadletterType)
x.deadletters, err = x.configPID(ctx,
actorName,
newDeadLetter(),
WithSupervisorStrategies(
NewSupervisorStrategy(PanicError{}, NewResumeDirective()),
NewSupervisorStrategy(&runtime.PanicNilError{}, NewResumeDirective()),
WithSupervisor(
NewSupervisor(
WithStrategy(OneForOneStrategy),
WithDirective(PanicError{}, ResumeDirective),
WithDirective(&runtime.PanicNilError{}, ResumeDirective),
),
),
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions actor/dead_letter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type deadLetter struct {
pid *PID
logger log.Logger
counter *atomic.Int64
letters *syncmap.SyncMap[string, *goaktpb.Deadletter]
counters *syncmap.SyncMap[string, *atomic.Int64]
letters *syncmap.Map[string, *goaktpb.Deadletter]
counters *syncmap.Map[string, *atomic.Int64]
}

// enforce the implementation of the Actor interface
Expand Down
4 changes: 0 additions & 4 deletions actor/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,6 @@ func startClusterSystem(t *testing.T, serverAddr string, opts ...testClusterOpti
return system, provider
}

type unhandledSupervisorDirective struct{}

func (x unhandledSupervisorDirective) isSupervisorDirective() {}

type mockRouterActor struct {
counter int
logger log.Logger
Expand Down
Loading

0 comments on commit 15a5c94

Please sign in to comment.