From 15a5c9412286eae76c53b1576efe2ceaa6aa769b Mon Sep 17 00:00:00 2001 From: Arsene Date: Thu, 13 Feb 2025 22:05:57 +0000 Subject: [PATCH] refactor: reimplement the supervisor strategy (#625) --- actor/actor_system.go | 61 ++++--- actor/dead_letter.go | 4 +- actor/mock_test.go | 4 - actor/pid.go | 181 ++++++++++++-------- actor/pid_node.go | 6 +- actor/pid_option.go | 12 +- actor/pid_option_test.go | 10 +- actor/pid_test.go | 217 ++++++++++++++++++------ actor/pid_tree.go | 143 ++++++++++------ actor/pid_tree_test.go | 26 ++- actor/spawn_option.go | 20 ++- actor/spawn_option_test.go | 6 +- actor/supervisor.go | 231 ++++++++++++++++++++++++++ actor/supervisor_directive.go | 90 ---------- actor/supervisor_strategy.go | 118 ------------- actor/supervisor_test.go | 59 +++++++ actor/util.go | 4 +- internal/internalpb/supervision.pb.go | 144 +++++++++++----- internal/slice/sync.go | 22 +-- internal/syncmap/syncmap.go | 30 ++-- protos/internal/supervision.proto | 7 + 21 files changed, 886 insertions(+), 509 deletions(-) create mode 100644 actor/supervisor.go delete mode 100644 actor/supervisor_directive.go delete mode 100644 actor/supervisor_strategy.go create mode 100644 actor/supervisor_test.go diff --git a/actor/actor_system.go b/actor/actor_system.go index 1c4b894d..054144e3 100644 --- a/actor/actor_system.go +++ b/actor/actor_system.go @@ -447,7 +447,7 @@ func (x *actorSystem) Start(ctx context.Context) error { AddError(x.spawnUserGuardian(ctx)). AddError(x.spawnRebalancer(ctx)). AddError(x.spawnJanitor(ctx)). - AddError(x.spawnDeadletters(ctx)). + AddError(x.spawnDeadletter(ctx)). Error(); err != nil { return errorschain. New(errorschain.ReturnAll()). @@ -1762,8 +1762,8 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o } // set the supervisor strategies when defined - if len(spawnConfig.supervisorStrategies) != 0 { - pidOpts = append(pidOpts, withSupervisorStrategies(spawnConfig.supervisorStrategies...)) + if spawnConfig.supervisor != nil { + pidOpts = append(pidOpts, withSupervisor(spawnConfig.supervisor)) } // enable stash @@ -1884,13 +1884,11 @@ func (x *actorSystem) spawnRootGuardian(ctx context.Context) error { func (x *actorSystem) spawnSystemGuardian(ctx context.Context) error { var err error actorName := x.reservedName(systemGuardianType) + x.systemGuardian, err = x.configPID(ctx, actorName, newSystemGuardian(), - WithSupervisorStrategies( - NewSupervisorStrategy(PanicError{}, NewStopDirective()), - ), - ) + WithSupervisor(NewSupervisor())) if err != nil { return fmt.Errorf("actor=%s failed to start system guardian: %w", actorName, err) } @@ -1907,9 +1905,7 @@ func (x *actorSystem) spawnUserGuardian(ctx context.Context) error { x.userGuardian, err = x.configPID(ctx, actorName, newUserGuardian(), - WithSupervisorStrategies( - NewSupervisorStrategy(PanicError{}, NewStopDirective()), - )) + WithSupervisor(NewSupervisor())) if err != nil { return fmt.Errorf("actor=%s failed to start user guardian: %w", actorName, err) } @@ -1923,13 +1919,18 @@ func (x *actorSystem) spawnUserGuardian(ctx context.Context) error { func (x *actorSystem) spawnJanitor(ctx context.Context) error { var err error actorName := x.reservedName(deathWatchType) + + // define the supervisor strategy to use + supervisor := NewSupervisor( + WithStrategy(OneForOneStrategy), + WithDirective(PanicError{}, StopDirective), + WithDirective(InternalError{}, StopDirective), + ) + x.deathWatch, err = x.configPID(ctx, actorName, newDeathWatch(), - WithSupervisorStrategies( - NewSupervisorStrategy(PanicError{}, NewStopDirective()), - NewSupervisorStrategy(InternalError{}, NewStopDirective()), - ), + WithSupervisor(supervisor), ) if err != nil { return fmt.Errorf("actor=%s failed to start the deathWatch: %w", actorName, err) @@ -1945,16 +1946,21 @@ func (x *actorSystem) spawnRebalancer(ctx context.Context) error { if x.clusterEnabled.Load() { var err error actorName := x.reservedName(rebalancerType) + + // define the supervisor strategy to use + supervisor := NewSupervisor( + WithStrategy(OneForOneStrategy), + WithDirective(PanicError{}, RestartDirective), + WithDirective(&runtime.PanicNilError{}, RestartDirective), + WithDirective(rebalancingError{}, RestartDirective), + WithDirective(InternalError{}, ResumeDirective), + WithDirective(SpawnError{}, ResumeDirective), + ) + x.rebalancer, err = x.configPID(ctx, actorName, newRebalancer(x.reflection, x.remoting), - WithSupervisorStrategies( - NewSupervisorStrategy(PanicError{}, NewRestartDirective()), - NewSupervisorStrategy(&runtime.PanicNilError{}, NewRestartDirective()), - NewSupervisorStrategy(rebalancingError{}, NewRestartDirective()), - NewSupervisorStrategy(InternalError{}, NewResumeDirective()), - NewSupervisorStrategy(SpawnError{}, NewResumeDirective()), - ), + WithSupervisor(supervisor), ) if err != nil { return fmt.Errorf("actor=%s failed to start cluster rebalancer: %w", actorName, err) @@ -1966,16 +1972,19 @@ func (x *actorSystem) spawnRebalancer(ctx context.Context) error { return nil } -// spawnDeadletters creates the deadletters synthetic actor -func (x *actorSystem) spawnDeadletters(ctx context.Context) error { +// spawnDeadletter creates the deadletters synthetic actor +func (x *actorSystem) spawnDeadletter(ctx context.Context) error { var err error actorName := x.reservedName(deadletterType) x.deadletters, err = x.configPID(ctx, actorName, newDeadLetter(), - WithSupervisorStrategies( - NewSupervisorStrategy(PanicError{}, NewResumeDirective()), - NewSupervisorStrategy(&runtime.PanicNilError{}, NewResumeDirective()), + WithSupervisor( + NewSupervisor( + WithStrategy(OneForOneStrategy), + WithDirective(PanicError{}, ResumeDirective), + WithDirective(&runtime.PanicNilError{}, ResumeDirective), + ), ), ) if err != nil { diff --git a/actor/dead_letter.go b/actor/dead_letter.go index 7ab8c1fd..9ae6c560 100644 --- a/actor/dead_letter.go +++ b/actor/dead_letter.go @@ -44,8 +44,8 @@ type deadLetter struct { pid *PID logger log.Logger counter *atomic.Int64 - letters *syncmap.SyncMap[string, *goaktpb.Deadletter] - counters *syncmap.SyncMap[string, *atomic.Int64] + letters *syncmap.Map[string, *goaktpb.Deadletter] + counters *syncmap.Map[string, *atomic.Int64] } // enforce the implementation of the Actor interface diff --git a/actor/mock_test.go b/actor/mock_test.go index 6147302a..420cd98e 100644 --- a/actor/mock_test.go +++ b/actor/mock_test.go @@ -520,10 +520,6 @@ func startClusterSystem(t *testing.T, serverAddr string, opts ...testClusterOpti return system, provider } -type unhandledSupervisorDirective struct{} - -func (x unhandledSupervisorDirective) isSupervisorDirective() {} - type mockRouterActor struct { counter int logger log.Logger diff --git a/actor/pid.go b/actor/pid.go index 082e9c17..50055899 100644 --- a/actor/pid.go +++ b/actor/pid.go @@ -134,7 +134,7 @@ type PID struct { processedCount *atomic.Int64 // supervisor strategy - supervisorStrategies *strategiesMap + supervisor *Supervisor supervisionChan chan error supervisionStopSignal chan types.Unit @@ -178,7 +178,7 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ... supervisionStopSignal: make(chan types.Unit, 1), remoting: NewRemoting(), goScheduler: newGoScheduler(300), - supervisorStrategies: newStrategiesMap(), + supervisor: NewSupervisor(), startedAt: atomic.NewInt64(0), } @@ -191,11 +191,6 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ... pid.initTimeout.Store(DefaultInitTimeout) pid.processing.Store(int32(idle)) - // set default strategies mappings - for _, strategy := range DefaultSupervisorStrategies { - pid.supervisorStrategies.Put(strategy) - } - for _, opt := range opts { opt(pid) } @@ -558,8 +553,8 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts . } // set the supervisor strategies when defined - if len(spawnConfig.supervisorStrategies) != 0 { - pidOptions = append(pidOptions, withSupervisorStrategies(spawnConfig.supervisorStrategies...)) + if spawnConfig.supervisor != nil { + pidOptions = append(pidOptions, withSupervisor(spawnConfig.supervisor)) } // disable passivation for system actor @@ -1220,7 +1215,7 @@ func (pid *PID) receiveLoop() { case *goaktpb.PoisonPill: _ = pid.Shutdown(received.Context()) case *internalpb.HandleFault: - pid.handleFaultyChild(msg) + pid.handleFaultyChild(received.Sender(), msg) default: pid.handleReceived(received) } @@ -1327,7 +1322,7 @@ func (pid *PID) reset() { pid.startedAt.Store(0) pid.stopping.Store(false) pid.suspended.Store(false) - pid.supervisorStrategies.Reset() + pid.supervisor.Reset() pid.mailbox.Dispose() } @@ -1602,8 +1597,7 @@ func (pid *PID) notifyParent(err error) { return } - // check my supervisor strategies based upon the error type - strategy, ok := pid.supervisorStrategies.Get(err) + directive, ok := pid.supervisor.Directive(err) if !ok { pid.logger.Debugf("no supervisor directive found for error: %s", errorType(err)) pid.suspend(err.Error()) @@ -1615,29 +1609,37 @@ func (pid *PID) notifyParent(err error) { Message: err.Error(), } - directive := strategy.Directive() - switch d := directive.(type) { - case *StopDirective: + switch directive { + case StopDirective: msg.Directive = &internalpb.HandleFault_Stop{ Stop: new(internalpb.StopDirective), } - case *RestartDirective: + case RestartDirective: msg.Directive = &internalpb.HandleFault_Restart{ Restart: &internalpb.RestartDirective{ - MaxRetries: d.maxNumRetries, - Timeout: int64(d.timeout), + MaxRetries: pid.supervisor.MaxRetries(), + Timeout: int64(pid.supervisor.Timeout()), }, } - case *ResumeDirective: + case ResumeDirective: msg.Directive = &internalpb.HandleFault_Resume{ Resume: &internalpb.ResumeDirective{}, } default: - pid.logger.Debugf("unknown directive: %T found for error: %s", d, errorType(err)) + pid.logger.Debugf("unknown directive: %T found for error: %s", directive, errorType(err)) pid.suspend(err.Error()) return } + switch pid.supervisor.Strategy() { + case OneForOneStrategy: + msg.Strategy = internalpb.Strategy_STRATEGY_ONE_FOR_ONE + case OneForAllStrategy: + msg.Strategy = internalpb.Strategy_STRATEGY_ONE_FOR_ALL + default: + msg.Strategy = internalpb.Strategy_STRATEGY_ONE_FOR_ONE + } + if parent := pid.Parent(); parent != nil && !parent.Equals(NoSender) { _ = pid.Tell(context.Background(), parent, msg) } @@ -1722,65 +1724,108 @@ func (pid *PID) handleCompletion(ctx context.Context, completion *taskCompletion } // handleFaultyChild watches for child actor's failure and act based upon the supervisory strategy -func (pid *PID) handleFaultyChild(msg *internalpb.HandleFault) { - tree := pid.ActorSystem().tree() - if descendants, ok := tree.Descendants(pid); ok { - for _, descendant := range descendants { - cid := descendant.GetValue() - if cid.ID() == msg.GetActorId() { - message := msg.GetMessage() - directive := msg.GetDirective() - pid.logger.Errorf("child actor=(%s) is failing: Err=%s", cid.Name(), message) - switch d := directive.(type) { - case *internalpb.HandleFault_Stop: - pid.handleStopDirective(cid) - case *internalpb.HandleFault_Restart: - pid.handleRestartDirective(cid, d.Restart.GetMaxRetries(), time.Duration(d.Restart.GetTimeout())) - case *internalpb.HandleFault_Resume: - // pass - default: - pid.handleStopDirective(cid) - } - return - } +func (pid *PID) handleFaultyChild(cid *PID, msg *internalpb.HandleFault) { + if cid.ID() == msg.GetActorId() { + message := msg.GetMessage() + directive := msg.GetDirective() + includeSiblings := msg.GetStrategy() == internalpb.Strategy_STRATEGY_ONE_FOR_ALL + + pid.logger.Errorf("child actor=(%s) is failing: Err=%s", cid.Name(), message) + + switch d := directive.(type) { + case *internalpb.HandleFault_Stop: + pid.handleStopDirective(cid, includeSiblings) + case *internalpb.HandleFault_Restart: + pid.handleRestartDirective(cid, + d.Restart.GetMaxRetries(), + time.Duration(d.Restart.GetTimeout()), + includeSiblings) + case *internalpb.HandleFault_Resume: + // pass + default: + pid.handleStopDirective(cid, includeSiblings) } + return } } -// handleStopDirective handles the Supervisor stop directive -func (pid *PID) handleStopDirective(cid *PID) { - pid.UnWatch(cid) +// handleStopDirective handles the Behavior stop directive +func (pid *PID) handleStopDirective(cid *PID, includeSiblings bool) { + ctx := context.Background() tree := pid.ActorSystem().tree() - if err := cid.Shutdown(context.Background()); err != nil { - // this can enter into some infinite loop if we panic - // since we are just shutting down the actor we can just log the error - // TODO: rethink properly about PostStop error handling - pid.logger.Error(err) + pids := []*PID{cid} + + if includeSiblings { + if siblings, ok := tree.Siblings(cid); ok { + for _, sibling := range siblings { + pids = append(pids, sibling.GetValue()) + } + } } - tree.DeleteNode(cid) -} + eg, ctx := errgroup.WithContext(ctx) + for _, spid := range pids { + spid := spid + eg.Go(func() error { + pid.UnWatch(spid) + if err := spid.Shutdown(ctx); err != nil { + // just log the error and suspend the given sibling + pid.logger.Error(fmt.Errorf("failed to shutdown actor=(%s): %w", spid.Name(), err)) + // we need to suspend the actor since its shutdown is the result of + // one of its faulty siblings + spid.suspend(err.Error()) + // no need to return an error + return nil + } + // remove the sibling tree node + tree.DeleteNode(spid) + return nil + }) + } -// handleRestartDirective handles the Supervisor restart directive -func (pid *PID) handleRestartDirective(cid *PID, maxRetries uint32, timeout time.Duration) { - pid.UnWatch(cid) + // no error to handle + _ = eg.Wait() +} +// handleRestartDirective handles the Behavior restart directive +func (pid *PID) handleRestartDirective(cid *PID, maxRetries uint32, timeout time.Duration, includeSiblings bool) { ctx := context.Background() - var err error - if maxRetries == 0 || timeout <= 0 { - err = cid.Restart(ctx) - } else { - // TODO: handle the initial delay - retrier := retry.NewRetrier(int(maxRetries), 100*time.Millisecond, timeout) - err = retrier.RunContext(ctx, cid.Restart) - } + tree := pid.ActorSystem().tree() + pids := []*PID{cid} - if err != nil { - pid.logger.Error(err) - if err := cid.Shutdown(ctx); err != nil { - pid.logger.Error(err) + if includeSiblings { + if siblings, ok := tree.Siblings(cid); ok { + for _, sibling := range siblings { + pids = append(pids, sibling.GetValue()) + } } - return + } + + eg, ctx := errgroup.WithContext(ctx) + for _, spid := range pids { + spid := spid + eg.Go(func() error { + pid.UnWatch(spid) + var err error + + switch { + case maxRetries == 0 || timeout <= 0: + err = spid.Restart(ctx) + default: + retrier := retry.NewRetrier(int(maxRetries), timeout, timeout) + err = retrier.RunContext(ctx, cid.Restart) + } + + if err != nil { + pid.logger.Error(err) + if err := spid.Shutdown(ctx); err != nil { + pid.logger.Error(err) + // we need to suspend the actor since it is faulty + spid.suspend(err.Error()) + } + } + return nil + }) } } diff --git a/actor/pid_node.go b/actor/pid_node.go index 6a71f2d9..1f150ca0 100644 --- a/actor/pid_node.go +++ b/actor/pid_node.go @@ -46,9 +46,9 @@ func (v *pidValue) Value() *PID { type pidNode struct { ID string value atomic.Pointer[pidValue] - Descendants *slice.SyncSlice[*pidNode] - Watchers *slice.SyncSlice[*pidNode] - Watchees *slice.SyncSlice[*pidNode] + Descendants *slice.Sync[*pidNode] + Watchers *slice.Sync[*pidNode] + Watchees *slice.Sync[*pidNode] } // SetValue sets a node value diff --git a/actor/pid_option.go b/actor/pid_option.go index bfc4eb09..509cc9f9 100644 --- a/actor/pid_option.go +++ b/actor/pid_option.go @@ -62,16 +62,10 @@ func withActorSystem(sys ActorSystem) pidOption { } } -// withSupervisorStrategies defines the supervisor strategies -func withSupervisorStrategies(strategies ...*SupervisorStrategy) pidOption { +// withSupervisor defines the supervisor +func withSupervisor(supervisor *Supervisor) pidOption { return func(pid *PID) { - if pid.supervisorStrategies == nil { - pid.supervisorStrategies = newStrategiesMap() - } - - for _, strategy := range strategies { - pid.supervisorStrategies.Put(strategy) - } + pid.supervisor = supervisor } } diff --git a/actor/pid_option_test.go b/actor/pid_option_test.go index 731816d4..3574a72d 100644 --- a/actor/pid_option_test.go +++ b/actor/pid_option_test.go @@ -37,9 +37,7 @@ import ( func TestPIDOptions(t *testing.T) { mailbox := NewUnboundedMailbox() - strategy := NewSupervisorStrategy(PanicError{}, NewResumeDirective()) - strategies := newStrategiesMap() - strategies.Put(strategy) + supervisor := NewSupervisor(WithStrategy(OneForAllStrategy)) var ( atomicDuration atomic.Duration atomicInt atomic.Int32 @@ -75,9 +73,9 @@ func TestPIDOptions(t *testing.T) { expected: &PID{logger: log.DefaultLogger}, }, { - name: "WithSupervisorStrategies", - option: withSupervisorStrategies(strategy), - expected: &PID{supervisorStrategies: strategies}, + name: "WithSupervisor", + option: withSupervisor(supervisor), + expected: &PID{supervisor: supervisor}, }, { name: "WithPassivationDisabled", diff --git a/actor/pid_test.go b/actor/pid_test.go index 056bf22e..d60e9f29 100644 --- a/actor/pid_test.go +++ b/actor/pid_test.go @@ -39,6 +39,7 @@ import ( "github.com/tochemey/goakt/v3/address" "github.com/tochemey/goakt/v3/goaktpb" + "github.com/tochemey/goakt/v3/internal/syncmap" "github.com/tochemey/goakt/v3/internal/util" "github.com/tochemey/goakt/v3/log" "github.com/tochemey/goakt/v3/remote" @@ -572,7 +573,7 @@ func TestSupervisorStrategy(t *testing.T) { actorSystem, err := NewActorSystem("testSys", WithRemote(remote.NewConfig(host, ports[0])), - WithPassivation(passivateAfter), + WithPassivation(10*time.Minute), WithLogger(log.DiscardLogger)) require.NoError(t, err) @@ -584,22 +585,73 @@ func TestSupervisorStrategy(t *testing.T) { parent, err := actorSystem.Spawn(ctx, "test", newMockSupervisorActor()) require.NoError(t, err) - assert.NotNil(t, parent) + require.NotNil(t, parent) // create the child actor - stopStrategy := NewSupervisorStrategy(PanicError{}, NewStopDirective()) - child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisorStrategies(stopStrategy)) + stopStrategy := NewSupervisor(WithDirective(PanicError{}, StopDirective)) + + child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisor(stopStrategy)) + require.NoError(t, err) + require.NotNil(t, child) + require.Equal(t, parent.ID(), child.Parent().ID()) + + require.Len(t, parent.Children(), 1) + require.NoError(t, Tell(ctx, child, new(testpb.TestPanic))) + + util.Pause(time.Second) + require.Zero(t, parent.ChildrenCount()) + + //stop the actor + err = parent.Shutdown(ctx) assert.NoError(t, err) - assert.NotNil(t, child) - assert.Equal(t, parent.ID(), child.Parent().ID()) + assert.NoError(t, actorSystem.Stop(ctx)) + }) + t.Run("With stop as supervisor directive with ONE_FOR_ALL", func(t *testing.T) { + ctx := context.TODO() + host := "127.0.0.1" + ports := dynaport.Get(1) - assert.Len(t, parent.Children(), 1) - // let us send 10 public to the actors - count := 10 - for i := 0; i < count; i++ { - assert.NoError(t, Tell(ctx, parent, new(testpb.TestSend))) - assert.NoError(t, Tell(ctx, child, new(testpb.TestSend))) - } + actorSystem, err := NewActorSystem("testSys", + WithRemote(remote.NewConfig(host, ports[0])), + WithPassivation(10*time.Minute), + WithLogger(log.DiscardLogger)) + + require.NoError(t, err) + require.NotNil(t, actorSystem) + + require.NoError(t, actorSystem.Start(ctx)) + + util.Pause(time.Second) + + parent, err := actorSystem.Spawn(ctx, "test", newMockSupervisorActor()) + require.NoError(t, err) + require.NotNil(t, parent) + + // create the child actor + stopStrategy := NewSupervisor( + WithStrategy(OneForAllStrategy), + WithDirective(PanicError{}, StopDirective), + ) + + child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisor(stopStrategy)) + require.NoError(t, err) + require.NotNil(t, child) + require.Equal(t, parent.ID(), child.Parent().ID()) + + util.Pause(500 * time.Millisecond) + + child2, err := parent.SpawnChild(ctx, "SpawnChild2", newMockSupervisedActor(), WithSupervisor(stopStrategy)) + require.NoError(t, err) + require.NotNil(t, child2) + require.Equal(t, parent.ID(), child2.Parent().ID()) + + util.Pause(500 * time.Millisecond) + + require.Len(t, parent.Children(), 2) + require.NoError(t, Tell(ctx, child, new(testpb.TestPanic))) + + util.Pause(time.Second) + require.Zero(t, parent.ChildrenCount()) //stop the actor err = parent.Shutdown(ctx) @@ -625,25 +677,25 @@ func TestSupervisorStrategy(t *testing.T) { parent, err := actorSystem.Spawn(ctx, "test", newMockSupervisorActor()) require.NoError(t, err) - assert.NotNil(t, parent) + require.NotNil(t, parent) // create the child actor child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor()) - assert.NoError(t, err) - assert.NotNil(t, child) + require.NoError(t, err) + require.NotNil(t, child) util.Pause(time.Second) - assert.Len(t, parent.Children(), 1) + require.Len(t, parent.Children(), 1) // send a test panic message to the actor - assert.NoError(t, Tell(ctx, child, new(testpb.TestPanic))) + require.NoError(t, Tell(ctx, child, new(testpb.TestPanic))) // wait for the child to properly shutdown util.Pause(time.Second) // assert the actor state - assert.False(t, child.IsRunning()) - assert.Len(t, parent.Children(), 0) + require.False(t, child.IsRunning()) + require.Len(t, parent.Children(), 0) //stop the actor err = parent.Shutdown(ctx) @@ -672,8 +724,8 @@ func TestSupervisorStrategy(t *testing.T) { require.NotNil(t, parent) // create the child actor - fakeStrategy := NewSupervisorStrategy(PanicError{}, new(unhandledSupervisorDirective)) - child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisorStrategies(fakeStrategy)) + fakeStrategy := NewSupervisor(WithDirective(PanicError{}, 4)) // undefined directive + child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisor(fakeStrategy)) require.NoError(t, err) require.NotNil(t, child) @@ -727,8 +779,14 @@ func TestSupervisorStrategy(t *testing.T) { util.Pause(time.Second) - // just for the sake of the test we remove the default directive - child.supervisorStrategies = newStrategiesMap() + // bare supervisor + child.supervisor = &Supervisor{ + Mutex: sync.Mutex{}, + strategy: OneForOneStrategy, + maxRetries: 0, + timeout: 0, + directives: syncmap.New[string, Directive](), + } require.Len(t, parent.Children(), 1) // send a message to the actor which result in panic @@ -767,31 +825,31 @@ func TestSupervisorStrategy(t *testing.T) { parent, err := actorSystem.Spawn(ctx, "test", newMockSupervisorActor()) require.NoError(t, err) - assert.NotNil(t, parent) + require.NotNil(t, parent) // create the child actor - stopStrategy := NewSupervisorStrategy(PanicError{}, DefaultSupervisoryStrategy) - child, err := parent.SpawnChild(ctx, "SpawnChild", &mockPostStopActor{}, WithSupervisorStrategies(stopStrategy)) - assert.NoError(t, err) - assert.NotNil(t, child) + stopStrategy := NewSupervisor(WithDirective(PanicError{}, DefaultSupervisorDirective)) + child, err := parent.SpawnChild(ctx, "SpawnChild", &mockPostStopActor{}, WithSupervisor(stopStrategy)) + require.NoError(t, err) + require.NotNil(t, child) util.Pause(time.Second) - assert.Len(t, parent.Children(), 1) + require.Len(t, parent.Children(), 1) // send a test panic message to the actor - assert.NoError(t, Tell(ctx, child, new(testpb.TestPanic))) + require.NoError(t, Tell(ctx, child, new(testpb.TestPanic))) // wait for the child to properly shutdown util.Pause(time.Second) // assert the actor state - assert.False(t, child.IsRunning()) - assert.Len(t, parent.Children(), 0) + require.False(t, child.IsRunning()) + require.Len(t, parent.Children(), 0) //stop the actor err = parent.Shutdown(ctx) - assert.NoError(t, err) - assert.NoError(t, actorSystem.Stop(ctx)) + require.NoError(t, err) + require.NoError(t, actorSystem.Stop(ctx)) }) t.Run("With restart as supervisor strategy", func(t *testing.T) { ctx := context.TODO() @@ -815,8 +873,8 @@ func TestSupervisorStrategy(t *testing.T) { require.NotNil(t, parent) // create the child actor - restartStrategy := NewSupervisorStrategy(PanicError{}, NewRestartDirective()) - child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisorStrategies(restartStrategy)) + restartStrategy := NewSupervisor(WithDirective(PanicError{}, RestartDirective)) + child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisor(restartStrategy)) require.NoError(t, err) require.NotNil(t, child) @@ -840,6 +898,64 @@ func TestSupervisorStrategy(t *testing.T) { assert.NoError(t, err) assert.NoError(t, actorSystem.Stop(ctx)) }) + t.Run("With restart as supervisor strategy with ONE_FOR_ALL", func(t *testing.T) { + ctx := context.TODO() + host := "127.0.0.1" + ports := dynaport.Get(1) + + actorSystem, err := NewActorSystem("testSys", + WithRemote(remote.NewConfig(host, ports[0])), + WithPassivationDisabled(), + WithLogger(log.DiscardLogger)) + + require.NoError(t, err) + require.NotNil(t, actorSystem) + + require.NoError(t, actorSystem.Start(ctx)) + + util.Pause(time.Second) + + parent, err := actorSystem.Spawn(ctx, "test", newMockSupervisorActor()) + require.NoError(t, err) + require.NotNil(t, parent) + + // create the child actor + restartStrategy := NewSupervisor( + WithStrategy(OneForAllStrategy), + WithDirective(PanicError{}, RestartDirective)) + child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisor(restartStrategy)) + require.NoError(t, err) + require.NotNil(t, child) + + util.Pause(time.Second) + + child2, err := parent.SpawnChild(ctx, "SpawnChild2", newMockSupervisedActor(), WithSupervisor(restartStrategy)) + require.NoError(t, err) + require.NotNil(t, child2) + require.Equal(t, parent.ID(), child2.Parent().ID()) + + util.Pause(500 * time.Millisecond) + + require.Len(t, parent.Children(), 2) + // send a test panic message to the actor + require.NoError(t, Tell(ctx, child, new(testpb.TestPanic))) + + // wait for the child to properly shutdown + util.Pause(time.Second) + + // assert the actor state + require.Len(t, parent.Children(), 2) + require.True(t, child.IsRunning()) + require.True(t, child2.IsRunning()) + + // TODO: fix the child relationship supervisor mode + // require.Len(t, parent.Children(), 1) + + //stop the actor + err = parent.Shutdown(ctx) + assert.NoError(t, err) + assert.NoError(t, actorSystem.Stop(ctx)) + }) t.Run("With no strategy set will default to a Shutdown", func(t *testing.T) { ctx := context.TODO() host := "127.0.0.1" @@ -905,8 +1021,8 @@ func TestSupervisorStrategy(t *testing.T) { assert.NotNil(t, parent) // create the child actor - resumeStrategy := NewSupervisorStrategy(PanicError{}, NewResumeDirective()) - child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisorStrategies(resumeStrategy)) + resumeStrategy := NewSupervisor(WithDirective(PanicError{}, ResumeDirective)) + child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisor(resumeStrategy)) assert.NoError(t, err) assert.NotNil(t, child) @@ -949,17 +1065,17 @@ func TestSupervisorStrategy(t *testing.T) { util.Pause(time.Second) - // create the directive - restart := NewRestartDirective() - restart.WithLimit(2, time.Minute) - parent, err := actorSystem.Spawn(ctx, "test", newMockSupervisorActor()) require.NoError(t, err) require.NotNil(t, parent) - // create the child actor - restartStrategy := NewSupervisorStrategy(PanicError{}, restart) - child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisorStrategies(restartStrategy)) + restartStrategy := NewSupervisor( + WithStrategy(OneForOneStrategy), + WithDirective(PanicError{}, RestartDirective), + WithRetry(2, time.Minute), + ) + + child, err := parent.SpawnChild(ctx, "SpawnChild", newMockSupervisedActor(), WithSupervisor(restartStrategy)) require.NoError(t, err) require.NotNil(t, child) @@ -1012,8 +1128,13 @@ func TestSupervisorStrategy(t *testing.T) { util.Pause(time.Second) - // just for the sake of the test we remove the default directive - child.supervisorStrategies = newStrategiesMap() + child.supervisor = &Supervisor{ + Mutex: sync.Mutex{}, + strategy: OneForOneStrategy, + maxRetries: 0, + timeout: 0, + directives: syncmap.New[string, Directive](), + } require.Len(t, parent.Children(), 1) // send a message to the actor which result in panic diff --git a/actor/pid_tree.go b/actor/pid_tree.go index 5e6b765e..67748404 100644 --- a/actor/pid_tree.go +++ b/actor/pid_tree.go @@ -25,6 +25,7 @@ package actors import ( + "errors" "fmt" "sync" "sync/atomic" @@ -39,6 +40,7 @@ type pidTree struct { nodePool *sync.Pool valuePool *sync.Pool size atomic.Int64 + rootNode *pidNode } // newTree creates a new instance of the actors Tree @@ -64,49 +66,62 @@ func newTree() *pidTree { } // AddNode adds a new node to the tree under a given parent -func (t *pidTree) AddNode(parent, child *PID) error { +// The first node that is created without a parent becomes the defacto root node +func (x *pidTree) AddNode(parent, child *PID) error { var ( parentNode *pidNode ok bool ) + + // check whether the node to be added is a root node + if parent == nil && x.rootNode != nil { + return errors.New("root node already set") + } + // validate parent node existence - if !parent.Equals(NoSender) { - parentNode, ok = t.GetNode(parent.ID()) + if parent != nil && !parent.Equals(NoSender) { + parentNode, ok = x.GetNode(parent.ID()) if !ok || parentNode == nil { return fmt.Errorf("parent node=(%s) does not exist", parent.ID()) } } // create a new node from the pool - newNode := t.nodePool.Get().(*pidNode) - t.resetNode(newNode, child.ID()) + newNode := x.nodePool.Get().(*pidNode) + x.resetNode(newNode, child.ID()) // create a pidValue using the pool and set its data - val := t.valuePool.Get().(*pidValue) + val := x.valuePool.Get().(*pidValue) val.data = child // store the value atomically in the node newNode.SetValue(val) // store the node in the tree - t.nodes.Store(child.ID(), newNode) + x.nodes.Store(child.ID(), newNode) // when parentNode is defined if parentNode != nil { - t.addChild(parentNode, newNode) - t.updateAncestors(parent.ID(), child.ID()) + x.addChild(parentNode, newNode) + x.updateAncestors(parent.ID(), child.ID()) + } + + // only set the root node when parent is nil + if parentNode == nil { + // set the given node as root node + x.rootNode = newNode } - t.size.Add(1) + x.size.Add(1) return nil } // AddWatcher adds a watcher to the given node. Make sure to check the existence of both PID // before watching because this call will do nothing when the watcher and the watched node do not exist in // the tree -func (t *pidTree) AddWatcher(node, watcher *PID) { - currentNode, currentOk := t.GetNode(node.ID()) - watcherNode, watcherOk := t.GetNode(watcher.ID()) +func (x *pidTree) AddWatcher(node, watcher *PID) { + currentNode, currentOk := x.GetNode(node.ID()) + watcherNode, watcherOk := x.GetNode(watcher.ID()) if !currentOk || !watcherOk || currentNode == nil { return } @@ -116,15 +131,15 @@ func (t *pidTree) AddWatcher(node, watcher *PID) { } // Ancestors retrieves all ancestors nodes of a given node -func (t *pidTree) Ancestors(pid *PID) ([]*pidNode, bool) { - ancestorIDs, ok := t.ancestors(pid.ID()) +func (x *pidTree) Ancestors(pid *PID) ([]*pidNode, bool) { + ancestorIDs, ok := x.ancestors(pid.ID()) if !ok { return nil, false } var ancestors []*pidNode for _, ancestorID := range ancestorIDs { - if ancestor, ok := t.GetNode(ancestorID); ok { + if ancestor, ok := x.GetNode(ancestorID); ok { ancestors = append(ancestors, ancestor) } } @@ -132,13 +147,13 @@ func (t *pidTree) Ancestors(pid *PID) ([]*pidNode, bool) { } // ParentAt returns a given PID direct parent -func (t *pidTree) ParentAt(pid *PID, level int) (*pidNode, bool) { - return t.ancestorAt(pid, level) +func (x *pidTree) ParentAt(pid *PID, level int) (*pidNode, bool) { + return x.ancestorAt(pid, level) } // Descendants retrieves all descendants of the node with the given ID. -func (t *pidTree) Descendants(pid *PID) ([]*pidNode, bool) { - node, ok := t.GetNode(pid.ID()) +func (x *pidTree) Descendants(pid *PID) ([]*pidNode, bool) { + node, ok := x.GetNode(pid.ID()) if !ok { return nil, false } @@ -146,17 +161,43 @@ func (t *pidTree) Descendants(pid *PID) ([]*pidNode, bool) { return collectDescendants(node), true } +// Siblings returns a slice of pidNode that are the siblings of the given node. +// If the node is the root (i.e. has no parent), it returns an empty slice. +// It returns (siblings, true) on success, or (nil, false) if an error occur +func (x *pidTree) Siblings(pid *PID) ([]*pidNode, bool) { + // get the direct parent of the given node + parentNode, ok := x.ParentAt(pid, 0) + if !ok { + return nil, false + } + + // defensive programming + if parentNode == nil { + return nil, false + } + + // here we are only fetching the first level children + children := parentNode.Descendants.Items() + var siblings []*pidNode + for _, child := range children { + if !child.GetValue().Equals(pid) { + siblings = append(siblings, child) + } + } + return siblings, true +} + // DeleteNode deletes a node and all its descendants -func (t *pidTree) DeleteNode(pid *PID) { - node, ok := t.GetNode(pid.ID()) +func (x *pidTree) DeleteNode(pid *PID) { + node, ok := x.GetNode(pid.ID()) if !ok { return } // remove the node from its parent's Children slice - if ancestors, ok := t.parents.Load(pid.ID()); ok && len(ancestors.([]string)) > 0 { + if ancestors, ok := x.parents.Load(pid.ID()); ok && len(ancestors.([]string)) > 0 { parentID := ancestors.([]string)[0] - if parent, found := t.GetNode(parentID); found { + if parent, found := x.GetNode(parentID); found { children := filterOutChild(parent.Descendants, pid.ID()) parent.Descendants.Reset() parent.Descendants.AppendMany(children.Items()...) @@ -171,18 +212,18 @@ func (t *pidTree) DeleteNode(pid *PID) { deleteChildren(child) } // delete node from maps and pool - t.nodes.Delete(n.ID) - t.parents.Delete(n.ID) - t.nodePool.Put(n) - t.size.Add(-1) + x.nodes.Delete(n.ID) + x.parents.Delete(n.ID) + x.nodePool.Put(n) + x.size.Add(-1) } deleteChildren(node) } // GetNode retrieves a node by its ID -func (t *pidTree) GetNode(id string) (*pidNode, bool) { - value, ok := t.nodes.Load(id) +func (x *pidTree) GetNode(id string) (*pidNode, bool) { + value, ok := x.nodes.Load(id) if !ok { return nil, false } @@ -191,12 +232,12 @@ func (t *pidTree) GetNode(id string) (*pidNode, bool) { } // Nodes retrieves all nodes in the tree efficiently -func (t *pidTree) Nodes() []*pidNode { - if t.Size() == 0 { +func (x *pidTree) Nodes() []*pidNode { + if x.Size() == 0 { return nil } var nodes []*pidNode - t.nodes.Range(func(_, value any) { + x.nodes.Range(func(_, value any) { node := value.(*pidNode) nodes = append(nodes, node) }) @@ -204,44 +245,44 @@ func (t *pidTree) Nodes() []*pidNode { } // Size returns the current number of nodes in the tree -func (t *pidTree) Size() int64 { - return t.size.Load() +func (x *pidTree) Size() int64 { + return x.size.Load() } // Reset clears all nodes and parents, resetting the tree to an empty state -func (t *pidTree) Reset() { - t.nodes.Reset() // Reset nodes map - t.parents.Reset() // Reset parents map - t.size.Store(0) +func (x *pidTree) Reset() { + x.nodes.Reset() // Reset nodes map + x.parents.Reset() // Reset parents map + x.size.Store(0) } // ancestors returns the list of ancestor nodes -func (t *pidTree) ancestors(id string) ([]string, bool) { - if value, ok := t.parents.Load(id); ok { +func (x *pidTree) ancestors(id string) ([]string, bool) { + if value, ok := x.parents.Load(id); ok { return value.([]string), true } return nil, false } // addChild safely appends a child to a parent's Children slice using atomic operations. -func (t *pidTree) addChild(parent *pidNode, child *pidNode) { +func (x *pidTree) addChild(parent *pidNode, child *pidNode) { parent.Descendants.Append(child) parent.Watchees.Append(child) child.Watchers.Append(parent) } // updateAncestors updates the parent/ancestor relationships. -func (t *pidTree) updateAncestors(parentID, childID string) { - switch ancestors, ok := t.ancestors(parentID); { +func (x *pidTree) updateAncestors(parentID, childID string) { + switch ancestors, ok := x.ancestors(parentID); { case ok: - t.parents.Store(childID, append([]string{parentID}, ancestors...)) + x.parents.Store(childID, append([]string{parentID}, ancestors...)) default: - t.parents.Store(childID, []string{parentID}) + x.parents.Store(childID, []string{parentID}) } } // filterOutChild removes the node with the given ID from the Children slice. -func filterOutChild(children *slice.SyncSlice[*pidNode], childID string) *slice.SyncSlice[*pidNode] { +func filterOutChild(children *slice.Sync[*pidNode], childID string) *slice.Sync[*pidNode] { for i, child := range children.Items() { if child.ID == childID { children.Delete(i) @@ -268,15 +309,15 @@ func collectDescendants(node *pidNode) []*pidNode { } // ancestorAt retrieves the ancestor at the specified level (0 for parent, 1 for grandparent, etc.) -func (t *pidTree) ancestorAt(pid *PID, level int) (*pidNode, bool) { - ancestors, ok := t.ancestors(pid.ID()) +func (x *pidTree) ancestorAt(pid *PID, level int) (*pidNode, bool) { + ancestors, ok := x.ancestors(pid.ID()) if ok && len(ancestors) > level { - return t.GetNode(ancestors[level]) + return x.GetNode(ancestors[level]) } return nil, false } -func (t *pidTree) resetNode(node *pidNode, id string) { +func (x *pidTree) resetNode(node *pidNode, id string) { node.ID = id node.Descendants.Reset() node.Watchees.Reset() diff --git a/actor/pid_tree_test.go b/actor/pid_tree_test.go index fb34657e..5f53fc79 100644 --- a/actor/pid_tree_test.go +++ b/actor/pid_tree_test.go @@ -46,12 +46,14 @@ func TestTree(t *testing.T) { tree := newTree() + // create pid0 as the root node require.NoError(t, tree.AddNode(NoSender, pid0)) // pid0 has no parent - require.NoError(t, tree.AddNode(NoSender, pid1)) // pid1 has no parent - require.NoError(t, tree.AddNode(pid0, pid2)) // pid0 is parent of pid2 - require.NoError(t, tree.AddNode(pid1, pid3)) // pid1 is parent of pid3 - require.Error(t, tree.AddNode(pid4, pid5)) // this will error because pid4 does not exist on the tree - require.NoError(t, tree.AddNode(pid3, pid4)) // pid3 is parent of pid4 + + require.NoError(t, tree.AddNode(pid0, pid1)) // pid1 has pid0 as parent + require.NoError(t, tree.AddNode(pid0, pid2)) // pid0 is parent of pid2 + require.NoError(t, tree.AddNode(pid1, pid3)) // pid1 is parent of pid3 + require.Error(t, tree.AddNode(pid4, pid5)) // this will error because pid4 does not exist on the tree + require.NoError(t, tree.AddNode(pid3, pid4)) // pid3 is parent of pid4 tree.AddWatcher(pid3, pid0) tree.AddWatcher(pid4, pid5) @@ -59,7 +61,7 @@ func TestTree(t *testing.T) { pid4Ancestors, ok := tree.Ancestors(pid4) require.True(t, ok) require.NotEmpty(t, pid4Ancestors) - require.Len(t, pid4Ancestors, 2) + require.Len(t, pid4Ancestors, 3) pid4Parent, ok := tree.ParentAt(pid4, 0) require.True(t, ok) require.NotNil(t, pid4Parent) @@ -68,8 +70,16 @@ func TestTree(t *testing.T) { require.True(t, ok) require.True(t, pid4GParent.GetValue().Equals(pid1)) pid4GGParent, ok := tree.ParentAt(pid4, 2) - require.False(t, ok) - require.Nil(t, pid4GGParent) + require.True(t, ok) + require.NotNil(t, pid4GGParent) + + // grab the siblings of pid2 + siblings, ok := tree.Siblings(pid2) + require.True(t, ok) + require.NotEmpty(t, siblings) + require.Len(t, siblings, 1) + sibling := siblings[0] + require.True(t, sibling.GetValue().Equals(pid1)) require.EqualValues(t, 5, tree.Size()) diff --git a/actor/spawn_option.go b/actor/spawn_option.go index 588a1a26..c2a9c641 100644 --- a/actor/spawn_option.go +++ b/actor/spawn_option.go @@ -32,8 +32,8 @@ import ( type spawnConfig struct { // mailbox defines the mailbox to use when spawning the actor mailbox Mailbox - // defines the supervisor strategies to apply - supervisorStrategies []*SupervisorStrategy + // defines the supervisor + supervisor *Supervisor // specifies at what point in time to passivate the actor. // when the actor is passivated it is stopped which means it does not consume // any further resources like memory and cpu. The default value is 120 seconds @@ -74,11 +74,19 @@ func WithMailbox(mailbox Mailbox) SpawnOption { }) } -// WithSupervisorStrategies defines the supervisor strategies to apply when the given actor fails -// or panics during its messages processing -func WithSupervisorStrategies(supervisorStrategies ...*SupervisorStrategy) SpawnOption { +// WithSupervisor sets the supervisor to apply when the actor fails +// or panics during message processing. The specified supervisor behavior determines how failures +// are handled, such as restarting, stopping, or resuming the actor. +// +// Parameters: +// - supervisor: A pointer to a `Supervisor` that defines +// the failure-handling policy for the actor. +// +// Returns: +// - A `SpawnOption` that applies the given supervisor strategy when spawning an actor. +func WithSupervisor(supervisor *Supervisor) SpawnOption { return spawnOption(func(config *spawnConfig) { - config.supervisorStrategies = supervisorStrategies + config.supervisor = supervisor }) } diff --git a/actor/spawn_option_test.go b/actor/spawn_option_test.go index 5e94e2b3..a1a0ea1b 100644 --- a/actor/spawn_option_test.go +++ b/actor/spawn_option_test.go @@ -41,10 +41,10 @@ func TestSpawnOption(t *testing.T) { }) t.Run("spawn option with supervisor strategy", func(t *testing.T) { config := &spawnConfig{} - strategy := NewSupervisorStrategy(PanicError{}, NewStopDirective()) - option := WithSupervisorStrategies(strategy) + supervisor := NewSupervisor(WithStrategy(OneForOneStrategy)) + option := WithSupervisor(supervisor) option.Apply(config) - require.Equal(t, &spawnConfig{supervisorStrategies: []*SupervisorStrategy{strategy}}, config) + require.Equal(t, &spawnConfig{supervisor: supervisor}, config) }) t.Run("spawn option with passivation after", func(t *testing.T) { config := &spawnConfig{} diff --git a/actor/supervisor.go b/actor/supervisor.go new file mode 100644 index 00000000..782ae95d --- /dev/null +++ b/actor/supervisor.go @@ -0,0 +1,231 @@ +/* + * MIT License + * + * Copyright (c) 2022-2025 Arsene Tochemey Gandote + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package actors + +import ( + "reflect" + "runtime" + "sync" + "time" + + "github.com/tochemey/goakt/v3/internal/syncmap" +) + +// Strategy represents the type of supervision strategy used by an actor's supervisor. +// In an actor framework, supervisors manage child actors and define how failures should be handled. +// Different strategy types determine whether to restart, stop, resume, or escalate failures. +type Strategy int + +const ( + // OneForOneStrategy is a supervision strategy where if a child actor fails, only that specific child + // is affected by the supervisor's directive (e.g., restart, stop, resume, or escalate). + // Other sibling actors continue running unaffected. + OneForOneStrategy Strategy = iota + + // OneForAllStrategy is a supervision strategy that applies the supervisor’s directive to all + // sibling child actors if any one of them fails or panics during message processing. + // + // When using OneForAllStrategy, a failure in any child actor triggers a collective response: + // the same directive (e.g., restart, resume, or stop) is applied to every child under the supervisor. + // This strategy is particularly appropriate when the child actors are tightly coupled or + // interdependent—where the malfunction of one actor can adversely affect the overall functionality + // of the ensemble. + // + // Use this strategy in scenarios where maintaining a consistent state across all child actors is crucial, + // such as when they share common resources or work together on a composite task. + // For more granular control over failures, consider using the OneForOneStrategy instead. + OneForAllStrategy +) + +// Directive defines the supervisor directive +// +// It represents the action that a supervisor can take when a child actor fails or panics +// during message processing. Each directive corresponds to a specific recovery behavior: +// +// - StopDirective: Instructs the supervisor to stop the failing actor. +// - ResumeDirective: Instructs the supervisor to resume the failing actor without restarting it, +// allowing it to continue processing messages (typically used for recoverable errors). +// - RestartDirective: Instructs the supervisor to restart the failing actor, reinitializing its state. +type Directive int + +const ( + // StopDirective indicates that when an actor fails, the supervisor should immediately stop + // the actor. This directive is typically used when a failure is deemed irrecoverable + // or when the actor's state cannot be safely resumed. + StopDirective Directive = iota + // ResumeDirective indicates that when an actor fails, the supervisor should resume the actor's + // operation without restarting it. This directive is used when the failure is transient and the + // actor can continue processing messages without a state reset. + ResumeDirective + // RestartDirective indicates that when an actor fails, the supervisor should restart the actor. + // Restarting involves stopping the current instance and creating a new one, effectively resetting + // the actor's internal state. + RestartDirective +) + +// SupervisorOption defines the various options to apply to a given Supervisor +type SupervisorOption func(*Supervisor) + +// WithStrategy sets the supervisor strategy +func WithStrategy(strategy Strategy) SupervisorOption { + return func(s *Supervisor) { + s.Lock() + s.strategy = strategy + s.Unlock() + } +} + +// WithDirective sets the mapping between an error and a given directive +func WithDirective(err error, directive Directive) SupervisorOption { + return func(s *Supervisor) { + s.Lock() + s.directives.Set(errorType(err), directive) + s.Unlock() + } +} + +// WithRetry configures the retry behavior for an actor when using the RestartDirective. +// It sets the maximum number of retry attempts and the timeout period between retries. +// +// Parameters: +// - maxRetries: The maximum number of times an actor will be restarted after failure. +// Exceeding this count will trigger escalation according to the supervisor's policy. +// - timeout: The duration to wait before attempting a retry. +// This timeout defines the retry window and can help avoid immediate, repeated restarts. +// +// Use WithRetry to provide a controlled recovery mechanism for transient failures, +// ensuring that the actor is not endlessly restarted. +func WithRetry(maxRetries uint32, timeout time.Duration) SupervisorOption { + return func(s *Supervisor) { + s.Lock() + s.maxRetries = maxRetries + s.timeout = timeout + s.Unlock() + } +} + +// Supervisor defines the supervisor behavior rules applied to a faulty actor during message processing. +// +// A supervision behavior determines how a supervisor responds when a child actor encounters an error. +// The strategy dictates whether the faulty actor should be resumed, restarted, stopped, or if the error +// should be escalated to the supervisor’s parent. +// +// The default supervision strategy is OneForOneStrategy, meaning that failures are handled individually, +// affecting only the actor that encountered the error. +type Supervisor struct { + sync.Mutex + // Specifies the strategy + strategy Strategy + // Specifies the maximum number of retries + // When reaching this number the faulty actor is stopped + maxRetries uint32 + // Specifies the time range to restart the faulty actor + timeout time.Duration + + directives *syncmap.Map[string, Directive] +} + +// NewSupervisor creates a new instance of supervisor behavior for managing actor supervision. +// +// This function initializes a supervisor behavior with a set of error handlers and optional configuration options. +// The default strategy applied is OneForOneStrategy, meaning that when a child actor fails, only that actor is affected, +// and the supervisor applies the defined directive to it individually. +// +// Once the behavior instance is created, one need to add the various directive/error mappings required to handle +// faulty actor using the WithDirective method of the behavior. +// +// Returns: +// - A pointer to the initialized *Supervisor instance. +func NewSupervisor(opts ...SupervisorOption) *Supervisor { + // define an instance of Behavior and sets the default strategy type + // to OneForOneStrategy + s := &Supervisor{ + Mutex: sync.Mutex{}, + strategy: OneForOneStrategy, + directives: syncmap.New[string, Directive](), + maxRetries: 0, + timeout: -1, + } + + // set the default directives + s.directives.Set(errorType(PanicError{}), StopDirective) + s.directives.Set(errorType(&runtime.PanicNilError{}), RestartDirective) + + for _, opt := range opts { + opt(s) + } + + return s +} + +// Strategy returns the supervisor strategy +func (s *Supervisor) Strategy() Strategy { + s.Lock() + strategy := s.strategy + s.Unlock() + return strategy +} + +// Directive returns the directive associated to a given error +func (s *Supervisor) Directive(err error) (Directive, bool) { + s.Lock() + directive, ok := s.directives.Get(errorType(err)) + s.Unlock() + return directive, ok +} + +// MaxRetries returns the maximum number of times an actor will be restarted after failure. +func (s *Supervisor) MaxRetries() uint32 { + return s.maxRetries +} + +// Timeout returns the timeout +// This is the duration to wait before attempting a retry. +func (s *Supervisor) Timeout() time.Duration { + return s.timeout +} + +// Reset resets the strategy +func (s *Supervisor) Reset() { + s.Lock() + s.strategy = OneForAllStrategy + s.directives = syncmap.New[string, Directive]() + s.Unlock() +} + +// errorType returns the string representation of an error's type using reflection +func errorType(err error) string { + // Handle nil errors first + if err == nil { + return "nil" + } + + rtype := reflect.TypeOf(err) + if rtype.Kind() == reflect.Ptr { + rtype = rtype.Elem() + } + + return rtype.String() +} diff --git a/actor/supervisor_directive.go b/actor/supervisor_directive.go deleted file mode 100644 index 1f383007..00000000 --- a/actor/supervisor_directive.go +++ /dev/null @@ -1,90 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2022-2025 Arsene Tochemey Gandote - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package actors - -import "time" - -// SupervisorDirective defines the supervisorQA directive -type SupervisorDirective interface { - isSupervisorDirective() -} - -// StopDirective defines the supervisor stop directive -type StopDirective struct{} - -// NewStopDirective creates an instance of StopDirective -func NewStopDirective() *StopDirective { - return new(StopDirective) -} - -func (*StopDirective) isSupervisorDirective() {} - -// ResumeDirective defines the supervisor resume directive -// This ignores the failure and process the next message, instead -type ResumeDirective struct{} - -// NewResumeDirective creates an instance of ResumeDirective -func NewResumeDirective() *ResumeDirective { - return new(ResumeDirective) -} - -// implements the SupervisorDirective -func (*ResumeDirective) isSupervisorDirective() {} - -// RestartDirective defines the supervisor restart directive -type RestartDirective struct { - // Specifies the maximum number of retries - // When reaching this number the faulty actor is stopped - maxNumRetries uint32 - // Specifies the time range to restart the faulty actor - timeout time.Duration -} - -// MaxNumRetries returns the max num retries -func (x *RestartDirective) MaxNumRetries() uint32 { - return x.maxNumRetries -} - -// Timeout returns the timeout -func (x *RestartDirective) Timeout() time.Duration { - return x.timeout -} - -// NewRestartDirective creates an instance of RestartDirective -func NewRestartDirective() *RestartDirective { - return &RestartDirective{ - maxNumRetries: 0, - timeout: -1, - } -} - -// WithLimit sets the restart limit -func (x *RestartDirective) WithLimit(maxNumRetries uint32, timeout time.Duration) { - x.maxNumRetries = maxNumRetries - x.timeout = timeout -} - -// implements the SupervisorDirective -func (*RestartDirective) isSupervisorDirective() {} diff --git a/actor/supervisor_strategy.go b/actor/supervisor_strategy.go deleted file mode 100644 index db807c95..00000000 --- a/actor/supervisor_strategy.go +++ /dev/null @@ -1,118 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2022-2025 Arsene Tochemey Gandote - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package actors - -import ( - "reflect" - "runtime" - "sync" -) - -// DefaultSupervisorStrategies defines the default supervisor strategies -var DefaultSupervisorStrategies = []*SupervisorStrategy{ - NewSupervisorStrategy(PanicError{}, NewStopDirective()), - NewSupervisorStrategy(&runtime.PanicNilError{}, NewStopDirective()), -} - -// SupervisorStrategy defines the rules to apply to a faulty actor -// during message processing -type SupervisorStrategy struct { - // specifies the type of directive to apply - directive SupervisorDirective - // specifies the error type - err error -} - -// NewSupervisorStrategy creates an instance of SupervisorStrategy -func NewSupervisorStrategy(err error, directive SupervisorDirective) *SupervisorStrategy { - return &SupervisorStrategy{ - directive: directive, - err: err, - } -} - -// Directive returns the directive of the supervisor strategy -func (s *SupervisorStrategy) Directive() SupervisorDirective { - return s.directive -} - -// Kind returns the error type of the supervisor strategy -func (s *SupervisorStrategy) Error() error { - return s.err -} - -// strategiesMap defines the strategies map -// this will be use internally by actor to define their supervisor strategies -type strategiesMap struct { - rwMutex sync.RWMutex - data map[string]*SupervisorStrategy -} - -// newStrategiesMap creates an instance of strategiesMap -func newStrategiesMap() *strategiesMap { - return &strategiesMap{ - data: make(map[string]*SupervisorStrategy), - rwMutex: sync.RWMutex{}, - } -} - -// Put sets the supervisor strategy -func (m *strategiesMap) Put(strategy *SupervisorStrategy) { - m.rwMutex.Lock() - key := errorType(strategy.Error()) - m.data[key] = strategy - m.rwMutex.Unlock() -} - -// Get retrieves based upon the error the given strategy -func (m *strategiesMap) Get(err error) (val *SupervisorStrategy, ok bool) { - m.rwMutex.RLock() - key := errorType(err) - val, ok = m.data[key] - m.rwMutex.RUnlock() - return val, ok -} - -// Reset resets the strategiesMap -func (m *strategiesMap) Reset() { - m.rwMutex.Lock() - m.data = make(map[string]*SupervisorStrategy) - m.rwMutex.Unlock() -} - -// errorType returns the string representation of an error's type using reflection -func errorType(err error) string { - // Handle nil errors first - if err == nil { - return "nil" - } - - rtype := reflect.TypeOf(err) - if rtype.Kind() == reflect.Ptr { - rtype = rtype.Elem() - } - - return rtype.String() -} diff --git a/actor/supervisor_test.go b/actor/supervisor_test.go new file mode 100644 index 00000000..90776b80 --- /dev/null +++ b/actor/supervisor_test.go @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2022-2025 Arsene Tochemey Gandote + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package actors + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSupervisorOption(t *testing.T) { + testCases := []struct { + name string + option SupervisorOption + expected *Supervisor + }{ + { + name: "WithStrategy", + option: WithStrategy(OneForAllStrategy), + expected: &Supervisor{strategy: OneForAllStrategy}, + }, + { + name: "WithRetry", + option: WithRetry(2, time.Second), + expected: &Supervisor{timeout: time.Second, maxRetries: 2}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + supervisor := &Supervisor{} + tc.option(supervisor) + assert.Equal(t, tc.expected, supervisor) + }) + } +} diff --git a/actor/util.go b/actor/util.go index 20ce0942..251bfb8f 100644 --- a/actor/util.go +++ b/actor/util.go @@ -72,8 +72,8 @@ var ( longLived time.Duration = 1<<63 - 1 // NoSender means that there is no sender NoSender *PID - // DefaultSupervisoryStrategy defines the default supervisory strategy - DefaultSupervisoryStrategy = NewStopDirective() + // DefaultSupervisorDirective defines the default supervisory strategy directive + DefaultSupervisorDirective = StopDirective timers = timer.NewPool() systemNames = map[nameType]string{ diff --git a/internal/internalpb/supervision.pb.go b/internal/internalpb/supervision.pb.go index 124b9285..4308d774 100644 --- a/internal/internalpb/supervision.pb.go +++ b/internal/internalpb/supervision.pb.go @@ -21,6 +21,52 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type Strategy int32 + +const ( + Strategy_STRATEGY_ONE_FOR_ONE Strategy = 0 + Strategy_STRATEGY_ONE_FOR_ALL Strategy = 1 +) + +// Enum value maps for Strategy. +var ( + Strategy_name = map[int32]string{ + 0: "STRATEGY_ONE_FOR_ONE", + 1: "STRATEGY_ONE_FOR_ALL", + } + Strategy_value = map[string]int32{ + "STRATEGY_ONE_FOR_ONE": 0, + "STRATEGY_ONE_FOR_ALL": 1, + } +) + +func (x Strategy) Enum() *Strategy { + p := new(Strategy) + *p = x + return p +} + +func (x Strategy) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Strategy) Descriptor() protoreflect.EnumDescriptor { + return file_internal_supervision_proto_enumTypes[0].Descriptor() +} + +func (Strategy) Type() protoreflect.EnumType { + return &file_internal_supervision_proto_enumTypes[0] +} + +func (x Strategy) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Strategy.Descriptor instead. +func (Strategy) EnumDescriptor() ([]byte, []int) { + return file_internal_supervision_proto_rawDescGZIP(), []int{0} +} + // HandleFault message is sent by a child // actor to its parent when it is panicking or returning an error // while processing message @@ -38,7 +84,9 @@ type HandleFault struct { // *HandleFault_Resume // *HandleFault_Restart // *HandleFault_Escalate - Directive isHandleFault_Directive `protobuf_oneof:"directive"` + Directive isHandleFault_Directive `protobuf_oneof:"directive"` + // Specifies the strategy + Strategy Strategy `protobuf:"varint,7,opt,name=strategy,proto3,enum=internalpb.Strategy" json:"strategy,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -130,6 +178,13 @@ func (x *HandleFault) GetEscalate() *EscalateDirective { return nil } +func (x *HandleFault) GetStrategy() Strategy { + if x != nil { + return x.Strategy + } + return Strategy_STRATEGY_ONE_FOR_ONE +} + type isHandleFault_Directive interface { isHandleFault_Directive() } @@ -332,7 +387,7 @@ var File_internal_supervision_proto protoreflect.FileDescriptor var file_internal_supervision_proto_rawDesc = string([]byte{ 0x0a, 0x1a, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x69, 0x6e, - 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x22, 0xae, 0x02, 0x0a, 0x0b, 0x48, 0x61, 0x6e, + 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x22, 0xe0, 0x02, 0x0a, 0x0b, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x46, 0x61, 0x75, 0x6c, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, @@ -350,28 +405,35 @@ var file_internal_supervision_proto_rawDesc = string([]byte{ 0x3b, 0x0a, 0x08, 0x65, 0x73, 0x63, 0x61, 0x6c, 0x61, 0x74, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x45, 0x73, 0x63, 0x61, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, - 0x48, 0x00, 0x52, 0x08, 0x65, 0x73, 0x63, 0x61, 0x6c, 0x61, 0x74, 0x65, 0x42, 0x0b, 0x0a, 0x09, - 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x0f, 0x0a, 0x0d, 0x53, 0x74, 0x6f, - 0x70, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x11, 0x0a, 0x0f, 0x52, 0x65, - 0x73, 0x75, 0x6d, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x13, 0x0a, - 0x11, 0x45, 0x73, 0x63, 0x61, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, - 0x76, 0x65, 0x22, 0x4d, 0x0a, 0x10, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x44, 0x69, 0x72, - 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x61, 0x78, 0x5f, 0x72, 0x65, - 0x74, 0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6d, 0x61, 0x78, - 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, - 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, - 0x74, 0x42, 0xa9, 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x61, 0x6c, 0x70, 0x62, 0x42, 0x10, 0x53, 0x75, 0x70, 0x65, 0x72, 0x76, 0x69, 0x73, 0x69, 0x6f, - 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x6f, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x79, 0x2f, - 0x67, 0x6f, 0x61, 0x6b, 0x74, 0x2f, 0x76, 0x33, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, - 0x6c, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x3b, 0x69, 0x6e, 0x74, - 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x49, 0x58, 0x58, 0xaa, 0x02, 0x0a, - 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xca, 0x02, 0x0a, 0x49, 0x6e, 0x74, - 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xe2, 0x02, 0x16, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x61, 0x6c, 0x70, 0x62, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0xea, 0x02, 0x0a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x48, 0x00, 0x52, 0x08, 0x65, 0x73, 0x63, 0x61, 0x6c, 0x61, 0x74, 0x65, 0x12, 0x30, 0x0a, 0x08, + 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, + 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x72, 0x61, + 0x74, 0x65, 0x67, 0x79, 0x52, 0x08, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x42, 0x0b, + 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x0f, 0x0a, 0x0d, 0x53, + 0x74, 0x6f, 0x70, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x11, 0x0a, 0x0f, + 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, + 0x13, 0x0a, 0x11, 0x45, 0x73, 0x63, 0x61, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, + 0x74, 0x69, 0x76, 0x65, 0x22, 0x4d, 0x0a, 0x10, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x44, + 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x61, 0x78, 0x5f, + 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6d, + 0x61, 0x78, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, + 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, + 0x6f, 0x75, 0x74, 0x2a, 0x3e, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, + 0x18, 0x0a, 0x14, 0x53, 0x54, 0x52, 0x41, 0x54, 0x45, 0x47, 0x59, 0x5f, 0x4f, 0x4e, 0x45, 0x5f, + 0x46, 0x4f, 0x52, 0x5f, 0x4f, 0x4e, 0x45, 0x10, 0x00, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x54, 0x52, + 0x41, 0x54, 0x45, 0x47, 0x59, 0x5f, 0x4f, 0x4e, 0x45, 0x5f, 0x46, 0x4f, 0x52, 0x5f, 0x41, 0x4c, + 0x4c, 0x10, 0x01, 0x42, 0xa9, 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x42, 0x10, 0x53, 0x75, 0x70, 0x65, 0x72, 0x76, 0x69, 0x73, + 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x3b, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x6f, 0x63, 0x68, 0x65, 0x6d, 0x65, + 0x79, 0x2f, 0x67, 0x6f, 0x61, 0x6b, 0x74, 0x2f, 0x76, 0x33, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x3b, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x49, 0x58, 0x58, 0xaa, + 0x02, 0x0a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xca, 0x02, 0x0a, 0x49, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xe2, 0x02, 0x16, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0xea, 0x02, 0x0a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( @@ -386,24 +448,27 @@ func file_internal_supervision_proto_rawDescGZIP() []byte { return file_internal_supervision_proto_rawDescData } +var file_internal_supervision_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_internal_supervision_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_internal_supervision_proto_goTypes = []any{ - (*HandleFault)(nil), // 0: internalpb.HandleFault - (*StopDirective)(nil), // 1: internalpb.StopDirective - (*ResumeDirective)(nil), // 2: internalpb.ResumeDirective - (*EscalateDirective)(nil), // 3: internalpb.EscalateDirective - (*RestartDirective)(nil), // 4: internalpb.RestartDirective + (Strategy)(0), // 0: internalpb.Strategy + (*HandleFault)(nil), // 1: internalpb.HandleFault + (*StopDirective)(nil), // 2: internalpb.StopDirective + (*ResumeDirective)(nil), // 3: internalpb.ResumeDirective + (*EscalateDirective)(nil), // 4: internalpb.EscalateDirective + (*RestartDirective)(nil), // 5: internalpb.RestartDirective } var file_internal_supervision_proto_depIdxs = []int32{ - 1, // 0: internalpb.HandleFault.stop:type_name -> internalpb.StopDirective - 2, // 1: internalpb.HandleFault.resume:type_name -> internalpb.ResumeDirective - 4, // 2: internalpb.HandleFault.restart:type_name -> internalpb.RestartDirective - 3, // 3: internalpb.HandleFault.escalate:type_name -> internalpb.EscalateDirective - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 2, // 0: internalpb.HandleFault.stop:type_name -> internalpb.StopDirective + 3, // 1: internalpb.HandleFault.resume:type_name -> internalpb.ResumeDirective + 5, // 2: internalpb.HandleFault.restart:type_name -> internalpb.RestartDirective + 4, // 3: internalpb.HandleFault.escalate:type_name -> internalpb.EscalateDirective + 0, // 4: internalpb.HandleFault.strategy:type_name -> internalpb.Strategy + 5, // [5:5] is the sub-list for method output_type + 5, // [5:5] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_internal_supervision_proto_init() } @@ -422,13 +487,14 @@ func file_internal_supervision_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_internal_supervision_proto_rawDesc), len(file_internal_supervision_proto_rawDesc)), - NumEnums: 0, + NumEnums: 1, NumMessages: 5, NumExtensions: 0, NumServices: 0, }, GoTypes: file_internal_supervision_proto_goTypes, DependencyIndexes: file_internal_supervision_proto_depIdxs, + EnumInfos: file_internal_supervision_proto_enumTypes, MessageInfos: file_internal_supervision_proto_msgTypes, }.Build() File_internal_supervision_proto = out.File diff --git a/internal/slice/sync.go b/internal/slice/sync.go index 47da30d9..b020fa88 100644 --- a/internal/slice/sync.go +++ b/internal/slice/sync.go @@ -28,19 +28,19 @@ import ( "sync" ) -// SyncSlice type that can be safely shared between goroutines. -type SyncSlice[T any] struct { +// Sync type that can be safely shared between goroutines. +type Sync[T any] struct { data []T mu sync.RWMutex } // NewSync creates a new lock-free thread-safe slice. -func NewSync[T any]() *SyncSlice[T] { - return &SyncSlice[T]{data: []T{}} +func NewSync[T any]() *Sync[T] { + return &Sync[T]{data: []T{}} } // Len returns the number of items -func (cs *SyncSlice[T]) Len() int { +func (cs *Sync[T]) Len() int { cs.mu.RLock() l := len(cs.data) cs.mu.RUnlock() @@ -48,21 +48,21 @@ func (cs *SyncSlice[T]) Len() int { } // Append adds an item to the concurrent slice. -func (cs *SyncSlice[T]) Append(item T) { +func (cs *Sync[T]) Append(item T) { cs.mu.Lock() cs.data = append(cs.data, item) cs.mu.Unlock() } // AppendMany adds many items to the concurrent slice -func (cs *SyncSlice[T]) AppendMany(item ...T) { +func (cs *Sync[T]) AppendMany(item ...T) { cs.mu.Lock() cs.data = append(cs.data, item...) cs.mu.Unlock() } // Get returns the slice item at the given index -func (cs *SyncSlice[T]) Get(index int) (item T) { +func (cs *Sync[T]) Get(index int) (item T) { cs.mu.RLock() if index < 0 || index >= len(cs.data) { var zero T @@ -74,7 +74,7 @@ func (cs *SyncSlice[T]) Get(index int) (item T) { } // Delete an item from the slice -func (cs *SyncSlice[T]) Delete(index int) { +func (cs *Sync[T]) Delete(index int) { cs.mu.Lock() if index < 0 || index >= len(cs.data) { cs.mu.Unlock() @@ -85,7 +85,7 @@ func (cs *SyncSlice[T]) Delete(index int) { } // Items returns the list of items -func (cs *SyncSlice[T]) Items() []T { +func (cs *Sync[T]) Items() []T { cs.mu.RLock() dataCopy := make([]T, len(cs.data)) copy(dataCopy, cs.data) @@ -94,7 +94,7 @@ func (cs *SyncSlice[T]) Items() []T { } // Reset resets the slice -func (cs *SyncSlice[T]) Reset() { +func (cs *Sync[T]) Reset() { cs.mu.Lock() cs.data = []T{} cs.mu.Unlock() diff --git a/internal/syncmap/syncmap.go b/internal/syncmap/syncmap.go index 029b8642..36feb11c 100644 --- a/internal/syncmap/syncmap.go +++ b/internal/syncmap/syncmap.go @@ -26,17 +26,17 @@ package syncmap import "sync" -// SyncMap is a generic, concurrency-safe map that allows storing key-value pairs +// Map is a generic, concurrency-safe map that allows storing key-value pairs // while ensuring thread safety using a read-write mutex. // // K represents the key type, which must be comparable. // V represents the value type, which can be any type. -type SyncMap[K comparable, V any] struct { +type Map[K comparable, V any] struct { mu sync.RWMutex data map[K]V } -// New creates and returns a new instance of SyncMap. +// New creates and returns a new instance of Map. // It initializes the internal map for storing key-value pairs. // // Example usage: @@ -44,58 +44,58 @@ type SyncMap[K comparable, V any] struct { // sm := New[string, int]() // sm.Set("foo", 42) // value, ok := sm.Get("foo") -func New[K comparable, V any]() *SyncMap[K, V] { - return &SyncMap[K, V]{ +func New[K comparable, V any]() *Map[K, V] { + return &Map[K, V]{ data: make(map[K]V), } } -// Set stores a key-value pair in the SyncMap. +// Set stores a key-value pair in the Map. // If the key already exists, its value is updated. // // This method acquires a write lock to ensure safe concurrent access. -func (s *SyncMap[K, V]) Set(k K, v V) { +func (s *Map[K, V]) Set(k K, v V) { s.mu.Lock() s.data[k] = v s.mu.Unlock() } -// Get retrieves the value associated with the given key from the SyncMap. +// Get retrieves the value associated with the given key from the Map. // The second return value indicates whether the key was found. // // This method acquires a read lock to ensure safe concurrent access. -func (s *SyncMap[K, V]) Get(k K) (V, bool) { +func (s *Map[K, V]) Get(k K) (V, bool) { s.mu.RLock() val, ok := s.data[k] s.mu.RUnlock() return val, ok } -// Delete removes the key-value pair associated with the given key from the SyncMap. +// Delete removes the key-value pair associated with the given key from the Map. // If the key does not exist, this operation has no effect. // // This method acquires a write lock to ensure safe concurrent access. -func (s *SyncMap[K, V]) Delete(k K) { +func (s *Map[K, V]) Delete(k K) { s.mu.Lock() delete(s.data, k) s.mu.Unlock() } -// Len returns the number of key-value pairs currently stored in the SyncMap. +// Len returns the number of key-value pairs currently stored in the Map. // // This method acquires a read lock to ensure safe concurrent access. -func (s *SyncMap[K, V]) Len() int { +func (s *Map[K, V]) Len() int { s.mu.RLock() l := len(s.data) s.mu.RUnlock() return l } -// Range iterates over all key-value pairs in the SyncMap and executes the given function `f` +// Range iterates over all key-value pairs in the Map and executes the given function `f` // for each pair. The iteration order is not guaranteed. // // This method acquires a read lock to ensure safe concurrent access. -func (s *SyncMap[K, V]) Range(f func(K, V)) { +func (s *Map[K, V]) Range(f func(K, V)) { s.mu.RLock() defer s.mu.RUnlock() for k, v := range s.data { diff --git a/protos/internal/supervision.proto b/protos/internal/supervision.proto index 706381b3..b181909d 100644 --- a/protos/internal/supervision.proto +++ b/protos/internal/supervision.proto @@ -4,6 +4,11 @@ package internalpb; option go_package = "github.com/tochemey/goakt/v3/internal/internalpb;internalpb"; +enum Strategy { + STRATEGY_ONE_FOR_ONE = 0; + STRATEGY_ONE_FOR_ALL = 1; +} + // HandleFault message is sent by a child // actor to its parent when it is panicking or returning an error // while processing message @@ -19,6 +24,8 @@ message HandleFault { RestartDirective restart = 5; EscalateDirective escalate = 6; } + // Specifies the strategy + Strategy strategy = 7; } // StopDirective defines the supervisor stop directive