Skip to content

Commit

Permalink
refactor: enhance code coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Feb 21, 2025
1 parent f49dda7 commit aadf824
Showing 1 changed file with 180 additions and 182 deletions.
362 changes: 180 additions & 182 deletions actor/rebalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,199 +40,197 @@ import (
)

func TestRebalancing(t *testing.T) {
t.Run("With successful actors redeployment", func(t *testing.T) {
// create a context
ctx := context.TODO()
// start the NATS server
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

// create and start system cluster
node3, sd3 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node3)
require.NotNil(t, sd3)

// let us create 4 actors on each node
for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node1-Actor-%d", j)
pid, err := node1.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

util.Pause(time.Second)

for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node2-Actor-%d", j)
pid, err := node2.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

util.Pause(time.Second)

for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node3-Actor-%d", j)
pid, err := node3.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

util.Pause(time.Second)

// take down node2
require.NoError(t, node2.Stop(ctx))
require.NoError(t, sd2.Close())

// Wait for cluster rebalancing
util.Pause(time.Minute)

sender, err := node1.LocalActor("Node1-Actor-1")
// create a context
ctx := context.TODO()
// start the NATS server
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

// create and start system cluster
node3, sd3 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node3)
require.NotNil(t, sd3)

// let us create 4 actors on each node
for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node1-Actor-%d", j)
pid, err := node1.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, sender)
require.NotNil(t, pid)
}

// let us access some of the node2 actors from node 1 and node 3
actorName := "Node2-Actor-1"
err = sender.SendAsync(ctx, actorName, new(testpb.TestSend))
util.Pause(time.Second)

for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node2-Actor-%d", j)
pid, err := node2.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

util.Pause(time.Second)

assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node3.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd3.Close())
srv.Shutdown()
})
t.Run("With successful actors redeployment when TLS enabled", func(t *testing.T) {
t.SkipNow()
// create a context
ctx := context.TODO()
// start the NATS server
srv := startNatsServer(t)

// AutoGenerate TLS certs
conf := autotls.Config{
AutoTLS: true,
ClientAuth: tls.RequireAndVerifyClientCert,
InsecureSkipVerify: true,
}
require.NoError(t, autotls.Setup(&conf))

// create and start system cluster
node1, sd1 := startClusterSystem(t, srv.Addr().String(), withTSL(conf))
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, srv.Addr().String(), withTSL(conf))
require.NotNil(t, node2)
require.NotNil(t, sd2)

// create and start system cluster
node3, sd3 := startClusterSystem(t, srv.Addr().String(), withTSL(conf))
require.NotNil(t, node3)
require.NotNil(t, sd3)

// let us create 4 actors on each node
for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node1-Actor-%d", j)
pid, err := node1.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

util.Pause(time.Second)

for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node2-Actor-%d", j)
pid, err := node2.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

util.Pause(time.Second)

for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node3-Actor-%d", j)
pid, err := node3.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

util.Pause(time.Second)

// take down node2
require.NoError(t, node2.Stop(ctx))
require.NoError(t, sd2.Close())

// Wait for cluster rebalancing
util.Pause(time.Minute)

sender, err := node1.LocalActor("Node1-Actor-1")
for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node3-Actor-%d", j)
pid, err := node3.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, sender)
require.NotNil(t, pid)
}

util.Pause(time.Second)

// take down node2
require.NoError(t, node2.Stop(ctx))
require.NoError(t, sd2.Close())

// Wait for cluster rebalancing
util.Pause(time.Minute)

sender, err := node1.LocalActor("Node1-Actor-1")
require.NoError(t, err)
require.NotNil(t, sender)

// let us access some of the node2 actors from node 1 and node 3
actorName := "Node2-Actor-1"
err = sender.SendAsync(ctx, actorName, new(testpb.TestSend))
require.NoError(t, err)

assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node3.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd3.Close())
srv.Shutdown()
}

// let us access some of the node2 actors from node 1 and node 3
actorName := "Node2-Actor-1"
err = sender.SendAsync(ctx, actorName, new(testpb.TestSend))
func TestRebalancingWithTLSEnabled(t *testing.T) {
t.SkipNow()
// create a context
ctx := context.TODO()
// start the NATS server
srv := startNatsServer(t)

// AutoGenerate TLS certs
conf := autotls.Config{
AutoTLS: true,
ClientAuth: tls.RequireAndVerifyClientCert,
InsecureSkipVerify: true,
}
require.NoError(t, autotls.Setup(&conf))

// create and start system cluster
node1, sd1 := startClusterSystem(t, srv.Addr().String(), withTSL(conf))
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, srv.Addr().String(), withTSL(conf))
require.NotNil(t, node2)
require.NotNil(t, sd2)

// create and start system cluster
node3, sd3 := startClusterSystem(t, srv.Addr().String(), withTSL(conf))
require.NotNil(t, node3)
require.NotNil(t, sd3)

// let us create 4 actors on each node
for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node1-Actor-%d", j)
pid, err := node1.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

t.Cleanup(func() {
assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node3.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd3.Close())
srv.Shutdown()
})
})
t.Run("With singleton actor redeployment", func(t *testing.T) {
// create a context
ctx := context.TODO()
// start the NATS server
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

// create and start system cluster
node3, sd3 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node3)
require.NotNil(t, sd3)

// create a singleton actor
err := node1.SpawnSingleton(ctx, "actorName", newMockActor())
util.Pause(time.Second)

for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node2-Actor-%d", j)
pid, err := node2.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

util.Pause(time.Second)

for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node3-Actor-%d", j)
pid, err := node3.Spawn(ctx, actorName, newMockActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

util.Pause(time.Second)
util.Pause(time.Second)

// take down node1 since it is the first node created in the cluster
require.NoError(t, node1.Stop(ctx))
require.NoError(t, sd1.Close())
// take down node2
require.NoError(t, node2.Stop(ctx))
require.NoError(t, sd2.Close())

require.Eventually(t, func() bool {
_, _, err := node2.ActorOf(ctx, "actorName")
return err != nil
}, 2*time.Minute, time.Second)
// Wait for cluster rebalancing
util.Pause(time.Minute)

sender, err := node1.LocalActor("Node1-Actor-1")
require.NoError(t, err)
require.NotNil(t, sender)

// let us access some of the node2 actors from node 1 and node 3
actorName := "Node2-Actor-1"
err = sender.SendAsync(ctx, actorName, new(testpb.TestSend))
require.NoError(t, err)

assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node3.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd3.Close())
srv.Shutdown()
}

assert.NoError(t, node2.Stop(ctx))
assert.NoError(t, node3.Stop(ctx))
assert.NoError(t, sd2.Close())
assert.NoError(t, sd3.Close())
srv.Shutdown()
})
func TestRebalancingWithSingletonActor(t *testing.T) {
// create a context
ctx := context.TODO()
// start the NATS server
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

// create and start system cluster
node3, sd3 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node3)
require.NotNil(t, sd3)

// create a singleton actor
err := node1.SpawnSingleton(ctx, "actorName", newMockActor())
require.NoError(t, err)

util.Pause(time.Second)

// take down node1 since it is the first node created in the cluster
require.NoError(t, node1.Stop(ctx))
require.NoError(t, sd1.Close())

require.Eventually(t, func() bool {
_, _, err := node2.ActorOf(ctx, "actorName")
return err != nil
}, 2*time.Minute, time.Second)

assert.NoError(t, node2.Stop(ctx))
assert.NoError(t, node3.Stop(ctx))
assert.NoError(t, sd2.Close())
assert.NoError(t, sd3.Close())
srv.Shutdown()
}

0 comments on commit aadf824

Please sign in to comment.