Skip to content

Commit

Permalink
feat: add relocation flag toggling
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Feb 21, 2025
1 parent c41a9ea commit 16ed9a1
Show file tree
Hide file tree
Showing 18 changed files with 304 additions and 88 deletions.
9 changes: 7 additions & 2 deletions actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,12 @@ func (x *actorSystem) RemoteSpawn(ctx context.Context, request *connect.Request[
}
}

if _, err = x.Spawn(ctx, msg.GetActorName(), actor); err != nil {
var opts []SpawnOption
if !msg.GetRelocatable() {
opts = append(opts, WithRelocationDisabled())
}

if _, err = x.Spawn(ctx, msg.GetActorName(), actor, opts...); err != nil {
logger.Errorf("failed to create actor=(%s) on [host=%s, port=%d]: reason: (%v)", msg.GetActorName(), msg.GetHost(), msg.GetPort(), err)
return nil, connect.NewError(connect.CodeInternal, err)
}
Expand Down Expand Up @@ -1862,7 +1867,7 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
pidOpts = append(pidOpts, asSingleton())
}

if spawnConfig.disableRelocation {
if !spawnConfig.relocatable {
pidOpts = append(pidOpts, withRelocationDisabled())
}

Expand Down
8 changes: 7 additions & 1 deletion actor/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,13 @@ func TestActorSystem(t *testing.T) {
require.Nil(t, addr)

// spawn the remote actor
err = remoting.RemoteSpawn(ctx, host, remotingPort, actorName, "actors.exchanger", false)
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actors.exchanger",
Singleton: false,
Relocatable: true,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
require.NoError(t, err)

// re-fetching the address of the actor should return not nil address after start
Expand Down
7 changes: 6 additions & 1 deletion actor/cluster_singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/tochemey/goakt/v3/internal/internalpb"
"github.com/tochemey/goakt/v3/internal/types"
"github.com/tochemey/goakt/v3/log"
"github.com/tochemey/goakt/v3/remote"
)

// clusterSingletonManager is a system actor that manages the lifecycle of singleton actors
Expand Down Expand Up @@ -162,5 +163,9 @@ func (x *actorSystem) spawnSingletonOnLeader(ctx context.Context, cl cluster.Int
port = int(peerState.GetRemotingPort())
)

return x.remoting.RemoteSpawn(ctx, host, port, name, actorType, true)
return x.remoting.RemoteSpawn(ctx, host, port, &remote.SpawnRequest{
Name: name,
Kind: actorType,
Singleton: true,
})
}
18 changes: 9 additions & 9 deletions actor/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ type PID struct {

remoting *Remoting

goScheduler *goScheduler
startedAt *atomic.Int64
isSingleton atomic.Bool
disableRelocation atomic.Bool
goScheduler *goScheduler
startedAt *atomic.Int64
isSingleton atomic.Bool
relocatable atomic.Bool
}

// newPID creates a new pid
Expand Down Expand Up @@ -193,7 +193,7 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.initTimeout.Store(DefaultInitTimeout)
pid.processing.Store(int32(idle))
pid.disableRelocation.Store(false)
pid.relocatable.Store(true)

for _, opt := range opts {
opt(pid)
Expand Down Expand Up @@ -392,13 +392,13 @@ func (pid *PID) IsSingleton() bool {
return pid.isSingleton.Load()
}

// IsRelocatable determines whether the actor can be relocated to another node if its host node shuts down unexpectedly.
// IsRelocatable determines whether the actor can be relocated to another node when its host node shuts down unexpectedly.
// By default, actors are relocatable to ensure system resilience and high availability.
// However, this behavior can be disabled during the actor's creation using the WithRelocationDisabled option.
//
// Returns true if relocation is allowed, and false if relocation is disabled.
func (pid *PID) IsRelocatable() bool {
return !pid.disableRelocation.Load()
return pid.relocatable.Load()
}

// ActorSystem returns the actor system
Expand Down Expand Up @@ -585,7 +585,7 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts .
}

// set the relocation flag
if spawnConfig.disableRelocation {
if spawnConfig.relocatable {
pidOptions = append(pidOptions, withRelocationDisabled())
}

Expand Down Expand Up @@ -1357,7 +1357,7 @@ func (pid *PID) reset() {
pid.supervisor.Reset()
pid.mailbox.Dispose()
pid.isSingleton.Store(false)
pid.disableRelocation.Store(false)
pid.relocatable.Store(true)
}

// freeWatchers releases all the actors watching this actor
Expand Down
2 changes: 1 addition & 1 deletion actor/pid_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ func asSingleton() pidOption {
// withRelocationDisabled disables the actor relocation
func withRelocationDisabled() pidOption {
return func(pid *PID) {
pid.disableRelocation.Store(true)
pid.relocatable.Store(false)
}
}
4 changes: 3 additions & 1 deletion actor/pid_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ func TestPIDOptions(t *testing.T) {
negativeDuration atomic.Duration
atomicUint64 atomic.Uint64
atomicTrue atomic.Bool
atomicFalse atomic.Bool
)
negativeDuration.Store(-1)
atomicInt.Store(5)
atomicDuration.Store(time.Second)
atomicUint64.Store(10)
eventsStream := eventstream.New()
atomicTrue.Store(true)
atomicFalse.Store(false)

testCases := []struct {
name string
Expand Down Expand Up @@ -107,7 +109,7 @@ func TestPIDOptions(t *testing.T) {
{
name: "withRelocationDisabled",
option: withRelocationDisabled(),
expected: &PID{disableRelocation: atomicTrue},
expected: &PID{relocatable: atomicFalse},
},
}
for _, tc := range testCases {
Expand Down
12 changes: 9 additions & 3 deletions actor/rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tochemey/goakt/v3/internal/collection/slice"
"github.com/tochemey/goakt/v3/internal/internalpb"
"github.com/tochemey/goakt/v3/log"
"github.com/tochemey/goakt/v3/remote"
)

// rebalancer is a system actor that helps rebalance cluster
Expand Down Expand Up @@ -134,10 +135,15 @@ func (r *rebalancer) Rebalance(ctx *ReceiveContext) {
if actor.GetRelocatable() {
host := peerState.GetHost()
port := int(peerState.GetRemotingPort())
actorName := actor.GetActorName()
actorType := actor.GetActorType()

if err := r.remoting.RemoteSpawn(egCtx, host, port, actorName, actorType, false); err != nil {
spawnRequest := &remote.SpawnRequest{
Name: actor.GetActorName(),
Kind: actor.GetActorType(),
Singleton: false,
Relocatable: true,
}

if err := r.remoting.RemoteSpawn(egCtx, host, port, spawnRequest); err != nil {
logger.Error(err)
return NewSpawnError(err)
}
Expand Down
16 changes: 12 additions & 4 deletions actor/remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
nethttp "net/http"
"strings"
"time"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/tochemey/goakt/v3/internal/http"
"github.com/tochemey/goakt/v3/internal/internalpb"
"github.com/tochemey/goakt/v3/internal/internalpb/internalpbconnect"
"github.com/tochemey/goakt/v3/remote"
)

// RemotingOption sets the remoting option
Expand Down Expand Up @@ -315,15 +317,21 @@ func (r *Remoting) RemoteBatchAsk(ctx context.Context, from, to *address.Address
}

// RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem
func (r *Remoting) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string, singleton bool) error {
func (r *Remoting) RemoteSpawn(ctx context.Context, host string, port int, spawnRequest *remote.SpawnRequest) error {
if err := spawnRequest.Validate(); err != nil {
return fmt.Errorf("invalid spawn option: %w", err)
}

spawnRequest.Sanitize()
remoteClient := r.serviceClient(host, port)
request := connect.NewRequest(
&internalpb.RemoteSpawnRequest{
Host: host,
Port: int32(port),
ActorName: name,
ActorType: actorType,
IsSingleton: singleton,
ActorName: spawnRequest.Name,
ActorType: spawnRequest.Kind,
IsSingleton: spawnRequest.Singleton,
Relocatable: spawnRequest.Relocatable,
},
)

Expand Down
84 changes: 80 additions & 4 deletions actor/remoting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,13 @@ func TestRemotingSpawn(t *testing.T) {
require.NoError(t, err)

// spawn the remote actor
err = remoting.RemoteSpawn(ctx, host, remotingPort, actorName, "actors.exchanger", false)
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actors.exchanger",
Singleton: false,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
require.NoError(t, err)

// re-fetching the address of the actor should return not nil address after start
Expand Down Expand Up @@ -1761,7 +1767,13 @@ func TestRemotingSpawn(t *testing.T) {
require.Nil(t, addr)

// spawn the remote actor
err = remoting.RemoteSpawn(ctx, sys.Host(), int(sys.Port()), actorName, "actors.exchanger", false)
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actors.exchanger",
Singleton: false,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, sys.Host(), int(sys.Port()), request)
require.Error(t, err)
assert.EqualError(t, err, ErrTypeNotRegistered.Error())

Expand Down Expand Up @@ -1799,7 +1811,13 @@ func TestRemotingSpawn(t *testing.T) {
actorName := uuid.NewString()
remoting := NewRemoting()
// spawn the remote actor
err = remoting.RemoteSpawn(ctx, host, remotingPort, actorName, "actors.exchanger", false)
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actors.exchanger",
Singleton: false,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
require.Error(t, err)

t.Cleanup(
Expand Down Expand Up @@ -1862,7 +1880,13 @@ func TestRemotingSpawn(t *testing.T) {
require.NoError(t, err)

// spawn the remote actor
err = remoting.RemoteSpawn(ctx, host, remotingPort, actorName, "actors.exchanger", false)
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actors.exchanger",
Singleton: false,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
require.NoError(t, err)

// re-fetching the address of the actor should return not nil address after start
Expand Down Expand Up @@ -1893,4 +1917,56 @@ func TestRemotingSpawn(t *testing.T) {
},
)
})
t.Run("When request is invalid", func(t *testing.T) {
// create the context
ctx := context.TODO()
// define the logger to use
logger := log.DiscardLogger
// generate the remoting port
ports := dynaport.Get(1)
remotingPort := ports[0]
host := "127.0.0.1"

// create the actor system
sys, err := NewActorSystem(
"test",
WithLogger(logger),
WithPassivationDisabled(),
WithRemote(remote.NewConfig(host, remotingPort)),
)
// assert there are no error
require.NoError(t, err)

// start the actor system
err = sys.Start(ctx)
assert.NoError(t, err)

// create an actor implementation and register it
actor := &exchanger{}
actorName := uuid.NewString()

remoting := NewRemoting()
// fetching the address of the that actor should return nil address
addr, err := remoting.RemoteLookup(ctx, sys.Host(), int(sys.Port()), actorName)
require.NoError(t, err)
require.Nil(t, addr)

// register the actor
err = sys.Register(ctx, actor)
require.NoError(t, err)

// spawn the remote actor
request := &remote.SpawnRequest{
Name: "",
Kind: "actors.exchanger",
Singleton: false,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
require.Error(t, err)

remoting.Close()
err = sys.Stop(ctx)
assert.NoError(t, err)
})
}
12 changes: 8 additions & 4 deletions actor/spawn_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ type spawnConfig struct {
passivateAfter *time.Duration
// specifies if the actor is a singleton
asSingleton bool
// specifies if the actor should be rebalanced
disableRelocation bool
// specifies if the actor should be relocated
relocatable bool
}

// newSpawnConfig creates an instance of spawnConfig
func newSpawnConfig(opts ...SpawnOption) *spawnConfig {
config := new(spawnConfig)
config := &spawnConfig{
relocatable: true,
asSingleton: false,
}

for _, opt := range opts {
opt.Apply(config)
}
Expand Down Expand Up @@ -124,7 +128,7 @@ func WithLongLived() SpawnOption {
// cannot be easily replicated.
func WithRelocationDisabled() SpawnOption {
return spawnOption(func(config *spawnConfig) {
config.disableRelocation = true
config.relocatable = false
})
}

Expand Down
2 changes: 1 addition & 1 deletion actor/spawn_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestSpawnOption(t *testing.T) {
config := &spawnConfig{}
option := WithRelocationDisabled()
option.Apply(config)
require.Equal(t, &spawnConfig{disableRelocation: true}, config)
require.Equal(t, &spawnConfig{relocatable: true}, config)
})
}

Expand Down
1 change: 1 addition & 0 deletions client/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package client
type Actor struct {
name string // Name defines the actor name. This will be unique in the Client
kind string // Kind specifies the actor kind.

}

// NewActor creates an instance of Actor
Expand Down
Loading

0 comments on commit 16ed9a1

Please sign in to comment.