diff --git a/actor.go b/actor.go index 70b6e13..a18883e 100644 --- a/actor.go +++ b/actor.go @@ -213,22 +213,16 @@ func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) { // processCommandAndReply processes the incoming command func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) { - // set the go context goCtx := ctx.Context() - // pass the received command to the command handler - event, err := entity.HandleCommand(goCtx, command, entity.currentState) - // handle the command handler error + events, err := entity.HandleCommand(goCtx, command, entity.currentState) if err != nil { - // send an error reply entity.sendErrorReply(ctx, err) return } - // if the event is nil nothing is persisted, and we return no reply - if event == nil { - // get the current state and marshal it + // no-op when there are no events to process + if len(events) == 0 { resultingState, _ := anypb.New(entity.currentState) - // create the command reply to send out reply := &egopb.CommandReply{ Reply: &egopb.CommandReply_StateReply{ StateReply: &egopb.StateReply{ @@ -239,80 +233,92 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman }, }, } - // send the response ctx.Response(reply) return } - // process the event by calling the event handler - resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState) - // handle the event handler error - if err != nil { - // send an error reply - entity.sendErrorReply(ctx, err) - return - } - - // increment the event counter - entity.eventsCounter.Inc() - - // set the current state for the next command - entity.currentState = resultingState - - // marshal the event and the resulting state - marshaledEvent, _ := anypb.New(event) - marshaledState, _ := anypb.New(resultingState) - - sequenceNumber := entity.eventsCounter.Load() - timestamp := timestamppb.Now() - entity.lastCommandTime = timestamp.AsTime() + envelopesChan := make(chan *egopb.Event, 1) + eg, goCtx := errgroup.WithContext(goCtx) shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID()) - - // create the event - envelope := &egopb.Event{ - PersistenceId: entity.ID(), - SequenceNumber: sequenceNumber, - IsDeleted: false, - Event: marshaledEvent, - ResultingState: marshaledState, - Timestamp: entity.lastCommandTime.Unix(), - Shard: shardNumber, - } - - // create a journal list - journals := []*egopb.Event{envelope} - - // define the topic for the given shard topic := fmt.Sprintf(eventsTopic, shardNumber) - // publish to the event stream and persist the event to the events store - eg, goCtx := errgroup.WithContext(goCtx) - - // publish the message to the topic + // 1-> process each event + // 2-> increment the events (processed) counter of the entity + // 3-> set the current state of the entity to the resulting state + // 4-> set the latCommandTime of the entity with the current timestamp + // 5-> marshal the resulting state and event to build the persistence envelope + // 6-> push the envelope to the envelope channel for downstream processing eg.Go(func() error { - entity.eventsStream.Publish(topic, envelope) + defer close(envelopesChan) + for _, event := range events { + resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState) + if err != nil { + entity.sendErrorReply(ctx, err) + return err + } + + entity.eventsCounter.Inc() + entity.currentState = resultingState + entity.lastCommandTime = timestamppb.Now().AsTime() + + event, _ := anypb.New(event) + state, _ := anypb.New(resultingState) + + envelope := &egopb.Event{ + PersistenceId: entity.ID(), + SequenceNumber: entity.eventsCounter.Load(), + IsDeleted: false, + Event: event, + ResultingState: state, + Timestamp: entity.lastCommandTime.Unix(), + Shard: shardNumber, + } + + select { + case envelopesChan <- envelope: + case <-goCtx.Done(): + return goCtx.Err() + } + } return nil }) - // persist the event to the events store + // 1-> publish the processed event + // 2-> build the list of envelopes for persistence + // 3-> persist the batch of envelopes when the channel is closed and return eg.Go(func() error { - return entity.eventsStore.WriteEvents(goCtx, journals) + var envelopes []*egopb.Event + for { + select { + case envelope, ok := <-envelopesChan: + // channel closed, persist the envelopes + if !ok { + return entity.eventsStore.WriteEvents(goCtx, envelopes) + } + + entity.eventsStream.Publish(topic, envelope) + envelopes = append(envelopes, envelope) + case <-goCtx.Done(): + return goCtx.Err() + } + } }) - // handle the persistence error + // wait for all go routines to complete if err := eg.Wait(); err != nil { - // send an error reply entity.sendErrorReply(ctx, err) return } + state, _ := anypb.New(entity.currentState) + // create the command reply to send reply := &egopb.CommandReply{ Reply: &egopb.CommandReply_StateReply{ StateReply: &egopb.StateReply{ PersistenceId: entity.ID(), - State: marshaledState, - SequenceNumber: sequenceNumber, + State: state, + SequenceNumber: entity.eventsCounter.Load(), Timestamp: entity.lastCommandTime.Unix(), }, }, diff --git a/behavior.go b/behavior.go index 18fa850..127e777 100644 --- a/behavior.go +++ b/behavior.go @@ -50,7 +50,10 @@ type EntityBehavior[T State] interface { // Any decision should be solely based on the data passed in the commands and the state of the Behavior. // In case of successful validation, one or more events expressing the mutations are persisted. // Once the events are persisted, they are applied to the state producing a new valid state. - HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error) + // Every event emitted are processed one after the other in the same order they were emitted to guarantee consistency. + // It is at the discretion of the application developer to know in which order a given command should return the list of events + // This is really powerful when a command needs to return two events. For instance, an OpenAccount command can result in two events: one is AccountOpened and the second is AccountCredited + HandleCommand(ctx context.Context, command Command, priorState T) (events []Event, err error) // HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it. // Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal. HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error) diff --git a/engine_test.go b/engine_test.go index 58ff887..594b7b1 100644 --- a/engine_test.go +++ b/engine_test.go @@ -240,20 +240,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account { } // HandleCommand handles every command that is sent to the persistent behavior -func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (event Event, err error) { +func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (events []Event, err error) { switch cmd := command.(type) { case *samplepb.CreateAccount: // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCreated{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetAccountBalance(), + return []Event{ + &samplepb.AccountCreated{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetAccountBalance(), + }, }, nil case *samplepb.CreditAccount: // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCredited{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetBalance(), + return []Event{ + &samplepb.AccountCredited{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetBalance(), + }, }, nil default: diff --git a/example/main.go b/example/main.go index f51a3e6..92de04a 100644 --- a/example/main.go +++ b/example/main.go @@ -113,20 +113,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account { } // HandleCommand handles every command that is sent to the persistent behavior -func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) { +func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (events []ego.Event, err error) { switch cmd := command.(type) { case *samplepb.CreateAccount: // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCreated{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetAccountBalance(), + return []ego.Event{ + &samplepb.AccountCreated{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetAccountBalance(), + }, }, nil case *samplepb.CreditAccount: // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCredited{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetBalance(), + return []ego.Event{ + &samplepb.AccountCredited{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetBalance(), + }, }, nil default: diff --git a/helper_test.go b/helper_test.go index b1e510b..50f5351 100644 --- a/helper_test.go +++ b/helper_test.go @@ -51,20 +51,24 @@ func (t *AccountEntityBehavior) InitialState() *testpb.Account { return new(testpb.Account) } -func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (event Event, err error) { +func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (events []Event, err error) { switch cmd := command.(type) { case *testpb.CreateAccount: // TODO in production grid app validate the command using the prior state - return &testpb.AccountCreated{ - AccountId: t.id, - AccountBalance: cmd.GetAccountBalance(), + return []Event{ + &testpb.AccountCreated{ + AccountId: t.id, + AccountBalance: cmd.GetAccountBalance(), + }, }, nil case *testpb.CreditAccount: if cmd.GetAccountId() == t.id { - return &testpb.AccountCredited{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetBalance(), + return []Event{ + &testpb.AccountCredited{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetBalance(), + }, }, nil } diff --git a/readme.md b/readme.md index 7e882aa..308a748 100644 --- a/readme.md +++ b/readme.md @@ -158,25 +158,29 @@ func (a *AccountBehavior) InitialState() *samplepb.Account { } // HandleCommand handles every command that is sent to the persistent behavior -func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) { - switch cmd := command.(type) { - case *samplepb.CreateAccount: - // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCreated{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetAccountBalance(), - }, nil - - case *samplepb.CreditAccount: - // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCredited{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetBalance(), - }, nil - - default: - return nil, errors.New("unhandled command") - } +func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (events []Event, err error) { + switch cmd := command.(type) { + case *samplepb.CreateAccount: + // TODO in production grid app validate the command using the prior state + return []Event{ + &samplepb.AccountCreated{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetAccountBalance(), + }, + }, nil + + case *samplepb.CreditAccount: + // TODO in production grid app validate the command using the prior state + return []Event{ + &samplepb.AccountCredited{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetBalance(), + }, + }, nil + + default: + return nil, errors.New("unhandled command") + } } // HandleEvent handles every event emitted