Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add durable state actor
Browse files Browse the repository at this point in the history
Tochemey committed Dec 16, 2024
1 parent ed4da0f commit 7de6e19
Showing 6 changed files with 390 additions and 26 deletions.
2 changes: 2 additions & 0 deletions durable_state_actor.go
Original file line number Diff line number Diff line change
@@ -85,6 +85,8 @@ func (entity *durableStateActor) Receive(ctx *actors.ReceiveContext) {
switch command := ctx.Message().(type) {
case *goaktpb.PostStart:
entity.actorSystem = ctx.ActorSystem()
case *egopb.GetStateCommand:
entity.sendStateReply(ctx)
default:
entity.processCommand(ctx, command)
}
152 changes: 152 additions & 0 deletions durable_state_actor_test.go
Original file line number Diff line number Diff line change
@@ -39,7 +39,9 @@ import (
"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstream"
"github.com/tochemey/ego/v3/internal/lib"
"github.com/tochemey/ego/v3/internal/postgres"
"github.com/tochemey/ego/v3/plugins/statestore/memory"
pgstore "github.com/tochemey/ego/v3/plugins/statestore/postgres"
testpb "github.com/tochemey/ego/v3/test/data/pb/v3"
)

@@ -229,4 +231,154 @@ func TestDurableStateBehavior(t *testing.T) {
lib.Pause(time.Second)
eventStream.Close()
})
t.Run("with state recovery from state store", func(t *testing.T) {
ctx := context.TODO()
actorSystem, err := actors.NewActorSystem("TestActorSystem",
actors.WithPassivationDisabled(),
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(3),
)
require.NoError(t, err)
assert.NotNil(t, actorSystem)

// start the actor system
err = actorSystem.Start(ctx)
require.NoError(t, err)

lib.Pause(time.Second)

var (
testDatabase = "testdb"
testUser = "testUser"
testDatabasePassword = "testPass"
)

testContainer := postgres.NewTestContainer(testDatabase, testUser, testDatabasePassword)
db := testContainer.GetTestDB()
require.NoError(t, db.Connect(ctx))
schemaUtils := pgstore.NewSchemaUtils(db)
require.NoError(t, schemaUtils.CreateTable(ctx))

config := &pgstore.Config{
DBHost: testContainer.Host(),
DBPort: testContainer.Port(),
DBName: testDatabase,
DBUser: testUser,
DBPassword: testDatabasePassword,
DBSchema: testContainer.Schema(),
}
durableStore := pgstore.NewStateStore(config)
require.NoError(t, durableStore.Connect(ctx))

lib.Pause(time.Second)

persistenceID := uuid.NewString()
behavior := NewAccountDurableStateBehavior(persistenceID)

err = durableStore.Connect(ctx)
require.NoError(t, err)

lib.Pause(time.Second)

eventStream := eventstream.New()

persistentActor := newDurableStateActor(behavior, durableStore, eventStream)
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NoError(t, err)
require.NotNil(t, pid)

lib.Pause(time.Second)

var command proto.Message

command = &testpb.CreateAccount{AccountBalance: 500.00}

reply, err := actors.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)

commandReply := reply.(*egopb.CommandReply)
require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply())

state := commandReply.GetReply().(*egopb.CommandReply_StateReply)
assert.EqualValues(t, 1, state.StateReply.GetSequenceNumber())

// marshal the resulting state
resultingState := new(testpb.Account)
err = state.StateReply.GetState().UnmarshalTo(resultingState)
require.NoError(t, err)

expected := &testpb.Account{
AccountId: persistenceID,
AccountBalance: 500.00,
}
assert.True(t, proto.Equal(expected, resultingState))

// send another command to credit the balance
command = &testpb.CreditAccount{
AccountId: persistenceID,
Balance: 250,
}
reply, err = actors.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)

commandReply = reply.(*egopb.CommandReply)
require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply())

state = commandReply.GetReply().(*egopb.CommandReply_StateReply)
assert.EqualValues(t, 2, state.StateReply.GetSequenceNumber())

// marshal the resulting state
resultingState = new(testpb.Account)
err = state.StateReply.GetState().UnmarshalTo(resultingState)
require.NoError(t, err)

expected = &testpb.Account{
AccountId: persistenceID,
AccountBalance: 750.00,
}

assert.True(t, proto.Equal(expected, resultingState))
// wait a while
lib.Pause(time.Second)

// restart the actor
pid, err = actorSystem.ReSpawn(ctx, behavior.ID())
require.NoError(t, err)

lib.Pause(time.Second)

// fetch the current state
command = &egopb.GetStateCommand{}
reply, err = actors.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)

commandReply = reply.(*egopb.CommandReply)
require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply())

resultingState = new(testpb.Account)
err = state.StateReply.GetState().UnmarshalTo(resultingState)
require.NoError(t, err)
expected = &testpb.Account{
AccountId: persistenceID,
AccountBalance: 750.00,
}
assert.True(t, proto.Equal(expected, resultingState))

err = actorSystem.Stop(ctx)
assert.NoError(t, err)

lib.Pause(time.Second)

// free resources
assert.NoError(t, schemaUtils.DropTable(ctx))
assert.NoError(t, durableStore.Disconnect(ctx))
testContainer.Cleanup()
eventStream.Close()
})
}
2 changes: 1 addition & 1 deletion engine.go
Original file line number Diff line number Diff line change
@@ -303,7 +303,7 @@ func (engine *Engine) DurableStateEntity(ctx context.Context, behavior DurableSt

// SendCommand sends command to a given entity ref.
// This will return:
// 1. the resulting state after the command has been handled and the emitted event persisted
// 1. the resulting state after the command has been handled and the emitted event/durable state persisted
// 2. nil when there is no resulting state or no event persisted
// 3. an error in case of error
func (engine *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) {
254 changes: 233 additions & 21 deletions engine_test.go
Original file line number Diff line number Diff line change
@@ -47,11 +47,13 @@ import (
"github.com/tochemey/ego/v3/internal/lib"
offsetstore "github.com/tochemey/ego/v3/offsetstore/memory"
"github.com/tochemey/ego/v3/plugins/eventstore/memory"
memstore "github.com/tochemey/ego/v3/plugins/statestore/memory"
"github.com/tochemey/ego/v3/projection"
testpb "github.com/tochemey/ego/v3/test/data/pb/v3"
)

func TestEgo(t *testing.T) {
t.Run("With single node cluster enabled", func(t *testing.T) {
t.Run("EventSourced entity With single node cluster enabled", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
@@ -112,7 +114,7 @@ func TestEgo(t *testing.T) {
// create a persistence id
entityID := uuid.NewString()
// create an entity behavior with a given id
behavior := NewAccountBehavior(entityID)
behavior := NewEventSourcedEntity(entityID)
// create an entity
err = engine.Entity(ctx, behavior)
require.NoError(t, err)
@@ -170,7 +172,7 @@ func TestEgo(t *testing.T) {
assert.NoError(t, offsetStore.Disconnect(ctx))
assert.NoError(t, engine.Stop(ctx))
})
t.Run("With no cluster enabled", func(t *testing.T) {
t.Run("EventSourced entity With no cluster enabled", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
@@ -184,7 +186,7 @@ func TestEgo(t *testing.T) {
// create a persistence id
entityID := uuid.NewString()
// create an entity behavior with a given id
behavior := NewAccountBehavior(entityID)
behavior := NewEventSourcedEntity(entityID)
// create an entity
err = engine.Entity(ctx, behavior)
require.NoError(t, err)
@@ -223,7 +225,7 @@ func TestEgo(t *testing.T) {
assert.NoError(t, eventStore.Disconnect(ctx))
assert.NoError(t, engine.Stop(ctx))
})
t.Run("With SendCommand when not started", func(t *testing.T) {
t.Run("EventSourced entity With SendCommand when not started", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
@@ -240,7 +242,7 @@ func TestEgo(t *testing.T) {

assert.NoError(t, eventStore.Disconnect(ctx))
})
t.Run("With SendCommand when entityID is not set", func(t *testing.T) {
t.Run("EventSourced entity With SendCommand when entityID is not set", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
@@ -261,7 +263,7 @@ func TestEgo(t *testing.T) {
assert.NoError(t, eventStore.Disconnect(ctx))
assert.NoError(t, engine.Stop(ctx))
})
t.Run("With SendCommand when entity is not found", func(t *testing.T) {
t.Run("EventSourced entity With SendCommand when entity is not found", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
@@ -282,7 +284,7 @@ func TestEgo(t *testing.T) {
assert.NoError(t, eventStore.Disconnect(ctx))
assert.NoError(t, engine.Stop(ctx))
})
t.Run("With IsProjectionRunning when not started", func(t *testing.T) {
t.Run("EventSourced entity With IsProjectionRunning when not started", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
@@ -298,7 +300,7 @@ func TestEgo(t *testing.T) {

assert.NoError(t, eventStore.Disconnect(ctx))
})
t.Run("With RemoveProjection", func(t *testing.T) {
t.Run("EventSourced entity With RemoveProjection", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
@@ -341,7 +343,7 @@ func TestEgo(t *testing.T) {
assert.NoError(t, eventStore.Disconnect(ctx))
assert.NoError(t, engine.Stop(ctx))
})
t.Run("With RemoveProjection when not started", func(t *testing.T) {
t.Run("EventSourced entity With RemoveProjection when not started", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
@@ -356,33 +358,243 @@ func TestEgo(t *testing.T) {

assert.NoError(t, eventStore.Disconnect(ctx))
})

t.Run("DurableStore entity With single node cluster enabled", func(t *testing.T) {
ctx := context.TODO()
stateStore := memstore.NewStateStore()
require.NoError(t, stateStore.Connect(ctx))

nodePorts := dynaport.Get(3)
gossipPort := nodePorts[0]
clusterPort := nodePorts[1]
remotingPort := nodePorts[2]

host := "127.0.0.1"

// define discovered addresses
addrs := []string{
net.JoinHostPort(host, strconv.Itoa(gossipPort)),
}

// mock the discovery provider
provider := new(mockdisco.Provider)

provider.EXPECT().ID().Return("testDisco")
provider.EXPECT().Initialize().Return(nil)
provider.EXPECT().Register().Return(nil)
provider.EXPECT().Deregister().Return(nil)
provider.EXPECT().DiscoverPeers().Return(addrs, nil)
provider.EXPECT().Close().Return(nil)

// create the ego engine
engine := NewEngine("Sample", nil,
WithLogger(log.DiscardLogger),
WithStateStore(stateStore),
WithCluster(provider, 4, 1, host, remotingPort, gossipPort, clusterPort))

err := engine.Start(ctx)

// wait for the cluster to fully start
lib.Pause(time.Second)

// subscribe to events
subscriber, err := engine.Subscribe()
require.NoError(t, err)
require.NotNil(t, subscriber)

require.NoError(t, err)
// create a persistence id
entityID := uuid.NewString()

behavior := NewAccountDurableStateBehavior(entityID)
// create an entity
err = engine.DurableStateEntity(ctx, behavior)
require.NoError(t, err)
// send some commands to the pid
var command proto.Message
command = &testpb.CreateAccount{
AccountBalance: 500.00,
}

// wait for the cluster to fully start
lib.Pause(time.Second)

// send the command to the actor. Please don't ignore the error in production grid code
resultingState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute)
require.NoError(t, err)
account, ok := resultingState.(*testpb.Account)
require.True(t, ok)

assert.EqualValues(t, 500.00, account.GetAccountBalance())
assert.Equal(t, entityID, account.GetAccountId())
assert.EqualValues(t, 1, revision)

command = &testpb.CreditAccount{
AccountId: entityID,
Balance: 250,
}

newState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute)
require.NoError(t, err)
newAccount, ok := newState.(*testpb.Account)
require.True(t, ok)

assert.EqualValues(t, 750.00, newAccount.GetAccountBalance())
assert.Equal(t, entityID, newAccount.GetAccountId())
assert.EqualValues(t, 2, revision)

for message := range subscriber.Iterator() {
payload := message.Payload()
envelope, ok := payload.(*egopb.DurableState)
require.True(t, ok)
require.NotZero(t, envelope.GetVersionNumber())
}

// free resources
require.NoError(t, engine.Stop(ctx))
lib.Pause(time.Second)
require.NoError(t, stateStore.Disconnect(ctx))
})
t.Run("DurableStore entity With no cluster enabled", func(t *testing.T) {
ctx := context.TODO()
stateStore := memstore.NewStateStore()
require.NoError(t, stateStore.Connect(ctx))

// create the ego engine
engine := NewEngine("Sample", nil,
WithStateStore(stateStore),
WithLogger(log.DiscardLogger))

err := engine.Start(ctx)
require.NoError(t, err)

entityID := uuid.NewString()
behavior := NewAccountDurableStateBehavior(entityID)

err = engine.DurableStateEntity(ctx, behavior)
require.NoError(t, err)
var command proto.Message

command = &testpb.CreateAccount{
AccountBalance: 500.00,
}

resultingState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute)
require.NoError(t, err)
account, ok := resultingState.(*testpb.Account)
require.True(t, ok)

assert.EqualValues(t, 500.00, account.GetAccountBalance())
assert.Equal(t, entityID, account.GetAccountId())
assert.EqualValues(t, 1, revision)

// send another command to credit the balance
command = &testpb.CreditAccount{
AccountId: entityID,
Balance: 250,
}
newState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute)
require.NoError(t, err)
newAccount, ok := newState.(*testpb.Account)
require.True(t, ok)

assert.EqualValues(t, 750.00, newAccount.GetAccountBalance())
assert.Equal(t, entityID, newAccount.GetAccountId())
assert.EqualValues(t, 2, revision)

assert.NoError(t, engine.Stop(ctx))
lib.Pause(time.Second)
assert.NoError(t, stateStore.Disconnect(ctx))
})
t.Run("DurableStore entity With SendCommand when not started", func(t *testing.T) {
ctx := context.TODO()

stateStore := memstore.NewStateStore()
require.NoError(t, stateStore.Connect(ctx))

// create the ego engine
engine := NewEngine("Sample", nil,
WithStateStore(stateStore),
WithLogger(log.DiscardLogger))

entityID := uuid.NewString()

_, _, err := engine.SendCommand(ctx, entityID, new(testpb.CreateAccount), time.Minute)
require.Error(t, err)
assert.EqualError(t, err, ErrEngineNotStarted.Error())

assert.NoError(t, stateStore.Disconnect(ctx))
})
t.Run("DurableStore entity With SendCommand when entityID is not set", func(t *testing.T) {
ctx := context.TODO()
stateStore := memstore.NewStateStore()
require.NoError(t, stateStore.Connect(ctx))

// create the ego engine
engine := NewEngine("Sample", nil,
WithStateStore(stateStore),
WithLogger(log.DiscardLogger))
err := engine.Start(ctx)
require.NoError(t, err)

// create a persistence id
entityID := ""

_, _, err = engine.SendCommand(ctx, entityID, new(testpb.CreateAccount), time.Minute)
require.Error(t, err)
assert.EqualError(t, err, ErrUndefinedEntityID.Error())
assert.NoError(t, engine.Stop(ctx))
assert.NoError(t, stateStore.Disconnect(ctx))
})
t.Run("DurableStore entity With SendCommand when entity is not found", func(t *testing.T) {
ctx := context.TODO()

stateStore := memstore.NewStateStore()
require.NoError(t, stateStore.Connect(ctx))

// create the ego engine
engine := NewEngine("Sample", nil,
WithStateStore(stateStore),
WithLogger(log.DiscardLogger))
err := engine.Start(ctx)
require.NoError(t, err)

// create a persistence id
entityID := uuid.NewString()

_, _, err = engine.SendCommand(ctx, entityID, new(testpb.CreateAccount), time.Minute)
require.Error(t, err)
assert.EqualError(t, err, actors.ErrActorNotFound(entityID).Error())
assert.NoError(t, engine.Stop(ctx))
assert.NoError(t, stateStore.Disconnect(ctx))
})
}

// AccountBehavior implements persistence.Behavior
type AccountBehavior struct {
// EventSourcedEntity implements persistence.Behavior
type EventSourcedEntity struct {
id string
}

// make sure that AccountBehavior is a true persistence behavior
var _ EventSourcedBehavior = &AccountBehavior{}
// make sure that EventSourcedEntity is a true persistence behavior
var _ EventSourcedBehavior = &EventSourcedEntity{}

// NewAccountBehavior creates an instance of AccountBehavior
func NewAccountBehavior(id string) *AccountBehavior {
return &AccountBehavior{id: id}
// NewEventSourcedEntity creates an instance of EventSourcedEntity
func NewEventSourcedEntity(id string) *EventSourcedEntity {
return &EventSourcedEntity{id: id}
}

// ID returns the id
func (a *AccountBehavior) ID() string {
func (a *EventSourcedEntity) ID() string {
return a.id
}

// InitialState returns the initial state
func (a *AccountBehavior) InitialState() State {
func (a *EventSourcedEntity) InitialState() State {
return State(new(samplepb.Account))
}

// HandleCommand handles every command that is sent to the persistent behavior
func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) {
func (a *EventSourcedEntity) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) {
switch cmd := command.(type) {
case *samplepb.CreateAccount:
// TODO in production grid app validate the command using the prior state
@@ -408,7 +620,7 @@ func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ St
}

// HandleEvent handles every event emitted
func (a *AccountBehavior) HandleEvent(_ context.Context, event Event, priorState State) (state State, err error) {
func (a *EventSourcedEntity) HandleEvent(_ context.Context, event Event, priorState State) (state State, err error) {
switch evt := event.(type) {
case *samplepb.AccountCreated:
return &samplepb.Account{
2 changes: 0 additions & 2 deletions helper_test.go
Original file line number Diff line number Diff line change
@@ -55,7 +55,6 @@ func (t *AccountEventSourcedBehavior) InitialState() State {
func (t *AccountEventSourcedBehavior) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) {
switch cmd := command.(type) {
case *testpb.CreateAccount:
// TODO in production grid app validate the command using the prior state
return []Event{
&testpb.AccountCreated{
AccountId: t.id,
@@ -95,7 +94,6 @@ func (t *AccountEventSourcedBehavior) HandleEvent(_ context.Context, event Event
}, nil

case *testpb.AccountCredited:
// we can safely cast the prior state to Account
account := priorState.(*testpb.Account)
bal := account.GetAccountBalance() + evt.GetAccountBalance()
return &testpb.Account{
4 changes: 2 additions & 2 deletions plugins/statestore/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -71,8 +71,8 @@ type DurableStore struct {
// enforce interface implementation
var _ persistence.StateStore = (*DurableStore)(nil)

// NewEventsStore creates a new instance of PostgresEventStore
func NewEventsStore(config *Config) *DurableStore {
// NewStateStore creates a new instance of StateStore
func NewStateStore(config *Config) *DurableStore {
// create the underlying db connection
db := postgres.New(postgres.NewConfig(config.DBHost, config.DBPort, config.DBUser, config.DBPassword, config.DBName))
return &DurableStore{

0 comments on commit 7de6e19

Please sign in to comment.