Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: make command handler returns one or more events #66

Merged
merged 5 commits into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 37 additions & 59 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,16 @@ func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) {

// processCommandAndReply processes the incoming command
func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
// set the go context
goCtx := ctx.Context()
// pass the received command to the command handler
event, err := entity.HandleCommand(goCtx, command, entity.currentState)
// handle the command handler error
events, err := entity.HandleCommand(goCtx, command, entity.currentState)
if err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
return
}

// if the event is nil nothing is persisted, and we return no reply
if event == nil {
// get the current state and marshal it
// no-op when there are no events to process
if len(events) == 0 {
resultingState, _ := anypb.New(entity.currentState)
// create the command reply to send out
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
StateReply: &egopb.StateReply{
Expand All @@ -239,85 +233,69 @@ func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, comman
},
},
}
// send the response
ctx.Response(reply)
return
}

// process the event by calling the event handler
resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
// handle the event handler error
if err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
return
}

// increment the event counter
entity.eventsCounter.Inc()

// set the current state for the next command
entity.currentState = resultingState

// marshal the event and the resulting state
marshaledEvent, _ := anypb.New(event)
marshaledState, _ := anypb.New(resultingState)

sequenceNumber := entity.eventsCounter.Load()
timestamp := timestamppb.Now()
entity.lastCommandTime = timestamp.AsTime()
shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID())
topic := fmt.Sprintf(eventsTopic, shardNumber)

// create the event
envelope := &egopb.Event{
PersistenceId: entity.ID(),
SequenceNumber: sequenceNumber,
IsDeleted: false,
Event: marshaledEvent,
ResultingState: marshaledState,
Timestamp: entity.lastCommandTime.Unix(),
Shard: shardNumber,
}

// create a journal list
journals := []*egopb.Event{envelope}
var envelopes []*egopb.Event
// process all events
for _, event := range events {
resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
if err != nil {
entity.sendErrorReply(ctx, err)
return
}

// define the topic for the given shard
topic := fmt.Sprintf(eventsTopic, shardNumber)
entity.eventsCounter.Inc()
entity.currentState = resultingState
entity.lastCommandTime = timestamppb.Now().AsTime()

event, _ := anypb.New(event)
state, _ := anypb.New(resultingState)

envelope := &egopb.Event{
PersistenceId: entity.ID(),
SequenceNumber: entity.eventsCounter.Load(),
IsDeleted: false,
Event: event,
ResultingState: state,
Timestamp: entity.lastCommandTime.Unix(),
Shard: shardNumber,
}
envelopes = append(envelopes, envelope)
}

// publish to the event stream and persist the event to the events store
eg, goCtx := errgroup.WithContext(goCtx)

// publish the message to the topic
eg.Go(func() error {
entity.eventsStream.Publish(topic, envelope)
for _, envelope := range envelopes {
entity.eventsStream.Publish(topic, envelope)
}
return nil
})

// persist the event to the events store
eg.Go(func() error {
return entity.eventsStore.WriteEvents(goCtx, journals)
return entity.eventsStore.WriteEvents(goCtx, envelopes)
})

// handle the persistence error
if err := eg.Wait(); err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
return
}

// create the command reply to send
state, _ := anypb.New(entity.currentState)
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
StateReply: &egopb.StateReply{
PersistenceId: entity.ID(),
State: marshaledState,
SequenceNumber: sequenceNumber,
State: state,
SequenceNumber: entity.eventsCounter.Load(),
Timestamp: entity.lastCommandTime.Unix(),
},
},
}

// send the response
ctx.Response(reply)
}
82 changes: 78 additions & 4 deletions actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ import (
"github.com/tochemey/goakt/actors"
"github.com/tochemey/goakt/log"
"github.com/tochemey/gopack/postgres"
"go.uber.org/goleak"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)

func TestActor(t *testing.T) {
t.Run("with state reply", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()
// create an actor system
actorSystem, err := actors.NewActorSystem("TestActorSystem",
Expand Down Expand Up @@ -144,7 +143,6 @@ func TestActor(t *testing.T) {
assert.NoError(t, err)
})
t.Run("with error reply", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()

// create an actor system
Expand Down Expand Up @@ -230,7 +228,6 @@ func TestActor(t *testing.T) {
assert.NoError(t, err)
})
t.Run("with unhandled command", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()

// create an actor system
Expand Down Expand Up @@ -554,4 +551,81 @@ func TestActor(t *testing.T) {
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
})
t.Run("with unhandled event", func(t *testing.T) {
ctx := context.TODO()
// create an actor system
actorSystem, err := actors.NewActorSystem("TestActorSystem",
actors.WithPassivationDisabled(),
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(3))
require.NoError(t, err)
assert.NotNil(t, actorSystem)

// start the actor system
err = actorSystem.Start(ctx)
require.NoError(t, err)

// create the event store
eventStore := memory.NewEventsStore()
// create a persistence id
persistenceID := uuid.NewString()
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)

// connect the event store
err = eventStore.Connect(ctx)
require.NoError(t, err)

// create an instance of events stream
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
actor := newActor[*testpb.Account](behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor)
require.NotNil(t, pid)

// send the command to the actor
reply, err := actors.Ask(ctx, pid, &testpb.CreateAccount{AccountBalance: 500.00}, 5*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)

commandReply := reply.(*egopb.CommandReply)
require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply())

state := commandReply.GetReply().(*egopb.CommandReply_StateReply)
assert.EqualValues(t, 1, state.StateReply.GetSequenceNumber())

// marshal the resulting state
resultingState := new(testpb.Account)
err = state.StateReply.GetState().UnmarshalTo(resultingState)
require.NoError(t, err)

expected := &testpb.Account{
AccountId: persistenceID,
AccountBalance: 500.00,
}
assert.True(t, proto.Equal(expected, resultingState))

reply, err = actors.Ask(ctx, pid, new(emptypb.Empty), 5*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)

commandReply = reply.(*egopb.CommandReply)
require.IsType(t, new(egopb.CommandReply_ErrorReply), commandReply.GetReply())

errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply)
assert.Equal(t, "unhandled event", errorReply.ErrorReply.GetMessage())

// disconnect from the event store
require.NoError(t, eventStore.Disconnect(ctx))

// close the stream
eventStream.Close()
// stop the actor system
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
})
}
5 changes: 4 additions & 1 deletion behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ type EntityBehavior[T State] interface {
// Any decision should be solely based on the data passed in the commands and the state of the Behavior.
// In case of successful validation, one or more events expressing the mutations are persisted.
// Once the events are persisted, they are applied to the state producing a new valid state.
HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error)
// Every event emitted are processed one after the other in the same order they were emitted to guarantee consistency.
// It is at the discretion of the application developer to know in which order a given command should return the list of events
// This is really powerful when a command needs to return two events. For instance, an OpenAccount command can result in two events: one is AccountOpened and the second is AccountCredited
HandleCommand(ctx context.Context, command Command, priorState T) (events []Event, err error)
// HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it.
// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error)
Expand Down
18 changes: 11 additions & 7 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account {
}

// HandleCommand handles every command that is sent to the persistent behavior
func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (event Event, err error) {
func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ *samplepb.Account) (events []Event, err error) {
switch cmd := command.(type) {
case *samplepb.CreateAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
return []Event{
&samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
},
}, nil

case *samplepb.CreditAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
return []Event{
&samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
},
}, nil

default:
Expand Down
18 changes: 11 additions & 7 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,24 @@ func (a *AccountBehavior) InitialState() *samplepb.Account {
}

// HandleCommand handles every command that is sent to the persistent behavior
func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (event ego.Event, err error) {
func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ *samplepb.Account) (events []ego.Event, err error) {
switch cmd := command.(type) {
case *samplepb.CreateAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
return []ego.Event{
&samplepb.AccountCreated{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetAccountBalance(),
},
}, nil

case *samplepb.CreditAccount:
// TODO in production grid app validate the command using the prior state
return &samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
return []ego.Event{
&samplepb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
},
}, nil

default:
Expand Down
22 changes: 15 additions & 7 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/pkg/errors"
testpb "github.com/tochemey/ego/test/data/pb/v1"
"google.golang.org/protobuf/types/known/emptypb"
)

// AccountEntityBehavior implement EntityBehavior
Expand All @@ -51,20 +52,24 @@ func (t *AccountEntityBehavior) InitialState() *testpb.Account {
return new(testpb.Account)
}

func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (event Event, err error) {
func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ *testpb.Account) (events []Event, err error) {
switch cmd := command.(type) {
case *testpb.CreateAccount:
// TODO in production grid app validate the command using the prior state
return &testpb.AccountCreated{
AccountId: t.id,
AccountBalance: cmd.GetAccountBalance(),
return []Event{
&testpb.AccountCreated{
AccountId: t.id,
AccountBalance: cmd.GetAccountBalance(),
},
}, nil

case *testpb.CreditAccount:
if cmd.GetAccountId() == t.id {
return &testpb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
return []Event{
&testpb.AccountCredited{
AccountId: cmd.GetAccountId(),
AccountBalance: cmd.GetBalance(),
},
}, nil
}

Expand All @@ -73,6 +78,9 @@ func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command
case *testpb.TestNoEvent:
return nil, nil

case *emptypb.Empty:
return []Event{new(emptypb.Empty)}, nil

default:
return nil, errors.New("unhandled command")
}
Expand Down
Loading
Loading