Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor PID to a concrete struct instead of interface #436

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,32 @@ type ActorSystem interface {
// Name returns the actor system name
Name() string
// Actors returns the list of Actors that are alive in the actor system
Actors() []PID
Actors() []*PID
// Start starts the actor system
Start(ctx context.Context) error
// Stop stops the actor system
Stop(ctx context.Context) error
// Spawn creates an actor in the system and starts it
Spawn(ctx context.Context, name string, actor Actor) (PID, error)
Spawn(ctx context.Context, name string, actor Actor) (*PID, error)
// SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks
// in the given optional options
SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error)
SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
// SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks
// in the given optional options
SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error)
SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
// SpawnRouter creates a new router. One can additionally set the router options.
// A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel.
// A single actor will only process one message at a time.
SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (PID, error)
SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error)
// Kill stops a given actor in the system
Kill(ctx context.Context, name string) error
// ReSpawn recreates a given actor in the system
ReSpawn(ctx context.Context, name string) (PID, error)
ReSpawn(ctx context.Context, name string) (*PID, error)
// NumActors returns the total number of active actors in the system
NumActors() uint64
// LocalActor returns the reference of a local actor.
// A local actor is an actor that reside on the same node where the given actor system is running
LocalActor(actorName string) (PID, error)
LocalActor(actorName string) (*PID, error)
// RemoteActor returns the address of a remote actor when cluster is enabled
// When the cluster mode is not enabled an actor not found error will be returned
// One can always check whether cluster is enabled before calling this method or just use the ActorOf method.
Expand All @@ -99,7 +99,7 @@ type ActorSystem interface {
// When cluster mode is activated, the PID will be nil.
// When remoting is enabled this method will return and error
// An actor not found error is return when the actor is not found.
ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid PID, err error)
ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid *PID, err error)
// InCluster states whether the actor system is running within a cluster of nodes
InCluster() bool
// GetPartition returns the partition where a given actor is located
Expand All @@ -111,14 +111,14 @@ type ActorSystem interface {
// ScheduleOnce schedules a message that will be delivered to the receiver actor
// This will send the given message to the actor after the given interval specified.
// The message will be sent once
ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error
ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error
// RemoteScheduleOnce schedules a message to be sent to a remote actor in the future.
// This requires remoting to be enabled on the actor system.
// This will send the given message to the actor after the given interval specified
// The message will be sent once
RemoteScheduleOnce(ctx context.Context, message proto.Message, address *goaktpb.Address, interval time.Duration) error
// ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error
ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error
// RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *goaktpb.Address, cronExpression string) error
// PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system.
Expand All @@ -132,13 +132,13 @@ type ActorSystem interface {
Logger() log.Logger
// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
// handleRemoteTell handles an asynchronous message to an actor
handleRemoteTell(ctx context.Context, to PID, message proto.Message) error
handleRemoteTell(ctx context.Context, to *PID, message proto.Message) error
// setActor sets actor in the actor system actors registry
setActor(actor PID)
setActor(actor *PID)
// supervisor return the system supervisor
getSupervisor() PID
getSupervisor() *PID
}

// ActorSystem represent a collection of actors on a given node
Expand Down Expand Up @@ -221,7 +221,7 @@ type actorSystem struct {
clusterConfig *ClusterConfig
redistributionChan chan *cluster.Event

supervisor PID
supervisor *PID
}

// enforce compilation error when all methods of the ActorSystem interface are not implemented
Expand Down Expand Up @@ -330,7 +330,7 @@ func (x *actorSystem) Register(_ context.Context, actor Actor) error {
// ScheduleOnce schedules a message that will be delivered to the receiver actor
// This will send the given message to the actor after the given interval specified.
// The message will be sent once
func (x *actorSystem) ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error {
func (x *actorSystem) ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error {
return x.scheduler.ScheduleOnce(ctx, message, pid, interval)
}

Expand All @@ -343,7 +343,7 @@ func (x *actorSystem) RemoteScheduleOnce(ctx context.Context, message proto.Mess
}

// ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
func (x *actorSystem) ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error {
func (x *actorSystem) ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error {
return x.scheduler.ScheduleWithCron(ctx, message, pid, cronExpression)
}

Expand Down Expand Up @@ -392,7 +392,7 @@ func (x *actorSystem) NumActors() uint64 {
}

// Spawn creates or returns the instance of a given actor in the system
func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID, error) {
func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (*PID, error) {
if !x.started.Load() {
return nil, ErrActorSystemNotStarted
}
Expand Down Expand Up @@ -423,7 +423,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,

// SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks
// in the given optional options
func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) {
func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) {
if !x.started.Load() {
return nil, ErrActorSystemNotStarted
}
Expand All @@ -440,14 +440,14 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
}

// SpawnFromFunc creates an actor with the given receive function.
func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) {
func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) {
return x.SpawnNamedFromFunc(ctx, uuid.NewString(), receiveFunc, opts...)
}

// SpawnRouter creates a new router. One can additionally set the router options.
// A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel.
// A single actor will only process one message at a time.
func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (PID, error) {
func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error) {
router := newRouter(poolSize, routeesKind, x.logger, opts...)
routerName := x.getSystemActorName(routerType)
return x.Spawn(ctx, routerName, router)
Expand Down Expand Up @@ -476,7 +476,7 @@ func (x *actorSystem) Kill(ctx context.Context, name string) error {
}

// ReSpawn recreates a given actor in the system
func (x *actorSystem) ReSpawn(ctx context.Context, name string) (PID, error) {
func (x *actorSystem) ReSpawn(ctx context.Context, name string) (*PID, error) {
if !x.started.Load() {
return nil, ErrActorSystemNotStarted
}
Expand Down Expand Up @@ -509,11 +509,11 @@ func (x *actorSystem) Name() string {
}

// Actors returns the list of Actors that are alive in the actor system
func (x *actorSystem) Actors() []PID {
func (x *actorSystem) Actors() []*PID {
x.locker.Lock()
pids := x.actors.pids()
x.locker.Unlock()
actors := make([]PID, 0, len(pids))
actors := make([]*PID, 0, len(pids))
for _, pid := range pids {
if !isSystemName(pid.Name()) {
actors = append(actors, pid)
Expand All @@ -537,7 +537,7 @@ func (x *actorSystem) PeerAddress() string {
// When cluster mode is activated, the PID will be nil.
// When remoting is enabled this method will return and error
// An actor not found error is return when the actor is not found.
func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid PID, err error) {
func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid *PID, err error) {
x.locker.Lock()
defer x.locker.Unlock()

Expand Down Expand Up @@ -579,7 +579,7 @@ func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goak

// LocalActor returns the reference of a local actor.
// A local actor is an actor that reside on the same node where the given actor system is running
func (x *actorSystem) LocalActor(actorName string) (PID, error) {
func (x *actorSystem) LocalActor(actorName string) (*PID, error) {
x.locker.Lock()
defer x.locker.Unlock()

Expand Down Expand Up @@ -1022,25 +1022,25 @@ func (x *actorSystem) GetKinds(_ context.Context, request *connect.Request[inter

// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func (x *actorSystem) handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
func (x *actorSystem) handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
return Ask(ctx, to, message, timeout)
}

// handleRemoteTell handles an asynchronous message to an actor
func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message proto.Message) error {
func (x *actorSystem) handleRemoteTell(ctx context.Context, to *PID, message proto.Message) error {
return Tell(ctx, to, message)
}

// getSupervisor return the system supervisor
func (x *actorSystem) getSupervisor() PID {
func (x *actorSystem) getSupervisor() *PID {
x.locker.Lock()
supervisor := x.supervisor
x.locker.Unlock()
return supervisor
}

// setActor implements ActorSystem.
func (x *actorSystem) setActor(actor PID) {
func (x *actorSystem) setActor(actor *PID) {
x.actors.set(actor)
if x.clusterEnabled.Load() {
x.actorsChan <- &internalpb.WireActor{
Expand Down Expand Up @@ -1365,7 +1365,7 @@ func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer)

// configPID constructs a PID provided the actor name and the actor kind
// this is a utility function used when spawning actors
func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (PID, error) {
func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (*PID, error) {
actorPath := NewPath(name, NewAddress(x.name, "", -1))
if x.remotingEnabled.Load() {
actorPath = NewPath(name, NewAddress(x.name, x.remotingHost, int(x.remotingPort)))
Expand Down
2 changes: 1 addition & 1 deletion actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (x *testRestart) PostStop(context.Context) error {
var _ Actor = &testRestart{}

type forwarder struct {
actorRef PID
actorRef *PID
}

func (x *forwarder) PreStart(context.Context) error {
Expand Down
8 changes: 4 additions & 4 deletions actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (

// Ask sends a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
if !to.IsRunning() {
return nil, ErrDead
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Durati
}

// Tell sends an asynchronous message to an actor
func Tell(ctx context.Context, to PID, message proto.Message) error {
func Tell(ctx context.Context, to *PID, message proto.Message) error {
if !to.IsRunning() {
return ErrDead
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func Tell(ctx context.Context, to PID, message proto.Message) error {
}

// BatchTell sends bulk asynchronous messages to an actor
func BatchTell(ctx context.Context, to PID, messages ...proto.Message) error {
func BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error {
if !to.IsRunning() {
return ErrDead
}
Expand All @@ -122,7 +122,7 @@ func BatchTell(ctx context.Context, to PID, messages ...proto.Message) error {
// BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages.
// The messages will be processed one after the other in the order they are sent
// This is a design choice to follow the simple principle of one message at a time processing by actors.
func BatchAsk(ctx context.Context, to PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error) {
func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error) {
if !to.IsRunning() {
return nil, ErrDead
}
Expand Down
4 changes: 2 additions & 2 deletions actors/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ func TestAPIRemoteReSpawn(t *testing.T) {
assert.NotNil(t, actorRef)

// assert the actor restart count
pid := actorRef.(*pid)
pid := actorRef
assert.Zero(t, pid.restartCount.Load())

// get the address of the actor
Expand Down Expand Up @@ -1687,7 +1687,7 @@ func TestAPIRemoteStop(t *testing.T) {
assert.NotNil(t, actorRef)

// assert the actor restart count
pid := actorRef.(*pid)
pid := actorRef
assert.Zero(t, pid.restartCount.Load())

// get the address of the actor
Expand Down
2 changes: 1 addition & 1 deletion actors/func_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func WithPostStop(fn PostStopFunc) FuncOption {

// funcActor is an actor that only handles messages
type funcActor struct {
pid PID
pid *PID
id string
receiveFunc ReceiveFunc
preStart PreStartFunc
Expand Down
Loading
Loading