From baae7d7873d33006f0f94779a6572edd39df77f5 Mon Sep 17 00:00:00 2001 From: Tochemey Date: Thu, 22 Aug 2024 22:12:27 +0100 Subject: [PATCH] refactor: refactor PID to a concrete struct instead of interface --- actors/actor_system.go | 62 ++++----- actors/actor_test.go | 2 +- actors/api.go | 8 +- actors/api_test.go | 4 +- actors/func_actor.go | 2 +- actors/pid.go | 271 +++++++++++--------------------------- actors/pid_map.go | 14 +- actors/pid_map_test.go | 4 +- actors/pid_option.go | 28 ++-- actors/pid_option_test.go | 24 ++-- actors/pid_test.go | 2 +- actors/receive_context.go | 30 ++--- actors/router.go | 10 +- actors/scheduler.go | 4 +- actors/stash.go | 6 +- actors/types.go | 2 +- bench/benchmark.go | 4 +- bench/benchmark_test.go | 1 + testkit/probe.go | 20 +-- testkit/testkit.go | 2 +- 20 files changed, 192 insertions(+), 308 deletions(-) diff --git a/actors/actor_system.go b/actors/actor_system.go index 9bef5445..49f02bdc 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -65,32 +65,32 @@ type ActorSystem interface { // Name returns the actor system name Name() string // Actors returns the list of Actors that are alive in the actor system - Actors() []PID + Actors() []*PID // Start starts the actor system Start(ctx context.Context) error // Stop stops the actor system Stop(ctx context.Context) error // Spawn creates an actor in the system and starts it - Spawn(ctx context.Context, name string, actor Actor) (PID, error) + Spawn(ctx context.Context, name string, actor Actor) (*PID, error) // SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks // in the given optional options - SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) + SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) // SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks // in the given optional options - SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) + SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) // SpawnRouter creates a new router. One can additionally set the router options. // A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel. // A single actor will only process one message at a time. - SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (PID, error) + SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error) // Kill stops a given actor in the system Kill(ctx context.Context, name string) error // ReSpawn recreates a given actor in the system - ReSpawn(ctx context.Context, name string) (PID, error) + ReSpawn(ctx context.Context, name string) (*PID, error) // NumActors returns the total number of active actors in the system NumActors() uint64 // LocalActor returns the reference of a local actor. // A local actor is an actor that reside on the same node where the given actor system is running - LocalActor(actorName string) (PID, error) + LocalActor(actorName string) (*PID, error) // RemoteActor returns the address of a remote actor when cluster is enabled // When the cluster mode is not enabled an actor not found error will be returned // One can always check whether cluster is enabled before calling this method or just use the ActorOf method. @@ -99,7 +99,7 @@ type ActorSystem interface { // When cluster mode is activated, the PID will be nil. // When remoting is enabled this method will return and error // An actor not found error is return when the actor is not found. - ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid PID, err error) + ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid *PID, err error) // InCluster states whether the actor system is running within a cluster of nodes InCluster() bool // GetPartition returns the partition where a given actor is located @@ -111,14 +111,14 @@ type ActorSystem interface { // ScheduleOnce schedules a message that will be delivered to the receiver actor // This will send the given message to the actor after the given interval specified. // The message will be sent once - ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error + ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error // RemoteScheduleOnce schedules a message to be sent to a remote actor in the future. // This requires remoting to be enabled on the actor system. // This will send the given message to the actor after the given interval specified // The message will be sent once RemoteScheduleOnce(ctx context.Context, message proto.Message, address *goaktpb.Address, interval time.Duration) error // ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression. - ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error + ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error // RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression. RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *goaktpb.Address, cronExpression string) error // PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system. @@ -132,13 +132,13 @@ type ActorSystem interface { Logger() log.Logger // handleRemoteAsk handles a synchronous message to another actor and expect a response. // This block until a response is received or timed out. - handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) + handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) // handleRemoteTell handles an asynchronous message to an actor - handleRemoteTell(ctx context.Context, to PID, message proto.Message) error + handleRemoteTell(ctx context.Context, to *PID, message proto.Message) error // setActor sets actor in the actor system actors registry - setActor(actor PID) + setActor(actor *PID) // supervisor return the system supervisor - getSupervisor() PID + getSupervisor() *PID } // ActorSystem represent a collection of actors on a given node @@ -221,7 +221,7 @@ type actorSystem struct { clusterConfig *ClusterConfig redistributionChan chan *cluster.Event - supervisor PID + supervisor *PID } // enforce compilation error when all methods of the ActorSystem interface are not implemented @@ -330,7 +330,7 @@ func (x *actorSystem) Register(_ context.Context, actor Actor) error { // ScheduleOnce schedules a message that will be delivered to the receiver actor // This will send the given message to the actor after the given interval specified. // The message will be sent once -func (x *actorSystem) ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error { +func (x *actorSystem) ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error { return x.scheduler.ScheduleOnce(ctx, message, pid, interval) } @@ -343,7 +343,7 @@ func (x *actorSystem) RemoteScheduleOnce(ctx context.Context, message proto.Mess } // ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression. -func (x *actorSystem) ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error { +func (x *actorSystem) ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error { return x.scheduler.ScheduleWithCron(ctx, message, pid, cronExpression) } @@ -392,7 +392,7 @@ func (x *actorSystem) NumActors() uint64 { } // Spawn creates or returns the instance of a given actor in the system -func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID, error) { +func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (*PID, error) { if !x.started.Load() { return nil, ErrActorSystemNotStarted } @@ -423,7 +423,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID, // SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks // in the given optional options -func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) { +func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) { if !x.started.Load() { return nil, ErrActorSystemNotStarted } @@ -440,14 +440,14 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei } // SpawnFromFunc creates an actor with the given receive function. -func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) { +func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) { return x.SpawnNamedFromFunc(ctx, uuid.NewString(), receiveFunc, opts...) } // SpawnRouter creates a new router. One can additionally set the router options. // A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel. // A single actor will only process one message at a time. -func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (PID, error) { +func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error) { router := newRouter(poolSize, routeesKind, x.logger, opts...) routerName := x.getSystemActorName(routerType) return x.Spawn(ctx, routerName, router) @@ -476,7 +476,7 @@ func (x *actorSystem) Kill(ctx context.Context, name string) error { } // ReSpawn recreates a given actor in the system -func (x *actorSystem) ReSpawn(ctx context.Context, name string) (PID, error) { +func (x *actorSystem) ReSpawn(ctx context.Context, name string) (*PID, error) { if !x.started.Load() { return nil, ErrActorSystemNotStarted } @@ -509,11 +509,11 @@ func (x *actorSystem) Name() string { } // Actors returns the list of Actors that are alive in the actor system -func (x *actorSystem) Actors() []PID { +func (x *actorSystem) Actors() []*PID { x.locker.Lock() pids := x.actors.pids() x.locker.Unlock() - actors := make([]PID, 0, len(pids)) + actors := make([]*PID, 0, len(pids)) for _, pid := range pids { if !isSystemName(pid.Name()) { actors = append(actors, pid) @@ -537,7 +537,7 @@ func (x *actorSystem) PeerAddress() string { // When cluster mode is activated, the PID will be nil. // When remoting is enabled this method will return and error // An actor not found error is return when the actor is not found. -func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid PID, err error) { +func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid *PID, err error) { x.locker.Lock() defer x.locker.Unlock() @@ -579,7 +579,7 @@ func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goak // LocalActor returns the reference of a local actor. // A local actor is an actor that reside on the same node where the given actor system is running -func (x *actorSystem) LocalActor(actorName string) (PID, error) { +func (x *actorSystem) LocalActor(actorName string) (*PID, error) { x.locker.Lock() defer x.locker.Unlock() @@ -1022,17 +1022,17 @@ func (x *actorSystem) GetKinds(_ context.Context, request *connect.Request[inter // handleRemoteAsk handles a synchronous message to another actor and expect a response. // This block until a response is received or timed out. -func (x *actorSystem) handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) { +func (x *actorSystem) handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) { return Ask(ctx, to, message, timeout) } // handleRemoteTell handles an asynchronous message to an actor -func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message proto.Message) error { +func (x *actorSystem) handleRemoteTell(ctx context.Context, to *PID, message proto.Message) error { return Tell(ctx, to, message) } // getSupervisor return the system supervisor -func (x *actorSystem) getSupervisor() PID { +func (x *actorSystem) getSupervisor() *PID { x.locker.Lock() supervisor := x.supervisor x.locker.Unlock() @@ -1040,7 +1040,7 @@ func (x *actorSystem) getSupervisor() PID { } // setActor implements ActorSystem. -func (x *actorSystem) setActor(actor PID) { +func (x *actorSystem) setActor(actor *PID) { x.actors.set(actor) if x.clusterEnabled.Load() { x.actorsChan <- &internalpb.WireActor{ @@ -1365,7 +1365,7 @@ func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer) // configPID constructs a PID provided the actor name and the actor kind // this is a utility function used when spawning actors -func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (PID, error) { +func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (*PID, error) { actorPath := NewPath(name, NewAddress(x.name, "", -1)) if x.remotingEnabled.Load() { actorPath = NewPath(name, NewAddress(x.name, x.remotingHost, int(x.remotingPort))) diff --git a/actors/actor_test.go b/actors/actor_test.go index e9c5bc01..30ade34a 100644 --- a/actors/actor_test.go +++ b/actors/actor_test.go @@ -372,7 +372,7 @@ func (x *testRestart) PostStop(context.Context) error { var _ Actor = &testRestart{} type forwarder struct { - actorRef PID + actorRef *PID } func (x *forwarder) PreStart(context.Context) error { diff --git a/actors/api.go b/actors/api.go index 863ce46a..1ea5ae13 100644 --- a/actors/api.go +++ b/actors/api.go @@ -43,7 +43,7 @@ import ( // Ask sends a synchronous message to another actor and expect a response. // This block until a response is received or timed out. -func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) { +func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) { if !to.IsRunning() { return nil, ErrDead } @@ -77,7 +77,7 @@ func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Durati } // Tell sends an asynchronous message to an actor -func Tell(ctx context.Context, to PID, message proto.Message) error { +func Tell(ctx context.Context, to *PID, message proto.Message) error { if !to.IsRunning() { return ErrDead } @@ -105,7 +105,7 @@ func Tell(ctx context.Context, to PID, message proto.Message) error { } // BatchTell sends bulk asynchronous messages to an actor -func BatchTell(ctx context.Context, to PID, messages ...proto.Message) error { +func BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error { if !to.IsRunning() { return ErrDead } @@ -122,7 +122,7 @@ func BatchTell(ctx context.Context, to PID, messages ...proto.Message) error { // BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. // The messages will be processed one after the other in the order they are sent // This is a design choice to follow the simple principle of one message at a time processing by actors. -func BatchAsk(ctx context.Context, to PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error) { +func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error) { if !to.IsRunning() { return nil, ErrDead } diff --git a/actors/api_test.go b/actors/api_test.go index aa9634ae..e890b49f 100644 --- a/actors/api_test.go +++ b/actors/api_test.go @@ -1596,7 +1596,7 @@ func TestAPIRemoteReSpawn(t *testing.T) { assert.NotNil(t, actorRef) // assert the actor restart count - pid := actorRef.(*pid) + pid := actorRef assert.Zero(t, pid.restartCount.Load()) // get the address of the actor @@ -1687,7 +1687,7 @@ func TestAPIRemoteStop(t *testing.T) { assert.NotNil(t, actorRef) // assert the actor restart count - pid := actorRef.(*pid) + pid := actorRef assert.Zero(t, pid.restartCount.Load()) // get the address of the actor diff --git a/actors/func_actor.go b/actors/func_actor.go index 37bd8977..df1901d2 100644 --- a/actors/func_actor.go +++ b/actors/func_actor.go @@ -73,7 +73,7 @@ func WithPostStop(fn PostStopFunc) FuncOption { // funcActor is an actor that only handles messages type funcActor struct { - pid PID + pid *PID id string receiveFunc ReceiveFunc preStart PreStartFunc diff --git a/actors/pid.go b/actors/pid.go index 633152ca..0b7a0dda 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -58,7 +58,7 @@ import ( // watcher is used to handle parent child relationship. // This helps handle error propagation from a child actor using any of supervisory strategies type watcher struct { - WatcherID PID // the WatcherID of the actor watching + WatcherID *PID // the WatcherID of the actor watching ErrChan chan error // ErrChan the channel where to pass error message Done chan types.Unit // Done when watching is completed } @@ -66,129 +66,15 @@ type watcher struct { // taskCompletion is used to track completions' taskCompletion // to pipe the result to the appropriate PID type taskCompletion struct { - Receiver PID + Receiver *PID Task future.Task } -// PID defines the various actions one can perform on a given actor -type PID interface { - // ID is a convenient method that returns the actor unique identifier - // An actor unique identifier is its address in the actor system. - ID() string - // Name returns the actor given name - Name() string - // Shutdown gracefully shuts down the given actor - // All current messages in the mailbox will be processed before the actor shutdown after a period of time - // that can be configured. All child actors will be gracefully shutdown. - Shutdown(ctx context.Context) error - // IsRunning returns true when the actor is running ready to process public and false - // when the actor is stopped or not started at all - IsRunning() bool - // SpawnChild creates a child actor - // When the given child actor already exists its PID will only be returned - SpawnChild(ctx context.Context, name string, actor Actor) (PID, error) - // Restart restarts the actor - Restart(ctx context.Context) error - // Watch an actor - Watch(pid PID) - // UnWatch stops watching a given actor - UnWatch(pid PID) - // ActorSystem returns the underlying actor system - ActorSystem() ActorSystem - // ActorPath returns the path of the actor - ActorPath() *Path - // ActorHandle returns the underlying actor - ActorHandle() Actor - // Tell sends an asynchronous message to another PID - Tell(ctx context.Context, to PID, message proto.Message) error - // BatchTell sends an asynchronous bunch of messages to the given PID - // The messages will be processed one after the other in the order they are sent - // This is a design choice to follow the simple principle of one message at a time processing by actors. - // When BatchTell encounter a single message it will fall back to a Tell call. - BatchTell(ctx context.Context, to PID, messages ...proto.Message) error - // Ask sends a synchronous message to another actor and expect a response. - Ask(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error) - // BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. - // The messages will be processed one after the other in the order they are sent - // This is a design choice to follow the simple principle of one message at a time processing by actors. - // This can hinder performance if it is not properly used. - BatchAsk(ctx context.Context, to PID, messages ...proto.Message) (responses chan proto.Message, err error) - // RemoteTell sends a message to an actor remotely without expecting any reply - RemoteTell(ctx context.Context, to *goaktpb.Address, message proto.Message) error - // RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner - // Messages are processed one after the other in the order they are sent. - RemoteBatchTell(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) error - // RemoteAsk is used to send a message to an actor remotely and expect a response - // immediately. With this type of message the receiver cannot communicate back to Sender - // except reply the message with a response. This one-way communication. - RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto.Message) (response *anypb.Any, err error) - // RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. - // Messages are processed one after the other in the order they are sent. - // This can hinder performance if it is not properly used. - RemoteBatchAsk(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) (responses []*anypb.Any, err error) - // RemoteLookup look for an actor address on a remote node. - RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error) - // RemoteReSpawn restarts an actor on a remote node. - RemoteReSpawn(ctx context.Context, host string, port int, name string) error - // RemoteStop stops an actor on a remote node - RemoteStop(ctx context.Context, host string, port int, name string) error - // RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem - RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error - // Children returns the list of all the direct children of the given actor - // Only alive actors are included in the list or an empty list is returned - Children() []PID - // Parents returns the list of all direct parents of a given actor. - // Only alive actors are included in the list or an empty list is returned - Parents() []PID - // Child returns the named child actor if it is alive - Child(name string) (PID, error) - // Stop forces the child Actor under the given name to terminate after it finishes processing its current message. - // Nothing happens if child is already stopped. - Stop(ctx context.Context, pid PID) error - // StashSize returns the stash buffer size - StashSize() uint64 - // Equals is a convenient method to compare two PIDs - Equals(to PID) bool - // PipeTo processes a long-running task and pipes the result to the provided actor. - // The successful result of the task will be put onto the provided actor mailbox. - // This is useful when interacting with external services. - // It’s common that you would like to use the value of the response in the actor when the long-running task is completed - PipeTo(ctx context.Context, to PID, task future.Task) error - // push a message to the actor's receiveContextBuffer - doReceive(ctx *ReceiveContext) - // watchers returns the list of actors watching this actor - watchers() *slice.Slice[*watcher] - // watchees returns the list of actors watched by this actor - watchees() *pidMap - // setBehavior is a utility function that helps set the actor behavior - setBehavior(behavior Behavior) - // setBehaviorStacked adds a behavior to the actor's behaviors - setBehaviorStacked(behavior Behavior) - // unsetBehaviorStacked sets the actor's behavior to the previous behavior - // prior to setBehaviorStacked is called - unsetBehaviorStacked() - // resetBehavior is a utility function resets the actor behavior - resetBehavior() - // stash adds the current message to the stash buffer - stash(ctx *ReceiveContext) error - // unstashAll unstashes all messages from the stash buffer and prepends in the mailbox - // (it keeps the messages in the same order as received, unstashing older messages before newer) - unstashAll() error - // unstash unstashes the oldest message in the stash and prepends to the mailbox - unstash() error - // toDeadletterQueue add the given message to the deadletters queue - toDeadletterQueue(receiveCtx *ReceiveContext, err error) - // removeChild is a utility function to remove child actor - removeChild(pid PID) - - getLastProcessingTime() time.Time - setLastProcessingDuration(d time.Duration) -} - -// pid specifies an actor unique process -// With the pid one can send a ReceiveContext to the actor -type pid struct { - Actor +// PID specifies an actor unique process +// With the PID one can send a ReceiveContext to the actor +type PID struct { + // specifies the message processor + actor Actor // specifies the actor path actorPath *Path @@ -276,11 +162,8 @@ type pid struct { metrics *metric.ActorMetric } -// enforce compilation error -var _ PID = (*pid)(nil) - // newPID creates a new pid -func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption) (*pid, error) { +func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption) (*PID, error) { // actor path is required if actorPath == nil { return nil, errors.New("actorPath is required") @@ -291,8 +174,8 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption return nil, err } - p := &pid{ - Actor: actor, + p := &PID{ + actor: actor, lastProcessingTime: atomic.Time{}, haltPassivationLnr: make(chan types.Unit, 1), logger: log.DefaultLogger, @@ -329,7 +212,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption } behaviorStack := newBehaviorStack() - behaviorStack.Push(p.Receive) + behaviorStack.Push(p.actor.Receive) p.behaviorStack = behaviorStack if err := p.init(ctx); err != nil { @@ -361,27 +244,27 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption // ID is a convenient method that returns the actor unique identifier // An actor unique identifier is its address in the actor system. -func (x *pid) ID() string { +func (x *PID) ID() string { return x.ActorPath().String() } // Name returns the actor given name -func (x *pid) Name() string { +func (x *PID) Name() string { return x.ActorPath().Name() } // Equals is a convenient method to compare two PIDs -func (x *pid) Equals(to PID) bool { +func (x *PID) Equals(to *PID) bool { return strings.EqualFold(x.ID(), to.ID()) } // ActorHandle returns the underlying Actor -func (x *pid) ActorHandle() Actor { - return x.Actor +func (x *PID) ActorHandle() Actor { + return x.actor } // Child returns the named child actor if it is alive -func (x *pid) Child(name string) (PID, error) { +func (x *PID) Child(name string) (*PID, error) { if !x.IsRunning() { return nil, ErrDead } @@ -395,11 +278,11 @@ func (x *pid) Child(name string) (PID, error) { // Parents returns the list of all direct parents of a given actor. // Only alive actors are included in the list or an empty list is returned -func (x *pid) Parents() []PID { +func (x *PID) Parents() []*PID { x.fieldsLocker.Lock() watchers := x.watchersList x.fieldsLocker.Unlock() - var parents []PID + var parents []*PID if watchers.Len() > 0 { for _, item := range watchers.Items() { watcher := item.Value @@ -416,12 +299,12 @@ func (x *pid) Parents() []PID { // Children returns the list of all the direct children of the given actor // Only alive actors are included in the list or an empty list is returned -func (x *pid) Children() []PID { +func (x *PID) Children() []*PID { x.fieldsLocker.RLock() children := x.children.pids() x.fieldsLocker.RUnlock() - cids := make([]PID, 0, len(children)) + cids := make([]*PID, 0, len(children)) for _, child := range children { if child.IsRunning() { cids = append(cids, child) @@ -433,7 +316,7 @@ func (x *pid) Children() []PID { // Stop forces the child Actor under the given name to terminate after it finishes processing its current message. // Nothing happens if child is already stopped. -func (x *pid) Stop(ctx context.Context, cid PID) error { +func (x *PID) Stop(ctx context.Context, cid *PID) error { if !x.IsRunning() { return ErrDead } @@ -459,12 +342,12 @@ func (x *pid) Stop(ctx context.Context, cid PID) error { // IsRunning returns true when the actor is alive ready to process messages and false // when the actor is stopped or not started at all -func (x *pid) IsRunning() bool { +func (x *PID) IsRunning() bool { return x != nil && x != NoSender && x.running.Load() } // ActorSystem returns the actor system -func (x *pid) ActorSystem() ActorSystem { +func (x *PID) ActorSystem() ActorSystem { x.fieldsLocker.RLock() sys := x.system x.fieldsLocker.RUnlock() @@ -472,7 +355,7 @@ func (x *pid) ActorSystem() ActorSystem { } // ActorPath returns the path of the actor -func (x *pid) ActorPath() *Path { +func (x *PID) ActorPath() *Path { x.fieldsLocker.RLock() path := x.actorPath x.fieldsLocker.RUnlock() @@ -481,7 +364,7 @@ func (x *pid) ActorPath() *Path { // Restart restarts the actor. // During restart all messages that are in the mailbox and not yet processed will be ignored -func (x *pid) Restart(ctx context.Context) error { +func (x *PID) Restart(ctx context.Context) error { if x == nil || x.ActorPath() == nil { return ErrUndefinedActor } @@ -529,7 +412,7 @@ func (x *pid) Restart(ctx context.Context) error { // SpawnChild creates a child actor and start watching it for error // When the given child actor already exists its PID will only be returned -func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, error) { +func (x *PID) SpawnChild(ctx context.Context, name string, actor Actor) (*PID, error) { if !x.IsRunning() { return nil, ErrDead } @@ -598,7 +481,7 @@ func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er } // StashSize returns the stash buffer size -func (x *pid) StashSize() uint64 { +func (x *PID) StashSize() uint64 { if x.stashBuffer == nil { return 0 } @@ -609,7 +492,7 @@ func (x *pid) StashSize() uint64 { // The successful result of the task will be put onto the provided actor mailbox. // This is useful when interacting with external services. // It’s common that you would like to use the value of the response in the actor when the long-running task is completed -func (x *pid) PipeTo(ctx context.Context, to PID, task future.Task) error { +func (x *PID) PipeTo(ctx context.Context, to *PID, task future.Task) error { if task == nil { return ErrUndefinedTask } @@ -628,7 +511,7 @@ func (x *pid) PipeTo(ctx context.Context, to PID, task future.Task) error { // Ask sends a synchronous message to another actor and expect a response. // This block until a response is received or timed out. -func (x *pid) Ask(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error) { +func (x *PID) Ask(ctx context.Context, to *PID, message proto.Message) (response proto.Message, err error) { if !to.IsRunning() { return nil, ErrDead } @@ -650,7 +533,7 @@ func (x *pid) Ask(ctx context.Context, to PID, message proto.Message) (response } // Tell sends an asynchronous message to another PID -func (x *pid) Tell(ctx context.Context, to PID, message proto.Message) error { +func (x *PID) Tell(ctx context.Context, to *PID, message proto.Message) error { if !to.IsRunning() { return ErrDead } @@ -664,7 +547,7 @@ func (x *pid) Tell(ctx context.Context, to PID, message proto.Message) error { // The messages will be processed one after the other in the order they are sent. // This is a design choice to follow the simple principle of one message at a time processing by actors. // When BatchTell encounter a single message it will fall back to a Tell call. -func (x *pid) BatchTell(ctx context.Context, to PID, messages ...proto.Message) error { +func (x *PID) BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error { if !to.IsRunning() { return ErrDead } @@ -684,7 +567,7 @@ func (x *pid) BatchTell(ctx context.Context, to PID, messages ...proto.Message) // BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. // The messages will be processed one after the other in the order they are sent. // This is a design choice to follow the simple principle of one message at a time processing by actors. -func (x *pid) BatchAsk(ctx context.Context, to PID, messages ...proto.Message) (responses chan proto.Message, err error) { +func (x *PID) BatchAsk(ctx context.Context, to *PID, messages ...proto.Message) (responses chan proto.Message, err error) { if !to.IsRunning() { return nil, ErrDead } @@ -703,7 +586,7 @@ func (x *pid) BatchAsk(ctx context.Context, to PID, messages ...proto.Message) ( } // RemoteLookup look for an actor address on a remote node. -func (x *pid) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error) { +func (x *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error) { remoteClient, err := x.remotingClient(host, port) if err != nil { return nil, err @@ -728,7 +611,7 @@ func (x *pid) RemoteLookup(ctx context.Context, host string, port int, name stri } // RemoteTell sends a message to an actor remotely without expecting any reply -func (x *pid) RemoteTell(ctx context.Context, to *goaktpb.Address, message proto.Message) error { +func (x *PID) RemoteTell(ctx context.Context, to *goaktpb.Address, message proto.Message) error { marshaled, err := anypb.New(message) if err != nil { return err @@ -779,7 +662,7 @@ func (x *pid) RemoteTell(ctx context.Context, to *goaktpb.Address, message proto } // RemoteAsk sends a synchronous message to another actor remotely and expect a response. -func (x *pid) RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto.Message) (response *anypb.Any, err error) { +func (x *PID) RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto.Message) (response *anypb.Any, err error) { marshaled, err := anypb.New(message) if err != nil { return nil, err @@ -847,7 +730,7 @@ func (x *pid) RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto. // RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner // Messages are processed one after the other in the order they are sent. -func (x *pid) RemoteBatchTell(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) error { +func (x *PID) RemoteBatchTell(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) error { if len(messages) == 1 { return x.RemoteTell(ctx, to, messages[0]) } @@ -907,7 +790,7 @@ func (x *pid) RemoteBatchTell(ctx context.Context, to *goaktpb.Address, messages // RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. // Messages are processed one after the other in the order they are sent. // This can hinder performance if it is not properly used. -func (x *pid) RemoteBatchAsk(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) (responses []*anypb.Any, err error) { +func (x *PID) RemoteBatchAsk(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) (responses []*anypb.Any, err error) { senderPath := x.ActorPath() senderAddress := senderPath.Address() @@ -979,7 +862,7 @@ func (x *pid) RemoteBatchAsk(ctx context.Context, to *goaktpb.Address, messages } // RemoteStop stops an actor on a remote node -func (x *pid) RemoteStop(ctx context.Context, host string, port int, name string) error { +func (x *PID) RemoteStop(ctx context.Context, host string, port int, name string) error { remoteService, err := x.remotingClient(host, port) if err != nil { return err @@ -1003,7 +886,7 @@ func (x *pid) RemoteStop(ctx context.Context, host string, port int, name string } // RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem -func (x *pid) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error { +func (x *PID) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error { remoteService, err := x.remotingClient(host, port) if err != nil { return err @@ -1033,7 +916,7 @@ func (x *pid) RemoteSpawn(ctx context.Context, host string, port int, name, acto } // RemoteReSpawn restarts an actor on a remote node. -func (x *pid) RemoteReSpawn(ctx context.Context, host string, port int, name string) error { +func (x *PID) RemoteReSpawn(ctx context.Context, host string, port int, name string) error { remoteService, err := x.remotingClient(host, port) if err != nil { return err @@ -1059,7 +942,7 @@ func (x *pid) RemoteReSpawn(ctx context.Context, host string, port int, name str // Shutdown gracefully shuts down the given actor // All current messages in the mailbox will be processed before the actor shutdown after a period of time // that can be configured. All child actors will be gracefully shutdown. -func (x *pid) Shutdown(ctx context.Context) error { +func (x *PID) Shutdown(ctx context.Context) error { x.stopLocker.Lock() defer x.stopLocker.Unlock() @@ -1092,7 +975,7 @@ func (x *pid) Shutdown(ctx context.Context) error { } // Watch a pid for errors, and send on the returned channel if an error occurred -func (x *pid) Watch(cid PID) { +func (x *PID) Watch(cid *PID) { w := &watcher{ WatcherID: x, ErrChan: make(chan error, 1), @@ -1104,7 +987,7 @@ func (x *pid) Watch(cid PID) { } // UnWatch stops watching a given actor -func (x *pid) UnWatch(pid PID) { +func (x *PID) UnWatch(pid *PID) { for _, item := range pid.watchers().Items() { w := item.Value if w.WatcherID.Equals(x) { @@ -1117,24 +1000,24 @@ func (x *pid) UnWatch(pid PID) { } // watchers return the list of watchersList -func (x *pid) watchers() *slice.Slice[*watcher] { +func (x *PID) watchers() *slice.Slice[*watcher] { return x.watchersList } // watchees returns the list of actors watched by this actor -func (x *pid) watchees() *pidMap { +func (x *PID) watchees() *pidMap { return x.watchedList } // doReceive pushes a given message to the actor receiveContextBuffer -func (x *pid) doReceive(receiveCtx *ReceiveContext) { +func (x *PID) doReceive(receiveCtx *ReceiveContext) { x.lastProcessingTime.Store(time.Now()) x.mailbox.Push(receiveCtx) } // init initializes the given actor and init processing messages // when the initialization failed the actor will not be started -func (x *pid) init(ctx context.Context) error { +func (x *PID) init(ctx context.Context) error { x.logger.Info("Initialization process has started...") cancelCtx, cancel := context.WithTimeout(ctx, x.initTimeout.Load()) @@ -1143,7 +1026,7 @@ func (x *pid) init(ctx context.Context) error { // create a new retrier that will try a maximum of `initMaxRetries` times, with // an initial delay of 100 ms and a maximum delay of 1 second retrier := retry.NewRetrier(int(x.initMaxRetries.Load()), 100*time.Millisecond, time.Second) - if err := retrier.RunContext(cancelCtx, x.Actor.PreStart); err != nil { + if err := retrier.RunContext(cancelCtx, x.actor.PreStart); err != nil { e := ErrInitFailure(err) return e } @@ -1165,7 +1048,7 @@ func (x *pid) init(ctx context.Context) error { } // reset re-initializes the actor PID -func (x *pid) reset() { +func (x *PID) reset() { x.lastProcessingTime.Store(time.Time{}) x.passivateAfter.Store(DefaultPassivationTimeout) x.askTimeout.Store(DefaultAskTimeout) @@ -1186,7 +1069,7 @@ func (x *pid) reset() { x.processedCount.Store(0) } -func (x *pid) freeWatchers(ctx context.Context) { +func (x *PID) freeWatchers(ctx context.Context) { x.logger.Debug("freeing all watcher actors...") watchers := x.watchers() if watchers.Len() > 0 { @@ -1207,7 +1090,7 @@ func (x *pid) freeWatchers(ctx context.Context) { } } -func (x *pid) freeWatchees(ctx context.Context) { +func (x *PID) freeWatchees(ctx context.Context) { x.logger.Debug("freeing all watched actors...") for _, watched := range x.watchedList.pids() { x.logger.Debugf("watcher=(%s) unwatching actor=(%s)", x.ID(), watched.ID()) @@ -1221,7 +1104,7 @@ func (x *pid) freeWatchees(ctx context.Context) { } } -func (x *pid) freeChildren(ctx context.Context) { +func (x *PID) freeChildren(ctx context.Context) { x.logger.Debug("freeing all child actors...") for _, child := range x.Children() { x.logger.Debugf("parent=(%s) disowning child=(%s)", x.ID(), child.ID()) @@ -1238,7 +1121,7 @@ func (x *pid) freeChildren(ctx context.Context) { } // receive extracts every message from the actor mailbox -func (x *pid) receive() { +func (x *PID) receive() { for x.running.Load() { received := x.mailbox.Pop() if received == nil { @@ -1257,7 +1140,7 @@ func (x *pid) receive() { } // handleReceived picks the right behavior and processes the message -func (x *pid) handleReceived(received *ReceiveContext) { +func (x *PID) handleReceived(received *ReceiveContext) { // handle panic or any error during processing defer x.recovery(received) // pick the current behvior from the stack @@ -1271,7 +1154,7 @@ func (x *pid) handleReceived(received *ReceiveContext) { } // recovery is called upon after message is processed -func (x *pid) recovery(received *ReceiveContext) { +func (x *PID) recovery(received *ReceiveContext) { if r := recover(); r != nil { x.notifyWatchers(fmt.Errorf("%s", r)) return @@ -1282,7 +1165,7 @@ func (x *pid) recovery(received *ReceiveContext) { // passivationLoop checks whether the actor is processing public or not. // when the actor is idle, it automatically shuts down to free resources -func (x *pid) passivationLoop() { +func (x *PID) passivationLoop() { x.logger.Info("start the passivation listener...") x.logger.Infof("passivation timeout is (%s)", x.passivateAfter.Load().String()) ticker := time.NewTicker(x.passivateAfter.Load()) @@ -1334,7 +1217,7 @@ func (x *pid) passivationLoop() { } // setBehavior is a utility function that helps set the actor behavior -func (x *pid) setBehavior(behavior Behavior) { +func (x *PID) setBehavior(behavior Behavior) { x.fieldsLocker.Lock() x.behaviorStack.Reset() x.behaviorStack.Push(behavior) @@ -1342,14 +1225,14 @@ func (x *pid) setBehavior(behavior Behavior) { } // resetBehavior is a utility function resets the actor behavior -func (x *pid) resetBehavior() { +func (x *PID) resetBehavior() { x.fieldsLocker.Lock() - x.behaviorStack.Push(x.Receive) + x.behaviorStack.Push(x.actor.Receive) x.fieldsLocker.Unlock() } // setBehaviorStacked adds a behavior to the actor's behaviorStack -func (x *pid) setBehaviorStacked(behavior Behavior) { +func (x *PID) setBehaviorStacked(behavior Behavior) { x.fieldsLocker.Lock() x.behaviorStack.Push(behavior) x.fieldsLocker.Unlock() @@ -1357,14 +1240,14 @@ func (x *pid) setBehaviorStacked(behavior Behavior) { // unsetBehaviorStacked sets the actor's behavior to the previous behavior // prior to setBehaviorStacked is called -func (x *pid) unsetBehaviorStacked() { +func (x *PID) unsetBehaviorStacked() { x.fieldsLocker.Lock() x.behaviorStack.Pop() x.fieldsLocker.Unlock() } // doStop stops the actor -func (x *pid) doStop(ctx context.Context) error { +func (x *PID) doStop(ctx context.Context) error { x.running.Store(false) // TODO: just signal stash processing done and ignore the messages or process them @@ -1406,11 +1289,11 @@ func (x *pid) doStop(ctx context.Context) error { x.logger.Infof("Shutdown process is on going for actor=%s...", x.ActorPath().String()) x.reset() - return x.Actor.PostStop(ctx) + return x.actor.PostStop(ctx) } // removeChild helps remove child actor -func (x *pid) removeChild(pid PID) { +func (x *PID) removeChild(pid *PID) { if !x.IsRunning() { return } @@ -1429,7 +1312,7 @@ func (x *pid) removeChild(pid PID) { } // notifyWatchers send error to watchers -func (x *pid) notifyWatchers(err error) { +func (x *PID) notifyWatchers(err error) { if err != nil { for _, item := range x.watchers().Items() { item.Value.ErrChan <- err @@ -1438,7 +1321,7 @@ func (x *pid) notifyWatchers(err error) { } // toDeadletterQueue sends message to deadletter queue -func (x *pid) toDeadletterQueue(receiveCtx *ReceiveContext, err error) { +func (x *PID) toDeadletterQueue(receiveCtx *ReceiveContext, err error) { // the message is lost if x.eventsStream == nil { return @@ -1468,7 +1351,7 @@ func (x *pid) toDeadletterQueue(receiveCtx *ReceiveContext, err error) { } // registerMetrics register the PID metrics with OTel instrumentation. -func (x *pid) registerMetrics() error { +func (x *PID) registerMetrics() error { meter := x.telemetry.Meter metrics := x.metrics _, err := meter.RegisterCallback(func(_ context.Context, observer otelmetric.Observer) error { @@ -1486,7 +1369,7 @@ func (x *pid) registerMetrics() error { } // clientOptions returns the gRPC client connections options -func (x *pid) clientOptions() ([]connect.ClientOption, error) { +func (x *PID) clientOptions() ([]connect.ClientOption, error) { var interceptor *otelconnect.Interceptor var err error if x.metricEnabled.Load() { @@ -1506,7 +1389,7 @@ func (x *pid) clientOptions() ([]connect.ClientOption, error) { } // remotingClient returns an instance of the Remote Service client -func (x *pid) remotingClient(host string, port int) (internalpbconnect.RemotingServiceClient, error) { +func (x *PID) remotingClient(host string, port int) (internalpbconnect.RemotingServiceClient, error) { clientOptions, err := x.clientOptions() if err != nil { return nil, err @@ -1520,7 +1403,7 @@ func (x *pid) remotingClient(host string, port int) (internalpbconnect.RemotingS } // getLastProcessingTime returns the last processing time -func (x *pid) getLastProcessingTime() time.Time { +func (x *PID) getLastProcessingTime() time.Time { x.processingTimeLocker.Lock() processingTime := x.lastProcessingTime.Load() x.processingTimeLocker.Unlock() @@ -1528,7 +1411,7 @@ func (x *pid) getLastProcessingTime() time.Time { } // setLastProcessingDuration sets the last processing duration -func (x *pid) setLastProcessingDuration(d time.Duration) { +func (x *PID) setLastProcessingDuration(d time.Duration) { x.processingTimeLocker.Lock() x.lastReceivedDuration.Store(d) x.processingTimeLocker.Unlock() @@ -1536,7 +1419,7 @@ func (x *pid) setLastProcessingDuration(d time.Duration) { // handleCompletion processes a long-running task and pipe the result to // the completion receiver -func (x *pid) handleCompletion(ctx context.Context, completion *taskCompletion) { +func (x *PID) handleCompletion(ctx context.Context, completion *taskCompletion) { // defensive programming if completion == nil || completion.Receiver == nil || @@ -1577,7 +1460,7 @@ func (x *pid) handleCompletion(ctx context.Context, completion *taskCompletion) } // supervise watches for child actor's failure and act based upon the supervisory strategy -func (x *pid) supervise(cid PID, watcher *watcher) { +func (x *PID) supervise(cid *PID, watcher *watcher) { for { select { case <-watcher.Done: @@ -1600,7 +1483,7 @@ func (x *pid) supervise(cid PID, watcher *watcher) { } // handleStopDirective handles the testSupervisor stop directive -func (x *pid) handleStopDirective(cid PID) { +func (x *PID) handleStopDirective(cid *PID) { x.UnWatch(cid) x.children.delete(cid.ActorPath()) if err := cid.Shutdown(context.Background()); err != nil { @@ -1612,7 +1495,7 @@ func (x *pid) handleStopDirective(cid PID) { } // handleRestartDirective handles the testSupervisor restart directive -func (x *pid) handleRestartDirective(cid PID, maxRetries uint32, timeout time.Duration) { +func (x *PID) handleRestartDirective(cid *PID, maxRetries uint32, timeout time.Duration) { x.UnWatch(cid) ctx := context.Background() var err error @@ -1636,7 +1519,7 @@ func (x *pid) handleRestartDirective(cid PID, maxRetries uint32, timeout time.Du x.Watch(cid) } -func (x *pid) recordLastReceivedDurationMetric(ctx context.Context) { +func (x *PID) recordLastReceivedDurationMetric(ctx context.Context) { x.lastReceivedDuration.Store(time.Since(x.lastProcessingTime.Load())) if x.metricEnabled.Load() { x.metrics.LastReceivedDuration().Record(ctx, x.lastReceivedDuration.Load().Milliseconds()) diff --git a/actors/pid_map.go b/actors/pid_map.go index 0c55a08a..55b76bf8 100644 --- a/actors/pid_map.go +++ b/actors/pid_map.go @@ -33,12 +33,12 @@ import ( type pidMap struct { mu *sync.RWMutex size atomic.Int32 - mappings map[string]PID + mappings map[string]*PID } func newPIDMap(cap int) *pidMap { return &pidMap{ - mappings: make(map[string]PID, cap), + mappings: make(map[string]*PID, cap), mu: &sync.RWMutex{}, } } @@ -49,7 +49,7 @@ func (m *pidMap) len() int { } // get retrieves a pid by its address -func (m *pidMap) get(path *Path) (pid PID, ok bool) { +func (m *pidMap) get(path *Path) (pid *PID, ok bool) { m.mu.RLock() pid, ok = m.mappings[path.String()] m.mu.RUnlock() @@ -57,7 +57,7 @@ func (m *pidMap) get(path *Path) (pid PID, ok bool) { } // set sets a pid in the map -func (m *pidMap) set(pid PID) { +func (m *pidMap) set(pid *PID) { m.mu.Lock() defer m.mu.Unlock() if pid != nil { @@ -75,9 +75,9 @@ func (m *pidMap) delete(addr *Path) { } // pids returns all actors as a slice -func (m *pidMap) pids() []PID { +func (m *pidMap) pids() []*PID { m.mu.Lock() - var out []PID + var out []*PID for _, prop := range m.mappings { out = append(out, prop) } @@ -87,6 +87,6 @@ func (m *pidMap) pids() []PID { func (m *pidMap) reset() { m.mu.Lock() - m.mappings = make(map[string]PID) + m.mappings = make(map[string]*PID) m.mu.Unlock() } diff --git a/actors/pid_map_test.go b/actors/pid_map_test.go index 68b9b01c..0b37e359 100644 --- a/actors/pid_map_test.go +++ b/actors/pid_map_test.go @@ -35,7 +35,7 @@ func TestPIDMap(t *testing.T) { // create the actor path actorPath := NewPath("Test", NewAddress("TestSys", "host", 444)) // create the PID - actorRef := &pid{actorPath: actorPath, fieldsLocker: &sync.RWMutex{}, stopLocker: &sync.Mutex{}} + actorRef := &PID{actorPath: actorPath, fieldsLocker: &sync.RWMutex{}, stopLocker: &sync.Mutex{}} // create a new PID map pidMap := newPIDMap(5) // add to the map @@ -49,7 +49,7 @@ func TestPIDMap(t *testing.T) { actual, ok := pidMap.get(actorPath) assert.True(t, ok) assert.NotNil(t, actual) - assert.IsType(t, new(pid), actual) + assert.IsType(t, new(PID), actual) // remove the pid from the map pidMap.delete(actorPath) // list the map diff --git a/actors/pid_option.go b/actors/pid_option.go index 4dd951b9..1993806d 100644 --- a/actors/pid_option.go +++ b/actors/pid_option.go @@ -33,11 +33,11 @@ import ( ) // pidOption represents the pid -type pidOption func(pid *pid) +type pidOption func(pid *PID) // withPassivationAfter sets the actor passivation time func withPassivationAfter(duration time.Duration) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.passivateAfter.Store(duration) } } @@ -45,28 +45,28 @@ func withPassivationAfter(duration time.Duration) pidOption { // withAskTimeout sets how long in seconds an actor should reply a command // in a receive-reply pattern func withAskTimeout(timeout time.Duration) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.askTimeout.Store(timeout) } } // withInitMaxRetries sets the number of times to retry an actor init process func withInitMaxRetries(max int) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.initMaxRetries.Store(int32(max)) } } // withCustomLogger sets the log func withCustomLogger(logger log.Logger) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.logger = logger } } // withActorSystem set the actor system of the pid func withActorSystem(sys ActorSystem) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.system = sys } } @@ -74,55 +74,55 @@ func withActorSystem(sys ActorSystem) pidOption { // withSupervisorDirective sets the supervisor strategy to used when dealing // with child actors func withSupervisorDirective(directive SupervisorDirective) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.supervisorDirective = directive } } // withShutdownTimeout sets the shutdown timeout func withShutdownTimeout(duration time.Duration) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.shutdownTimeout.Store(duration) } } // withNoPassivation disable passivation func withPassivationDisabled() pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.passivateAfter.Store(-1) } } // withTelemetry sets the custom telemetry func withTelemetry(telemetry *telemetry.Telemetry) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.telemetry = telemetry } } // withStash sets the actor's stash buffer func withStash() pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.stashBuffer = newMailbox() } } // withEventsStream set the events stream func withEventsStream(stream *eventstream.EventsStream) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.eventsStream = stream } } // withInitTimeout sets the init timeout func withInitTimeout(duration time.Duration) pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.initTimeout.Store(duration) } } func withMetric() pidOption { - return func(pid *pid) { + return func(pid *PID) { pid.metricEnabled.Store(true) } } diff --git a/actors/pid_option_test.go b/actors/pid_option_test.go index 973ddd1c..f185cf01 100644 --- a/actors/pid_option_test.go +++ b/actors/pid_option_test.go @@ -54,62 +54,62 @@ func TestPIDOptions(t *testing.T) { testCases := []struct { name string option pidOption - expected *pid + expected *PID }{ { name: "WithPassivationAfter", option: withPassivationAfter(time.Second), - expected: &pid{passivateAfter: atomicDuration}, + expected: &PID{passivateAfter: atomicDuration}, }, { name: "WithAskTimeout", option: withAskTimeout(time.Second), - expected: &pid{askTimeout: atomicDuration}, + expected: &PID{askTimeout: atomicDuration}, }, { name: "WithInitMaxRetries", option: withInitMaxRetries(5), - expected: &pid{initMaxRetries: atomicInt}, + expected: &PID{initMaxRetries: atomicInt}, }, { name: "WithLogger", option: withCustomLogger(log.DefaultLogger), - expected: &pid{logger: log.DefaultLogger}, + expected: &PID{logger: log.DefaultLogger}, }, { name: "WithSupervisorStrategy", option: withSupervisorDirective(resumeDirective), - expected: &pid{supervisorDirective: resumeDirective}, + expected: &PID{supervisorDirective: resumeDirective}, }, { name: "WithShutdownTimeout", option: withShutdownTimeout(time.Second), - expected: &pid{shutdownTimeout: atomicDuration}, + expected: &PID{shutdownTimeout: atomicDuration}, }, { name: "WithPassivationDisabled", option: withPassivationDisabled(), - expected: &pid{passivateAfter: negativeDuration}, + expected: &PID{passivateAfter: negativeDuration}, }, { name: "withEventsStream", option: withEventsStream(eventsStream), - expected: &pid{eventsStream: eventsStream}, + expected: &PID{eventsStream: eventsStream}, }, { name: "withInitTimeout", option: withInitTimeout(time.Second), - expected: &pid{initTimeout: atomicDuration}, + expected: &PID{initTimeout: atomicDuration}, }, { name: "withMetric", option: withMetric(), - expected: &pid{metricEnabled: atomicTrue}, + expected: &PID{metricEnabled: atomicTrue}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - pid := &pid{} + pid := &PID{} tc.option(pid) assert.Equal(t, tc.expected, pid) }) diff --git a/actors/pid_test.go b/actors/pid_test.go index 727ae969..1c0b7464 100644 --- a/actors/pid_test.go +++ b/actors/pid_test.go @@ -337,7 +337,7 @@ func TestRestart(t *testing.T) { assert.NoError(t, err) }) t.Run("noSender cannot be restarted", func(t *testing.T) { - pid := &pid{ + pid := &PID{ fieldsLocker: &sync.RWMutex{}, } err := pid.Restart(context.TODO()) diff --git a/actors/receive_context.go b/actors/receive_context.go index 3ce23df4..5def7a99 100644 --- a/actors/receive_context.go +++ b/actors/receive_context.go @@ -38,15 +38,15 @@ import ( type ReceiveContext struct { ctx context.Context message proto.Message - sender PID + sender *PID remoteSender *goaktpb.Address response chan proto.Message - recipient PID + recipient *PID err error } // newReceiveContext creates an instance of ReceiveContext -func newReceiveContext(ctx context.Context, from, to PID, message proto.Message) *ReceiveContext { +func newReceiveContext(ctx context.Context, from, to *PID, message proto.Message) *ReceiveContext { // create a message receiveContext return &ReceiveContext{ ctx: ctx, @@ -64,7 +64,7 @@ func (c *ReceiveContext) WithRemoteSender(remoteSender *goaktpb.Address) *Receiv } // Self returns the receiver PID of the message -func (c *ReceiveContext) Self() PID { +func (c *ReceiveContext) Self() *PID { return c.recipient } @@ -85,7 +85,7 @@ func (c *ReceiveContext) Context() context.Context { } // Sender of the message -func (c *ReceiveContext) Sender() PID { +func (c *ReceiveContext) Sender() *PID { return c.sender } @@ -150,7 +150,7 @@ func (c *ReceiveContext) UnstashAll() { } // Tell sends an asynchronous message to another PID -func (c *ReceiveContext) Tell(to PID, message proto.Message) { +func (c *ReceiveContext) Tell(to *PID, message proto.Message) { recipient := c.recipient ctx := context.WithoutCancel(c.ctx) if err := recipient.Tell(ctx, to, message); err != nil { @@ -162,7 +162,7 @@ func (c *ReceiveContext) Tell(to PID, message proto.Message) { // The messages will be processed one after the other in the order they are sent // This is a design choice to follow the simple principle of one message at a time processing by actors. // When BatchTell encounter a single message it will fall back to a Tell call. -func (c *ReceiveContext) BatchTell(to PID, messages ...proto.Message) { +func (c *ReceiveContext) BatchTell(to *PID, messages ...proto.Message) { recipient := c.recipient ctx := context.WithoutCancel(c.ctx) if err := recipient.BatchTell(ctx, to, messages...); err != nil { @@ -173,7 +173,7 @@ func (c *ReceiveContext) BatchTell(to PID, messages ...proto.Message) { // Ask sends a synchronous message to another actor and expect a response. This method is good when interacting with a child actor. // Ask has a timeout which can cause the sender to set the context error. When ask times out, the receiving actor does not know and may still process the message. // It is recommended to set a good timeout to quickly receive response and try to avoid false positives -func (c *ReceiveContext) Ask(to PID, message proto.Message) (response proto.Message) { +func (c *ReceiveContext) Ask(to *PID, message proto.Message) (response proto.Message) { recipient := c.recipient ctx := context.WithoutCancel(c.ctx) reply, err := recipient.Ask(ctx, to, message) @@ -186,7 +186,7 @@ func (c *ReceiveContext) Ask(to PID, message proto.Message) (response proto.Mess // BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. // The messages will be processed one after the other in the order they are sent // This is a design choice to follow the simple principle of one message at a time processing by actors. -func (c *ReceiveContext) BatchAsk(to PID, messages ...proto.Message) (responses chan proto.Message) { +func (c *ReceiveContext) BatchAsk(to *PID, messages ...proto.Message) (responses chan proto.Message) { recipient := c.recipient ctx := context.WithoutCancel(c.ctx) reply, err := recipient.BatchAsk(ctx, to, messages...) @@ -264,7 +264,7 @@ func (c *ReceiveContext) Shutdown() { } // Spawn creates a child actor or return error -func (c *ReceiveContext) Spawn(name string, actor Actor) PID { +func (c *ReceiveContext) Spawn(name string, actor Actor) *PID { recipient := c.recipient ctx := context.WithoutCancel(c.ctx) pid, err := recipient.SpawnChild(ctx, name, actor) @@ -275,12 +275,12 @@ func (c *ReceiveContext) Spawn(name string, actor Actor) PID { } // Children returns the list of all the children of the given actor -func (c *ReceiveContext) Children() []PID { +func (c *ReceiveContext) Children() []*PID { return c.recipient.Children() } // Child returns the named child actor if it is alive -func (c *ReceiveContext) Child(name string) PID { +func (c *ReceiveContext) Child(name string) *PID { recipient := c.recipient pid, err := recipient.Child(name) if err != nil { @@ -291,7 +291,7 @@ func (c *ReceiveContext) Child(name string) PID { // Stop forces the child Actor under the given name to terminate after it finishes processing its current message. // Nothing happens if child is already stopped. However, it returns an error when the child cannot be stopped. -func (c *ReceiveContext) Stop(child PID) { +func (c *ReceiveContext) Stop(child *PID) { recipient := c.recipient ctx := context.WithoutCancel(c.ctx) if err := recipient.Stop(ctx, child); err != nil { @@ -303,7 +303,7 @@ func (c *ReceiveContext) Stop(child PID) { // As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. // The message that is forwarded is the current message received by the received context. // This operation does nothing when the receiving actor is not running -func (c *ReceiveContext) Forward(to PID) { +func (c *ReceiveContext) Forward(to *PID) { message := c.Message() sender := c.Sender() @@ -333,7 +333,7 @@ func (c *ReceiveContext) RemoteReSpawn(host string, port int, name string) { // The successful result of the task will be put onto the provided actor mailbox. // This is useful when interacting with external services. // It’s common that you would like to use the value of the response in the actor when the long-running task is completed -func (c *ReceiveContext) PipeTo(to PID, task future.Task) { +func (c *ReceiveContext) PipeTo(to *PID, task future.Task) { recipient := c.recipient ctx := context.WithoutCancel(c.ctx) if err := recipient.PipeTo(ctx, to, task); err != nil { diff --git a/actors/router.go b/actors/router.go index 5e147a25..ff636bf6 100644 --- a/actors/router.go +++ b/actors/router.go @@ -74,7 +74,7 @@ type router struct { strategy RoutingStrategy poolSize int // list of routees - routeesMap map[string]PID + routeesMap map[string]*PID next uint32 routeesKind reflect.Type logger log.Logger @@ -88,7 +88,7 @@ func newRouter(poolSize int, routeesKind Actor, loggger log.Logger, opts ...Rout router := &router{ strategy: FanOutRouting, poolSize: poolSize, - routeesMap: make(map[string]PID, poolSize), + routeesMap: make(map[string]*PID, poolSize), routeesKind: reflect.TypeOf(routeesKind).Elem(), logger: loggger, } @@ -176,7 +176,7 @@ func (x *router) broadcast(ctx *ReceiveContext) { default: for _, routee := range routees { routee := routee - go func(pid PID) { + go func(pid *PID) { ctx.Tell(pid, msg) }(routee) } @@ -188,8 +188,8 @@ func routeeName(routerName string, routeeIndex int) string { return fmt.Sprintf("%s-%s-%d", routeeNamePrefix, routerName, routeeIndex) } -func (x *router) availableRoutees() ([]PID, bool) { - routees := make([]PID, 0, x.poolSize) +func (x *router) availableRoutees() ([]*PID, bool) { + routees := make([]*PID, 0, x.poolSize) for _, routee := range x.routeesMap { if !routee.IsRunning() { delete(x.routeesMap, routee.ID()) diff --git a/actors/scheduler.go b/actors/scheduler.go index cc94fffa..e8f76a16 100644 --- a/actors/scheduler.go +++ b/actors/scheduler.go @@ -124,7 +124,7 @@ func (x *scheduler) Stop(ctx context.Context) { // ScheduleOnce schedules a message that will be delivered to the receiver actor // This will send the given message to the actor after the given interval specified. // The message will be sent once -func (x *scheduler) ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error { +func (x *scheduler) ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error { x.mu.Lock() defer x.mu.Unlock() @@ -183,7 +183,7 @@ func (x *scheduler) RemoteScheduleOnce(ctx context.Context, message proto.Messag } // ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression. -func (x *scheduler) ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error { +func (x *scheduler) ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error { x.mu.Lock() defer x.mu.Unlock() if !x.started.Load() { diff --git a/actors/stash.go b/actors/stash.go index 48a8e984..a95e2510 100644 --- a/actors/stash.go +++ b/actors/stash.go @@ -27,7 +27,7 @@ package actors import "errors" // stash adds the current message to the stash buffer -func (x *pid) stash(ctx *ReceiveContext) error { +func (x *PID) stash(ctx *ReceiveContext) error { if x.stashBuffer == nil { return ErrStashBufferNotSet } @@ -36,7 +36,7 @@ func (x *pid) stash(ctx *ReceiveContext) error { } // unstash unstashes the oldest message in the stash and prepends to the mailbox -func (x *pid) unstash() error { +func (x *PID) unstash() error { if x.stashBuffer == nil { return ErrStashBufferNotSet } @@ -51,7 +51,7 @@ func (x *pid) unstash() error { // unstashAll unstashes all messages from the stash buffer and prepends in the mailbox // (it keeps the messages in the same order as received, unstashing older messages before newer). -func (x *pid) unstashAll() error { +func (x *PID) unstashAll() error { if x.stashBuffer == nil { return ErrStashBufferNotSet } diff --git a/actors/types.go b/actors/types.go index c9dcc4df..c3c692a7 100644 --- a/actors/types.go +++ b/actors/types.go @@ -66,7 +66,7 @@ const ( var ( // NoSender means that there is no sender - NoSender PID + NoSender *PID // DefaultSupervisoryStrategy defines the default supervisory strategy DefaultSupervisoryStrategy = NewStopDirective() // RemoteNoSender means that there is no sender diff --git a/bench/benchmark.go b/bench/benchmark.go index c82c9b4f..2438dbc2 100644 --- a/bench/benchmark.go +++ b/bench/benchmark.go @@ -79,7 +79,7 @@ type Benchmark struct { workersCount int // duration specifies how long the load testing will run duration time.Duration - pids []actors.PID + pids []*actors.PID system actors.ActorSystem } @@ -89,7 +89,7 @@ func NewBenchmark(actorsCount, workersCount int, duration time.Duration) *Benchm actorsCount: actorsCount, workersCount: workersCount, duration: duration, - pids: make([]actors.PID, 0, actorsCount), + pids: make([]*actors.PID, 0, actorsCount), } } diff --git a/bench/benchmark_test.go b/bench/benchmark_test.go index 70560227..9519c709 100644 --- a/bench/benchmark_test.go +++ b/bench/benchmark_test.go @@ -74,6 +74,7 @@ func BenchmarkActor(b *testing.B) { _ = actorSystem.Stop(ctx) }) b.Run("ask", func(b *testing.B) { + b.Skip("") ctx := context.TODO() // create the actor system actorSystem, _ := actors.NewActorSystem("bench", diff --git a/testkit/probe.go b/testkit/probe.go index e1a988af..60a9ecf5 100644 --- a/testkit/probe.go +++ b/testkit/probe.go @@ -38,17 +38,17 @@ type Probe interface { // ExpectMessageOfTypeWithin asserts the expectation of a given message type within a time duration ExpectMessageOfTypeWithin(duration time.Duration, messageType protoreflect.MessageType) // Send sends a message to an actor and also provides probe's test actor PID as sender. - Send(to actors.PID, message proto.Message) + Send(to *actors.PID, message proto.Message) // Sender returns the sender of last received message. - Sender() actors.PID + Sender() *actors.PID // PID returns the pid of the test actor - PID() actors.PID + PID() *actors.PID // Stop stops the test probe Stop() } type message struct { - sender actors.PID + sender *actors.PID payload proto.Message } @@ -91,15 +91,15 @@ type probe struct { pt *testing.T testCtx context.Context - pid actors.PID + pid *actors.PID lastMessage proto.Message - lastSender actors.PID + lastSender *actors.PID messageQueue chan message defaultTimeout time.Duration } // ensure that probe implements Probe -var _ Probe = &probe{} +var _ Probe = (*probe)(nil) // newProbe creates an instance of probe func newProbe(ctx context.Context, actorSystem actors.ActorSystem, t *testing.T) (*probe, error) { @@ -158,18 +158,18 @@ func (x *probe) ExpectAnyMessageWithin(duration time.Duration) proto.Message { } // Send sends a message to the given actor -func (x *probe) Send(to actors.PID, message proto.Message) { +func (x *probe) Send(to *actors.PID, message proto.Message) { err := x.pid.Tell(x.testCtx, to, message) require.NoError(x.pt, err) } // Sender returns the last sender -func (x *probe) Sender() actors.PID { +func (x *probe) Sender() *actors.PID { return x.lastSender } // PID returns the pid of the test actor -func (x *probe) PID() actors.PID { +func (x *probe) PID() *actors.PID { return x.pid } diff --git a/testkit/testkit.go b/testkit/testkit.go index 4a98d207..5bb17698 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -41,7 +41,7 @@ func New(ctx context.Context, t *testing.T) *TestKit { } // Spawn create an actor -func (k *TestKit) Spawn(ctx context.Context, name string, actor actors.Actor) actors.PID { +func (k *TestKit) Spawn(ctx context.Context, name string, actor actors.Actor) *actors.PID { // create and instance of actor pid, err := k.actorSystem.Spawn(ctx, name, actor) // handle the error