Skip to content

Commit

Permalink
feat: add spawn option to entities creation (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Feb 3, 2025
1 parent bcc2c66 commit b8baa65
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 47 deletions.
213 changes: 173 additions & 40 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (
"sync"
"time"

"github.com/tochemey/goakt/v2/address"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"

"github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/address"
"github.com/tochemey/goakt/v2/discovery"
"github.com/tochemey/goakt/v2/log"
"github.com/tochemey/goakt/v2/remote"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"

"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstream"
Expand Down Expand Up @@ -79,7 +79,19 @@ type Engine struct {
remoting *actors.Remoting
}

// NewEngine creates an instance of Engine
// NewEngine creates and initializes a new instance of the eGo engine.
//
// This function constructs an engine with the specified name and event store, applying any additional
// configuration options. The engine serves as the core for managing event-sourced entities, durable
// state entities, and projections.
//
// Parameters:
// - name: A unique identifier for the engine instance.
// - eventsStore: The event store responsible for persisting and retrieving events.
// - opts: Optional configurations to customize engine behavior.
//
// Returns:
// - A pointer to the newly created Engine instance.
func NewEngine(name string, eventsStore persistence.EventsStore, opts ...Option) *Engine {
e := &Engine{
name: name,
Expand All @@ -99,7 +111,17 @@ func NewEngine(name string, eventsStore persistence.EventsStore, opts ...Option)
return e
}

// Start starts the ego engine
// Start initializes and starts eGo engine.
//
// This function launches the engine, enabling it to manage event-sourced entities, durable state entities,
// and projections. It ensures that all necessary components are initialized and ready to process commands
// and events.
//
// Parameters:
// - ctx: Execution context for managing startup behavior and handling cancellations.
//
// Returns:
// - An error if the engine fails to start due to misconfiguration or system issues; otherwise, nil.
func (engine *Engine) Start(ctx context.Context) error {
opts := []actors.Option{
actors.WithLogger(engine.logger),
Expand Down Expand Up @@ -132,7 +154,7 @@ func (engine *Engine) Start(ctx context.Context) error {

opts = append(opts,
actors.WithCluster(clusterConfig),
actors.WithRemoting(engine.hostName, int32(engine.remotingPort)))
actors.WithRemote(remote.NewConfig(engine.hostName, engine.remotingPort)))
}

var err error
Expand All @@ -150,7 +172,23 @@ func (engine *Engine) Start(ctx context.Context) error {
return nil
}

// AddProjection add a projection to the running eGo engine and starts it
// AddProjection registers a new projection with the eGo engine and starts its execution.
//
// The projection processes events from the events store applying the specified handler to manage state updates
// based on incoming events. The provided offset store ensures the projection maintains its processing position
// across restarts.
//
// Key behavior:
// - Projections once created, will persist for the entire lifespan of the running eGo system.
//
// Parameters:
// - ctx: Execution context used for cancellation and deadlines.
// - name: A unique identifier for the projection.
// - handler: The event handler responsible for processing events and updating the projection state.
// - offsetStore: The storage mechanism for tracking the last processed event, ensuring resumability.
// - opts: Optional configuration settings that modify projection behavior.
//
// Returns an error if the projection fails to start due to misconfiguration or underlying system issues.
func (engine *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error {
if !engine.Started() {
return ErrEngineNotStarted
Expand All @@ -162,14 +200,25 @@ func (engine *Engine) AddProjection(ctx context.Context, name string, handler pr
actorSystem := engine.actorSystem
engine.mutex.Unlock()

if _, err := actorSystem.Spawn(ctx, name, actor); err != nil {
// projections are long-lived actors
if _, err := actorSystem.Spawn(ctx, name, actor, actors.WithLongLived()); err != nil {
return fmt.Errorf("failed to register the projection=(%s): %w", name, err)
}

return nil
}

// RemoveProjection stops and removes a given projection from the engine
// RemoveProjection stops and removes the specified projection from the engine.
//
// This function gracefully shuts down the projection identified by `name` and removes it from the system.
// Any in-progress processing will be stopped, and the projection will no longer receive events.
//
// Parameters:
// - ctx: Execution context for managing cancellation and timeouts.
// - name: The unique identifier of the projection to be removed.
//
// Returns:
// - An error if the projection fails to stop or does not exist; otherwise, nil.
func (engine *Engine) RemoveProjection(ctx context.Context, name string) error {
if !engine.Started() {
return ErrEngineNotStarted
Expand All @@ -182,8 +231,19 @@ func (engine *Engine) RemoveProjection(ctx context.Context, name string) error {
return actorSystem.Kill(ctx, name)
}

// IsProjectionRunning returns true when the projection is active and running
// One needs to check the error to see whether this function does not return a false negative
// IsProjectionRunning checks whether the specified projection is currently active and running.
//
// This function returns `true` if the projection identified by `name` is running. However, callers should
// always check the returned error to ensure the result is valid, as an error may indicate an inability to
// determine the projection's status.
//
// Parameters:
// - ctx: Execution context for managing timeouts and cancellations.
// - name: The unique identifier of the projection.
//
// Returns:
// - A boolean indicating whether the projection is running (`true` if active, `false` otherwise).
// - An error if the status check fails, which may result in a false negative.
func (engine *Engine) IsProjectionRunning(ctx context.Context, name string) (bool, error) {
if !engine.Started() {
return false, ErrEngineNotStarted
Expand All @@ -204,7 +264,17 @@ func (engine *Engine) IsProjectionRunning(ctx context.Context, name string) (boo
return addr.Equals(address.NoSender()), nil
}

// Stop stops the ego engine
// Stop gracefully shuts down the eGo engine.
//
// This function ensures that all running projections, entities, and processes managed by the engine
// are properly stopped before termination. It waits for ongoing operations to complete or time out
// based on the provided context.
//
// Parameters:
// - ctx: Execution context for managing cancellation and timeouts.
//
// Returns:
// - An error if the shutdown process encounters issues; otherwise, nil.
func (engine *Engine) Stop(ctx context.Context) error {
engine.started.Store(false)
engine.eventStream.Close()
Expand Down Expand Up @@ -237,17 +307,29 @@ func (engine *Engine) Subscribe() (eventstream.Subscriber, error) {
return subscriber, nil
}

// Entity creates an event sourced entity.
// Entity persists its full state into an events store that tracks the history based upon events that occurred.
// An event sourced entity receives a (non-persistent) command which is first validated if it can be applied to the current state.
// Here validation can mean anything, from simple inspection of a command message’s fields up to a conversation with several external services, for instance.
// If validation succeeds, events are generated from the command, representing the outcome of the command.
// These events are then persisted and, after successful persistence, used to change the actor’s state.
// When the event sourced entity needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied.
// In other words, events cannot fail when being replayed to a persistent actor, in contrast to commands.
// When there are no events to persist the event sourced entity will return the current state of the entity.
// One can use the SendCommand to send a command a durable state entity.
func (engine *Engine) Entity(ctx context.Context, behavior EventSourcedBehavior) error {
// Entity creates an event-sourced entity that persists its state by storing a history of events.
//
// This entity follows the event sourcing pattern, where changes to its state are driven by events rather than direct mutations.
// It processes incoming commands, validates them, generates corresponding events upon successful validation, and persists those
// events before applying them to update its state.
//
// Key Behavior:
// - Commands: The entity receives commands (non-persistent messages) that are validated before being processed.
// - Validation: This can range from simple field checks to interactions with external services.
// - Events: If validation succeeds, events are derived from the command, persisted in the event store, and then used to update the entity’s state.
// - Recovery: During recovery, only persisted events are replayed to rebuild the entity’s state, ensuring deterministic behavior.
// - Command vs. Event: Commands may be rejected if they are invalid, while events—once persisted—cannot fail during replay.
//
// If no new events are generated, the entity simply returns its current state.
// To interact with the entity, use SendCommand to send commands to a durable event-sourced entity.
//
// Parameters:
// - ctx: Execution context for controlling the lifecycle of the entity.
// - behavior: Defines the entity’s event-sourced behavior, including command handling and state transitions.
// - opts: Additional spawning options to configure entity behavior.
//
// Returns an error if the entity fails to initialize or encounters an issue during execution.
func (engine *Engine) Entity(ctx context.Context, behavior EventSourcedBehavior, opts ...SpawnOption) error {
if !engine.Started() {
return ErrEngineNotStarted
}
Expand All @@ -258,25 +340,49 @@ func (engine *Engine) Entity(ctx context.Context, behavior EventSourcedBehavior)
eventStream := engine.eventStream
engine.mutex.Unlock()

config := newSpawnConfig(opts...)
var sOptions []actors.SpawnOption

switch {
case config.passivateAfter > 0:
sOptions = append(sOptions, actors.WithPassivateAfter(config.passivateAfter))
default:
sOptions = append(sOptions, actors.WithLongLived())
}

_, err := actorSystem.Spawn(ctx,
behavior.ID(),
newEventSourcedActor(behavior, eventsStore, eventStream))
newEventSourcedActor(behavior, eventsStore, eventStream),
sOptions...)
if err != nil {
return err
}

return nil
}

// DurableStateEntity creates a durable state entity.
// A DurableStateEntity persists its full state into a durable store without any history of the state evolution.
// A durable state entity receives a (non-persistent) command which is first validated if it can be applied to the current state.
// Here validation can mean anything, from simple inspection of a command message’s fields up to a conversation with several external services, for instance.
// If validation succeeds, a new state is generated from the command, representing the outcome of the command.
// The new state is persisted and, after successful persistence, used to change the actor’s state.
// During a normal shutdown process, it will persist its current state to the durable store prior to shutting down.
// One can use the SendCommand to send a command a durable state entity.
func (engine *Engine) DurableStateEntity(ctx context.Context, behavior DurableStateBehavior) error {
// DurableStateEntity creates an entity that persists its full state in a durable store without maintaining historical event records.
//
// Unlike an event-sourced entity, a durable state entity does not track past state changes as a sequence of events. Instead, it
// directly stores and updates its current state in a durable store.
//
// Key Behavior:
// - Commands: The entity receives non-persistent commands that are validated before being processed.
// - Validation: This can range from simple field checks to complex interactions with external services.
// - State Updates: If validation succeeds, a new state is derived from the command, persisted, and then applied to update the entity’s state.
// - Persistence: The latest state is always stored, ensuring that only the most recent version is retained.
// - Recovery: Upon restart, the entity reloads its last persisted state, rather than replaying a sequence of past events.
// - Shutdown Handling: During a normal shutdown, the entity ensures that its current state is persisted before termination.
//
// To interact with the entity, use SendCommand to send commands and update the durable state.
//
// Parameters:
// - ctx: Execution context for controlling the entity’s lifecycle.
// - behavior: Defines the entity’s behavior, including command handling and state transitions.
// - opts: Additional spawning options to configure entity behavior.
//
// Returns an error if the entity fails to initialize or encounters an issue during execution.
func (engine *Engine) DurableStateEntity(ctx context.Context, behavior DurableStateBehavior, opts ...SpawnOption) error {
if !engine.Started() {
return ErrEngineNotStarted
}
Expand All @@ -291,21 +397,48 @@ func (engine *Engine) DurableStateEntity(ctx context.Context, behavior DurableSt
return ErrDurableStateStoreRequired
}

config := newSpawnConfig(opts...)
var sOptions []actors.SpawnOption

switch {
case config.passivateAfter > 0:
sOptions = append(sOptions, actors.WithPassivateAfter(config.passivateAfter))
default:
sOptions = append(sOptions, actors.WithLongLived())
}

_, err := actorSystem.Spawn(ctx,
behavior.ID(),
newDurableStateActor(behavior, durableStateStore, eventStream))
newDurableStateActor(behavior, durableStateStore, eventStream),
sOptions...)
if err != nil {
return err
}

return nil
}

// SendCommand sends command to a given entity ref.
// This will return:
// 1. the resulting state after the command has been handled and the emitted event/durable state persisted
// 2. nil when there is no resulting state or no event persisted
// 3. an error in case of error
// SendCommand sends a command to the specified entity and processes its response.
//
// This function dispatches a command to an entity identified by `entityID`. The entity validates the command, applies
// any necessary state changes, and persists the resulting state if applicable. The function returns the updated state,
// a revision number, or an error if the operation fails.
//
// Behavior:
// - If the command is successfully processed and results in a state update, the new state and its revision number are returned.
// - If no state update occurs (i.e., no event is persisted or the command does not trigger a change), `nil` is returned.
// - If an error occurs during processing, the function returns a non-nil error.
//
// Parameters:
// - ctx: Execution context for handling timeouts and cancellations.
// - entityID: The unique identifier of the target entity.
// - cmd: The command to be processed by the entity.
// - timeout: The duration within which the command must be processed before timing out.
//
// Returns:
// - resultingState: The updated state of the entity after handling the command, or `nil` if no state change occurred.
// - revision: A monotonically increasing revision number representing the persisted state version.
// - err: An error if the command processing fails.
func (engine *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) {
if !engine.Started() {
return nil, 0, ErrEngineNotStarted
Expand Down
6 changes: 2 additions & 4 deletions event_sourced_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,14 @@ func (entity *eventSourcedActor) PreStart(ctx context.Context) error {
return fmt.Errorf("failed to connect to the events store: %w", err)
}

return nil
return entity.recoverFromSnapshot(ctx)
}

// Receive processes any message dropped into the actor mailbox.
func (entity *eventSourcedActor) Receive(ctx *actors.ReceiveContext) {
switch command := ctx.Message().(type) {
case *goaktpb.PostStart:
if err := entity.recoverFromSnapshot(ctx.Context()); err != nil {
ctx.Err(fmt.Errorf("failed to recover from snapshot: %w", err))
}
// pass
case *egopb.GetStateCommand:
entity.getStateAndReply(ctx)
default:
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/flowchartsman/retry v1.2.0
github.com/google/uuid v1.6.0
github.com/stretchr/testify v1.10.0
github.com/tochemey/goakt/v2 v2.12.1
github.com/tochemey/goakt/v2 v2.13.0
github.com/travisjeffery/go-dynaport v1.0.0
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
Expand Down Expand Up @@ -49,6 +49,7 @@ require (
github.com/hashicorp/memberlist v0.5.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
github.com/miekg/dns v1.1.63 // indirect
Expand All @@ -61,6 +62,7 @@ require (
github.com/redis/go-redis/v9 v9.7.0 // indirect
github.com/reugn/go-quartz v0.13.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tidwall/btree v1.7.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -239,8 +241,8 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/redcon v1.6.2 h1:5qfvrrybgtO85jnhSravmkZyC0D+7WstbfCs3MmPhow=
github.com/tidwall/redcon v1.6.2/go.mod h1:p5Wbsgeyi2VSTBWOcA5vRXrOb9arFTcU2+ZzFjqV75Y=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tochemey/goakt/v2 v2.12.1 h1:d7FkdnMB0aQQTiKZPHoC7jpvr7xP+0YSxp7+XDv9h6M=
github.com/tochemey/goakt/v2 v2.12.1/go.mod h1:zGcDYeKJi5KIBzN1lvJxcjQf3ce6fT7OFxoMNkyDAj4=
github.com/tochemey/goakt/v2 v2.13.0 h1:yOiXVuqllHGe4YvvjkBmy/zyGE+o2m4rKw786XEi5XM=
github.com/tochemey/goakt/v2 v2.13.0/go.mod h1:oFq9MUV1w3dCB2gEpy1onsfS/9m8EJ06F2c290o2Nrk=
github.com/tochemey/olric v0.2.0-alpha h1:pj6haxwPhIuLcTGwhsSNEoryPdPXMcEIWBo0EJI8mMA=
github.com/tochemey/olric v0.2.0-alpha/go.mod h1:x0Q6soq7bk0rWSDFPzE4fA8a5m+MnWxHRHZzg/ywg0M=
github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=
Expand Down
Loading

0 comments on commit b8baa65

Please sign in to comment.