Skip to content

Commit

Permalink
feat: make command handler returns one or more events
Browse files Browse the repository at this point in the history
BREAKING CHANGE: HandleCommand return an array of events instead of a single event
  • Loading branch information
Tochemey committed Mar 16, 2024
1 parent 262e336 commit eccfcc3
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 100 deletions.
124 changes: 65 additions & 59 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,16 @@ func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) {

// processCommandAndReply processes the incoming command
func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
// set the go context
goCtx := ctx.Context()
// pass the received command to the command handler
event, err := entity.HandleCommand(goCtx, command, entity.currentState)
// handle the command handler error
events, err := entity.HandleCommand(goCtx, command, entity.currentState)
if err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
return
}

// if the event is nil nothing is persisted, and we return no reply
if event == nil {
// get the current state and marshal it
// no-op when there are no events to process
if len(events) == 0 {
resultingState, _ := anypb.New(entity.currentState)
// create the command reply to send out
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
StateReply: &egopb.StateReply{
Expand All @@ -239,80 +233,92 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman
},
},
}
// send the response
ctx.Response(reply)
return
}

// process the event by calling the event handler
resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
// handle the event handler error
if err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
return
}

// increment the event counter
entity.eventsCounter.Inc()

// set the current state for the next command
entity.currentState = resultingState

// marshal the event and the resulting state
marshaledEvent, _ := anypb.New(event)
marshaledState, _ := anypb.New(resultingState)

sequenceNumber := entity.eventsCounter.Load()
timestamp := timestamppb.Now()
entity.lastCommandTime = timestamp.AsTime()
envelopesChan := make(chan *egopb.Event, 1)
eg, goCtx := errgroup.WithContext(goCtx)
shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID())

// create the event
envelope := &egopb.Event{
PersistenceId: entity.ID(),
SequenceNumber: sequenceNumber,
IsDeleted: false,
Event: marshaledEvent,
ResultingState: marshaledState,
Timestamp: entity.lastCommandTime.Unix(),
Shard: shardNumber,
}

// create a journal list
journals := []*egopb.Event{envelope}

// define the topic for the given shard
topic := fmt.Sprintf(eventsTopic, shardNumber)

// publish to the event stream and persist the event to the events store
eg, goCtx := errgroup.WithContext(goCtx)

// publish the message to the topic
// 1-> process each event
// 2-> increment the events (processed) counter of the entity
// 3-> set the current state of the entity to the resulting state
// 4-> set the latCommandTime of the entity with the current timestamp
// 5-> marshal the resulting state and event to build the persistence envelope
// 6-> push the envelope to the envelope channel for downstream processing
eg.Go(func() error {
entity.eventsStream.Publish(topic, envelope)
defer close(envelopesChan)
for _, event := range events {
resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
if err != nil {
entity.sendErrorReply(ctx, err)
return err
}

entity.eventsCounter.Inc()
entity.currentState = resultingState
entity.lastCommandTime = timestamppb.Now().AsTime()

event, _ := anypb.New(event)
state, _ := anypb.New(resultingState)

envelope := &egopb.Event{
PersistenceId: entity.ID(),
SequenceNumber: entity.eventsCounter.Load(),
IsDeleted: false,
Event: event,
ResultingState: state,
Timestamp: entity.lastCommandTime.Unix(),
Shard: shardNumber,
}

select {
case envelopesChan <- envelope:
case <-goCtx.Done():
return goCtx.Err()
}
}
return nil
})

// persist the event to the events store
// 1-> publish the processed event
// 2-> build the list of envelopes for persistence
// 3-> persist the batch of envelopes when the channel is closed and return
eg.Go(func() error {
return entity.eventsStore.WriteEvents(goCtx, journals)
var envelopes []*egopb.Event
for {
select {
case envelope, ok := <-envelopesChan:
// channel closed, persist the envelopes
if !ok {
return entity.eventsStore.WriteEvents(goCtx, envelopes)
}

entity.eventsStream.Publish(topic, envelope)
envelopes = append(envelopes, envelope)
case <-goCtx.Done():
return goCtx.Err()
}
}
})

// handle the persistence error
// wait for all go routines to complete
if err := eg.Wait(); err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
return
}

state, _ := anypb.New(entity.currentState)

// create the command reply to send
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
StateReply: &egopb.StateReply{
PersistenceId: entity.ID(),
State: marshaledState,
SequenceNumber: sequenceNumber,
State: state,
SequenceNumber: entity.eventsCounter.Load(),
Timestamp: entity.lastCommandTime.Unix(),
},
},
Expand Down
5 changes: 4 additions & 1 deletion behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ type EntityBehavior[T State] interface {
// Any decision should be solely based on the data passed in the commands and the state of the Behavior.
// In case of successful validation, one or more events expressing the mutations are persisted.
// Once the events are persisted, they are applied to the state producing a new valid state.
HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error)
// Every event emitted are processed one after the other in the same order they were emitted to guarantee consistency.
// It is at the discretion of the application developer to know in which order a given command should return the list of events
// This is really powerful when a command needs to return two events. For instance, an OpenAccount command can result in two events: one is AccountOpened and the second is AccountCredited
HandleCommand(ctx context.Context, command Command, priorState T) (events []Event, err error)
// HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it.
// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error)
Expand Down
18 changes: 11 additions & 7 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account {
}

// HandleCommand handles every command that is sent to the persistent behavior
func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (event Event, err error) {
func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (events []Event, err error) {
switch cmd := command.(type) {
case *samplepb.CreateAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
return []Event{
&samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
},
}, nil

case *samplepb.CreditAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
return []Event{
&samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
},
}, nil

default:
Expand Down
18 changes: 11 additions & 7 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account {
}

// HandleCommand handles every command that is sent to the persistent behavior
func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) {
func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (events []ego.Event, err error) {
switch cmd := command.(type) {
case *samplepb.CreateAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
return []ego.Event{
&samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
},
}, nil

case *samplepb.CreditAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
return []ego.Event{
&samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
},
}, nil

default:
Expand Down
18 changes: 11 additions & 7 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,24 @@ func (t *AccountEntityBehavior) InitialState() *testpb.Account {
return new(testpb.Account)
}

func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (event Event, err error) {
func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (events []Event, err error) {
switch cmd := command.(type) {
case *testpb.CreateAccount:
// TODO in production grid app validate the command using the prior state
return &testpb.AccountCreated{
AccountId: t.id,
AccountBalance: cmd.GetAccountBalance(),
return []Event{
&testpb.AccountCreated{
AccountId: t.id,
AccountBalance: cmd.GetAccountBalance(),
},
}, nil

case *testpb.CreditAccount:
if cmd.GetAccountId() == t.id {
return &testpb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
return []Event{
&testpb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
},
}, nil
}

Expand Down
42 changes: 23 additions & 19 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,25 +158,29 @@ func (a *AccountBehavior) InitialState() *samplepb.Account {
}

// HandleCommand handles every command that is sent to the persistent behavior
func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) {
switch cmd := command.(type) {
case *samplepb.CreateAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
}, nil

case *samplepb.CreditAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
}, nil

default:
return nil, errors.New("unhandled command")
}
func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (events []Event, err error) {
switch cmd := command.(type) {
case *samplepb.CreateAccount:
// TODO in production grid app validate the command using the prior state
return []Event{
&samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
},
}, nil

case *samplepb.CreditAccount:
// TODO in production grid app validate the command using the prior state
return []Event{
&samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
},
}, nil

default:
return nil, errors.New("unhandled command")
}
}

// HandleEvent handles every event emitted
Expand Down

0 comments on commit eccfcc3

Please sign in to comment.