Skip to content

Commit

Permalink
refactor: enhance code coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Feb 21, 2025
1 parent 8e20de3 commit f22aeab
Showing 1 changed file with 17 additions and 27 deletions.
44 changes: 17 additions & 27 deletions actor/rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ import (
"github.com/tochemey/goakt/v3/log"
)

type toRebalance struct {
name string
kind string
isSingleton bool
}

// rebalancer is a system actor that helps rebalance cluster
// when the cluster topology changes
type rebalancer struct {
Expand Down Expand Up @@ -104,7 +98,7 @@ func (r *rebalancer) Rebalance(ctx *ReceiveContext) {
eg.Go(func() error {
for _, actor := range leaderShares {
// never redistribute system actors
if !isReservedName(actor.name) {
if !isReservedName(actor.GetActorName()) {
if err := r.recreateLocally(egCtx, actor, true); err != nil {
return NewSpawnError(err)
}
Expand All @@ -128,7 +122,7 @@ func (r *rebalancer) Rebalance(ctx *ReceiveContext) {

for _, actor := range actors {
// never redistribute system actors and singleton actors
if !isReservedName(actor.name) && !actor.isSingleton {
if !isReservedName(actor.GetActorName()) && !actor.GetIsSingleton() {
if !peerFound {
err := r.recreateLocally(egCtx, actor, false)

Check warning on line 127 in actor/rebalancer.go

View check run for this annotation

Codecov / codecov/patch

actor/rebalancer.go#L127

Added line #L127 was not covered by tests
if err == nil {
Expand All @@ -140,8 +134,8 @@ func (r *rebalancer) Rebalance(ctx *ReceiveContext) {
if err := r.remoting.RemoteSpawn(egCtx,
peerState.GetHost(),
int(peerState.GetRemotingPort()),
actor.name,
actor.kind,
actor.GetActorName(),
actor.GetActorType(),
false); err != nil {
logger.Error(err)
return NewSpawnError(err)
Expand Down Expand Up @@ -178,28 +172,24 @@ func (r *rebalancer) PostStop(context.Context) error {
}

// computeRebalancing build the list of actors to create on the leader node and the peers in the cluster
func (r *rebalancer) computeRebalancing(totalPeers int, nodeLeftState *internalpb.PeerState) (leaderShares []*toRebalance, peersShares [][]*toRebalance) {
func (r *rebalancer) computeRebalancing(totalPeers int, nodeLeftState *internalpb.PeerState) (leaderShares []*internalpb.ActorProps, peersShares [][]*internalpb.ActorProps) {
actors := nodeLeftState.GetActors()
actorsCount := len(actors)

// Collect all actors to be rebalanced
toRebalances := make([]*toRebalance, 0, actorsCount)
for name, actorProps := range actors {
toRebalances = append(toRebalances, &toRebalance{
name: name,
kind: actorProps.GetActorType(),
isSingleton: actorProps.GetIsSingleton(),
})
toRebalances := make([]*internalpb.ActorProps, 0, actorsCount)
for _, actorProps := range actors {
toRebalances = append(toRebalances, actorProps)
}

// Separate singleton actors to be assigned to the leader
leaderShares = slice.Filter(toRebalances, func(actor *toRebalance) bool {
return actor.isSingleton
leaderShares = slice.Filter(toRebalances, func(actor *internalpb.ActorProps) bool {
return actor.GetIsSingleton()
})

// Remove singleton actors from the list
toRebalances = slice.Filter(toRebalances, func(actor *toRebalance) bool {
return !actor.isSingleton
toRebalances = slice.Filter(toRebalances, func(actor *internalpb.ActorProps) bool {
return !actor.GetIsSingleton()
})

// Distribute remaining actors among peers
Expand All @@ -222,17 +212,17 @@ func (r *rebalancer) computeRebalancing(totalPeers int, nodeLeftState *internalp
}

// recreateLocally recreates the actor
func (r *rebalancer) recreateLocally(ctx context.Context, actor *toRebalance, enforceSingleton bool) error {
iactor, err := r.reflection.ActorFrom(actor.kind)
func (r *rebalancer) recreateLocally(ctx context.Context, actor *internalpb.ActorProps, enforceSingleton bool) error {
iactor, err := r.reflection.ActorFrom(actor.GetActorType())
if err != nil {
return err
}

if enforceSingleton && actor.isSingleton {
if enforceSingleton && actor.GetIsSingleton() {
// spawn the singleton actor
return r.pid.ActorSystem().SpawnSingleton(ctx, actor.name, iactor)
return r.pid.ActorSystem().SpawnSingleton(ctx, actor.GetActorName(), iactor)
}

_, err = r.pid.ActorSystem().Spawn(ctx, actor.name, iactor)
_, err = r.pid.ActorSystem().Spawn(ctx, actor.GetActorName(), iactor)
return err
}

0 comments on commit f22aeab

Please sign in to comment.