Skip to content

Commit

Permalink
feat: add cluster events (#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Jan 20, 2024
1 parent ca9b73a commit 1ab0d9a
Show file tree
Hide file tree
Showing 15 changed files with 1,396 additions and 636 deletions.
53 changes: 50 additions & 3 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ type ActorSystem interface {
ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error
// RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *addresspb.Address, cronExpression string) error
// PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system.
// This address is empty when cluster mode is not activated
PeerAddress() string
// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
handleRemoteAsk(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error)
Expand Down Expand Up @@ -191,6 +194,7 @@ type actorSystem struct {
tracer trace.Tracer
// specifies whether metric is enabled
metricEnabled atomic.Bool
eventsChan <-chan *cluster.Event
}

// enforce compilation error when all methods of the ActorSystem interface are not implemented
Expand Down Expand Up @@ -227,6 +231,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
partitionHasher: hash.DefaultHasher(),
actorInitTimeout: DefaultInitTimeout,
tracer: noop.NewTracerProvider().Tracer(name),
eventsChan: make(chan *cluster.Event, 1),
}
// set the atomic settings
system.hasStarted.Store(false)
Expand Down Expand Up @@ -548,6 +553,21 @@ func (x *actorSystem) Actors() []PID {
return x.actors.List()
}

// PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system.
// This address is empty when cluster mode is not activated
func (x *actorSystem) PeerAddress() string {
// acquire the lock
x.sem.Lock()
// release the lock
defer x.sem.Unlock()
// check whether cluster mode is enabled
if x.clusterEnabled.Load() {
// return the cluster node advertised address
return x.cluster.AdvertisedAddress()
}
return ""
}

// ActorOf returns an existing actor in the local system or in the cluster when clustering is enabled
// When cluster mode is activated, the PID will be nil.
// When remoting is enabled this method will return and error
Expand Down Expand Up @@ -810,10 +830,8 @@ func (x *actorSystem) Stop(ctx context.Context) error {
}
// stop broadcasting cluster messages
close(x.clusterChan)

// unset the remoting settings
x.clusterEnabled.Store(false)
x.cluster = nil
}

// short-circuit the shutdown process when there are no online actors
Expand Down Expand Up @@ -1106,7 +1124,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) {
x.logger.Info("enabling clustering...")

// create an instance of the cluster service and start it
cluster, err := cluster.New(x.Name(),
cluster, err := cluster.NewNode(x.Name(),
x.serviceDiscovery,
cluster.WithLogger(x.logger),
cluster.WithPartitionsCount(x.partitionsCount),
Expand Down Expand Up @@ -1139,11 +1157,15 @@ func (x *actorSystem) enableClustering(ctx context.Context) {
x.sem.Lock()
// set the cluster field
x.cluster = cluster
// set the cluster events channel
x.eventsChan = cluster.Events()
// set the remoting host and port
x.remotingHost = cluster.NodeHost()
x.remotingPort = int32(cluster.NodeRemotingPort())
// release the lock
x.sem.Unlock()
// start listening to cluster events
go x.broadcastClusterEvents()
// start broadcasting cluster message
go x.broadcast(ctx)
// add some logging
Expand Down Expand Up @@ -1241,6 +1263,7 @@ func (x *actorSystem) reset() {
x.telemetry = nil
x.actors = newPIDMap(10)
x.name = ""
x.cluster = nil
}

// broadcast publishes newly created actor into the cluster when cluster is enabled
Expand Down Expand Up @@ -1323,3 +1346,27 @@ func (x *actorSystem) registerMetrics() error {

return err
}

// broadcastClusterEvents listens to cluster events
func (x *actorSystem) broadcastClusterEvents() {
// read from the channel
for event := range x.eventsChan {
// when cluster is enabled
if x.clusterEnabled.Load() {
// only push cluster event when defined
if event != nil && event.Type != nil {
// unpack the event
message, _ := event.Type.UnmarshalNew()
// push the cluster event when event stream is set
if x.eventsStream != nil {
// add some debug log
x.logger.Debugf("node=(%s) publishing cluster event=(%s)....", x.name, event.Type.GetTypeUrl())
// send the event to the event streams
x.eventsStream.Publish(eventsTopic, message)
// add some debug log
x.logger.Debugf("cluster event=(%s) successfully published by node=(%s)", event.Type.GetTypeUrl(), x.name)
}
}
}
}
}
101 changes: 99 additions & 2 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ func TestActorSystem(t *testing.T) {
err := sys.Start(ctx)
assert.NoError(t, err)

// create a deadletter subscriber
consumer, err := sys.Subscribe()
require.NoError(t, err)
require.NotNil(t, consumer)

actorName := "Exchanger"
actorRef, err := sys.Spawn(ctx, actorName, &Exchanger{})
assert.NoError(t, err)
Expand All @@ -429,6 +434,18 @@ func TestActorSystem(t *testing.T) {
time.Sleep(time.Second)
require.True(t, actorRef.IsRunning())

var items []*eventspb.ActorRestarted
for message := range consumer.Iterator() {
payload := message.Payload()
// only listening to deadletters
restarted, ok := payload.(*eventspb.ActorRestarted)
if ok {
items = append(items, restarted)
}
}

require.Len(t, items, 1)

t.Cleanup(func() {
err = sys.Stop(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -879,8 +896,8 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, consumer)

// create the black hole actor
actor := &BlackHole{}
actorRef, err := sys.Spawn(ctx, "BlackHole ", actor)
actor := &Discarder{}
actorRef, err := sys.Spawn(ctx, "Discarder ", actor)
assert.NoError(t, err)
assert.NotNil(t, actorRef)

Expand Down Expand Up @@ -1091,4 +1108,84 @@ func TestActorSystem(t *testing.T) {
assert.NoError(t, err)
})
})
t.Run("With cluster events subscription", func(t *testing.T) {
// create a context
ctx := context.TODO()
// start the NATS server
srv := startNatsServer(t)

// create and start system cluster
cl1, sd1 := startClusterSystem(t, "Node1", srv.Addr().String())
peerAddress1 := cl1.PeerAddress()
require.NotEmpty(t, peerAddress1)

// create a subscriber to node 1
subscriber1, err := cl1.Subscribe()
require.NoError(t, err)
require.NotNil(t, subscriber1)

// create and start system cluster
cl2, sd2 := startClusterSystem(t, "Node2", srv.Addr().String())
peerAddress2 := cl2.PeerAddress()
require.NotEmpty(t, peerAddress2)

// create a subscriber to node 2
subscriber2, err := cl2.Subscribe()
require.NoError(t, err)
require.NotNil(t, subscriber2)

// wait for some time
time.Sleep(time.Second)

// capture the joins
var joins []*eventspb.NodeJoined
for event := range subscriber1.Iterator() {
// get the event payload
payload := event.Payload()
// only listening to cluster event
nodeJoined, ok := payload.(*eventspb.NodeJoined)
require.True(t, ok)
joins = append(joins, nodeJoined)
}

// assert the joins list
require.NotEmpty(t, joins)
require.Len(t, joins, 1)
require.Equal(t, peerAddress2, joins[0].GetAddress())

// wait for some time
time.Sleep(time.Second)

// stop the node
require.NoError(t, cl1.Unsubscribe(subscriber1))
assert.NoError(t, cl1.Stop(ctx))
assert.NoError(t, sd1.Close())

// wait for some time
time.Sleep(time.Second)

var lefts []*eventspb.NodeLeft
for event := range subscriber2.Iterator() {
payload := event.Payload()

// only listening to cluster event
nodeLeft, ok := payload.(*eventspb.NodeLeft)
require.True(t, ok)
lefts = append(lefts, nodeLeft)
}

require.NotEmpty(t, lefts)
require.Len(t, lefts, 1)
require.Equal(t, peerAddress1, lefts[0].GetAddress())

require.NoError(t, cl2.Unsubscribe(subscriber2))

t.Cleanup(func() {
assert.NoError(t, cl2.Stop(ctx))
// stop the discovery engines
assert.NoError(t, sd2.Close())
// shutdown the nats server gracefully
srv.Shutdown()
})
})
}
107 changes: 102 additions & 5 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,20 @@ package actors

import (
"context"
"os"
"strconv"
"sync"
"testing"
"time"

natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/tochemey/goakt/discovery"
"github.com/tochemey/goakt/discovery/nats"
"github.com/tochemey/goakt/log"
testspb "github.com/tochemey/goakt/test/data/pb/v1"
"github.com/travisjeffery/go-dynaport"
"go.uber.org/atomic"
"go.uber.org/goleak"
)
Expand All @@ -40,6 +48,8 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/go-redis/redis/v8/internal/pool.(*ConnPool).reaper"),
goleak.IgnoreTopFunction("golang.org/x/net/http2.(*serverConn).serve"),
goleak.IgnoreTopFunction("github.com/nats-io/nats%2ego.(*Conn).doReconnect"),
goleak.IgnoreTopFunction("sync.runtime_notifyListWait"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"))
}

Expand Down Expand Up @@ -349,21 +359,108 @@ func (x *Forwarder) PostStop(context.Context) error {

var _ Actor = &Forwarder{}

type BlackHole struct{}
type Discarder struct{}

func (d *BlackHole) PreStart(context.Context) error {
var _ Actor = &Discarder{}

func (d *Discarder) PreStart(context.Context) error {
return nil
}

func (d *BlackHole) Receive(ctx ReceiveContext) {
func (d *Discarder) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
default:
ctx.Unhandled()
}
}

func (d *BlackHole) PostStop(context.Context) error {
func (d *Discarder) PostStop(context.Context) error {
return nil
}

var _ Actor = &BlackHole{}
func startNatsServer(t *testing.T) *natsserver.Server {
t.Helper()
serv, err := natsserver.NewServer(&natsserver.Options{
Host: "127.0.0.1",
Port: -1,
})

require.NoError(t, err)

ready := make(chan bool)
go func() {
ready <- true
serv.Start()
}()
<-ready

if !serv.ReadyForConnections(2 * time.Second) {
t.Fatalf("nats-io server failed to start")
}

return serv
}

func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem, discovery.Provider) {
ctx := context.TODO()
logger := log.New(log.DebugLevel, os.Stdout)

// generate the ports for the single startNode
nodePorts := dynaport.Get(3)
gossipPort := nodePorts[0]
clusterPort := nodePorts[1]
remotingPort := nodePorts[2]

// create a Cluster startNode
host := "127.0.0.1"
// set the environments
require.NoError(t, os.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)))
require.NoError(t, os.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)))
require.NoError(t, os.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)))
require.NoError(t, os.Setenv("NODE_NAME", nodeName))
require.NoError(t, os.Setenv("NODE_IP", host))

// create the various config option
applicationName := "accounts"
actorSystemName := "testSystem"
natsSubject := "some-subject"
// create the instance of provider
provider := nats.NewDiscovery()

// create the config
config := discovery.Config{
nats.ApplicationName: applicationName,
nats.ActorSystemName: actorSystemName,
nats.NatsServer: serverAddr,
nats.NatsSubject: natsSubject,
}

// create the sd
sd := discovery.NewServiceDiscovery(provider, config)

// create the actor system
system, err := NewActorSystem(
nodeName,
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithClustering(sd, 10))

require.NotNil(t, system)
require.NoError(t, err)

// start the node
require.NoError(t, system.Start(ctx))

// clear the env var
require.NoError(t, os.Unsetenv("GOSSIP_PORT"))
require.NoError(t, os.Unsetenv("CLUSTER_PORT"))
require.NoError(t, os.Unsetenv("REMOTING_PORT"))
require.NoError(t, os.Unsetenv("NODE_NAME"))
require.NoError(t, os.Unsetenv("NODE_IP"))

time.Sleep(2 * time.Second)

// return the cluster startNode
return system, provider
}
Loading

0 comments on commit 1ab0d9a

Please sign in to comment.