diff --git a/actor.go b/actor.go index a18883e..895c2fc 100644 --- a/actor.go +++ b/actor.go @@ -237,82 +237,55 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman return } - envelopesChan := make(chan *egopb.Event, 1) - eg, goCtx := errgroup.WithContext(goCtx) shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID()) topic := fmt.Sprintf(eventsTopic, shardNumber) - // 1-> process each event - // 2-> increment the events (processed) counter of the entity - // 3-> set the current state of the entity to the resulting state - // 4-> set the latCommandTime of the entity with the current timestamp - // 5-> marshal the resulting state and event to build the persistence envelope - // 6-> push the envelope to the envelope channel for downstream processing + var envelopes []*egopb.Event + // process all events + for _, event := range events { + resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState) + if err != nil { + entity.sendErrorReply(ctx, err) + return + } + + entity.eventsCounter.Inc() + entity.currentState = resultingState + entity.lastCommandTime = timestamppb.Now().AsTime() + + event, _ := anypb.New(event) + state, _ := anypb.New(resultingState) + + envelope := &egopb.Event{ + PersistenceId: entity.ID(), + SequenceNumber: entity.eventsCounter.Load(), + IsDeleted: false, + Event: event, + ResultingState: state, + Timestamp: entity.lastCommandTime.Unix(), + Shard: shardNumber, + } + envelopes = append(envelopes, envelope) + } + + eg, goCtx := errgroup.WithContext(goCtx) eg.Go(func() error { - defer close(envelopesChan) - for _, event := range events { - resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState) - if err != nil { - entity.sendErrorReply(ctx, err) - return err - } - - entity.eventsCounter.Inc() - entity.currentState = resultingState - entity.lastCommandTime = timestamppb.Now().AsTime() - - event, _ := anypb.New(event) - state, _ := anypb.New(resultingState) - - envelope := &egopb.Event{ - PersistenceId: entity.ID(), - SequenceNumber: entity.eventsCounter.Load(), - IsDeleted: false, - Event: event, - ResultingState: state, - Timestamp: entity.lastCommandTime.Unix(), - Shard: shardNumber, - } - - select { - case envelopesChan <- envelope: - case <-goCtx.Done(): - return goCtx.Err() - } + for _, envelope := range envelopes { + entity.eventsStream.Publish(topic, envelope) } return nil }) - // 1-> publish the processed event - // 2-> build the list of envelopes for persistence - // 3-> persist the batch of envelopes when the channel is closed and return eg.Go(func() error { - var envelopes []*egopb.Event - for { - select { - case envelope, ok := <-envelopesChan: - // channel closed, persist the envelopes - if !ok { - return entity.eventsStore.WriteEvents(goCtx, envelopes) - } - - entity.eventsStream.Publish(topic, envelope) - envelopes = append(envelopes, envelope) - case <-goCtx.Done(): - return goCtx.Err() - } - } + return entity.eventsStore.WriteEvents(goCtx, envelopes) }) - // wait for all go routines to complete if err := eg.Wait(); err != nil { entity.sendErrorReply(ctx, err) return } state, _ := anypb.New(entity.currentState) - - // create the command reply to send reply := &egopb.CommandReply{ Reply: &egopb.CommandReply_StateReply{ StateReply: &egopb.StateReply{ @@ -324,6 +297,5 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman }, } - // send the response ctx.Response(reply) } diff --git a/actor_test.go b/actor_test.go index 9da3c5d..e4407c4 100644 --- a/actor_test.go +++ b/actor_test.go @@ -40,14 +40,12 @@ import ( "github.com/tochemey/goakt/actors" "github.com/tochemey/goakt/log" "github.com/tochemey/gopack/postgres" - "go.uber.org/goleak" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" ) func TestActor(t *testing.T) { t.Run("with state reply", func(t *testing.T) { - defer goleak.VerifyNone(t) ctx := context.TODO() // create an actor system actorSystem, err := actors.NewActorSystem("TestActorSystem", @@ -145,7 +143,6 @@ func TestActor(t *testing.T) { assert.NoError(t, err) }) t.Run("with error reply", func(t *testing.T) { - defer goleak.VerifyNone(t) ctx := context.TODO() // create an actor system @@ -231,7 +228,6 @@ func TestActor(t *testing.T) { assert.NoError(t, err) }) t.Run("with unhandled command", func(t *testing.T) { - defer goleak.VerifyNone(t) ctx := context.TODO() // create an actor system @@ -556,7 +552,6 @@ func TestActor(t *testing.T) { assert.NoError(t, err) }) t.Run("with unhandled event", func(t *testing.T) { - defer goleak.VerifyNone(t) ctx := context.TODO() // create an actor system actorSystem, err := actors.NewActorSystem("TestActorSystem", @@ -624,16 +619,13 @@ func TestActor(t *testing.T) { errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply) assert.Equal(t, "unhandled event", errorReply.ErrorReply.GetMessage()) - // disconnect the events store - err = eventStore.Disconnect(ctx) - require.NoError(t, err) + // disconnect from the event store + require.NoError(t, eventStore.Disconnect(ctx)) - t.Cleanup(func() { - // close the stream - eventStream.Close() - // stop the actor system - err = actorSystem.Stop(ctx) - assert.NoError(t, err) - }) + // close the stream + eventStream.Close() + // stop the actor system + err = actorSystem.Stop(ctx) + assert.NoError(t, err) }) }