Skip to content

Commit

Permalink
refactor: enhance code
Browse files Browse the repository at this point in the history
  • Loading branch information
Arsene Gandote authored and Arsene Gandote committed Feb 20, 2025
1 parent 70badf2 commit 1f23a16
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 51 deletions.
48 changes: 34 additions & 14 deletions actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ type actorSystem struct {
userGuardian *PID
systemGuardian *PID
deathWatch *PID
deadletters *PID
deadletter *PID
singletonManager *PID

startedAt *atomic.Int64
Expand Down Expand Up @@ -648,7 +648,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor, opts
}

// check some preconditions
if err := x.checkSpawnPreconditions(ctx, name, actor); err != nil {
if err := x.checkSpawnPreconditions(ctx, name, actor, false); err != nil {
return nil, err
}

Expand Down Expand Up @@ -685,7 +685,7 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
actor := newFuncActor(name, receiveFunc, config)

// check some preconditions
if err := x.checkSpawnPreconditions(ctx, name, actor); err != nil {
if err := x.checkSpawnPreconditions(ctx, name, actor, false); err != nil {
return nil, err
}

Expand Down Expand Up @@ -757,7 +757,7 @@ func (x *actorSystem) SpawnSingleton(ctx context.Context, name string, actor Act
}

// check some preconditions
if err := x.checkSpawnPreconditions(ctx, name, actor); err != nil {
if err := x.checkSpawnPreconditions(ctx, name, actor, true); err != nil {
return err
}

Expand Down Expand Up @@ -1376,10 +1376,10 @@ func (x *actorSystem) getDeathWatch() *PID {
return janitor
}

// getDeadletters returns the system deadletters actor
// getDeadletters returns the system deadletter actor
func (x *actorSystem) getDeadletter() *PID {
x.locker.Lock()
deadletters := x.deadletters
deadletters := x.deadletter
x.locker.Unlock()
return deadletters
}
Expand Down Expand Up @@ -1830,7 +1830,10 @@ func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer)
}

x.logger.Debugf("peer (%s) actors count (%d)", peerAddress, len(peerState.GetActors()))
x.clusterStore.set(peerState)
if err := x.clusterStore.set(peerState); err != nil {
x.logger.Error(err)
return err
}
x.logger.Infof("peer sync(%s) successfully processed", peerAddress)
return nil
}
Expand Down Expand Up @@ -2071,11 +2074,11 @@ func (x *actorSystem) spawnRebalancer(ctx context.Context) error {
return nil
}

// spawnDeadletter creates the deadletters synthetic actor
// spawnDeadletter creates the deadletter synthetic actor
func (x *actorSystem) spawnDeadletter(ctx context.Context) error {
var err error
actorName := x.reservedName(deadletterType)
x.deadletters, err = x.configPID(ctx,
x.deadletter, err = x.configPID(ctx,
actorName,
newDeadLetter(),
WithSupervisor(
Expand All @@ -2087,18 +2090,35 @@ func (x *actorSystem) spawnDeadletter(ctx context.Context) error {
),
)
if err != nil {
return fmt.Errorf("actor=%s failed to start deadletters: %w", actorName, err)
return fmt.Errorf("actor=%s failed to start deadletter: %w", actorName, err)
}

// the deadletters is a child actor of the system guardian
_ = x.actors.AddNode(x.systemGuardian, x.deadletters)
// the deadletter is a child actor of the system guardian
_ = x.actors.AddNode(x.systemGuardian, x.deadletter)
return nil
}

// checkSpawnPreconditions make sure before an actor is created some pre-conditions are checks
func (x *actorSystem) checkSpawnPreconditions(ctx context.Context, actorName string, kind Actor) error {
func (x *actorSystem) checkSpawnPreconditions(ctx context.Context, actorName string, kind Actor, singleton bool) error {
// check the existence of the actor given the kind prior to creating it
if x.clusterEnabled.Load() {
// a singleton actor must only have one instance at a given time of its kind
// in the whole cluster
if singleton {
id, err := x.cluster.LookupKind(ctx, types.TypeName(kind))
if err != nil {
return err
}

if id != "" {
return ErrSingletonAlreadyExists
}

return nil
}

// here we make sure in cluster mode that the given actor is uniquely created
// by checking both its kind and identifier
existed, err := x.cluster.GetActor(ctx, actorName)
if err != nil {
if errors.Is(err, cluster.ErrActorNotFound) {
Expand Down Expand Up @@ -2144,7 +2164,7 @@ func (x *actorSystem) getSetDeadlettersCount(ctx context.Context) {
// using the default ask timeout
// note: no need to check for error because this call is internal
message, _ := from.Ask(ctx, to, message, DefaultAskTimeout)
// cast the response received from the deadletters
// cast the response received from the deadletter
deadlettersCount := message.(*internalpb.DeadlettersCount)
// set the counter
x.deadlettersCounter.Store(uint64(deadlettersCount.GetTotalCount()))
Expand Down
14 changes: 7 additions & 7 deletions actor/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func TestActorSystem(t *testing.T) {
var items []*goaktpb.ActorRestarted
for message := range consumer.Iterator() {
payload := message.Payload()
// only listening to deadletters
// only listening to deadletter
restarted, ok := payload.(*goaktpb.ActorRestarted)
if ok {
items = append(items, restarted)
Expand Down Expand Up @@ -908,7 +908,7 @@ func TestActorSystem(t *testing.T) {
},
)
})
t.Run("With deadletters subscription ", func(t *testing.T) {
t.Run("With deadletter subscription ", func(t *testing.T) {
ctx := context.TODO()
sys, _ := NewActorSystem("testSys", WithLogger(log.DiscardLogger))

Expand All @@ -933,7 +933,7 @@ func TestActorSystem(t *testing.T) {
// wait a while
util.Pause(time.Second)

// every message sent to the actor will result in deadletters
// every message sent to the actor will result in deadletter
for i := 0; i < 5; i++ {
require.NoError(t, Tell(ctx, actorRef, new(testpb.TestSend)))
}
Expand All @@ -943,7 +943,7 @@ func TestActorSystem(t *testing.T) {
var items []*goaktpb.Deadletter
for message := range consumer.Iterator() {
payload := message.Payload()
// only listening to deadletters
// only listening to deadletter
deadletter, ok := payload.(*goaktpb.Deadletter)
if ok {
items = append(items, deadletter)
Expand All @@ -965,15 +965,15 @@ func TestActorSystem(t *testing.T) {
err = sys.Stop(ctx)
assert.NoError(t, err)
})
t.Run("With deadletters subscription when not started", func(t *testing.T) {
t.Run("With deadletter subscription when not started", func(t *testing.T) {
sys, _ := NewActorSystem("testSys", WithLogger(log.DiscardLogger))

// create a deadletter subscriber
consumer, err := sys.Subscribe()
require.Error(t, err)
require.Nil(t, consumer)
})
t.Run("With deadletters unsubscription when not started", func(t *testing.T) {
t.Run("With deadletter unsubscription when not started", func(t *testing.T) {
ctx := context.TODO()
sys, _ := NewActorSystem("testSys", WithLogger(log.DiscardLogger))

Expand Down Expand Up @@ -1581,7 +1581,7 @@ func TestActorSystem(t *testing.T) {
assert.EqualValues(t, 1, pid.ProcessedCount())
require.True(t, pid.IsRunning())

// every message sent to the actor will result in deadletters
// every message sent to the actor will result in deadletter
counter := 0
for i := 1; i <= 5; i++ {
require.NoError(t, Tell(ctx, pid, new(testpb.TestSend)))
Expand Down
2 changes: 1 addition & 1 deletion actor/dead_letter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/tochemey/goakt/v3/log"
)

// deadletters is a synthetic actor that houses all deadletter
// deadletter is a synthetic actor that houses all deadletter
// in GoAkt deadletter are messages that have not been handled
type deadLetter struct {
eventsStream *eventstream.EventsStream
Expand Down
10 changes: 5 additions & 5 deletions actor/dead_letter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDeadletter(t *testing.T) {
// wait a while
util.Pause(time.Second)

// every message sent to the actor will result in deadletters
// every message sent to the actor will result in deadletter
for i := 0; i < 5; i++ {
require.NoError(t, Tell(ctx, actorRef, new(testpb.TestSend)))
}
Expand All @@ -75,7 +75,7 @@ func TestDeadletter(t *testing.T) {
var items []*goaktpb.Deadletter
for message := range consumer.Iterator() {
payload := message.Payload()
// only listening to deadletters
// only listening to deadletter
deadletter, ok := payload.(*goaktpb.Deadletter)
if ok {
items = append(items, deadletter)
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestDeadletter(t *testing.T) {
// wait a while
util.Pause(time.Second)

// every message sent to the actor will result in deadletters
// every message sent to the actor will result in deadletter
for i := 0; i < 5; i++ {
require.NoError(t, Tell(ctx, actorRef, new(testpb.TestSend)))
}
Expand All @@ -136,7 +136,7 @@ func TestDeadletter(t *testing.T) {
var items []*goaktpb.Deadletter
for message := range consumer.Iterator() {
payload := message.Payload()
// only listening to deadletters
// only listening to deadletter
deadletter, ok := payload.(*goaktpb.Deadletter)
if ok {
items = append(items, deadletter)
Expand All @@ -149,7 +149,7 @@ func TestDeadletter(t *testing.T) {

for message := range consumer.Iterator() {
payload := message.Payload()
// only listening to deadletters
// only listening to deadletter
deadletter, ok := payload.(*goaktpb.Deadletter)
if ok {
items = append(items, deadletter)
Expand Down
4 changes: 3 additions & 1 deletion actor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ var (
ErrPriorityMessageRequired = errors.New("priority message type is required")
// ErrActorAlreadyExists is returned when trying to create the same actor more than once
ErrActorAlreadyExists = func(actorName string) error { return fmt.Errorf("actor=(%s) already exists", actorName) }
// ErrInvalidTLSConfiguration is returned whent the TLS configuration is not properly set
// ErrInvalidTLSConfiguration is returned when the TLS configuration is not properly set
ErrInvalidTLSConfiguration = errors.New("TLS configuration is invalid")
// ErrSingletonAlreadyExists is returned when a given singleton actor type already exists
ErrSingletonAlreadyExists = errors.New("singleton already exists")
)

// eof returns true if the given error is an EOF error
Expand Down
8 changes: 4 additions & 4 deletions actor/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ import "time"

// Metric defines the actor system metric
type Metric struct {
// DeadlettersCount returns the total number of deadletters
// DeadlettersCount returns the total number of deadletter
deadlettersCount int64
// ActorsCount returns the total number of actors in the system
actorsCount int64
// Uptime returns the number of seconds since the actor/system started
uptime int64
}

// DeadlettersCount returns the total number of deadletters
// DeadlettersCount returns the total number of deadletter
func (m Metric) DeadlettersCount() int64 {
return m.deadlettersCount
}
Expand All @@ -54,7 +54,7 @@ func (m Metric) Uptime() int64 {

// ActorMetric defines actor specific metrics
type ActorMetric struct {
// DeadlettersCount returns the total number of deadletters
// DeadlettersCount returns the total number of deadletter
deadlettersCount uint64
// ActorsCount returns the total number of child actor given a specific PID
childrenCount uint64
Expand All @@ -80,7 +80,7 @@ func (x ActorMetric) RestartCount() uint64 {
return x.restartCount
}

// DeadlettersCount returns the total number of deadletters
// DeadlettersCount returns the total number of deadletter
func (x ActorMetric) DeadlettersCount() uint64 {
return x.deadlettersCount
}
Expand Down
12 changes: 6 additions & 6 deletions actor/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -1645,7 +1645,7 @@ func (pid *PID) notifyParent(err error) {
}
}

// toDeadletters sends message to deadletters synthetic actor
// toDeadletters sends message to deadletter synthetic actor
func (pid *PID) toDeadletters(receiveCtx *ReceiveContext, err error) {
// the message is lost
if pid.eventsStream == nil {
Expand All @@ -1666,11 +1666,11 @@ func (pid *PID) toDeadletters(receiveCtx *ReceiveContext, err error) {
sender = receiveCtx.Sender().Address().Address
}

// get the deadletters synthetic actor and send a message to it
// get the deadletter synthetic actor and send a message to it
receiver := pid.Address().Address
deadletters := pid.ActorSystem().getDeadletter()
deadletter := pid.ActorSystem().getDeadletter()
_ = pid.Tell(context.Background(),
deadletters,
deadletter,
&internalpb.EmitDeadletter{
Deadletter: &goaktpb.Deadletter{
Sender: sender,
Expand Down Expand Up @@ -1854,7 +1854,7 @@ func (pid *PID) suspend(reason string) {
})
}

// getDeadlettersCount gets deadletters
// getDeadlettersCount gets deadletter
func (pid *PID) getDeadlettersCount(ctx context.Context) int64 {
var (
name = pid.Name()
Expand All @@ -1869,7 +1869,7 @@ func (pid *PID) getDeadlettersCount(ctx context.Context) int64 {
// using the default ask timeout
// note: no need to check for error because this call is internal
message, _ := from.Ask(ctx, to, message, DefaultAskTimeout)
// cast the response received from the deadletters
// cast the response received from the deadletter
deadlettersCount := message.(*internalpb.DeadlettersCount)
return deadlettersCount.GetTotalCount()
}
Expand Down
6 changes: 3 additions & 3 deletions actor/receive_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ func TestReceiveContext(t *testing.T) {
self: pid1,
}

// calling unhandled will push the current message to deadletters
// calling unhandled will push the current message to deadletter
context.Unhandled()

// wait for messages to be published
Expand Down Expand Up @@ -1295,7 +1295,7 @@ func TestReceiveContext(t *testing.T) {
self: pid1,
}

// calling unhandled will push the current message to deadletters
// calling unhandled will push the current message to deadletter
context.Unhandled()

// wait for messages to be published
Expand Down Expand Up @@ -1363,7 +1363,7 @@ func TestReceiveContext(t *testing.T) {
self: pid1,
}

// calling unhandled will push the current message to deadletters
// calling unhandled will push the current message to deadletter
context.Unhandled()

// wait for messages to be published
Expand Down
2 changes: 1 addition & 1 deletion actor/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (x *router) broadcast(ctx *ReceiveContext) {
routees, proceed := x.availableRoutees()
if !proceed {
x.logger.Warn("no routees available. stopping.... Bye")
// push message to deadletters
// push message to deadletter
ctx.Unhandled()
// shutdown
ctx.Shutdown()
Expand Down
2 changes: 1 addition & 1 deletion actor/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestRouter(t *testing.T) {
var items []*goaktpb.Deadletter
for message := range consumer.Iterator() {
payload := message.Payload()
// only listening to deadletters
// only listening to deadletter
deadletter, ok := payload.(*goaktpb.Deadletter)
if ok {
items = append(items, deadletter)
Expand Down
Loading

0 comments on commit 1f23a16

Please sign in to comment.