Skip to content

Commit

Permalink
refactor: cleanup the code to allow easy extensibility (#505)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Oct 27, 2024
1 parent 7209b33 commit 7296141
Show file tree
Hide file tree
Showing 33 changed files with 6,056 additions and 5,509 deletions.
260 changes: 173 additions & 87 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"errors"
"fmt"
"net"
stdhttp "net/http"
nethttp "net/http"
"os"
"regexp"
"strconv"
Expand All @@ -40,6 +40,8 @@ import (
"connectrpc.com/connect"
"github.com/google/uuid"
"go.uber.org/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand All @@ -49,7 +51,6 @@ import (
"github.com/tochemey/goakt/v2/hash"
"github.com/tochemey/goakt/v2/internal/cluster"
"github.com/tochemey/goakt/v2/internal/eventstream"
"github.com/tochemey/goakt/v2/internal/http"
"github.com/tochemey/goakt/v2/internal/internalpb"
"github.com/tochemey/goakt/v2/internal/internalpb/internalpbconnect"
"github.com/tochemey/goakt/v2/internal/tcp"
Expand Down Expand Up @@ -182,12 +183,14 @@ type actorSystem struct {
// Specifies whether remoting is enabled.
// This allows to handle remote messaging
remotingEnabled atomic.Bool
remoting *Remoting
// Specifies the remoting port
port int32
// Specifies the remoting host
host string
// Specifies the remoting server
remotingServer *stdhttp.Server
server *nethttp.Server
listener net.Listener

// cluster settings
clusterEnabled atomic.Bool
Expand Down Expand Up @@ -282,7 +285,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
}
}

system.scheduler = newScheduler(system.logger, system.shutdownTimeout, withSchedulerCluster(system.cluster))
system.scheduler = newScheduler(system.logger, system.shutdownTimeout, withSchedulerCluster(system.cluster), withSchedulerRemoting(NewRemoting()))
return system, nil
}

Expand Down Expand Up @@ -681,12 +684,14 @@ func (x *actorSystem) Stop(ctx context.Context) error {
defer cancel()

if x.remotingEnabled.Load() {
if err := x.remotingServer.Shutdown(ctx); err != nil {
x.remoting.Close()
if err := x.shutdownHTTPServer(ctx); err != nil {
return err
}

x.remotingEnabled.Store(false)
x.remotingServer = nil
x.server = nil
x.listener = nil
}

if x.clusterEnabled.Load() {
Expand Down Expand Up @@ -833,42 +838,46 @@ func (x *actorSystem) RemoteTell(ctx context.Context, stream *connect.ClientStre
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(2)

eg.Go(func() error {
defer close(requestc)
for stream.Receive() {
select {
case requestc <- stream.Msg():
case <-ctx.Done():
logger.Error(ctx.Err())
return connect.NewError(connect.CodeCanceled, ctx.Err())
eg.Go(
func() error {
defer close(requestc)
for stream.Receive() {
select {
case requestc <- stream.Msg():
case <-ctx.Done():
logger.Error(ctx.Err())
return connect.NewError(connect.CodeCanceled, ctx.Err())
}
}
}

if err := stream.Err(); err != nil {
logger.Error(err)
return connect.NewError(connect.CodeUnknown, err)
}

return nil
})

eg.Go(func() error {
for request := range requestc {
receiver := request.GetRemoteMessage().GetReceiver()
addr := address.New(receiver.GetName(), x.Name(), receiver.GetHost(), int(receiver.GetPort()))
pid, exist := x.actors.Get(addr)
if !exist {
logger.Error(ErrAddressNotFound(addr.String()).Error())
return ErrAddressNotFound(addr.String())
if err := stream.Err(); err != nil {
logger.Error(err)
return connect.NewError(connect.CodeUnknown, err)
}

if err := x.handleRemoteTell(ctx, pid, request.GetRemoteMessage()); err != nil {
logger.Error(ErrRemoteSendFailure(err))
return ErrRemoteSendFailure(err)
return nil
},
)

eg.Go(
func() error {
for request := range requestc {
receiver := request.GetRemoteMessage().GetReceiver()
addr := address.New(receiver.GetName(), x.Name(), receiver.GetHost(), int(receiver.GetPort()))
pid, exist := x.actors.Get(addr)
if !exist {
logger.Error(ErrAddressNotFound(addr.String()).Error())
return ErrAddressNotFound(addr.String())
}

if err := x.handleRemoteTell(ctx, pid, request.GetRemoteMessage()); err != nil {
logger.Error(ErrRemoteSendFailure(err))
return ErrRemoteSendFailure(err)
}
}
}
return nil
})
return nil
},
)

if err := eg.Wait(); err != nil {
return nil, err
Expand Down Expand Up @@ -953,8 +962,10 @@ func (x *actorSystem) RemoteSpawn(ctx context.Context, request *connect.Request[

actor, err := x.reflection.ActorFrom(msg.GetActorType())
if err != nil {
logger.Errorf("failed to create actor=[(%s) of type (%s)] on [host=%s, port=%d]: reason: (%v)",
msg.GetActorName(), msg.GetActorType(), msg.GetHost(), msg.GetPort(), err)
logger.Errorf(
"failed to create actor=[(%s) of type (%s)] on [host=%s, port=%d]: reason: (%v)",
msg.GetActorName(), msg.GetActorType(), msg.GetHost(), msg.GetPort(), err,
)

if errors.Is(err, ErrTypeNotRegistered) {
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrTypeNotRegistered)
Expand Down Expand Up @@ -986,10 +997,12 @@ func (x *actorSystem) GetNodeMetric(_ context.Context, request *connect.Request[
}

actorCount := x.actors.Size()
return connect.NewResponse(&internalpb.GetNodeMetricResponse{
NodeRemoteAddress: remoteAddr,
ActorsCount: uint64(actorCount),
}), nil
return connect.NewResponse(
&internalpb.GetNodeMetricResponse{
NodeRemoteAddress: remoteAddr,
ActorsCount: uint64(actorCount),
},
), nil
}

// GetKinds returns the cluster kinds
Expand Down Expand Up @@ -1084,9 +1097,11 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
}

bootstrapChan := make(chan struct{}, 1)
timer := time.AfterFunc(time.Second, func() {
bootstrapChan <- struct{}{}
})
timer := time.AfterFunc(
time.Second, func() {
bootstrapChan <- struct{}{}
},
)
<-bootstrapChan
timer.Stop()

Expand Down Expand Up @@ -1126,23 +1141,28 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {
remotingServicePath, remotingServiceHandler := internalpbconnect.NewRemotingServiceHandler(x)
clusterServicePath, clusterServiceHandler := internalpbconnect.NewClusterServiceHandler(x)

mux := stdhttp.NewServeMux()
mux := nethttp.NewServeMux()
mux.Handle(remotingServicePath, remotingServiceHandler)
mux.Handle(clusterServicePath, clusterServiceHandler)
server := http.NewServer(ctx, x.host, remotingPort, mux)

x.locker.Lock()
// configure the appropriate server
if err := x.configureServer(ctx, mux); err != nil {
x.locker.Unlock()
x.logger.Panic(fmt.Errorf("failed enable remoting: %w", err))
return
}
x.locker.Unlock()

go func() {
if err := server.ListenAndServe(); err != nil {
if !errors.Is(err, stdhttp.ErrServerClosed) {
if err := x.startHTTPServer(); err != nil {
if !errors.Is(err, nethttp.ErrServerClosed) {
x.logger.Panic(fmt.Errorf("failed to start remoting service: %w", err))
}
}
}()

x.locker.Lock()
x.remotingServer = server
x.locker.Unlock()

x.remoting = NewRemoting()
x.logger.Info("remoting enabled...:)")
}

Expand Down Expand Up @@ -1235,37 +1255,41 @@ func (x *actorSystem) peersStateLoop() {

peersChan := make(chan *cluster.Peer)

eg.Go(func() error {
defer close(peersChan)
peers, err := x.cluster.Peers(ctx)
if err != nil {
return err
}

for _, peer := range peers {
select {
case peersChan <- peer:
case <-ctx.Done():
return ctx.Err()
eg.Go(
func() error {
defer close(peersChan)
peers, err := x.cluster.Peers(ctx)
if err != nil {
return err
}
}
return nil
})

eg.Go(func() error {
for peer := range peersChan {
if err := x.processPeerState(ctx, peer); err != nil {
return err
for _, peer := range peers {
select {
case peersChan <- peer:
case <-ctx.Done():
return ctx.Err()
}
}
select {
case <-ctx.Done():
return ctx.Err()
default:
// pass
return nil
},
)

eg.Go(
func() error {
for peer := range peersChan {
if err := x.processPeerState(ctx, peer); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
// pass
}
}
}
return nil
})
return nil
},
)

if err := eg.Wait(); err != nil {
x.logger.Error(err)
Expand Down Expand Up @@ -1365,10 +1389,12 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
pidOpts = append(pidOpts, withPassivationAfter(x.expireActorAfter))
}

pid, err := newPID(ctx,
pid, err := newPID(
ctx,
addr,
actor,
pidOpts...)
pidOpts...,
)

if err != nil {
return nil, err
Expand All @@ -1379,17 +1405,21 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
// getSystemActorName returns the system supervisor name
func (x *actorSystem) getSystemActorName(nameType nameType) string {
if x.remotingEnabled.Load() {
return fmt.Sprintf("%s%s%s-%d-%d",
return fmt.Sprintf(
"%s%s%s-%d-%d",
systemNames[nameType],
strings.ToTitle(x.name),
x.host,
x.port,
time.Now().UnixNano())
time.Now().UnixNano(),
)
}
return fmt.Sprintf("%s%s-%d",
return fmt.Sprintf(
"%s%s-%d",
systemNames[nameType],
strings.ToTitle(x.name),
time.Now().UnixNano())
time.Now().UnixNano(),
)
}

func isSystemName(name string) bool {
Expand All @@ -1400,3 +1430,59 @@ func isSystemName(name string) bool {
func (x *actorSystem) actorAddress(name string) *address.Address {
return address.New(name, x.name, x.host, int(x.port))
}

// startHTTPServer starts the appropriate http server
func (x *actorSystem) startHTTPServer() error {
return x.server.Serve(x.listener)
}

// shutdownHTTPServer stops the appropriate http server
func (x *actorSystem) shutdownHTTPServer(ctx context.Context) error {
return x.server.Shutdown(ctx)
}

// configureServer configure the various http server and listeners based upon the various settings
func (x *actorSystem) configureServer(ctx context.Context, mux *nethttp.ServeMux) error {
hostPort := net.JoinHostPort(x.host, strconv.Itoa(int(x.port)))
httpServer := getServer(ctx, hostPort)
// create a tcp listener
lnr, err := net.Listen("tcp", hostPort)
if err != nil {
return err
}

// set the http server
x.server = httpServer
// For gRPC clients, it's convenient to support HTTP/2 without TLS.
x.server.Handler = h2c.NewHandler(
mux, &http2.Server{
IdleTimeout: 1200 * time.Second,
},
)
// set the non-secure http server
x.listener = lnr
return nil
}

// getServer creates an instance of http server
func getServer(ctx context.Context, address string) *nethttp.Server {
return &nethttp.Server{
Addr: address,
// The maximum duration for reading the entire request, including the body.
// It’s implemented in net/http by calling SetReadDeadline immediately after Accept
// ReadTimeout := handler_timeout + ReadHeaderTimeout + wiggle_room
ReadTimeout: 3 * time.Second,
// ReadHeaderTimeout is the amount of time allowed to read request headers
ReadHeaderTimeout: time.Second,
// WriteTimeout is the maximum duration before timing out writes of the response.
// It is reset whenever a new request’s header is read.
// This effectively covers the lifetime of the ServeHTTP handler stack
WriteTimeout: time.Second,
// IdleTimeout is the maximum amount of time to wait for the next request when keep-alive are enabled.
// If IdleTimeout is zero, the value of ReadTimeout is used. Not relevant to request timeouts
IdleTimeout: 1200 * time.Second,
BaseContext: func(_ net.Listener) context.Context {
return ctx
},
}
}
Loading

0 comments on commit 7296141

Please sign in to comment.