Skip to content

Commit ff03386

Browse files
committed
refactor: maintenance refactor
1 parent 690aceb commit ff03386

11 files changed

+540
-94
lines changed

durable_state_actor.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,12 @@ func newDurableStateActor(behavior DurableStateBehavior, stateStore persistence.
7272

7373
// PreStart pre-starts the actor
7474
func (entity *durableStateActor) PreStart(ctx context.Context) error {
75+
if err := entity.durableStateRequired(); err != nil {
76+
return err
77+
}
78+
7579
return errorschain.
7680
New(errorschain.ReturnFirst()).
77-
AddError(entity.durableStateRequired()).
7881
AddError(entity.stateStore.Ping(ctx)).
7982
AddError(entity.recoverFromStore(ctx)).
8083
Error()
@@ -106,21 +109,23 @@ func (entity *durableStateActor) PostStop(ctx context.Context) error {
106109
func (entity *durableStateActor) recoverFromStore(ctx context.Context) error {
107110
durableState, err := entity.stateStore.GetLatestState(ctx, entity.ID())
108111
if err != nil {
109-
return fmt.Errorf("failed unmarshal the latest state: %w", err)
112+
return fmt.Errorf("failed to get the latest state: %w", err)
110113
}
111114

112-
if durableState != nil && proto.Equal(durableState, new(egopb.DurableState)) {
113-
currentState := entity.InitialState()
114-
if err := durableState.GetResultingState().UnmarshalTo(currentState); err != nil {
115-
return fmt.Errorf("failed unmarshal the latest state: %w", err)
116-
}
117-
118-
entity.currentState = currentState
119-
entity.currentVersion = durableState.GetVersionNumber()
115+
if durableState == nil || proto.Equal(durableState, new(egopb.DurableState)) {
116+
entity.currentState = entity.InitialState()
120117
return nil
121118
}
122119

123-
entity.currentState = entity.InitialState()
120+
currentState := entity.InitialState()
121+
if resultingState := durableState.GetResultingState(); resultingState != nil {
122+
if err := resultingState.UnmarshalTo(currentState); err != nil {
123+
return fmt.Errorf("failed to unmarshal the latest state: %w", err)
124+
}
125+
}
126+
127+
entity.currentState = currentState
128+
entity.currentVersion = durableState.GetVersionNumber()
124129
return nil
125130
}
126131

durable_state_actor_test.go

+170
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ import (
3131

3232
"github.com/google/uuid"
3333
"github.com/stretchr/testify/assert"
34+
"github.com/stretchr/testify/mock"
3435
"github.com/stretchr/testify/require"
3536
goakt "github.com/tochemey/goakt/v3/actor"
3637
"github.com/tochemey/goakt/v3/log"
38+
"go.uber.org/goleak"
3739
"google.golang.org/protobuf/proto"
40+
"google.golang.org/protobuf/types/known/anypb"
3841

3942
"github.com/tochemey/ego/v3/egopb"
4043
"github.com/tochemey/ego/v3/eventstream"
4144
"github.com/tochemey/ego/v3/internal/lib"
45+
mocks "github.com/tochemey/ego/v3/mocks/persistence"
4246
testpb "github.com/tochemey/ego/v3/test/data/pb/v3"
4347
"github.com/tochemey/ego/v3/testkit"
4448
)
@@ -357,4 +361,170 @@ func TestDurableStateBehavior(t *testing.T) {
357361
assert.NoError(t, durableStore.Disconnect(ctx))
358362
eventStream.Close()
359363
})
364+
t.Run("with state recovery from state store with no latest state", func(t *testing.T) {
365+
defer goleak.VerifyNone(t)
366+
ctx := context.TODO()
367+
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
368+
goakt.WithPassivationDisabled(),
369+
goakt.WithLogger(log.DiscardLogger),
370+
goakt.WithActorInitMaxRetries(3),
371+
)
372+
require.NoError(t, err)
373+
assert.NotNil(t, actorSystem)
374+
375+
// start the actor system
376+
err = actorSystem.Start(ctx)
377+
require.NoError(t, err)
378+
379+
lib.Pause(time.Second)
380+
381+
persistenceID := uuid.NewString()
382+
behavior := NewAccountDurableStateBehavior(persistenceID)
383+
384+
eventStream := eventstream.New()
385+
386+
durableStore := new(mocks.StateStore)
387+
durableStore.EXPECT().Ping(mock.Anything).Return(nil)
388+
durableStore.EXPECT().GetLatestState(mock.Anything, behavior.ID()).Return(new(egopb.DurableState), nil)
389+
durableStore.EXPECT().WriteState(mock.Anything, mock.AnythingOfType("*egopb.DurableState")).Return(nil)
390+
391+
persistentActor := newDurableStateActor(behavior, durableStore, eventStream)
392+
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
393+
require.NoError(t, err)
394+
require.NotNil(t, pid)
395+
396+
lib.Pause(time.Second)
397+
398+
err = actorSystem.Stop(ctx)
399+
require.NoError(t, err)
400+
401+
lib.Pause(time.Second)
402+
403+
eventStream.Close()
404+
durableStore.AssertExpectations(t)
405+
})
406+
t.Run("with state recovery from state store failure", func(t *testing.T) {
407+
defer goleak.VerifyNone(t)
408+
ctx := context.TODO()
409+
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
410+
goakt.WithPassivationDisabled(),
411+
goakt.WithLogger(log.DiscardLogger),
412+
goakt.WithActorInitMaxRetries(3),
413+
)
414+
require.NoError(t, err)
415+
assert.NotNil(t, actorSystem)
416+
417+
// start the actor system
418+
err = actorSystem.Start(ctx)
419+
require.NoError(t, err)
420+
421+
lib.Pause(time.Second)
422+
423+
persistenceID := uuid.NewString()
424+
behavior := NewAccountDurableStateBehavior(persistenceID)
425+
426+
eventStream := eventstream.New()
427+
428+
durableStore := new(mocks.StateStore)
429+
durableStore.EXPECT().Ping(mock.Anything).Return(nil)
430+
durableStore.EXPECT().GetLatestState(mock.Anything, behavior.ID()).Return(nil, assert.AnError)
431+
432+
persistentActor := newDurableStateActor(behavior, durableStore, eventStream)
433+
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
434+
require.Error(t, err)
435+
require.Nil(t, pid)
436+
437+
lib.Pause(time.Second)
438+
439+
err = actorSystem.Stop(ctx)
440+
assert.NoError(t, err)
441+
442+
lib.Pause(time.Second)
443+
eventStream.Close()
444+
durableStore.AssertExpectations(t)
445+
})
446+
t.Run("with state recovery from state store with initial parsing failure", func(t *testing.T) {
447+
defer goleak.VerifyNone(t)
448+
ctx := context.TODO()
449+
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
450+
goakt.WithPassivationDisabled(),
451+
goakt.WithLogger(log.DiscardLogger),
452+
goakt.WithActorInitMaxRetries(3),
453+
)
454+
require.NoError(t, err)
455+
assert.NotNil(t, actorSystem)
456+
457+
// start the actor system
458+
err = actorSystem.Start(ctx)
459+
require.NoError(t, err)
460+
461+
lib.Pause(time.Second)
462+
463+
persistenceID := uuid.NewString()
464+
behavior := NewAccountDurableStateBehavior(persistenceID)
465+
466+
eventStream := eventstream.New()
467+
468+
latestState := &egopb.DurableState{
469+
ResultingState: &anypb.Any{
470+
TypeUrl: "invalid-type-url",
471+
Value: []byte("invalid-value"),
472+
},
473+
}
474+
durableStore := new(mocks.StateStore)
475+
durableStore.EXPECT().Ping(mock.Anything).Return(nil)
476+
durableStore.EXPECT().GetLatestState(mock.Anything, behavior.ID()).Return(latestState, nil)
477+
478+
persistentActor := newDurableStateActor(behavior, durableStore, eventStream)
479+
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
480+
require.Error(t, err)
481+
require.Nil(t, pid)
482+
483+
lib.Pause(time.Second)
484+
485+
err = actorSystem.Stop(ctx)
486+
assert.NoError(t, err)
487+
488+
lib.Pause(time.Second)
489+
490+
eventStream.Close()
491+
durableStore.AssertExpectations(t)
492+
})
493+
494+
t.Run("with no durable state store", func(t *testing.T) {
495+
defer goleak.VerifyNone(t)
496+
ctx := context.TODO()
497+
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
498+
goakt.WithPassivationDisabled(),
499+
goakt.WithLogger(log.DiscardLogger),
500+
goakt.WithActorInitMaxRetries(3),
501+
)
502+
require.NoError(t, err)
503+
assert.NotNil(t, actorSystem)
504+
505+
// start the actor system
506+
err = actorSystem.Start(ctx)
507+
require.NoError(t, err)
508+
509+
lib.Pause(time.Second)
510+
511+
persistenceID := uuid.NewString()
512+
behavior := NewAccountDurableStateBehavior(persistenceID)
513+
514+
eventStream := eventstream.New()
515+
516+
persistentActor := newDurableStateActor(behavior, nil, eventStream)
517+
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
518+
require.Error(t, err)
519+
require.Nil(t, pid)
520+
521+
lib.Pause(time.Second)
522+
523+
err = actorSystem.Stop(ctx)
524+
assert.NoError(t, err)
525+
526+
lib.Pause(time.Second)
527+
528+
eventStream.Close()
529+
})
360530
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/google/uuid v1.6.0
99
github.com/kapetan-io/tackle v0.11.0
1010
github.com/stretchr/testify v1.10.0
11-
github.com/tochemey/goakt/v3 v3.1.0
11+
github.com/tochemey/goakt/v3 v3.1.1-0.20250223115635-0da908109d68
1212
github.com/travisjeffery/go-dynaport v1.0.0
1313
go.uber.org/atomic v1.11.0
1414
go.uber.org/goleak v1.3.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,8 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
243243
github.com/tidwall/redcon v1.6.2 h1:5qfvrrybgtO85jnhSravmkZyC0D+7WstbfCs3MmPhow=
244244
github.com/tidwall/redcon v1.6.2/go.mod h1:p5Wbsgeyi2VSTBWOcA5vRXrOb9arFTcU2+ZzFjqV75Y=
245245
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
246-
github.com/tochemey/goakt/v3 v3.1.0 h1:ZhQZr631bvOsdpi1aPdAjoo+pOveDNBzOjOs4jRzRy8=
247-
github.com/tochemey/goakt/v3 v3.1.0/go.mod h1:iaLGbOHWtGiViJaJ1IrIMyvlafGuNIViUIs8/K060w8=
246+
github.com/tochemey/goakt/v3 v3.1.1-0.20250223115635-0da908109d68 h1:tAsHrw+gghsgDGZ778PD7tKybjDIcCcH/SfbiUAKu74=
247+
github.com/tochemey/goakt/v3 v3.1.1-0.20250223115635-0da908109d68/go.mod h1:iaLGbOHWtGiViJaJ1IrIMyvlafGuNIViUIs8/K060w8=
248248
github.com/tochemey/olric v0.2.1 h1:6RhHx5lBNgRj5IXatuu5nAaHviOmgukxpZDFtUfL52A=
249249
github.com/tochemey/olric v0.2.1/go.mod h1:3V8kqOJwN4eQj6M0BbLCMTDSdDhZJitVY4aUuOJUA28=
250250
github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=

internal/errorschain/errorschain.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,18 @@ func (c *Chain) AddErrors(errs ...error) *Chain {
6363

6464
// Error returns the error
6565
func (c *Chain) Error() error {
66+
if c.returnFirst {
67+
for _, v := range c.errs {
68+
if v != nil {
69+
return v
70+
}
71+
}
72+
return nil
73+
}
74+
6675
var err error
6776
for _, v := range c.errs {
6877
if v != nil {
69-
if c.returnFirst {
70-
// just return the error
71-
return v
72-
}
73-
// append error to the violations
7478
err = multierr.Append(err, v)
7579
}
7680
}

projection/handler.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@ package projection
2727
import (
2828
"context"
2929

30+
"github.com/tochemey/goakt/v3/log"
3031
"go.uber.org/atomic"
3132
"google.golang.org/protobuf/types/known/anypb"
32-
33-
"github.com/tochemey/goakt/v3/log"
3433
)
3534

3635
// Handler is used to handle event and state consumed from the event store

projection/option.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@ func (f OptionFunc) Apply(runner *runner) {
4646
f(runner)
4747
}
4848

49-
// WithRefreshInterval sets the refresh interval
50-
func WithRefreshInterval(interval time.Duration) Option {
49+
// WithPullInterval sets the events pull interval
50+
// This defines how often the projection will fetch events
51+
func WithPullInterval(interval time.Duration) Option {
5152
return OptionFunc(func(runner *runner) {
52-
runner.refreshInterval = interval
53+
runner.pullInterval = interval
5354
})
5455
}
5556

projection/option_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"time"
3030

3131
"github.com/stretchr/testify/assert"
32-
3332
"github.com/tochemey/goakt/v3/log"
3433
)
3534

@@ -45,8 +44,8 @@ func TestOption(t *testing.T) {
4544
}{
4645
{
4746
name: "WithRefreshInterval",
48-
option: WithRefreshInterval(ts),
49-
expected: runner{refreshInterval: ts},
47+
option: WithPullInterval(ts),
48+
expected: runner{pullInterval: ts},
5049
},
5150
{
5251
name: "WithMaxBufferSize",

0 commit comments

Comments
 (0)