Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add cluster singleton #628

Merged
merged 24 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 145 additions & 26 deletions actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ type ActorSystem interface {
// A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel.
// A single actor will only process one message at a time.
SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error)
// SpawnSingleton creates a singleton actor in the system.
//
// A singleton actor is instantiated when cluster mode is enabled.
// A singleton actor like any other actor is created only once within the system and in the cluster.
// A singleton actor is created with the default supervisor strategy and directive.
// A singleton actor once created lives throughout the lifetime of the given actor system.
//
// The cluster singleton is automatically started on the oldest node in the cluster.
// If the oldest node leaves the cluster, the singleton is restarted on the new oldest node.
// This is useful for managing shared resources or coordinating tasks that should be handled by a single actor.
SpawnSingleton(ctx context.Context, name string, actor Actor) error
// Kill stops a given actor in the system
Kill(ctx context.Context, name string) error
// ReSpawn recreates a given actor in the system
Expand Down Expand Up @@ -186,8 +197,8 @@ type ActorSystem interface {
handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
// handleRemoteTell handles an asynchronous message to an actor
handleRemoteTell(ctx context.Context, to *PID, message proto.Message) error
// setActor sets actor in the actor system actors registry
broadcastActor(actor *PID)
// broadcastActor sets actor in the actor system actors registry
broadcastActor(actor *PID, singleton bool)
// getPeerStateFromStore returns the peer state from the cluster store
getPeerStateFromStore(address string) (*internalpb.PeerState, error)
// removePeerStateFromStore removes the peer state from the cluster store
Expand All @@ -205,6 +216,7 @@ type ActorSystem interface {
getUserGuardian() *PID
getDeathWatch() *PID
getDeadletter() *PID
getSingletonManager() *PID
}

// ActorSystem represent a collection of actors on a given node
Expand Down Expand Up @@ -278,12 +290,14 @@ type actorSystem struct {
rebalancingQueue chan *internalpb.PeerState
rebalancedNodes goset.Set[string]

rebalancer *PID
rootGuardian *PID
userGuardian *PID
systemGuardian *PID
deathWatch *PID
deadletters *PID
rebalancer *PID
rootGuardian *PID
userGuardian *PID
systemGuardian *PID
deathWatch *PID
deadletter *PID
singletonManager *PID

startedAt *atomic.Int64
rebalancing *atomic.Bool
rebalanceLocker *sync.Mutex
Expand Down Expand Up @@ -446,8 +460,9 @@ func (x *actorSystem) Start(ctx context.Context) error {
AddError(x.spawnSystemGuardian(ctx)).
AddError(x.spawnUserGuardian(ctx)).
AddError(x.spawnRebalancer(ctx)).
AddError(x.spawnJanitor(ctx)).
AddError(x.spawnDeathWatch(ctx)).
AddError(x.spawnDeadletter(ctx)).
AddError(x.spawnSingletonManager(ctx)).
Error(); err != nil {
return errorschain.
New(errorschain.ReturnAll()).
Expand Down Expand Up @@ -633,7 +648,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor, opts
}

// check some preconditions
if err := x.checkSpawnPreconditions(ctx, name, actor); err != nil {
if err := x.checkSpawnPreconditions(ctx, name, actor, false); err != nil {
return nil, err
}

Expand All @@ -655,7 +670,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor, opts
// add the given actor to the tree and supervise it
_ = x.actors.AddNode(x.userGuardian, pid)
x.actors.AddWatcher(pid, x.deathWatch)
x.broadcastActor(pid)
x.broadcastActor(pid, false)
return pid, nil
}

Expand All @@ -670,7 +685,7 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
actor := newFuncActor(name, receiveFunc, config)

// check some preconditions
if err := x.checkSpawnPreconditions(ctx, name, actor); err != nil {
if err := x.checkSpawnPreconditions(ctx, name, actor, false); err != nil {
return nil, err
}

Expand All @@ -691,7 +706,7 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
x.actorsCounter.Inc()
_ = x.actors.AddNode(x.userGuardian, pid)
x.actors.AddWatcher(pid, x.deathWatch)
x.broadcastActor(pid)
x.broadcastActor(pid, false)
return pid, nil
}

Expand All @@ -709,6 +724,74 @@ func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind
return x.Spawn(ctx, routerName, router)
}

// SpawnSingleton creates a singleton actor in the system.
//
// A singleton actor is instantiated when cluster mode is enabled.
// A singleton actor like any other actor is created only once within the system and in the cluster.
// A singleton actor is created with the default supervisor strategy and directive.
// A singleton actor once created lives throughout the lifetime of the given actor system.
//
// The cluster singleton is automatically started on the oldest node in the cluster.
// If the oldest node leaves the cluster, the singleton is restarted on the new oldest node.
// This is useful for managing shared resources or coordinating tasks that should be handled by a single actor.
func (x *actorSystem) SpawnSingleton(ctx context.Context, name string, actor Actor) error {
if !x.started.Load() {
return ErrActorSystemNotStarted
}

if !x.InCluster() {
return ErrClusterDisabled
}

// create the actor as the child actor of the singleton manager
singletonManager := x.getSingletonManager()
if singletonManager == nil {
return ErrClusterDisabled
}

cluster := x.getCluster()

// only create the singleton actor on the oldest node in the cluster
if !cluster.IsLeader(ctx) {
return x.spawnSingletonOnLeader(ctx, cluster, name, actor)
}

// check some preconditions
if err := x.checkSpawnPreconditions(ctx, name, actor, true); err != nil {
return err
}

actorAddress := x.actorAddress(name)
pidNode, exist := x.actors.GetNode(actorAddress.String())
if exist {
pid := pidNode.GetValue()
if pid.IsRunning() {
return nil
}
}

pid, err := x.configPID(ctx, name, actor,
WithLongLived(),
WithSupervisor(
NewSupervisor(
WithStrategy(OneForOneStrategy),
WithDirective(PanicError{}, StopDirective),
WithDirective(InternalError{}, StopDirective),
WithDirective(&runtime.PanicNilError{}, StopDirective),
),
))
if err != nil {
return err
}

x.actorsCounter.Inc()
// add the given actor to the tree and supervise it
_ = x.actors.AddNode(x.singletonManager, pid)
x.actors.AddWatcher(pid, x.deathWatch)
x.broadcastActor(pid, true)
return nil
}

// Kill stops a given actor in the system
func (x *actorSystem) Kill(ctx context.Context, name string) error {
if !x.started.Load() {
Expand Down Expand Up @@ -1190,6 +1273,13 @@ func (x *actorSystem) RemoteSpawn(ctx context.Context, request *connect.Request[
return nil, connect.NewError(connect.CodeInternal, err)
}

if msg.GetIsSingleton() {
if err := x.SpawnSingleton(ctx, msg.GetActorName(), actor); err != nil {
logger.Errorf("failed to create actor=(%s) on [host=%s, port=%d]: reason: (%v)", msg.GetActorName(), msg.GetHost(), msg.GetPort(), err)
return nil, connect.NewError(connect.CodeInternal, err)
}
}

if _, err = x.Spawn(ctx, msg.GetActorName(), actor); err != nil {
logger.Errorf("failed to create actor=(%s) on [host=%s, port=%d]: reason: (%v)", msg.GetActorName(), msg.GetHost(), msg.GetPort(), err)
return nil, connect.NewError(connect.CodeInternal, err)
Expand Down Expand Up @@ -1286,14 +1376,22 @@ func (x *actorSystem) getDeathWatch() *PID {
return janitor
}

// getDeadletters returns the system deadletters actor
// getDeadletters returns the system deadletter actor
func (x *actorSystem) getDeadletter() *PID {
x.locker.Lock()
deadletters := x.deadletters
deadletters := x.deadletter
x.locker.Unlock()
return deadletters
}

// getSingletonManager returns the system singleton manager
func (x *actorSystem) getSingletonManager() *PID {
x.locker.Lock()
singletonManager := x.singletonManager
x.locker.Unlock()
return singletonManager
}

func (x *actorSystem) completeRebalancing() {
x.rebalancing.Store(false)
}
Expand Down Expand Up @@ -1322,11 +1420,12 @@ func (x *actorSystem) getPeerStateFromStore(address string) (*internalpb.PeerSta
}

// broadcastActor broadcast the newly (re)spawned actor into the cluster
func (x *actorSystem) broadcastActor(actor *PID) {
func (x *actorSystem) broadcastActor(actor *PID, singleton bool) {
if x.clusterEnabled.Load() {
x.wireActorsQueue <- &internalpb.ActorRef{
ActorAddress: actor.Address().Address,
ActorType: types.TypeName(actor.Actor()),
IsSingleton: singleton,
}
}
}
Expand Down Expand Up @@ -1731,7 +1830,10 @@ func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer)
}

x.logger.Debugf("peer (%s) actors count (%d)", peerAddress, len(peerState.GetActors()))
x.clusterStore.set(peerState)
if err := x.clusterStore.set(peerState); err != nil {
x.logger.Error(err)
return err
}
x.logger.Infof("peer sync(%s) successfully processed", peerAddress)
return nil
}
Expand Down Expand Up @@ -1915,8 +2017,8 @@ func (x *actorSystem) spawnUserGuardian(ctx context.Context) error {
return nil
}

// spawnRebalancer creates the cluster rebalancer
func (x *actorSystem) spawnJanitor(ctx context.Context) error {
// spawnDeathWatch creates the deathWatch actor
func (x *actorSystem) spawnDeathWatch(ctx context.Context) error {
var err error
actorName := x.reservedName(deathWatchType)

Expand Down Expand Up @@ -1972,11 +2074,11 @@ func (x *actorSystem) spawnRebalancer(ctx context.Context) error {
return nil
}

// spawnDeadletter creates the deadletters synthetic actor
// spawnDeadletter creates the deadletter synthetic actor
func (x *actorSystem) spawnDeadletter(ctx context.Context) error {
var err error
actorName := x.reservedName(deadletterType)
x.deadletters, err = x.configPID(ctx,
x.deadletter, err = x.configPID(ctx,
actorName,
newDeadLetter(),
WithSupervisor(
Expand All @@ -1988,18 +2090,35 @@ func (x *actorSystem) spawnDeadletter(ctx context.Context) error {
),
)
if err != nil {
return fmt.Errorf("actor=%s failed to start deadletters: %w", actorName, err)
return fmt.Errorf("actor=%s failed to start deadletter: %w", actorName, err)
}

// the deadletters is a child actor of the system guardian
_ = x.actors.AddNode(x.systemGuardian, x.deadletters)
// the deadletter is a child actor of the system guardian
_ = x.actors.AddNode(x.systemGuardian, x.deadletter)
return nil
}

// checkSpawnPreconditions make sure before an actor is created some pre-conditions are checks
func (x *actorSystem) checkSpawnPreconditions(ctx context.Context, actorName string, kind Actor) error {
func (x *actorSystem) checkSpawnPreconditions(ctx context.Context, actorName string, kind Actor, singleton bool) error {
// check the existence of the actor given the kind prior to creating it
if x.clusterEnabled.Load() {
// a singleton actor must only have one instance at a given time of its kind
// in the whole cluster
if singleton {
id, err := x.cluster.LookupKind(ctx, types.TypeName(kind))
if err != nil {
return err
}

if id != "" {
return ErrSingletonAlreadyExists
}

return nil
}

// here we make sure in cluster mode that the given actor is uniquely created
// by checking both its kind and identifier
existed, err := x.cluster.GetActor(ctx, actorName)
if err != nil {
if errors.Is(err, cluster.ErrActorNotFound) {
Expand Down Expand Up @@ -2045,7 +2164,7 @@ func (x *actorSystem) getSetDeadlettersCount(ctx context.Context) {
// using the default ask timeout
// note: no need to check for error because this call is internal
message, _ := from.Ask(ctx, to, message, DefaultAskTimeout)
// cast the response received from the deadletters
// cast the response received from the deadletter
deadlettersCount := message.(*internalpb.DeadlettersCount)
// set the counter
x.deadlettersCounter.Store(uint64(deadlettersCount.GetTotalCount()))
Expand Down
16 changes: 8 additions & 8 deletions actor/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func TestActorSystem(t *testing.T) {
var items []*goaktpb.ActorRestarted
for message := range consumer.Iterator() {
payload := message.Payload()
// only listening to deadletters
// only listening to deadletter
restarted, ok := payload.(*goaktpb.ActorRestarted)
if ok {
items = append(items, restarted)
Expand Down Expand Up @@ -908,7 +908,7 @@ func TestActorSystem(t *testing.T) {
},
)
})
t.Run("With deadletters subscription ", func(t *testing.T) {
t.Run("With deadletter subscription ", func(t *testing.T) {
ctx := context.TODO()
sys, _ := NewActorSystem("testSys", WithLogger(log.DiscardLogger))

Expand All @@ -933,7 +933,7 @@ func TestActorSystem(t *testing.T) {
// wait a while
util.Pause(time.Second)

// every message sent to the actor will result in deadletters
// every message sent to the actor will result in deadletter
for i := 0; i < 5; i++ {
require.NoError(t, Tell(ctx, actorRef, new(testpb.TestSend)))
}
Expand All @@ -943,7 +943,7 @@ func TestActorSystem(t *testing.T) {
var items []*goaktpb.Deadletter
for message := range consumer.Iterator() {
payload := message.Payload()
// only listening to deadletters
// only listening to deadletter
deadletter, ok := payload.(*goaktpb.Deadletter)
if ok {
items = append(items, deadletter)
Expand All @@ -965,15 +965,15 @@ func TestActorSystem(t *testing.T) {
err = sys.Stop(ctx)
assert.NoError(t, err)
})
t.Run("With deadletters subscription when not started", func(t *testing.T) {
t.Run("With deadletter subscription when not started", func(t *testing.T) {
sys, _ := NewActorSystem("testSys", WithLogger(log.DiscardLogger))

// create a deadletter subscriber
consumer, err := sys.Subscribe()
require.Error(t, err)
require.Nil(t, consumer)
})
t.Run("With deadletters unsubscription when not started", func(t *testing.T) {
t.Run("With deadletter unsubscription when not started", func(t *testing.T) {
ctx := context.TODO()
sys, _ := NewActorSystem("testSys", WithLogger(log.DiscardLogger))

Expand Down Expand Up @@ -1529,7 +1529,7 @@ func TestActorSystem(t *testing.T) {
require.Nil(t, addr)

// spawn the remote actor
err = remoting.RemoteSpawn(ctx, host, remotingPort, actorName, "actors.exchanger")
err = remoting.RemoteSpawn(ctx, host, remotingPort, actorName, "actors.exchanger", false)
require.NoError(t, err)

// re-fetching the address of the actor should return not nil address after start
Expand Down Expand Up @@ -1581,7 +1581,7 @@ func TestActorSystem(t *testing.T) {
assert.EqualValues(t, 1, pid.ProcessedCount())
require.True(t, pid.IsRunning())

// every message sent to the actor will result in deadletters
// every message sent to the actor will result in deadletter
counter := 0
for i := 1; i <= 5; i++ {
require.NoError(t, Tell(ctx, pid, new(testpb.TestSend)))
Expand Down
Loading