diff --git a/actor.go b/actor.go index 70b6e13..895c2fc 100644 --- a/actor.go +++ b/actor.go @@ -213,22 +213,16 @@ func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) { // processCommandAndReply processes the incoming command func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) { - // set the go context goCtx := ctx.Context() - // pass the received command to the command handler - event, err := entity.HandleCommand(goCtx, command, entity.currentState) - // handle the command handler error + events, err := entity.HandleCommand(goCtx, command, entity.currentState) if err != nil { - // send an error reply entity.sendErrorReply(ctx, err) return } - // if the event is nil nothing is persisted, and we return no reply - if event == nil { - // get the current state and marshal it + // no-op when there are no events to process + if len(events) == 0 { resultingState, _ := anypb.New(entity.currentState) - // create the command reply to send out reply := &egopb.CommandReply{ Reply: &egopb.CommandReply_StateReply{ StateReply: &egopb.StateReply{ @@ -239,85 +233,69 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman }, }, } - // send the response ctx.Response(reply) return } - // process the event by calling the event handler - resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState) - // handle the event handler error - if err != nil { - // send an error reply - entity.sendErrorReply(ctx, err) - return - } - - // increment the event counter - entity.eventsCounter.Inc() - - // set the current state for the next command - entity.currentState = resultingState - - // marshal the event and the resulting state - marshaledEvent, _ := anypb.New(event) - marshaledState, _ := anypb.New(resultingState) - - sequenceNumber := entity.eventsCounter.Load() - timestamp := timestamppb.Now() - entity.lastCommandTime = timestamp.AsTime() shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID()) + topic := fmt.Sprintf(eventsTopic, shardNumber) - // create the event - envelope := &egopb.Event{ - PersistenceId: entity.ID(), - SequenceNumber: sequenceNumber, - IsDeleted: false, - Event: marshaledEvent, - ResultingState: marshaledState, - Timestamp: entity.lastCommandTime.Unix(), - Shard: shardNumber, - } - - // create a journal list - journals := []*egopb.Event{envelope} + var envelopes []*egopb.Event + // process all events + for _, event := range events { + resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState) + if err != nil { + entity.sendErrorReply(ctx, err) + return + } - // define the topic for the given shard - topic := fmt.Sprintf(eventsTopic, shardNumber) + entity.eventsCounter.Inc() + entity.currentState = resultingState + entity.lastCommandTime = timestamppb.Now().AsTime() + + event, _ := anypb.New(event) + state, _ := anypb.New(resultingState) + + envelope := &egopb.Event{ + PersistenceId: entity.ID(), + SequenceNumber: entity.eventsCounter.Load(), + IsDeleted: false, + Event: event, + ResultingState: state, + Timestamp: entity.lastCommandTime.Unix(), + Shard: shardNumber, + } + envelopes = append(envelopes, envelope) + } - // publish to the event stream and persist the event to the events store eg, goCtx := errgroup.WithContext(goCtx) - - // publish the message to the topic eg.Go(func() error { - entity.eventsStream.Publish(topic, envelope) + for _, envelope := range envelopes { + entity.eventsStream.Publish(topic, envelope) + } return nil }) - // persist the event to the events store eg.Go(func() error { - return entity.eventsStore.WriteEvents(goCtx, journals) + return entity.eventsStore.WriteEvents(goCtx, envelopes) }) - // handle the persistence error if err := eg.Wait(); err != nil { - // send an error reply entity.sendErrorReply(ctx, err) return } - // create the command reply to send + state, _ := anypb.New(entity.currentState) reply := &egopb.CommandReply{ Reply: &egopb.CommandReply_StateReply{ StateReply: &egopb.StateReply{ PersistenceId: entity.ID(), - State: marshaledState, - SequenceNumber: sequenceNumber, + State: state, + SequenceNumber: entity.eventsCounter.Load(), Timestamp: entity.lastCommandTime.Unix(), }, }, } - // send the response ctx.Response(reply) } diff --git a/actor_test.go b/actor_test.go index 49a656b..e4407c4 100644 --- a/actor_test.go +++ b/actor_test.go @@ -40,13 +40,12 @@ import ( "github.com/tochemey/goakt/actors" "github.com/tochemey/goakt/log" "github.com/tochemey/gopack/postgres" - "go.uber.org/goleak" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" ) func TestActor(t *testing.T) { t.Run("with state reply", func(t *testing.T) { - defer goleak.VerifyNone(t) ctx := context.TODO() // create an actor system actorSystem, err := actors.NewActorSystem("TestActorSystem", @@ -144,7 +143,6 @@ func TestActor(t *testing.T) { assert.NoError(t, err) }) t.Run("with error reply", func(t *testing.T) { - defer goleak.VerifyNone(t) ctx := context.TODO() // create an actor system @@ -230,7 +228,6 @@ func TestActor(t *testing.T) { assert.NoError(t, err) }) t.Run("with unhandled command", func(t *testing.T) { - defer goleak.VerifyNone(t) ctx := context.TODO() // create an actor system @@ -554,4 +551,81 @@ func TestActor(t *testing.T) { err = actorSystem.Stop(ctx) assert.NoError(t, err) }) + t.Run("with unhandled event", func(t *testing.T) { + ctx := context.TODO() + // create an actor system + actorSystem, err := actors.NewActorSystem("TestActorSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(log.DiscardLogger), + actors.WithActorInitMaxRetries(3)) + require.NoError(t, err) + assert.NotNil(t, actorSystem) + + // start the actor system + err = actorSystem.Start(ctx) + require.NoError(t, err) + + // create the event store + eventStore := memory.NewEventsStore() + // create a persistence id + persistenceID := uuid.NewString() + // create the persistence behavior + behavior := NewAccountEntityBehavior(persistenceID) + + // connect the event store + err = eventStore.Connect(ctx) + require.NoError(t, err) + + // create an instance of events stream + eventStream := eventstream.New() + + // create the persistence actor using the behavior previously created + actor := newActor[*testpb.Account](behavior, eventStore, eventStream) + // spawn the actor + pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) + require.NotNil(t, pid) + + // send the command to the actor + reply, err := actors.Ask(ctx, pid, &testpb.CreateAccount{AccountBalance: 500.00}, 5*time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply := reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + state := commandReply.GetReply().(*egopb.CommandReply_StateReply) + assert.EqualValues(t, 1, state.StateReply.GetSequenceNumber()) + + // marshal the resulting state + resultingState := new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + + expected := &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 500.00, + } + assert.True(t, proto.Equal(expected, resultingState)) + + reply, err = actors.Ask(ctx, pid, new(emptypb.Empty), 5*time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply = reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_ErrorReply), commandReply.GetReply()) + + errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply) + assert.Equal(t, "unhandled event", errorReply.ErrorReply.GetMessage()) + + // disconnect from the event store + require.NoError(t, eventStore.Disconnect(ctx)) + + // close the stream + eventStream.Close() + // stop the actor system + err = actorSystem.Stop(ctx) + assert.NoError(t, err) + }) } diff --git a/behavior.go b/behavior.go index 18fa850..127e777 100644 --- a/behavior.go +++ b/behavior.go @@ -50,7 +50,10 @@ type EntityBehavior[T State] interface { // Any decision should be solely based on the data passed in the commands and the state of the Behavior. // In case of successful validation, one or more events expressing the mutations are persisted. // Once the events are persisted, they are applied to the state producing a new valid state. - HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error) + // Every event emitted are processed one after the other in the same order they were emitted to guarantee consistency. + // It is at the discretion of the application developer to know in which order a given command should return the list of events + // This is really powerful when a command needs to return two events. For instance, an OpenAccount command can result in two events: one is AccountOpened and the second is AccountCredited + HandleCommand(ctx context.Context, command Command, priorState T) (events []Event, err error) // HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it. // Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal. HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error) diff --git a/engine_test.go b/engine_test.go index 58ff887..594b7b1 100644 --- a/engine_test.go +++ b/engine_test.go @@ -240,20 +240,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account { } // HandleCommand handles every command that is sent to the persistent behavior -func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (event Event, err error) { +func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (events []Event, err error) { switch cmd := command.(type) { case *samplepb.CreateAccount: // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCreated{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetAccountBalance(), + return []Event{ + &samplepb.AccountCreated{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetAccountBalance(), + }, }, nil case *samplepb.CreditAccount: // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCredited{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetBalance(), + return []Event{ + &samplepb.AccountCredited{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetBalance(), + }, }, nil default: diff --git a/example/main.go b/example/main.go index f51a3e6..92de04a 100644 --- a/example/main.go +++ b/example/main.go @@ -113,20 +113,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account { } // HandleCommand handles every command that is sent to the persistent behavior -func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) { +func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (events []ego.Event, err error) { switch cmd := command.(type) { case *samplepb.CreateAccount: // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCreated{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetAccountBalance(), + return []ego.Event{ + &samplepb.AccountCreated{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetAccountBalance(), + }, }, nil case *samplepb.CreditAccount: // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCredited{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetBalance(), + return []ego.Event{ + &samplepb.AccountCredited{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetBalance(), + }, }, nil default: diff --git a/helper_test.go b/helper_test.go index b1e510b..e4b98ef 100644 --- a/helper_test.go +++ b/helper_test.go @@ -29,6 +29,7 @@ import ( "github.com/pkg/errors" testpb "github.com/tochemey/ego/test/data/pb/v1" + "google.golang.org/protobuf/types/known/emptypb" ) // AccountEntityBehavior implement EntityBehavior @@ -51,20 +52,24 @@ func (t *AccountEntityBehavior) InitialState() *testpb.Account { return new(testpb.Account) } -func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (event Event, err error) { +func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (events []Event, err error) { switch cmd := command.(type) { case *testpb.CreateAccount: // TODO in production grid app validate the command using the prior state - return &testpb.AccountCreated{ - AccountId: t.id, - AccountBalance: cmd.GetAccountBalance(), + return []Event{ + &testpb.AccountCreated{ + AccountId: t.id, + AccountBalance: cmd.GetAccountBalance(), + }, }, nil case *testpb.CreditAccount: if cmd.GetAccountId() == t.id { - return &testpb.AccountCredited{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetBalance(), + return []Event{ + &testpb.AccountCredited{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetBalance(), + }, }, nil } @@ -73,6 +78,9 @@ func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command case *testpb.TestNoEvent: return nil, nil + case *emptypb.Empty: + return []Event{new(emptypb.Empty)}, nil + default: return nil, errors.New("unhandled command") } diff --git a/readme.md b/readme.md index 7e882aa..308a748 100644 --- a/readme.md +++ b/readme.md @@ -158,25 +158,29 @@ func (a *AccountBehavior) InitialState() *samplepb.Account { } // HandleCommand handles every command that is sent to the persistent behavior -func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) { - switch cmd := command.(type) { - case *samplepb.CreateAccount: - // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCreated{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetAccountBalance(), - }, nil - - case *samplepb.CreditAccount: - // TODO in production grid app validate the command using the prior state - return &samplepb.AccountCredited{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetBalance(), - }, nil - - default: - return nil, errors.New("unhandled command") - } +func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (events []Event, err error) { + switch cmd := command.(type) { + case *samplepb.CreateAccount: + // TODO in production grid app validate the command using the prior state + return []Event{ + &samplepb.AccountCreated{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetAccountBalance(), + }, + }, nil + + case *samplepb.CreditAccount: + // TODO in production grid app validate the command using the prior state + return []Event{ + &samplepb.AccountCredited{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetBalance(), + }, + }, nil + + default: + return nil, errors.New("unhandled command") + } } // HandleEvent handles every event emitted