Skip to content

Commit

Permalink
feat: make command handler returns one or more events
Browse files Browse the repository at this point in the history
BREAKING CHANGE: HandleCommand return an array of events instead of a single event
  • Loading branch information
Tochemey committed Mar 17, 2024
1 parent 135bbb7 commit fb95c49
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 75 deletions.
92 changes: 32 additions & 60 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,82 +237,55 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman
return
}

envelopesChan := make(chan *egopb.Event, 1)
eg, goCtx := errgroup.WithContext(goCtx)
shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID())
topic := fmt.Sprintf(eventsTopic, shardNumber)

// 1-> process each event
// 2-> increment the events (processed) counter of the entity
// 3-> set the current state of the entity to the resulting state
// 4-> set the latCommandTime of the entity with the current timestamp
// 5-> marshal the resulting state and event to build the persistence envelope
// 6-> push the envelope to the envelope channel for downstream processing
var envelopes []*egopb.Event
// process all events
for _, event := range events {
resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
if err != nil {
entity.sendErrorReply(ctx, err)
return
}

entity.eventsCounter.Inc()
entity.currentState = resultingState
entity.lastCommandTime = timestamppb.Now().AsTime()

event, _ := anypb.New(event)
state, _ := anypb.New(resultingState)

envelope := &egopb.Event{
PersistenceId: entity.ID(),
SequenceNumber: entity.eventsCounter.Load(),
IsDeleted: false,
Event: event,
ResultingState: state,
Timestamp: entity.lastCommandTime.Unix(),
Shard: shardNumber,
}
envelopes = append(envelopes, envelope)
}

eg, goCtx := errgroup.WithContext(goCtx)
eg.Go(func() error {
defer close(envelopesChan)
for _, event := range events {
resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
if err != nil {
entity.sendErrorReply(ctx, err)
return err
}

entity.eventsCounter.Inc()
entity.currentState = resultingState
entity.lastCommandTime = timestamppb.Now().AsTime()

event, _ := anypb.New(event)
state, _ := anypb.New(resultingState)

envelope := &egopb.Event{
PersistenceId: entity.ID(),
SequenceNumber: entity.eventsCounter.Load(),
IsDeleted: false,
Event: event,
ResultingState: state,
Timestamp: entity.lastCommandTime.Unix(),
Shard: shardNumber,
}

select {
case envelopesChan <- envelope:
case <-goCtx.Done():
return goCtx.Err()
}
for _, envelope := range envelopes {
entity.eventsStream.Publish(topic, envelope)
}
return nil
})

// 1-> publish the processed event
// 2-> build the list of envelopes for persistence
// 3-> persist the batch of envelopes when the channel is closed and return
eg.Go(func() error {
var envelopes []*egopb.Event
for {
select {
case envelope, ok := <-envelopesChan:
// channel closed, persist the envelopes
if !ok {
return entity.eventsStore.WriteEvents(goCtx, envelopes)
}

entity.eventsStream.Publish(topic, envelope)
envelopes = append(envelopes, envelope)
case <-goCtx.Done():
return goCtx.Err()
}
}
return entity.eventsStore.WriteEvents(goCtx, envelopes)
})

// wait for all go routines to complete
if err := eg.Wait(); err != nil {
entity.sendErrorReply(ctx, err)
return
}

state, _ := anypb.New(entity.currentState)

// create the command reply to send
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
StateReply: &egopb.StateReply{
Expand All @@ -324,6 +297,5 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman
},
}

// send the response
ctx.Response(reply)
}
22 changes: 7 additions & 15 deletions actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ import (
"github.com/tochemey/goakt/actors"
"github.com/tochemey/goakt/log"
"github.com/tochemey/gopack/postgres"
"go.uber.org/goleak"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)

func TestActor(t *testing.T) {
t.Run("with state reply", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()
// create an actor system
actorSystem, err := actors.NewActorSystem("TestActorSystem",
Expand Down Expand Up @@ -145,7 +143,6 @@ func TestActor(t *testing.T) {
assert.NoError(t, err)
})
t.Run("with error reply", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()

// create an actor system
Expand Down Expand Up @@ -231,7 +228,6 @@ func TestActor(t *testing.T) {
assert.NoError(t, err)
})
t.Run("with unhandled command", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()

// create an actor system
Expand Down Expand Up @@ -556,7 +552,6 @@ func TestActor(t *testing.T) {
assert.NoError(t, err)
})
t.Run("with unhandled event", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()
// create an actor system
actorSystem, err := actors.NewActorSystem("TestActorSystem",
Expand Down Expand Up @@ -624,16 +619,13 @@ func TestActor(t *testing.T) {
errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply)
assert.Equal(t, "unhandled event", errorReply.ErrorReply.GetMessage())

// disconnect the events store
err = eventStore.Disconnect(ctx)
require.NoError(t, err)
// disconnect from the event store
require.NoError(t, eventStore.Disconnect(ctx))

t.Cleanup(func() {
// close the stream
eventStream.Close()
// stop the actor system
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
})
// close the stream
eventStream.Close()
// stop the actor system
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
})
}

0 comments on commit fb95c49

Please sign in to comment.