From 7de6e19afc428b9e7d3c96d6ec4e400cc373d89e Mon Sep 17 00:00:00 2001 From: Tochemey Date: Mon, 16 Dec 2024 14:52:46 +0000 Subject: [PATCH] feat: add durable state actor --- durable_state_actor.go | 2 + durable_state_actor_test.go | 152 ++++++++++++++ engine.go | 2 +- engine_test.go | 254 ++++++++++++++++++++++-- helper_test.go | 2 - plugins/statestore/postgres/postgres.go | 4 +- 6 files changed, 390 insertions(+), 26 deletions(-) diff --git a/durable_state_actor.go b/durable_state_actor.go index 208d964..dd5a57e 100644 --- a/durable_state_actor.go +++ b/durable_state_actor.go @@ -85,6 +85,8 @@ func (entity *durableStateActor) Receive(ctx *actors.ReceiveContext) { switch command := ctx.Message().(type) { case *goaktpb.PostStart: entity.actorSystem = ctx.ActorSystem() + case *egopb.GetStateCommand: + entity.sendStateReply(ctx) default: entity.processCommand(ctx, command) } diff --git a/durable_state_actor_test.go b/durable_state_actor_test.go index a4bba81..c6030e2 100644 --- a/durable_state_actor_test.go +++ b/durable_state_actor_test.go @@ -39,7 +39,9 @@ import ( "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstream" "github.com/tochemey/ego/v3/internal/lib" + "github.com/tochemey/ego/v3/internal/postgres" "github.com/tochemey/ego/v3/plugins/statestore/memory" + pgstore "github.com/tochemey/ego/v3/plugins/statestore/postgres" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) @@ -229,4 +231,154 @@ func TestDurableStateBehavior(t *testing.T) { lib.Pause(time.Second) eventStream.Close() }) + t.Run("with state recovery from state store", func(t *testing.T) { + ctx := context.TODO() + actorSystem, err := actors.NewActorSystem("TestActorSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(log.DiscardLogger), + actors.WithActorInitMaxRetries(3), + ) + require.NoError(t, err) + assert.NotNil(t, actorSystem) + + // start the actor system + err = actorSystem.Start(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + + var ( + testDatabase = "testdb" + testUser = "testUser" + testDatabasePassword = "testPass" + ) + + testContainer := postgres.NewTestContainer(testDatabase, testUser, testDatabasePassword) + db := testContainer.GetTestDB() + require.NoError(t, db.Connect(ctx)) + schemaUtils := pgstore.NewSchemaUtils(db) + require.NoError(t, schemaUtils.CreateTable(ctx)) + + config := &pgstore.Config{ + DBHost: testContainer.Host(), + DBPort: testContainer.Port(), + DBName: testDatabase, + DBUser: testUser, + DBPassword: testDatabasePassword, + DBSchema: testContainer.Schema(), + } + durableStore := pgstore.NewStateStore(config) + require.NoError(t, durableStore.Connect(ctx)) + + lib.Pause(time.Second) + + persistenceID := uuid.NewString() + behavior := NewAccountDurableStateBehavior(persistenceID) + + err = durableStore.Connect(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + + eventStream := eventstream.New() + + persistentActor := newDurableStateActor(behavior, durableStore, eventStream) + pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) + require.NoError(t, err) + require.NotNil(t, pid) + + lib.Pause(time.Second) + + var command proto.Message + + command = &testpb.CreateAccount{AccountBalance: 500.00} + + reply, err := actors.Ask(ctx, pid, command, time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply := reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + state := commandReply.GetReply().(*egopb.CommandReply_StateReply) + assert.EqualValues(t, 1, state.StateReply.GetSequenceNumber()) + + // marshal the resulting state + resultingState := new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + + expected := &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 500.00, + } + assert.True(t, proto.Equal(expected, resultingState)) + + // send another command to credit the balance + command = &testpb.CreditAccount{ + AccountId: persistenceID, + Balance: 250, + } + reply, err = actors.Ask(ctx, pid, command, time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply = reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + state = commandReply.GetReply().(*egopb.CommandReply_StateReply) + assert.EqualValues(t, 2, state.StateReply.GetSequenceNumber()) + + // marshal the resulting state + resultingState = new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + + expected = &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 750.00, + } + + assert.True(t, proto.Equal(expected, resultingState)) + // wait a while + lib.Pause(time.Second) + + // restart the actor + pid, err = actorSystem.ReSpawn(ctx, behavior.ID()) + require.NoError(t, err) + + lib.Pause(time.Second) + + // fetch the current state + command = &egopb.GetStateCommand{} + reply, err = actors.Ask(ctx, pid, command, time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply = reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + resultingState = new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + expected = &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 750.00, + } + assert.True(t, proto.Equal(expected, resultingState)) + + err = actorSystem.Stop(ctx) + assert.NoError(t, err) + + lib.Pause(time.Second) + + // free resources + assert.NoError(t, schemaUtils.DropTable(ctx)) + assert.NoError(t, durableStore.Disconnect(ctx)) + testContainer.Cleanup() + eventStream.Close() + }) } diff --git a/engine.go b/engine.go index ddaf3a0..875f3b0 100644 --- a/engine.go +++ b/engine.go @@ -303,7 +303,7 @@ func (engine *Engine) DurableStateEntity(ctx context.Context, behavior DurableSt // SendCommand sends command to a given entity ref. // This will return: -// 1. the resulting state after the command has been handled and the emitted event persisted +// 1. the resulting state after the command has been handled and the emitted event/durable state persisted // 2. nil when there is no resulting state or no event persisted // 3. an error in case of error func (engine *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) { diff --git a/engine_test.go b/engine_test.go index 4e7a11a..3c0a279 100644 --- a/engine_test.go +++ b/engine_test.go @@ -47,11 +47,13 @@ import ( "github.com/tochemey/ego/v3/internal/lib" offsetstore "github.com/tochemey/ego/v3/offsetstore/memory" "github.com/tochemey/ego/v3/plugins/eventstore/memory" + memstore "github.com/tochemey/ego/v3/plugins/statestore/memory" "github.com/tochemey/ego/v3/projection" + testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) func TestEgo(t *testing.T) { - t.Run("With single node cluster enabled", func(t *testing.T) { + t.Run("EventSourced entity With single node cluster enabled", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -112,7 +114,7 @@ func TestEgo(t *testing.T) { // create a persistence id entityID := uuid.NewString() // create an entity behavior with a given id - behavior := NewAccountBehavior(entityID) + behavior := NewEventSourcedEntity(entityID) // create an entity err = engine.Entity(ctx, behavior) require.NoError(t, err) @@ -170,7 +172,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, offsetStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With no cluster enabled", func(t *testing.T) { + t.Run("EventSourced entity With no cluster enabled", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -184,7 +186,7 @@ func TestEgo(t *testing.T) { // create a persistence id entityID := uuid.NewString() // create an entity behavior with a given id - behavior := NewAccountBehavior(entityID) + behavior := NewEventSourcedEntity(entityID) // create an entity err = engine.Entity(ctx, behavior) require.NoError(t, err) @@ -223,7 +225,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With SendCommand when not started", func(t *testing.T) { + t.Run("EventSourced entity With SendCommand when not started", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -240,7 +242,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) }) - t.Run("With SendCommand when entityID is not set", func(t *testing.T) { + t.Run("EventSourced entity With SendCommand when entityID is not set", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -261,7 +263,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With SendCommand when entity is not found", func(t *testing.T) { + t.Run("EventSourced entity With SendCommand when entity is not found", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -282,7 +284,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With IsProjectionRunning when not started", func(t *testing.T) { + t.Run("EventSourced entity With IsProjectionRunning when not started", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -298,7 +300,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) }) - t.Run("With RemoveProjection", func(t *testing.T) { + t.Run("EventSourced entity With RemoveProjection", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -341,7 +343,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With RemoveProjection when not started", func(t *testing.T) { + t.Run("EventSourced entity With RemoveProjection when not started", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -356,33 +358,243 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) }) + + t.Run("DurableStore entity With single node cluster enabled", func(t *testing.T) { + ctx := context.TODO() + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + nodePorts := dynaport.Get(3) + gossipPort := nodePorts[0] + clusterPort := nodePorts[1] + remotingPort := nodePorts[2] + + host := "127.0.0.1" + + // define discovered addresses + addrs := []string{ + net.JoinHostPort(host, strconv.Itoa(gossipPort)), + } + + // mock the discovery provider + provider := new(mockdisco.Provider) + + provider.EXPECT().ID().Return("testDisco") + provider.EXPECT().Initialize().Return(nil) + provider.EXPECT().Register().Return(nil) + provider.EXPECT().Deregister().Return(nil) + provider.EXPECT().DiscoverPeers().Return(addrs, nil) + provider.EXPECT().Close().Return(nil) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithLogger(log.DiscardLogger), + WithStateStore(stateStore), + WithCluster(provider, 4, 1, host, remotingPort, gossipPort, clusterPort)) + + err := engine.Start(ctx) + + // wait for the cluster to fully start + lib.Pause(time.Second) + + // subscribe to events + subscriber, err := engine.Subscribe() + require.NoError(t, err) + require.NotNil(t, subscriber) + + require.NoError(t, err) + // create a persistence id + entityID := uuid.NewString() + + behavior := NewAccountDurableStateBehavior(entityID) + // create an entity + err = engine.DurableStateEntity(ctx, behavior) + require.NoError(t, err) + // send some commands to the pid + var command proto.Message + command = &testpb.CreateAccount{ + AccountBalance: 500.00, + } + + // wait for the cluster to fully start + lib.Pause(time.Second) + + // send the command to the actor. Please don't ignore the error in production grid code + resultingState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute) + require.NoError(t, err) + account, ok := resultingState.(*testpb.Account) + require.True(t, ok) + + assert.EqualValues(t, 500.00, account.GetAccountBalance()) + assert.Equal(t, entityID, account.GetAccountId()) + assert.EqualValues(t, 1, revision) + + command = &testpb.CreditAccount{ + AccountId: entityID, + Balance: 250, + } + + newState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute) + require.NoError(t, err) + newAccount, ok := newState.(*testpb.Account) + require.True(t, ok) + + assert.EqualValues(t, 750.00, newAccount.GetAccountBalance()) + assert.Equal(t, entityID, newAccount.GetAccountId()) + assert.EqualValues(t, 2, revision) + + for message := range subscriber.Iterator() { + payload := message.Payload() + envelope, ok := payload.(*egopb.DurableState) + require.True(t, ok) + require.NotZero(t, envelope.GetVersionNumber()) + } + + // free resources + require.NoError(t, engine.Stop(ctx)) + lib.Pause(time.Second) + require.NoError(t, stateStore.Disconnect(ctx)) + }) + t.Run("DurableStore entity With no cluster enabled", func(t *testing.T) { + ctx := context.TODO() + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithStateStore(stateStore), + WithLogger(log.DiscardLogger)) + + err := engine.Start(ctx) + require.NoError(t, err) + + entityID := uuid.NewString() + behavior := NewAccountDurableStateBehavior(entityID) + + err = engine.DurableStateEntity(ctx, behavior) + require.NoError(t, err) + var command proto.Message + + command = &testpb.CreateAccount{ + AccountBalance: 500.00, + } + + resultingState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute) + require.NoError(t, err) + account, ok := resultingState.(*testpb.Account) + require.True(t, ok) + + assert.EqualValues(t, 500.00, account.GetAccountBalance()) + assert.Equal(t, entityID, account.GetAccountId()) + assert.EqualValues(t, 1, revision) + + // send another command to credit the balance + command = &testpb.CreditAccount{ + AccountId: entityID, + Balance: 250, + } + newState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute) + require.NoError(t, err) + newAccount, ok := newState.(*testpb.Account) + require.True(t, ok) + + assert.EqualValues(t, 750.00, newAccount.GetAccountBalance()) + assert.Equal(t, entityID, newAccount.GetAccountId()) + assert.EqualValues(t, 2, revision) + + assert.NoError(t, engine.Stop(ctx)) + lib.Pause(time.Second) + assert.NoError(t, stateStore.Disconnect(ctx)) + }) + t.Run("DurableStore entity With SendCommand when not started", func(t *testing.T) { + ctx := context.TODO() + + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithStateStore(stateStore), + WithLogger(log.DiscardLogger)) + + entityID := uuid.NewString() + + _, _, err := engine.SendCommand(ctx, entityID, new(testpb.CreateAccount), time.Minute) + require.Error(t, err) + assert.EqualError(t, err, ErrEngineNotStarted.Error()) + + assert.NoError(t, stateStore.Disconnect(ctx)) + }) + t.Run("DurableStore entity With SendCommand when entityID is not set", func(t *testing.T) { + ctx := context.TODO() + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithStateStore(stateStore), + WithLogger(log.DiscardLogger)) + err := engine.Start(ctx) + require.NoError(t, err) + + // create a persistence id + entityID := "" + + _, _, err = engine.SendCommand(ctx, entityID, new(testpb.CreateAccount), time.Minute) + require.Error(t, err) + assert.EqualError(t, err, ErrUndefinedEntityID.Error()) + assert.NoError(t, engine.Stop(ctx)) + assert.NoError(t, stateStore.Disconnect(ctx)) + }) + t.Run("DurableStore entity With SendCommand when entity is not found", func(t *testing.T) { + ctx := context.TODO() + + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithStateStore(stateStore), + WithLogger(log.DiscardLogger)) + err := engine.Start(ctx) + require.NoError(t, err) + + // create a persistence id + entityID := uuid.NewString() + + _, _, err = engine.SendCommand(ctx, entityID, new(testpb.CreateAccount), time.Minute) + require.Error(t, err) + assert.EqualError(t, err, actors.ErrActorNotFound(entityID).Error()) + assert.NoError(t, engine.Stop(ctx)) + assert.NoError(t, stateStore.Disconnect(ctx)) + }) } -// AccountBehavior implements persistence.Behavior -type AccountBehavior struct { +// EventSourcedEntity implements persistence.Behavior +type EventSourcedEntity struct { id string } -// make sure that AccountBehavior is a true persistence behavior -var _ EventSourcedBehavior = &AccountBehavior{} +// make sure that EventSourcedEntity is a true persistence behavior +var _ EventSourcedBehavior = &EventSourcedEntity{} -// NewAccountBehavior creates an instance of AccountBehavior -func NewAccountBehavior(id string) *AccountBehavior { - return &AccountBehavior{id: id} +// NewEventSourcedEntity creates an instance of EventSourcedEntity +func NewEventSourcedEntity(id string) *EventSourcedEntity { + return &EventSourcedEntity{id: id} } // ID returns the id -func (a *AccountBehavior) ID() string { +func (a *EventSourcedEntity) ID() string { return a.id } // InitialState returns the initial state -func (a *AccountBehavior) InitialState() State { +func (a *EventSourcedEntity) InitialState() State { return State(new(samplepb.Account)) } // HandleCommand handles every command that is sent to the persistent behavior -func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) { +func (a *EventSourcedEntity) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) { switch cmd := command.(type) { case *samplepb.CreateAccount: // TODO in production grid app validate the command using the prior state @@ -408,7 +620,7 @@ func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ St } // HandleEvent handles every event emitted -func (a *AccountBehavior) HandleEvent(_ context.Context, event Event, priorState State) (state State, err error) { +func (a *EventSourcedEntity) HandleEvent(_ context.Context, event Event, priorState State) (state State, err error) { switch evt := event.(type) { case *samplepb.AccountCreated: return &samplepb.Account{ diff --git a/helper_test.go b/helper_test.go index 5e7ccc8..882119c 100644 --- a/helper_test.go +++ b/helper_test.go @@ -55,7 +55,6 @@ func (t *AccountEventSourcedBehavior) InitialState() State { func (t *AccountEventSourcedBehavior) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) { switch cmd := command.(type) { case *testpb.CreateAccount: - // TODO in production grid app validate the command using the prior state return []Event{ &testpb.AccountCreated{ AccountId: t.id, @@ -95,7 +94,6 @@ func (t *AccountEventSourcedBehavior) HandleEvent(_ context.Context, event Event }, nil case *testpb.AccountCredited: - // we can safely cast the prior state to Account account := priorState.(*testpb.Account) bal := account.GetAccountBalance() + evt.GetAccountBalance() return &testpb.Account{ diff --git a/plugins/statestore/postgres/postgres.go b/plugins/statestore/postgres/postgres.go index 7ebc627..ca1b741 100644 --- a/plugins/statestore/postgres/postgres.go +++ b/plugins/statestore/postgres/postgres.go @@ -71,8 +71,8 @@ type DurableStore struct { // enforce interface implementation var _ persistence.StateStore = (*DurableStore)(nil) -// NewEventsStore creates a new instance of PostgresEventStore -func NewEventsStore(config *Config) *DurableStore { +// NewStateStore creates a new instance of StateStore +func NewStateStore(config *Config) *DurableStore { // create the underlying db connection db := postgres.New(postgres.NewConfig(config.DBHost, config.DBPort, config.DBUser, config.DBPassword, config.DBName)) return &DurableStore{