Skip to content

Commit

Permalink
feat: add relocation flag toggling (#632)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Feb 21, 2025
1 parent a195e15 commit 512be4c
Show file tree
Hide file tree
Showing 24 changed files with 613 additions and 109 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ The complete Documentation can be found [here](https://tochemey.gitbook.io/goakt

Kindly check out the [examples](https://github.com/Tochemey/goakt-examples)' repository.

## 💪 Support

GoAkt is free and open source. If you need priority support on complex topics or request new features, please consider [sponsorship](https://github.com/sponsors/Tochemey).

## 🌍 Community

You can join these groups and chat to discuss and ask GoAkt related questions on:
Expand Down
12 changes: 12 additions & 0 deletions actor/actor_ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ActorRef struct {
address *address.Address
// isSingleton defines if the actor is a singleton
isSingleton bool
relocatable bool
}

// Name represents the actor given name
Expand All @@ -77,6 +78,15 @@ func (x ActorRef) IsSingleton() bool {
return x.isSingleton
}

// IsRelocatable determines whether the actor can be relocated to another node if its host node shuts down unexpectedly.
// By default, actors are relocatable to ensure system resilience and high availability.
// However, this behavior can be disabled during the actor's creation using the WithRelocationDisabled option.
//
// Returns true if relocation is allowed, and false if relocation is disabled.
func (x ActorRef) IsRelocatable() bool {
return x.relocatable
}

// Equals is a convenient method to compare two ActorRef
func (x ActorRef) Equals(actor ActorRef) bool {
return x.address.Equals(actor.address)
Expand All @@ -88,6 +98,7 @@ func fromActorRef(actorRef *internalpb.ActorRef) ActorRef {
kind: actorRef.GetActorType(),
address: address.From(actorRef.GetActorAddress()),
isSingleton: actorRef.GetIsSingleton(),
relocatable: actorRef.GetRelocatable(),
}
}

Expand All @@ -97,5 +108,6 @@ func fromPID(pid *PID) ActorRef {
kind: types.Name(pid.Actor()),
address: pid.Address(),
isSingleton: pid.IsSingleton(),
relocatable: pid.IsRelocatable(),
}
}
27 changes: 19 additions & 8 deletions actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ type ActorSystem interface {
// handleRemoteTell handles an asynchronous message to an actor
handleRemoteTell(ctx context.Context, to *PID, message proto.Message) error
// broadcastActor sets actor in the actor system actors registry
broadcastActor(actor *PID, singleton bool)
broadcastActor(actor *PID)
// getPeerStateFromStore returns the peer state from the cluster store
getPeerStateFromStore(address string) (*internalpb.PeerState, error)
// removePeerStateFromStore removes the peer state from the cluster store
Expand Down Expand Up @@ -671,7 +671,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor, opts
guardian := x.getUserGuardian()
_ = x.actors.AddNode(guardian, pid)
x.actors.AddWatcher(pid, x.deathWatch)
x.broadcastActor(pid, false)
x.broadcastActor(pid)
return pid, nil
}

Expand Down Expand Up @@ -707,7 +707,7 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
x.actorsCounter.Inc()
_ = x.actors.AddNode(x.userGuardian, pid)
x.actors.AddWatcher(pid, x.deathWatch)
x.broadcastActor(pid, false)
x.broadcastActor(pid)
return pid, nil
}

Expand All @@ -731,9 +731,10 @@ func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind
// A singleton actor like any other actor is created only once within the system and in the cluster.
// A singleton actor is created with the default supervisor strategy and directive.
// A singleton actor once created lives throughout the lifetime of the given actor system.
// One cannot create a child actor for a singleton actor.
//
// The cluster singleton is automatically started on the oldest node in the cluster.
// If the oldest node leaves the cluster, the singleton is restarted on the new oldest node.
// When the oldest node leaves the cluster unexpectedly, the singleton is restarted on the new oldest node.
// This is useful for managing shared resources or coordinating tasks that should be handled by a single actor.
func (x *actorSystem) SpawnSingleton(ctx context.Context, name string, actor Actor) error {
if !x.started.Load() {
Expand Down Expand Up @@ -775,7 +776,7 @@ func (x *actorSystem) SpawnSingleton(ctx context.Context, name string, actor Act
// add the given actor to the tree and supervise it
_ = x.actors.AddNode(x.singletonManager, pid)
x.actors.AddWatcher(pid, x.deathWatch)
x.broadcastActor(pid, true)
x.broadcastActor(pid)
return nil
}

Expand Down Expand Up @@ -1267,7 +1268,12 @@ func (x *actorSystem) RemoteSpawn(ctx context.Context, request *connect.Request[
}
}

if _, err = x.Spawn(ctx, msg.GetActorName(), actor); err != nil {
var opts []SpawnOption
if !msg.GetRelocatable() {
opts = append(opts, WithRelocationDisabled())
}

if _, err = x.Spawn(ctx, msg.GetActorName(), actor, opts...); err != nil {
logger.Errorf("failed to create actor=(%s) on [host=%s, port=%d]: reason: (%v)", msg.GetActorName(), msg.GetHost(), msg.GetPort(), err)
return nil, connect.NewError(connect.CodeInternal, err)
}
Expand Down Expand Up @@ -1407,12 +1413,13 @@ func (x *actorSystem) getPeerStateFromStore(address string) (*internalpb.PeerSta
}

// broadcastActor broadcast the newly (re)spawned actor into the cluster
func (x *actorSystem) broadcastActor(actor *PID, singleton bool) {
func (x *actorSystem) broadcastActor(actor *PID) {
if x.clusterEnabled.Load() {
x.wireActorsQueue <- &internalpb.ActorRef{
ActorAddress: actor.Address().Address,
ActorType: types.Name(actor.Actor()),
IsSingleton: singleton,
IsSingleton: actor.IsSingleton(),
Relocatable: actor.IsRelocatable(),
}
}
}
Expand Down Expand Up @@ -1860,6 +1867,10 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
pidOpts = append(pidOpts, asSingleton())
}

if !spawnConfig.relocatable {
pidOpts = append(pidOpts, withRelocationDisabled())
}

// enable stash
if x.stashEnabled {
pidOpts = append(pidOpts, withStash())
Expand Down
8 changes: 7 additions & 1 deletion actor/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,13 @@ func TestActorSystem(t *testing.T) {
require.Nil(t, addr)

// spawn the remote actor
err = remoting.RemoteSpawn(ctx, host, remotingPort, actorName, "actors.exchanger", false)
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actors.exchanger",
Singleton: false,
Relocatable: true,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
require.NoError(t, err)

// re-fetching the address of the actor should return not nil address after start
Expand Down
7 changes: 6 additions & 1 deletion actor/cluster_singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/tochemey/goakt/v3/internal/internalpb"
"github.com/tochemey/goakt/v3/internal/types"
"github.com/tochemey/goakt/v3/log"
"github.com/tochemey/goakt/v3/remote"
)

// clusterSingletonManager is a system actor that manages the lifecycle of singleton actors
Expand Down Expand Up @@ -162,5 +163,9 @@ func (x *actorSystem) spawnSingletonOnLeader(ctx context.Context, cl cluster.Int
port = int(peerState.GetRemotingPort())
)

return x.remoting.RemoteSpawn(ctx, host, port, name, actorType, true)
return x.remoting.RemoteSpawn(ctx, host, port, &remote.SpawnRequest{
Name: name,
Kind: actorType,
Singleton: true,
})
}
30 changes: 28 additions & 2 deletions actor/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type PID struct {
goScheduler *goScheduler
startedAt *atomic.Int64
isSingleton atomic.Bool
relocatable atomic.Bool
}

// newPID creates a new pid
Expand Down Expand Up @@ -192,6 +193,7 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.initTimeout.Store(DefaultInitTimeout)
pid.processing.Store(int32(idle))
pid.relocatable.Store(true)

for _, opt := range opts {
opt(pid)
Expand Down Expand Up @@ -376,11 +378,29 @@ func (pid *PID) IsSuspended() bool {
return pid.suspended.Load()
}

// IsSingleton returns true when the actor is a singleton
// IsSingleton returns true when the actor is a singleton.
//
// A singleton actor is instantiated when cluster mode is enabled.
// A singleton actor like any other actor is created only once within the system and in the cluster.
// A singleton actor is created with the default supervisor strategy and directive.
// A singleton actor once created lives throughout the lifetime of the given actor system.
//
// The singleton actor is created on the oldest node in the cluster.
// When the oldest node leaves the cluster unexpectedly, the singleton is restarted on the new oldest node.
// This is useful for managing shared resources or coordinating tasks that should be handled by a single actor.
func (pid *PID) IsSingleton() bool {
return pid.isSingleton.Load()
}

// IsRelocatable determines whether the actor can be relocated to another node when its host node shuts down unexpectedly.
// By default, actors are relocatable to ensure system resilience and high availability.
// However, this behavior can be disabled during the actor's creation using the WithRelocationDisabled option.
//
// Returns true if relocation is allowed, and false if relocation is disabled.
func (pid *PID) IsRelocatable() bool {
return pid.relocatable.Load()
}

// ActorSystem returns the actor system
func (pid *PID) ActorSystem() ActorSystem {
pid.fieldsLocker.RLock()
Expand Down Expand Up @@ -564,6 +584,11 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts .
pidOptions = append(pidOptions, withSupervisor(spawnConfig.supervisor))
}

// set the relocation flag
if spawnConfig.relocatable {
pidOptions = append(pidOptions, withRelocationDisabled())
}

// disable passivation for system actor
switch {
case isReservedName(name):
Expand Down Expand Up @@ -611,7 +636,7 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts .
}

// set the actor in the given actor system registry
pid.ActorSystem().broadcastActor(cid, false)
pid.ActorSystem().broadcastActor(cid)
return cid, nil
}

Expand Down Expand Up @@ -1332,6 +1357,7 @@ func (pid *PID) reset() {
pid.supervisor.Reset()
pid.mailbox.Dispose()
pid.isSingleton.Store(false)
pid.relocatable.Store(true)
}

// freeWatchers releases all the actors watching this actor
Expand Down
7 changes: 7 additions & 0 deletions actor/pid_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,10 @@ func asSingleton() pidOption {
pid.isSingleton.Store(true)
}
}

// withRelocationDisabled disables the actor relocation
func withRelocationDisabled() pidOption {
return func(pid *PID) {
pid.relocatable.Store(false)
}
}
7 changes: 7 additions & 0 deletions actor/pid_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ func TestPIDOptions(t *testing.T) {
negativeDuration atomic.Duration
atomicUint64 atomic.Uint64
atomicTrue atomic.Bool
atomicFalse atomic.Bool
)
negativeDuration.Store(-1)
atomicInt.Store(5)
atomicDuration.Store(time.Second)
atomicUint64.Store(10)
eventsStream := eventstream.New()
atomicTrue.Store(true)
atomicFalse.Store(false)

testCases := []struct {
name string
Expand Down Expand Up @@ -104,6 +106,11 @@ func TestPIDOptions(t *testing.T) {
isSingleton: atomicTrue,
},
},
{
name: "withRelocationDisabled",
option: withRelocationDisabled(),
expected: &PID{relocatable: atomicFalse},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
28 changes: 20 additions & 8 deletions actor/rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tochemey/goakt/v3/internal/collection/slice"
"github.com/tochemey/goakt/v3/internal/internalpb"
"github.com/tochemey/goakt/v3/log"
"github.com/tochemey/goakt/v3/remote"
)

// rebalancer is a system actor that helps rebalance cluster
Expand Down Expand Up @@ -131,14 +132,21 @@ func (r *rebalancer) Rebalance(ctx *ReceiveContext) {
return NewSpawnError(err)
}

if err := r.remoting.RemoteSpawn(egCtx,
peerState.GetHost(),
int(peerState.GetRemotingPort()),
actor.GetActorName(),
actor.GetActorType(),
false); err != nil {
logger.Error(err)
return NewSpawnError(err)
if actor.GetRelocatable() {
host := peerState.GetHost()
port := int(peerState.GetRemotingPort())

spawnRequest := &remote.SpawnRequest{
Name: actor.GetActorName(),
Kind: actor.GetActorType(),
Singleton: false,
Relocatable: true,
}

if err := r.remoting.RemoteSpawn(egCtx, host, port, spawnRequest); err != nil {
logger.Error(err)
return NewSpawnError(err)
}
}
}
}
Expand Down Expand Up @@ -223,6 +231,10 @@ func (r *rebalancer) recreateLocally(ctx context.Context, actor *internalpb.Acto
return r.pid.ActorSystem().SpawnSingleton(ctx, actor.GetActorName(), iactor)
}

if !actor.GetRelocatable() {
return nil
}

_, err = r.pid.ActorSystem().Spawn(ctx, actor.GetActorName(), iactor)
return err
}
Loading

0 comments on commit 512be4c

Please sign in to comment.