From eccfcc3895c70c75bd5d19fe8353c15cca2debb6 Mon Sep 17 00:00:00 2001
From: Tochemey <tochemey@hotmail.com>
Date: Sat, 16 Mar 2024 12:07:11 +0000
Subject: [PATCH 1/5] feat: make command handler returns one or more events

BREAKING CHANGE: HandleCommand return an array of events instead of a single event
---
 actor.go        | 124 +++++++++++++++++++++++++-----------------------
 behavior.go     |   5 +-
 engine_test.go  |  18 ++++---
 example/main.go |  18 ++++---
 helper_test.go  |  18 ++++---
 readme.md       |  42 ++++++++--------
 6 files changed, 125 insertions(+), 100 deletions(-)

diff --git a/actor.go b/actor.go
index 70b6e13..a18883e 100644
--- a/actor.go
+++ b/actor.go
@@ -213,22 +213,16 @@ func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) {
 
 // processCommandAndReply processes the incoming command
 func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
-	// set the go context
 	goCtx := ctx.Context()
-	// pass the received command to the command handler
-	event, err := entity.HandleCommand(goCtx, command, entity.currentState)
-	// handle the command handler error
+	events, err := entity.HandleCommand(goCtx, command, entity.currentState)
 	if err != nil {
-		// send an error reply
 		entity.sendErrorReply(ctx, err)
 		return
 	}
 
-	// if the event is nil nothing is persisted, and we return no reply
-	if event == nil {
-		// get the current state and marshal it
+	// no-op when there are no events to process
+	if len(events) == 0 {
 		resultingState, _ := anypb.New(entity.currentState)
-		// create the command reply to send out
 		reply := &egopb.CommandReply{
 			Reply: &egopb.CommandReply_StateReply{
 				StateReply: &egopb.StateReply{
@@ -239,80 +233,92 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman
 				},
 			},
 		}
-		// send the response
 		ctx.Response(reply)
 		return
 	}
 
-	// process the event by calling the event handler
-	resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
-	// handle the event handler error
-	if err != nil {
-		// send an error reply
-		entity.sendErrorReply(ctx, err)
-		return
-	}
-
-	// increment the event counter
-	entity.eventsCounter.Inc()
-
-	// set the current state for the next command
-	entity.currentState = resultingState
-
-	// marshal the event and the resulting state
-	marshaledEvent, _ := anypb.New(event)
-	marshaledState, _ := anypb.New(resultingState)
-
-	sequenceNumber := entity.eventsCounter.Load()
-	timestamp := timestamppb.Now()
-	entity.lastCommandTime = timestamp.AsTime()
+	envelopesChan := make(chan *egopb.Event, 1)
+	eg, goCtx := errgroup.WithContext(goCtx)
 	shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID())
-
-	// create the event
-	envelope := &egopb.Event{
-		PersistenceId:  entity.ID(),
-		SequenceNumber: sequenceNumber,
-		IsDeleted:      false,
-		Event:          marshaledEvent,
-		ResultingState: marshaledState,
-		Timestamp:      entity.lastCommandTime.Unix(),
-		Shard:          shardNumber,
-	}
-
-	// create a journal list
-	journals := []*egopb.Event{envelope}
-
-	// define the topic for the given shard
 	topic := fmt.Sprintf(eventsTopic, shardNumber)
 
-	// publish to the event stream and persist the event to the events store
-	eg, goCtx := errgroup.WithContext(goCtx)
-
-	// publish the message to the topic
+	// 1-> process each event
+	// 2-> increment the events (processed) counter of the entity
+	// 3-> set the current state of the entity to the resulting state
+	// 4-> set the latCommandTime of the entity with the current timestamp
+	// 5-> marshal the resulting state and event to build the persistence envelope
+	// 6-> push the envelope to the envelope channel for downstream processing
 	eg.Go(func() error {
-		entity.eventsStream.Publish(topic, envelope)
+		defer close(envelopesChan)
+		for _, event := range events {
+			resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
+			if err != nil {
+				entity.sendErrorReply(ctx, err)
+				return err
+			}
+
+			entity.eventsCounter.Inc()
+			entity.currentState = resultingState
+			entity.lastCommandTime = timestamppb.Now().AsTime()
+
+			event, _ := anypb.New(event)
+			state, _ := anypb.New(resultingState)
+
+			envelope := &egopb.Event{
+				PersistenceId:  entity.ID(),
+				SequenceNumber: entity.eventsCounter.Load(),
+				IsDeleted:      false,
+				Event:          event,
+				ResultingState: state,
+				Timestamp:      entity.lastCommandTime.Unix(),
+				Shard:          shardNumber,
+			}
+
+			select {
+			case envelopesChan <- envelope:
+			case <-goCtx.Done():
+				return goCtx.Err()
+			}
+		}
 		return nil
 	})
 
-	// persist the event to the events store
+	// 1-> publish the processed event
+	// 2-> build the list of envelopes for persistence
+	// 3-> persist the batch of envelopes when the channel is closed and return
 	eg.Go(func() error {
-		return entity.eventsStore.WriteEvents(goCtx, journals)
+		var envelopes []*egopb.Event
+		for {
+			select {
+			case envelope, ok := <-envelopesChan:
+				// channel closed, persist the envelopes
+				if !ok {
+					return entity.eventsStore.WriteEvents(goCtx, envelopes)
+				}
+
+				entity.eventsStream.Publish(topic, envelope)
+				envelopes = append(envelopes, envelope)
+			case <-goCtx.Done():
+				return goCtx.Err()
+			}
+		}
 	})
 
-	// handle the persistence error
+	// wait for all go routines to complete
 	if err := eg.Wait(); err != nil {
-		// send an error reply
 		entity.sendErrorReply(ctx, err)
 		return
 	}
 
+	state, _ := anypb.New(entity.currentState)
+
 	// create the command reply to send
 	reply := &egopb.CommandReply{
 		Reply: &egopb.CommandReply_StateReply{
 			StateReply: &egopb.StateReply{
 				PersistenceId:  entity.ID(),
-				State:          marshaledState,
-				SequenceNumber: sequenceNumber,
+				State:          state,
+				SequenceNumber: entity.eventsCounter.Load(),
 				Timestamp:      entity.lastCommandTime.Unix(),
 			},
 		},
diff --git a/behavior.go b/behavior.go
index 18fa850..127e777 100644
--- a/behavior.go
+++ b/behavior.go
@@ -50,7 +50,10 @@ type EntityBehavior[T State] interface {
 	//  Any decision should be solely based on the data passed in the commands and the state of the Behavior.
 	// In case of successful validation, one or more events expressing the mutations are persisted.
 	// Once the events are persisted, they are applied to the state producing a new valid state.
-	HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error)
+	// Every event emitted are processed one after the other in the same order they were emitted to guarantee consistency.
+	// It is at the discretion of the application developer to know in which order a given command should return the list of events
+	// This is really powerful when a command needs to return two events. For instance, an OpenAccount command can result in two events: one is AccountOpened and the second is AccountCredited
+	HandleCommand(ctx context.Context, command Command, priorState T) (events []Event, err error)
 	// HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it.
 	// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
 	HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error)
diff --git a/engine_test.go b/engine_test.go
index 58ff887..594b7b1 100644
--- a/engine_test.go
+++ b/engine_test.go
@@ -240,20 +240,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account {
 }
 
 // HandleCommand handles every command that is sent to the persistent behavior
-func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (event Event, err error) {
+func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (events []Event, err error) {
 	switch cmd := command.(type) {
 	case *samplepb.CreateAccount:
 		// TODO in production grid app validate the command using the prior state
-		return &samplepb.AccountCreated{
-			AccountId:      cmd.GetAccountId(),
-			AccountBalance: cmd.GetAccountBalance(),
+		return []Event{
+			&samplepb.AccountCreated{
+				AccountId:      cmd.GetAccountId(),
+				AccountBalance: cmd.GetAccountBalance(),
+			},
 		}, nil
 
 	case *samplepb.CreditAccount:
 		// TODO in production grid app validate the command using the prior state
-		return &samplepb.AccountCredited{
-			AccountId:      cmd.GetAccountId(),
-			AccountBalance: cmd.GetBalance(),
+		return []Event{
+			&samplepb.AccountCredited{
+				AccountId:      cmd.GetAccountId(),
+				AccountBalance: cmd.GetBalance(),
+			},
 		}, nil
 
 	default:
diff --git a/example/main.go b/example/main.go
index f51a3e6..92de04a 100644
--- a/example/main.go
+++ b/example/main.go
@@ -113,20 +113,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account {
 }
 
 // HandleCommand handles every command that is sent to the persistent behavior
-func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) {
+func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (events []ego.Event, err error) {
 	switch cmd := command.(type) {
 	case *samplepb.CreateAccount:
 		// TODO in production grid app validate the command using the prior state
-		return &samplepb.AccountCreated{
-			AccountId:      cmd.GetAccountId(),
-			AccountBalance: cmd.GetAccountBalance(),
+		return []ego.Event{
+			&samplepb.AccountCreated{
+				AccountId:      cmd.GetAccountId(),
+				AccountBalance: cmd.GetAccountBalance(),
+			},
 		}, nil
 
 	case *samplepb.CreditAccount:
 		// TODO in production grid app validate the command using the prior state
-		return &samplepb.AccountCredited{
-			AccountId:      cmd.GetAccountId(),
-			AccountBalance: cmd.GetBalance(),
+		return []ego.Event{
+			&samplepb.AccountCredited{
+				AccountId:      cmd.GetAccountId(),
+				AccountBalance: cmd.GetBalance(),
+			},
 		}, nil
 
 	default:
diff --git a/helper_test.go b/helper_test.go
index b1e510b..50f5351 100644
--- a/helper_test.go
+++ b/helper_test.go
@@ -51,20 +51,24 @@ func (t *AccountEntityBehavior) InitialState() *testpb.Account {
 	return new(testpb.Account)
 }
 
-func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (event Event, err error) {
+func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (events []Event, err error) {
 	switch cmd := command.(type) {
 	case *testpb.CreateAccount:
 		// TODO in production grid app validate the command using the prior state
-		return &testpb.AccountCreated{
-			AccountId:      t.id,
-			AccountBalance: cmd.GetAccountBalance(),
+		return []Event{
+			&testpb.AccountCreated{
+				AccountId:      t.id,
+				AccountBalance: cmd.GetAccountBalance(),
+			},
 		}, nil
 
 	case *testpb.CreditAccount:
 		if cmd.GetAccountId() == t.id {
-			return &testpb.AccountCredited{
-				AccountId:      cmd.GetAccountId(),
-				AccountBalance: cmd.GetBalance(),
+			return []Event{
+				&testpb.AccountCredited{
+					AccountId:      cmd.GetAccountId(),
+					AccountBalance: cmd.GetBalance(),
+				},
 			}, nil
 		}
 
diff --git a/readme.md b/readme.md
index 7e882aa..308a748 100644
--- a/readme.md
+++ b/readme.md
@@ -158,25 +158,29 @@ func (a *AccountBehavior) InitialState() *samplepb.Account {
 }
 
 // HandleCommand handles every command that is sent to the persistent behavior
-func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) {
-	switch cmd := command.(type) {
-	case *samplepb.CreateAccount:
-		// TODO in production grid app validate the command using the prior state
-		return &samplepb.AccountCreated{
-			AccountId:      cmd.GetAccountId(),
-			AccountBalance: cmd.GetAccountBalance(),
-		}, nil
-
-	case *samplepb.CreditAccount:
-		// TODO in production grid app validate the command using the prior state
-		return &samplepb.AccountCredited{
-			AccountId:      cmd.GetAccountId(),
-			AccountBalance: cmd.GetBalance(),
-		}, nil
-
-	default:
-		return nil, errors.New("unhandled command")
-	}
+func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (events []Event, err error) {
+  switch cmd := command.(type) {
+  case *samplepb.CreateAccount:
+    // TODO in production grid app validate the command using the prior state
+    return []Event{
+      &samplepb.AccountCreated{
+        AccountId:      cmd.GetAccountId(),
+        AccountBalance: cmd.GetAccountBalance(),
+      },
+    }, nil
+
+  case *samplepb.CreditAccount:
+    // TODO in production grid app validate the command using the prior state
+    return []Event{
+      &samplepb.AccountCredited{
+        AccountId:      cmd.GetAccountId(),
+        AccountBalance: cmd.GetBalance(),
+      },
+    }, nil
+
+  default:
+    return nil, errors.New("unhandled command")
+  }
 }
 
 // HandleEvent handles every event emitted

From 17a4cd70f96e93c800b253ec5ec869a47d815475 Mon Sep 17 00:00:00 2001
From: Tochemey <tochemey@hotmail.com>
Date: Sat, 16 Mar 2024 14:11:01 +0000
Subject: [PATCH 2/5] feat: make command handler returns one or more events

BREAKING CHANGE: HandleCommand return an array of events instead of a single event
---
 actor_test.go  | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++
 helper_test.go |  4 +++
 2 files changed, 84 insertions(+)

diff --git a/actor_test.go b/actor_test.go
index 49a656b..e3eea34 100644
--- a/actor_test.go
+++ b/actor_test.go
@@ -42,6 +42,7 @@ import (
 	"github.com/tochemey/gopack/postgres"
 	"go.uber.org/goleak"
 	"google.golang.org/protobuf/proto"
+	"google.golang.org/protobuf/types/known/emptypb"
 )
 
 func TestActor(t *testing.T) {
@@ -554,4 +555,83 @@ func TestActor(t *testing.T) {
 		err = actorSystem.Stop(ctx)
 		assert.NoError(t, err)
 	})
+	t.Run("with unhandled event", func(t *testing.T) {
+		defer goleak.VerifyNone(t)
+		ctx := context.TODO()
+		// create an actor system
+		actorSystem, err := actors.NewActorSystem("TestActorSystem",
+			actors.WithPassivationDisabled(),
+			actors.WithLogger(log.DiscardLogger),
+			actors.WithActorInitMaxRetries(3))
+		require.NoError(t, err)
+		assert.NotNil(t, actorSystem)
+
+		// start the actor system
+		err = actorSystem.Start(ctx)
+		require.NoError(t, err)
+
+		// create the event store
+		eventStore := memory.NewEventsStore()
+		// create a persistence id
+		persistenceID := uuid.NewString()
+		// create the persistence behavior
+		behavior := NewAccountEntityBehavior(persistenceID)
+
+		// connect the event store
+		err = eventStore.Connect(ctx)
+		require.NoError(t, err)
+
+		// create an instance of events stream
+		eventStream := eventstream.New()
+
+		// create the persistence actor using the behavior previously created
+		actor := newActor[*testpb.Account](behavior, eventStore, eventStream)
+		// spawn the actor
+		pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor)
+		require.NotNil(t, pid)
+
+		// send the command to the actor
+		reply, err := actors.Ask(ctx, pid, &testpb.CreateAccount{AccountBalance: 500.00}, 5*time.Second)
+		require.NoError(t, err)
+		require.NotNil(t, reply)
+		require.IsType(t, new(egopb.CommandReply), reply)
+
+		commandReply := reply.(*egopb.CommandReply)
+		require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply())
+
+		state := commandReply.GetReply().(*egopb.CommandReply_StateReply)
+		assert.EqualValues(t, 1, state.StateReply.GetSequenceNumber())
+
+		// marshal the resulting state
+		resultingState := new(testpb.Account)
+		err = state.StateReply.GetState().UnmarshalTo(resultingState)
+		require.NoError(t, err)
+
+		expected := &testpb.Account{
+			AccountId:      persistenceID,
+			AccountBalance: 500.00,
+		}
+		assert.True(t, proto.Equal(expected, resultingState))
+
+		reply, err = actors.Ask(ctx, pid, new(emptypb.Empty), 5*time.Second)
+		require.NoError(t, err)
+		require.NotNil(t, reply)
+		require.IsType(t, new(egopb.CommandReply), reply)
+
+		commandReply = reply.(*egopb.CommandReply)
+		require.IsType(t, new(egopb.CommandReply_ErrorReply), commandReply.GetReply())
+
+		errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply)
+		assert.Equal(t, "unhandled event", errorReply.ErrorReply.GetMessage())
+
+		// disconnect the events store
+		err = eventStore.Disconnect(ctx)
+		require.NoError(t, err)
+
+		// close the stream
+		eventStream.Close()
+		// stop the actor system
+		err = actorSystem.Stop(ctx)
+		assert.NoError(t, err)
+	})
 }
diff --git a/helper_test.go b/helper_test.go
index 50f5351..e4b98ef 100644
--- a/helper_test.go
+++ b/helper_test.go
@@ -29,6 +29,7 @@ import (
 
 	"github.com/pkg/errors"
 	testpb "github.com/tochemey/ego/test/data/pb/v1"
+	"google.golang.org/protobuf/types/known/emptypb"
 )
 
 // AccountEntityBehavior implement EntityBehavior
@@ -77,6 +78,9 @@ func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command
 	case *testpb.TestNoEvent:
 		return nil, nil
 
+	case *emptypb.Empty:
+		return []Event{new(emptypb.Empty)}, nil
+
 	default:
 		return nil, errors.New("unhandled command")
 	}

From 19778f7a9f0f3621935eeaf20fde1faa1c123e50 Mon Sep 17 00:00:00 2001
From: Tochemey <tochemey@hotmail.com>
Date: Sat, 16 Mar 2024 14:27:51 +0000
Subject: [PATCH 3/5] feat: make command handler returns one or more events

BREAKING CHANGE: HandleCommand return an array of events instead of a single event
---
 actor_test.go | 1 -
 1 file changed, 1 deletion(-)

diff --git a/actor_test.go b/actor_test.go
index e3eea34..d2e9b21 100644
--- a/actor_test.go
+++ b/actor_test.go
@@ -556,7 +556,6 @@ func TestActor(t *testing.T) {
 		assert.NoError(t, err)
 	})
 	t.Run("with unhandled event", func(t *testing.T) {
-		defer goleak.VerifyNone(t)
 		ctx := context.TODO()
 		// create an actor system
 		actorSystem, err := actors.NewActorSystem("TestActorSystem",

From 135bbb70775f73e93689164de7b557ca8ef66ff7 Mon Sep 17 00:00:00 2001
From: Tochemey <tochemey@hotmail.com>
Date: Sat, 16 Mar 2024 14:29:11 +0000
Subject: [PATCH 4/5] feat: make command handler returns one or more events

BREAKING CHANGE: HandleCommand return an array of events instead of a single event
---
 actor_test.go | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/actor_test.go b/actor_test.go
index d2e9b21..9da3c5d 100644
--- a/actor_test.go
+++ b/actor_test.go
@@ -556,6 +556,7 @@ func TestActor(t *testing.T) {
 		assert.NoError(t, err)
 	})
 	t.Run("with unhandled event", func(t *testing.T) {
+		defer goleak.VerifyNone(t)
 		ctx := context.TODO()
 		// create an actor system
 		actorSystem, err := actors.NewActorSystem("TestActorSystem",
@@ -627,10 +628,12 @@ func TestActor(t *testing.T) {
 		err = eventStore.Disconnect(ctx)
 		require.NoError(t, err)
 
-		// close the stream
-		eventStream.Close()
-		// stop the actor system
-		err = actorSystem.Stop(ctx)
-		assert.NoError(t, err)
+		t.Cleanup(func() {
+			// close the stream
+			eventStream.Close()
+			// stop the actor system
+			err = actorSystem.Stop(ctx)
+			assert.NoError(t, err)
+		})
 	})
 }

From fb95c498f7ac49ffbd72f9556d01a0fce4462aa1 Mon Sep 17 00:00:00 2001
From: Tochemey <tochemey@hotmail.com>
Date: Sun, 17 Mar 2024 18:06:17 +0000
Subject: [PATCH 5/5] feat: make command handler returns one or more events

BREAKING CHANGE: HandleCommand return an array of events instead of a single event
---
 actor.go      | 92 ++++++++++++++++++---------------------------------
 actor_test.go | 22 ++++--------
 2 files changed, 39 insertions(+), 75 deletions(-)

diff --git a/actor.go b/actor.go
index a18883e..895c2fc 100644
--- a/actor.go
+++ b/actor.go
@@ -237,82 +237,55 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman
 		return
 	}
 
-	envelopesChan := make(chan *egopb.Event, 1)
-	eg, goCtx := errgroup.WithContext(goCtx)
 	shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID())
 	topic := fmt.Sprintf(eventsTopic, shardNumber)
 
-	// 1-> process each event
-	// 2-> increment the events (processed) counter of the entity
-	// 3-> set the current state of the entity to the resulting state
-	// 4-> set the latCommandTime of the entity with the current timestamp
-	// 5-> marshal the resulting state and event to build the persistence envelope
-	// 6-> push the envelope to the envelope channel for downstream processing
+	var envelopes []*egopb.Event
+	// process all events
+	for _, event := range events {
+		resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
+		if err != nil {
+			entity.sendErrorReply(ctx, err)
+			return
+		}
+
+		entity.eventsCounter.Inc()
+		entity.currentState = resultingState
+		entity.lastCommandTime = timestamppb.Now().AsTime()
+
+		event, _ := anypb.New(event)
+		state, _ := anypb.New(resultingState)
+
+		envelope := &egopb.Event{
+			PersistenceId:  entity.ID(),
+			SequenceNumber: entity.eventsCounter.Load(),
+			IsDeleted:      false,
+			Event:          event,
+			ResultingState: state,
+			Timestamp:      entity.lastCommandTime.Unix(),
+			Shard:          shardNumber,
+		}
+		envelopes = append(envelopes, envelope)
+	}
+
+	eg, goCtx := errgroup.WithContext(goCtx)
 	eg.Go(func() error {
-		defer close(envelopesChan)
-		for _, event := range events {
-			resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
-			if err != nil {
-				entity.sendErrorReply(ctx, err)
-				return err
-			}
-
-			entity.eventsCounter.Inc()
-			entity.currentState = resultingState
-			entity.lastCommandTime = timestamppb.Now().AsTime()
-
-			event, _ := anypb.New(event)
-			state, _ := anypb.New(resultingState)
-
-			envelope := &egopb.Event{
-				PersistenceId:  entity.ID(),
-				SequenceNumber: entity.eventsCounter.Load(),
-				IsDeleted:      false,
-				Event:          event,
-				ResultingState: state,
-				Timestamp:      entity.lastCommandTime.Unix(),
-				Shard:          shardNumber,
-			}
-
-			select {
-			case envelopesChan <- envelope:
-			case <-goCtx.Done():
-				return goCtx.Err()
-			}
+		for _, envelope := range envelopes {
+			entity.eventsStream.Publish(topic, envelope)
 		}
 		return nil
 	})
 
-	// 1-> publish the processed event
-	// 2-> build the list of envelopes for persistence
-	// 3-> persist the batch of envelopes when the channel is closed and return
 	eg.Go(func() error {
-		var envelopes []*egopb.Event
-		for {
-			select {
-			case envelope, ok := <-envelopesChan:
-				// channel closed, persist the envelopes
-				if !ok {
-					return entity.eventsStore.WriteEvents(goCtx, envelopes)
-				}
-
-				entity.eventsStream.Publish(topic, envelope)
-				envelopes = append(envelopes, envelope)
-			case <-goCtx.Done():
-				return goCtx.Err()
-			}
-		}
+		return entity.eventsStore.WriteEvents(goCtx, envelopes)
 	})
 
-	// wait for all go routines to complete
 	if err := eg.Wait(); err != nil {
 		entity.sendErrorReply(ctx, err)
 		return
 	}
 
 	state, _ := anypb.New(entity.currentState)
-
-	// create the command reply to send
 	reply := &egopb.CommandReply{
 		Reply: &egopb.CommandReply_StateReply{
 			StateReply: &egopb.StateReply{
@@ -324,6 +297,5 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman
 		},
 	}
 
-	// send the response
 	ctx.Response(reply)
 }
diff --git a/actor_test.go b/actor_test.go
index 9da3c5d..e4407c4 100644
--- a/actor_test.go
+++ b/actor_test.go
@@ -40,14 +40,12 @@ import (
 	"github.com/tochemey/goakt/actors"
 	"github.com/tochemey/goakt/log"
 	"github.com/tochemey/gopack/postgres"
-	"go.uber.org/goleak"
 	"google.golang.org/protobuf/proto"
 	"google.golang.org/protobuf/types/known/emptypb"
 )
 
 func TestActor(t *testing.T) {
 	t.Run("with state reply", func(t *testing.T) {
-		defer goleak.VerifyNone(t)
 		ctx := context.TODO()
 		// create an actor system
 		actorSystem, err := actors.NewActorSystem("TestActorSystem",
@@ -145,7 +143,6 @@ func TestActor(t *testing.T) {
 		assert.NoError(t, err)
 	})
 	t.Run("with error reply", func(t *testing.T) {
-		defer goleak.VerifyNone(t)
 		ctx := context.TODO()
 
 		// create an actor system
@@ -231,7 +228,6 @@ func TestActor(t *testing.T) {
 		assert.NoError(t, err)
 	})
 	t.Run("with unhandled command", func(t *testing.T) {
-		defer goleak.VerifyNone(t)
 		ctx := context.TODO()
 
 		// create an actor system
@@ -556,7 +552,6 @@ func TestActor(t *testing.T) {
 		assert.NoError(t, err)
 	})
 	t.Run("with unhandled event", func(t *testing.T) {
-		defer goleak.VerifyNone(t)
 		ctx := context.TODO()
 		// create an actor system
 		actorSystem, err := actors.NewActorSystem("TestActorSystem",
@@ -624,16 +619,13 @@ func TestActor(t *testing.T) {
 		errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply)
 		assert.Equal(t, "unhandled event", errorReply.ErrorReply.GetMessage())
 
-		// disconnect the events store
-		err = eventStore.Disconnect(ctx)
-		require.NoError(t, err)
+		// disconnect from the event store
+		require.NoError(t, eventStore.Disconnect(ctx))
 
-		t.Cleanup(func() {
-			// close the stream
-			eventStream.Close()
-			// stop the actor system
-			err = actorSystem.Stop(ctx)
-			assert.NoError(t, err)
-		})
+		// close the stream
+		eventStream.Close()
+		// stop the actor system
+		err = actorSystem.Stop(ctx)
+		assert.NoError(t, err)
 	})
 }