Skip to content

Commit

Permalink
refactor: refactor PID to a concrete struct instead of interface (#436)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Aug 23, 2024
1 parent 6fd3e3d commit ba541fb
Show file tree
Hide file tree
Showing 20 changed files with 192 additions and 308 deletions.
62 changes: 31 additions & 31 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,32 @@ type ActorSystem interface {
// Name returns the actor system name
Name() string
// Actors returns the list of Actors that are alive in the actor system
Actors() []PID
Actors() []*PID
// Start starts the actor system
Start(ctx context.Context) error
// Stop stops the actor system
Stop(ctx context.Context) error
// Spawn creates an actor in the system and starts it
Spawn(ctx context.Context, name string, actor Actor) (PID, error)
Spawn(ctx context.Context, name string, actor Actor) (*PID, error)
// SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks
// in the given optional options
SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error)
SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
// SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks
// in the given optional options
SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error)
SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
// SpawnRouter creates a new router. One can additionally set the router options.
// A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel.
// A single actor will only process one message at a time.
SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (PID, error)
SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error)
// Kill stops a given actor in the system
Kill(ctx context.Context, name string) error
// ReSpawn recreates a given actor in the system
ReSpawn(ctx context.Context, name string) (PID, error)
ReSpawn(ctx context.Context, name string) (*PID, error)
// NumActors returns the total number of active actors in the system
NumActors() uint64
// LocalActor returns the reference of a local actor.
// A local actor is an actor that reside on the same node where the given actor system is running
LocalActor(actorName string) (PID, error)
LocalActor(actorName string) (*PID, error)
// RemoteActor returns the address of a remote actor when cluster is enabled
// When the cluster mode is not enabled an actor not found error will be returned
// One can always check whether cluster is enabled before calling this method or just use the ActorOf method.
Expand All @@ -99,7 +99,7 @@ type ActorSystem interface {
// When cluster mode is activated, the PID will be nil.
// When remoting is enabled this method will return and error
// An actor not found error is return when the actor is not found.
ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid PID, err error)
ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid *PID, err error)
// InCluster states whether the actor system is running within a cluster of nodes
InCluster() bool
// GetPartition returns the partition where a given actor is located
Expand All @@ -111,14 +111,14 @@ type ActorSystem interface {
// ScheduleOnce schedules a message that will be delivered to the receiver actor
// This will send the given message to the actor after the given interval specified.
// The message will be sent once
ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error
ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error
// RemoteScheduleOnce schedules a message to be sent to a remote actor in the future.
// This requires remoting to be enabled on the actor system.
// This will send the given message to the actor after the given interval specified
// The message will be sent once
RemoteScheduleOnce(ctx context.Context, message proto.Message, address *goaktpb.Address, interval time.Duration) error
// ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error
ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error
// RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *goaktpb.Address, cronExpression string) error
// PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system.
Expand All @@ -132,13 +132,13 @@ type ActorSystem interface {
Logger() log.Logger
// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
// handleRemoteTell handles an asynchronous message to an actor
handleRemoteTell(ctx context.Context, to PID, message proto.Message) error
handleRemoteTell(ctx context.Context, to *PID, message proto.Message) error
// setActor sets actor in the actor system actors registry
setActor(actor PID)
setActor(actor *PID)
// supervisor return the system supervisor
getSupervisor() PID
getSupervisor() *PID
}

// ActorSystem represent a collection of actors on a given node
Expand Down Expand Up @@ -221,7 +221,7 @@ type actorSystem struct {
clusterConfig *ClusterConfig
redistributionChan chan *cluster.Event

supervisor PID
supervisor *PID
}

// enforce compilation error when all methods of the ActorSystem interface are not implemented
Expand Down Expand Up @@ -330,7 +330,7 @@ func (x *actorSystem) Register(_ context.Context, actor Actor) error {
// ScheduleOnce schedules a message that will be delivered to the receiver actor
// This will send the given message to the actor after the given interval specified.
// The message will be sent once
func (x *actorSystem) ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error {
func (x *actorSystem) ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error {
return x.scheduler.ScheduleOnce(ctx, message, pid, interval)
}

Expand All @@ -343,7 +343,7 @@ func (x *actorSystem) RemoteScheduleOnce(ctx context.Context, message proto.Mess
}

// ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
func (x *actorSystem) ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error {
func (x *actorSystem) ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error {
return x.scheduler.ScheduleWithCron(ctx, message, pid, cronExpression)
}

Expand Down Expand Up @@ -392,7 +392,7 @@ func (x *actorSystem) NumActors() uint64 {
}

// Spawn creates or returns the instance of a given actor in the system
func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID, error) {
func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (*PID, error) {
if !x.started.Load() {
return nil, ErrActorSystemNotStarted
}
Expand Down Expand Up @@ -423,7 +423,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,

// SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks
// in the given optional options
func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) {
func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) {
if !x.started.Load() {
return nil, ErrActorSystemNotStarted
}
Expand All @@ -440,14 +440,14 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
}

// SpawnFromFunc creates an actor with the given receive function.
func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) {
func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) {
return x.SpawnNamedFromFunc(ctx, uuid.NewString(), receiveFunc, opts...)
}

// SpawnRouter creates a new router. One can additionally set the router options.
// A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel.
// A single actor will only process one message at a time.
func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (PID, error) {
func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error) {
router := newRouter(poolSize, routeesKind, x.logger, opts...)
routerName := x.getSystemActorName(routerType)
return x.Spawn(ctx, routerName, router)
Expand Down Expand Up @@ -476,7 +476,7 @@ func (x *actorSystem) Kill(ctx context.Context, name string) error {
}

// ReSpawn recreates a given actor in the system
func (x *actorSystem) ReSpawn(ctx context.Context, name string) (PID, error) {
func (x *actorSystem) ReSpawn(ctx context.Context, name string) (*PID, error) {
if !x.started.Load() {
return nil, ErrActorSystemNotStarted
}
Expand Down Expand Up @@ -509,11 +509,11 @@ func (x *actorSystem) Name() string {
}

// Actors returns the list of Actors that are alive in the actor system
func (x *actorSystem) Actors() []PID {
func (x *actorSystem) Actors() []*PID {
x.locker.Lock()
pids := x.actors.pids()
x.locker.Unlock()
actors := make([]PID, 0, len(pids))
actors := make([]*PID, 0, len(pids))
for _, pid := range pids {
if !isSystemName(pid.Name()) {
actors = append(actors, pid)
Expand All @@ -537,7 +537,7 @@ func (x *actorSystem) PeerAddress() string {
// When cluster mode is activated, the PID will be nil.
// When remoting is enabled this method will return and error
// An actor not found error is return when the actor is not found.
func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid PID, err error) {
func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid *PID, err error) {
x.locker.Lock()
defer x.locker.Unlock()

Expand Down Expand Up @@ -579,7 +579,7 @@ func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goak

// LocalActor returns the reference of a local actor.
// A local actor is an actor that reside on the same node where the given actor system is running
func (x *actorSystem) LocalActor(actorName string) (PID, error) {
func (x *actorSystem) LocalActor(actorName string) (*PID, error) {
x.locker.Lock()
defer x.locker.Unlock()

Expand Down Expand Up @@ -1022,25 +1022,25 @@ func (x *actorSystem) GetKinds(_ context.Context, request *connect.Request[inter

// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func (x *actorSystem) handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
func (x *actorSystem) handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
return Ask(ctx, to, message, timeout)
}

// handleRemoteTell handles an asynchronous message to an actor
func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message proto.Message) error {
func (x *actorSystem) handleRemoteTell(ctx context.Context, to *PID, message proto.Message) error {
return Tell(ctx, to, message)
}

// getSupervisor return the system supervisor
func (x *actorSystem) getSupervisor() PID {
func (x *actorSystem) getSupervisor() *PID {
x.locker.Lock()
supervisor := x.supervisor
x.locker.Unlock()
return supervisor
}

// setActor implements ActorSystem.
func (x *actorSystem) setActor(actor PID) {
func (x *actorSystem) setActor(actor *PID) {
x.actors.set(actor)
if x.clusterEnabled.Load() {
x.actorsChan <- &internalpb.WireActor{
Expand Down Expand Up @@ -1365,7 +1365,7 @@ func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer)

// configPID constructs a PID provided the actor name and the actor kind
// this is a utility function used when spawning actors
func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (PID, error) {
func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (*PID, error) {
actorPath := NewPath(name, NewAddress(x.name, "", -1))
if x.remotingEnabled.Load() {
actorPath = NewPath(name, NewAddress(x.name, x.remotingHost, int(x.remotingPort)))
Expand Down
2 changes: 1 addition & 1 deletion actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (x *testRestart) PostStop(context.Context) error {
var _ Actor = &testRestart{}

type forwarder struct {
actorRef PID
actorRef *PID
}

func (x *forwarder) PreStart(context.Context) error {
Expand Down
8 changes: 4 additions & 4 deletions actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (

// Ask sends a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
if !to.IsRunning() {
return nil, ErrDead
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Durati
}

// Tell sends an asynchronous message to an actor
func Tell(ctx context.Context, to PID, message proto.Message) error {
func Tell(ctx context.Context, to *PID, message proto.Message) error {
if !to.IsRunning() {
return ErrDead
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func Tell(ctx context.Context, to PID, message proto.Message) error {
}

// BatchTell sends bulk asynchronous messages to an actor
func BatchTell(ctx context.Context, to PID, messages ...proto.Message) error {
func BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error {
if !to.IsRunning() {
return ErrDead
}
Expand All @@ -122,7 +122,7 @@ func BatchTell(ctx context.Context, to PID, messages ...proto.Message) error {
// BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages.
// The messages will be processed one after the other in the order they are sent
// This is a design choice to follow the simple principle of one message at a time processing by actors.
func BatchAsk(ctx context.Context, to PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error) {
func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error) {
if !to.IsRunning() {
return nil, ErrDead
}
Expand Down
4 changes: 2 additions & 2 deletions actors/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ func TestAPIRemoteReSpawn(t *testing.T) {
assert.NotNil(t, actorRef)

// assert the actor restart count
pid := actorRef.(*pid)
pid := actorRef
assert.Zero(t, pid.restartCount.Load())

// get the address of the actor
Expand Down Expand Up @@ -1687,7 +1687,7 @@ func TestAPIRemoteStop(t *testing.T) {
assert.NotNil(t, actorRef)

// assert the actor restart count
pid := actorRef.(*pid)
pid := actorRef
assert.Zero(t, pid.restartCount.Load())

// get the address of the actor
Expand Down
2 changes: 1 addition & 1 deletion actors/func_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func WithPostStop(fn PostStopFunc) FuncOption {

// funcActor is an actor that only handles messages
type funcActor struct {
pid PID
pid *PID
id string
receiveFunc ReceiveFunc
preStart PreStartFunc
Expand Down
Loading

0 comments on commit ba541fb

Please sign in to comment.