Skip to content

Commit 8959214

Browse files
authored
feat: add durable state actor (#100)
1 parent 16af6b4 commit 8959214

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2381
-276
lines changed

Earthfile

+3-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ local-test:
8282
FROM +vendor
8383

8484
WITH DOCKER --pull postgres:11
85-
RUN go-acc ./... -o coverage.out --ignore egopb,test,example,mocks -- -mod=vendor -race -v
85+
RUN go-acc ./... -o coverage.out --ignore egopb,test,example,mocks -- -mod=vendor -timeout 0 -race -v
8686
END
8787

8888
SAVE ARTIFACT coverage.out AS LOCAL coverage.out
@@ -92,7 +92,8 @@ mock:
9292
FROM +code
9393

9494
# generate the mocks
95-
RUN mockery --dir eventstore --name EventsStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/eventstore --case snake
95+
RUN mockery --dir persistence --all --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/persistence --case snake
9696
RUN mockery --dir offsetstore --name OffsetStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/offsetstore --case snake
9797

98+
9899
SAVE ARTIFACT ./mocks mocks AS LOCAL mocks

behavior.go

+32-3
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ type Command proto.Message
3434
type Event proto.Message
3535
type State proto.Message
3636

37-
// EntityBehavior defines an event sourced behavior when modeling a CQRS EntityBehavior.
38-
type EntityBehavior interface {
37+
// EventSourcedBehavior defines an event sourced behavior when modeling a CQRS EventSourcedBehavior.
38+
type EventSourcedBehavior interface {
3939
// ID defines the id that will be used in the event journal.
4040
// This helps track the entity in the events store.
4141
ID() string
@@ -46,7 +46,7 @@ type EntityBehavior interface {
4646
// which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can
4747
// be returned as a no-op. Command handlers are the meat of the event sourced actor.
4848
// They encode the business rules of your event sourced actor and act as a guardian of the event sourced actor consistency.
49-
// The command eventSourcedHandler must first validate that the incoming command can be applied to the current model state.
49+
// The command must first validate that the incoming command can be applied to the current model state.
5050
// Any decision should be solely based on the data passed in the commands and the state of the Behavior.
5151
// In case of successful validation, one or more events expressing the mutations are persisted.
5252
// Once the events are persisted, they are applied to the state producing a new valid state.
@@ -58,3 +58,32 @@ type EntityBehavior interface {
5858
// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
5959
HandleEvent(ctx context.Context, event Event, priorState State) (state State, err error)
6060
}
61+
62+
// DurableStateBehavior represents a type of Actor that persists its full state after processing each command instead of using event sourcing.
63+
// This type of Actor keeps its current state in memory during command handling and based upon the command response
64+
// persists its full state into a durable store. The store can be a SQL or NoSQL database.
65+
// The whole concept is given the current state of the actor and a command produce a new state with a higher version as shown in this diagram: (State, Command) => State
66+
// DurableStateBehavior reacts to commands which result in a new version of the actor state. Only the latest version of the actor state is
67+
// persisted to the durable store. There is no concept of history regarding the actor state since this is not an event sourced actor.
68+
// However, one can rely on the version number of the actor state and exactly know how the actor state has evolved overtime.
69+
// State actor version number are numerically incremented by the command handler which means it is imperative that the newer version of the state is greater than the current version by one.
70+
//
71+
// DurableStateBehavior will attempt to recover its state whenever available from the durable state.
72+
// During a normal shutdown process, it will persist its current state to the durable store prior to shutting down.
73+
// This behavior help maintain some consistency across the actor state evolution.
74+
type DurableStateBehavior interface {
75+
// ID defines the id that will be used in the event journal.
76+
// This helps track the entity in the events store.
77+
ID() string
78+
// InitialState returns the durable state actor initial state.
79+
// This is set as the initial state when there are no snapshots found the entity
80+
InitialState() State
81+
// HandleCommand processes every command sent to the DurableStateBehavior. One needs to use the command, the priorVersion and the priorState sent to produce a newState and newVersion.
82+
// This defines how to handle each incoming command, which validations must be applied, and finally, whether a resulting state will be persisted depending upon the response.
83+
// They encode the business rules of your durable state actor and act as a guardian of the actor consistency.
84+
// The command handler must first validate that the incoming command can be applied to the current model state.
85+
// Any decision should be solely based on the data passed in the command, the priorVersion and the priorState.
86+
// In case of successful validation and processing , the new state will be stored in the durable store depending upon response.
87+
// The actor state will be updated with the newState only if the newVersion is 1 more than the already existing state.
88+
HandleCommand(ctx context.Context, command Command, priorVersion uint64, priorState State) (newState State, newVersion uint64, err error)
89+
}

durable_state_actor.go

+233
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2022-2024 Tochemey
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package ego
26+
27+
import (
28+
"context"
29+
"fmt"
30+
"math"
31+
"time"
32+
33+
"github.com/tochemey/goakt/v2/actors"
34+
"github.com/tochemey/goakt/v2/goaktpb"
35+
"golang.org/x/sync/errgroup"
36+
"google.golang.org/protobuf/proto"
37+
"google.golang.org/protobuf/types/known/anypb"
38+
"google.golang.org/protobuf/types/known/timestamppb"
39+
40+
"github.com/tochemey/ego/v3/egopb"
41+
"github.com/tochemey/ego/v3/eventstream"
42+
"github.com/tochemey/ego/v3/internal/errorschain"
43+
"github.com/tochemey/ego/v3/persistence"
44+
)
45+
46+
var (
47+
statesTopic = "topic.states.%d"
48+
)
49+
50+
// durableStateActor is a durable state based actor
51+
type durableStateActor struct {
52+
DurableStateBehavior
53+
stateStore persistence.StateStore
54+
currentState State
55+
currentVersion uint64
56+
lastCommandTime time.Time
57+
eventsStream eventstream.Stream
58+
actorSystem actors.ActorSystem
59+
}
60+
61+
// implements the actors.Actor interface
62+
var _ actors.Actor = (*durableStateActor)(nil)
63+
64+
// newDurableStateActor creates an instance of actor provided the DurableStateBehavior
65+
func newDurableStateActor(behavior DurableStateBehavior, stateStore persistence.StateStore, eventsStream eventstream.Stream) *durableStateActor {
66+
return &durableStateActor{
67+
stateStore: stateStore,
68+
eventsStream: eventsStream,
69+
DurableStateBehavior: behavior,
70+
}
71+
}
72+
73+
// PreStart pre-starts the actor
74+
func (entity *durableStateActor) PreStart(ctx context.Context) error {
75+
return errorschain.
76+
New(errorschain.ReturnFirst()).
77+
AddError(entity.durableStateRequired()).
78+
AddError(entity.stateStore.Ping(ctx)).
79+
AddError(entity.recoverFromStore(ctx)).
80+
Error()
81+
}
82+
83+
// Receive processes any message dropped into the actor mailbox.
84+
func (entity *durableStateActor) Receive(ctx *actors.ReceiveContext) {
85+
switch command := ctx.Message().(type) {
86+
case *goaktpb.PostStart:
87+
entity.actorSystem = ctx.ActorSystem()
88+
case *egopb.GetStateCommand:
89+
entity.sendStateReply(ctx)
90+
default:
91+
entity.processCommand(ctx, command)
92+
}
93+
}
94+
95+
// PostStop prepares the actor to gracefully shutdown
96+
func (entity *durableStateActor) PostStop(ctx context.Context) error {
97+
return errorschain.
98+
New(errorschain.ReturnFirst()).
99+
AddError(entity.stateStore.Ping(ctx)).
100+
AddError(entity.persistStateAndPublish(ctx)).
101+
Error()
102+
}
103+
104+
// recoverFromStore reset the persistent actor to the latest state in case there is one
105+
// this is vital when the entity actor is restarting.
106+
func (entity *durableStateActor) recoverFromStore(ctx context.Context) error {
107+
durableState, err := entity.stateStore.GetLatestState(ctx, entity.ID())
108+
if err != nil {
109+
return fmt.Errorf("failed unmarshal the latest state: %w", err)
110+
}
111+
112+
if durableState != nil && proto.Equal(durableState, new(egopb.DurableState)) {
113+
currentState := entity.InitialState()
114+
if err := durableState.GetResultingState().UnmarshalTo(currentState); err != nil {
115+
return fmt.Errorf("failed unmarshal the latest state: %w", err)
116+
}
117+
118+
entity.currentState = currentState
119+
entity.currentVersion = durableState.GetVersionNumber()
120+
return nil
121+
}
122+
123+
entity.currentState = entity.InitialState()
124+
return nil
125+
}
126+
127+
// processCommand processes the incoming command
128+
func (entity *durableStateActor) processCommand(receiveContext *actors.ReceiveContext, command Command) {
129+
ctx := receiveContext.Context()
130+
newState, newVersion, err := entity.HandleCommand(ctx, command, entity.currentVersion, entity.currentState)
131+
if err != nil {
132+
entity.sendErrorReply(receiveContext, err)
133+
return
134+
}
135+
136+
// check whether the pre-conditions have met
137+
if err := entity.checkPreconditions(newState, newVersion); err != nil {
138+
entity.sendErrorReply(receiveContext, err)
139+
return
140+
}
141+
142+
// set the current state with the newState
143+
entity.currentState = newState
144+
entity.lastCommandTime = timestamppb.Now().AsTime()
145+
entity.currentVersion = newVersion
146+
147+
if err := entity.persistStateAndPublish(ctx); err != nil {
148+
entity.sendErrorReply(receiveContext, err)
149+
return
150+
}
151+
152+
entity.sendStateReply(receiveContext)
153+
}
154+
155+
// sendStateReply sends a state reply message
156+
func (entity *durableStateActor) sendStateReply(ctx *actors.ReceiveContext) {
157+
state, _ := anypb.New(entity.currentState)
158+
ctx.Response(&egopb.CommandReply{
159+
Reply: &egopb.CommandReply_StateReply{
160+
StateReply: &egopb.StateReply{
161+
PersistenceId: entity.ID(),
162+
State: state,
163+
SequenceNumber: entity.currentVersion,
164+
Timestamp: entity.lastCommandTime.Unix(),
165+
},
166+
},
167+
})
168+
}
169+
170+
// sendErrorReply sends an error as a reply message
171+
func (entity *durableStateActor) sendErrorReply(ctx *actors.ReceiveContext, err error) {
172+
ctx.Response(&egopb.CommandReply{
173+
Reply: &egopb.CommandReply_ErrorReply{
174+
ErrorReply: &egopb.ErrorReply{
175+
Message: err.Error(),
176+
},
177+
},
178+
})
179+
}
180+
181+
// checkAndSetPreconditions validates the newState and the newVersion
182+
func (entity *durableStateActor) checkPreconditions(newState State, newVersion uint64) error {
183+
currentState := entity.currentState
184+
currentStateType := currentState.ProtoReflect().Descriptor().FullName()
185+
latestStateType := newState.ProtoReflect().Descriptor().FullName()
186+
if currentStateType != latestStateType {
187+
return fmt.Errorf("mismatch state types: %s != %s", currentStateType, latestStateType)
188+
}
189+
190+
proceed := int(math.Abs(float64(newVersion-entity.currentVersion))) == 1
191+
if !proceed {
192+
return fmt.Errorf("%s received version=(%d) while current version is (%d)",
193+
entity.ID(),
194+
newVersion,
195+
entity.currentVersion)
196+
}
197+
return nil
198+
}
199+
200+
// checks whether the durable state store is set or not
201+
func (entity *durableStateActor) durableStateRequired() error {
202+
if entity.stateStore == nil {
203+
return ErrDurableStateStoreRequired
204+
}
205+
return nil
206+
}
207+
208+
// persistState persists the actor state
209+
func (entity *durableStateActor) persistStateAndPublish(ctx context.Context) error {
210+
resultingState, _ := anypb.New(entity.currentState)
211+
shardNumber := entity.actorSystem.GetPartition(entity.ID())
212+
topic := fmt.Sprintf(statesTopic, shardNumber)
213+
214+
durableState := &egopb.DurableState{
215+
PersistenceId: entity.ID(),
216+
VersionNumber: entity.currentVersion,
217+
ResultingState: resultingState,
218+
Timestamp: entity.lastCommandTime.Unix(),
219+
Shard: shardNumber,
220+
}
221+
222+
eg, ctx := errgroup.WithContext(ctx)
223+
eg.Go(func() error {
224+
entity.eventsStream.Publish(topic, durableState)
225+
return nil
226+
})
227+
228+
eg.Go(func() error {
229+
return entity.stateStore.WriteState(ctx, durableState)
230+
})
231+
232+
return eg.Wait()
233+
}

0 commit comments

Comments
 (0)