Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cleanup the code to allow easy extensibility #505

Merged
merged 14 commits into from
Oct 27, 2024
Merged
260 changes: 173 additions & 87 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"errors"
"fmt"
"net"
stdhttp "net/http"
nethttp "net/http"
"os"
"regexp"
"strconv"
Expand All @@ -40,6 +40,8 @@ import (
"connectrpc.com/connect"
"github.com/google/uuid"
"go.uber.org/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand All @@ -49,7 +51,6 @@ import (
"github.com/tochemey/goakt/v2/hash"
"github.com/tochemey/goakt/v2/internal/cluster"
"github.com/tochemey/goakt/v2/internal/eventstream"
"github.com/tochemey/goakt/v2/internal/http"
"github.com/tochemey/goakt/v2/internal/internalpb"
"github.com/tochemey/goakt/v2/internal/internalpb/internalpbconnect"
"github.com/tochemey/goakt/v2/internal/tcp"
Expand Down Expand Up @@ -182,12 +183,14 @@ type actorSystem struct {
// Specifies whether remoting is enabled.
// This allows to handle remote messaging
remotingEnabled atomic.Bool
remoting *Remoting
// Specifies the remoting port
port int32
// Specifies the remoting host
host string
// Specifies the remoting server
remotingServer *stdhttp.Server
server *nethttp.Server
listener net.Listener

// cluster settings
clusterEnabled atomic.Bool
Expand Down Expand Up @@ -282,7 +285,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
}
}

system.scheduler = newScheduler(system.logger, system.shutdownTimeout, withSchedulerCluster(system.cluster))
system.scheduler = newScheduler(system.logger, system.shutdownTimeout, withSchedulerCluster(system.cluster), withSchedulerRemoting(NewRemoting()))
return system, nil
}

Expand Down Expand Up @@ -681,12 +684,14 @@ func (x *actorSystem) Stop(ctx context.Context) error {
defer cancel()

if x.remotingEnabled.Load() {
if err := x.remotingServer.Shutdown(ctx); err != nil {
x.remoting.Close()
if err := x.shutdownHTTPServer(ctx); err != nil {
return err
}

x.remotingEnabled.Store(false)
x.remotingServer = nil
x.server = nil
x.listener = nil
}

if x.clusterEnabled.Load() {
Expand Down Expand Up @@ -833,42 +838,46 @@ func (x *actorSystem) RemoteTell(ctx context.Context, stream *connect.ClientStre
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(2)

eg.Go(func() error {
defer close(requestc)
for stream.Receive() {
select {
case requestc <- stream.Msg():
case <-ctx.Done():
logger.Error(ctx.Err())
return connect.NewError(connect.CodeCanceled, ctx.Err())
eg.Go(
func() error {
defer close(requestc)
for stream.Receive() {
select {
case requestc <- stream.Msg():
case <-ctx.Done():
logger.Error(ctx.Err())
return connect.NewError(connect.CodeCanceled, ctx.Err())
}
}
}

if err := stream.Err(); err != nil {
logger.Error(err)
return connect.NewError(connect.CodeUnknown, err)
}

return nil
})

eg.Go(func() error {
for request := range requestc {
receiver := request.GetRemoteMessage().GetReceiver()
addr := address.New(receiver.GetName(), x.Name(), receiver.GetHost(), int(receiver.GetPort()))
pid, exist := x.actors.Get(addr)
if !exist {
logger.Error(ErrAddressNotFound(addr.String()).Error())
return ErrAddressNotFound(addr.String())
if err := stream.Err(); err != nil {
logger.Error(err)
return connect.NewError(connect.CodeUnknown, err)
}

if err := x.handleRemoteTell(ctx, pid, request.GetRemoteMessage()); err != nil {
logger.Error(ErrRemoteSendFailure(err))
return ErrRemoteSendFailure(err)
return nil
},
)

eg.Go(
func() error {
for request := range requestc {
receiver := request.GetRemoteMessage().GetReceiver()
addr := address.New(receiver.GetName(), x.Name(), receiver.GetHost(), int(receiver.GetPort()))
pid, exist := x.actors.Get(addr)
if !exist {
logger.Error(ErrAddressNotFound(addr.String()).Error())
return ErrAddressNotFound(addr.String())
}

if err := x.handleRemoteTell(ctx, pid, request.GetRemoteMessage()); err != nil {
logger.Error(ErrRemoteSendFailure(err))
return ErrRemoteSendFailure(err)
}
}
}
return nil
})
return nil
},
)

if err := eg.Wait(); err != nil {
return nil, err
Expand Down Expand Up @@ -953,8 +962,10 @@ func (x *actorSystem) RemoteSpawn(ctx context.Context, request *connect.Request[

actor, err := x.reflection.ActorFrom(msg.GetActorType())
if err != nil {
logger.Errorf("failed to create actor=[(%s) of type (%s)] on [host=%s, port=%d]: reason: (%v)",
msg.GetActorName(), msg.GetActorType(), msg.GetHost(), msg.GetPort(), err)
logger.Errorf(
"failed to create actor=[(%s) of type (%s)] on [host=%s, port=%d]: reason: (%v)",
msg.GetActorName(), msg.GetActorType(), msg.GetHost(), msg.GetPort(), err,
)

if errors.Is(err, ErrTypeNotRegistered) {
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrTypeNotRegistered)
Expand Down Expand Up @@ -986,10 +997,12 @@ func (x *actorSystem) GetNodeMetric(_ context.Context, request *connect.Request[
}

actorCount := x.actors.Size()
return connect.NewResponse(&internalpb.GetNodeMetricResponse{
NodeRemoteAddress: remoteAddr,
ActorsCount: uint64(actorCount),
}), nil
return connect.NewResponse(
&internalpb.GetNodeMetricResponse{
NodeRemoteAddress: remoteAddr,
ActorsCount: uint64(actorCount),
},
), nil
}

// GetKinds returns the cluster kinds
Expand Down Expand Up @@ -1084,9 +1097,11 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
}

bootstrapChan := make(chan struct{}, 1)
timer := time.AfterFunc(time.Second, func() {
bootstrapChan <- struct{}{}
})
timer := time.AfterFunc(
time.Second, func() {
bootstrapChan <- struct{}{}
},
)
<-bootstrapChan
timer.Stop()

Expand Down Expand Up @@ -1126,23 +1141,28 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {
remotingServicePath, remotingServiceHandler := internalpbconnect.NewRemotingServiceHandler(x)
clusterServicePath, clusterServiceHandler := internalpbconnect.NewClusterServiceHandler(x)

mux := stdhttp.NewServeMux()
mux := nethttp.NewServeMux()
mux.Handle(remotingServicePath, remotingServiceHandler)
mux.Handle(clusterServicePath, clusterServiceHandler)
server := http.NewServer(ctx, x.host, remotingPort, mux)

x.locker.Lock()
// configure the appropriate server
if err := x.configureServer(ctx, mux); err != nil {
x.locker.Unlock()
x.logger.Panic(fmt.Errorf("failed enable remoting: %w", err))
return
}
x.locker.Unlock()

go func() {
if err := server.ListenAndServe(); err != nil {
if !errors.Is(err, stdhttp.ErrServerClosed) {
if err := x.startHTTPServer(); err != nil {
if !errors.Is(err, nethttp.ErrServerClosed) {
x.logger.Panic(fmt.Errorf("failed to start remoting service: %w", err))
}
}
}()

x.locker.Lock()
x.remotingServer = server
x.locker.Unlock()

x.remoting = NewRemoting()
x.logger.Info("remoting enabled...:)")
}

Expand Down Expand Up @@ -1235,37 +1255,41 @@ func (x *actorSystem) peersStateLoop() {

peersChan := make(chan *cluster.Peer)

eg.Go(func() error {
defer close(peersChan)
peers, err := x.cluster.Peers(ctx)
if err != nil {
return err
}

for _, peer := range peers {
select {
case peersChan <- peer:
case <-ctx.Done():
return ctx.Err()
eg.Go(
func() error {
defer close(peersChan)
peers, err := x.cluster.Peers(ctx)
if err != nil {
return err
}
}
return nil
})

eg.Go(func() error {
for peer := range peersChan {
if err := x.processPeerState(ctx, peer); err != nil {
return err
for _, peer := range peers {
select {
case peersChan <- peer:
case <-ctx.Done():
return ctx.Err()
}
}
select {
case <-ctx.Done():
return ctx.Err()
default:
// pass
return nil
},
)

eg.Go(
func() error {
for peer := range peersChan {
if err := x.processPeerState(ctx, peer); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
// pass
}
}
}
return nil
})
return nil
},
)

if err := eg.Wait(); err != nil {
x.logger.Error(err)
Expand Down Expand Up @@ -1365,10 +1389,12 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
pidOpts = append(pidOpts, withPassivationAfter(x.expireActorAfter))
}

pid, err := newPID(ctx,
pid, err := newPID(
ctx,
addr,
actor,
pidOpts...)
pidOpts...,
)

if err != nil {
return nil, err
Expand All @@ -1379,17 +1405,21 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
// getSystemActorName returns the system supervisor name
func (x *actorSystem) getSystemActorName(nameType nameType) string {
if x.remotingEnabled.Load() {
return fmt.Sprintf("%s%s%s-%d-%d",
return fmt.Sprintf(
"%s%s%s-%d-%d",
systemNames[nameType],
strings.ToTitle(x.name),
x.host,
x.port,
time.Now().UnixNano())
time.Now().UnixNano(),
)
}
return fmt.Sprintf("%s%s-%d",
return fmt.Sprintf(
"%s%s-%d",
systemNames[nameType],
strings.ToTitle(x.name),
time.Now().UnixNano())
time.Now().UnixNano(),
)
}

func isSystemName(name string) bool {
Expand All @@ -1400,3 +1430,59 @@ func isSystemName(name string) bool {
func (x *actorSystem) actorAddress(name string) *address.Address {
return address.New(name, x.name, x.host, int(x.port))
}

// startHTTPServer starts the appropriate http server
func (x *actorSystem) startHTTPServer() error {
return x.server.Serve(x.listener)
}

// shutdownHTTPServer stops the appropriate http server
func (x *actorSystem) shutdownHTTPServer(ctx context.Context) error {
return x.server.Shutdown(ctx)
}

// configureServer configure the various http server and listeners based upon the various settings
func (x *actorSystem) configureServer(ctx context.Context, mux *nethttp.ServeMux) error {
hostPort := net.JoinHostPort(x.host, strconv.Itoa(int(x.port)))
httpServer := getServer(ctx, hostPort)
// create a tcp listener
lnr, err := net.Listen("tcp", hostPort)
if err != nil {
return err
}

// set the http server
x.server = httpServer
// For gRPC clients, it's convenient to support HTTP/2 without TLS.
x.server.Handler = h2c.NewHandler(
mux, &http2.Server{
IdleTimeout: 1200 * time.Second,
},
)
// set the non-secure http server
x.listener = lnr
return nil
}

// getServer creates an instance of http server
func getServer(ctx context.Context, address string) *nethttp.Server {
return &nethttp.Server{
Addr: address,
// The maximum duration for reading the entire request, including the body.
// It’s implemented in net/http by calling SetReadDeadline immediately after Accept
// ReadTimeout := handler_timeout + ReadHeaderTimeout + wiggle_room
ReadTimeout: 3 * time.Second,
// ReadHeaderTimeout is the amount of time allowed to read request headers
ReadHeaderTimeout: time.Second,
// WriteTimeout is the maximum duration before timing out writes of the response.
// It is reset whenever a new request’s header is read.
// This effectively covers the lifetime of the ServeHTTP handler stack
WriteTimeout: time.Second,
// IdleTimeout is the maximum amount of time to wait for the next request when keep-alive are enabled.
// If IdleTimeout is zero, the value of ReadTimeout is used. Not relevant to request timeouts
IdleTimeout: 1200 * time.Second,
BaseContext: func(_ net.Listener) context.Context {
return ctx
},
}
}
Loading