Skip to content

Commit 352c755

Browse files
committed
chore: maintenance
1 parent 2cbcc70 commit 352c755

File tree

2 files changed

+157
-29
lines changed

2 files changed

+157
-29
lines changed

engine.go

+4
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,10 @@ func (engine *Engine) IsProjectionRunning(ctx context.Context, name string) (boo
310310
// Returns:
311311
// - An error if the shutdown process encounters issues; otherwise, nil.
312312
func (engine *Engine) Stop(ctx context.Context) error {
313+
if !engine.Started() {
314+
return nil
315+
}
316+
313317
engine.started.Store(false)
314318

315319
// shutdown all event eventsStreams

engine_test.go

+153-29
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"errors"
3131
"net"
3232
"strconv"
33+
"strings"
3334
"testing"
3435
"time"
3536

@@ -50,17 +51,17 @@ import (
5051
egomock "github.com/tochemey/ego/v3/mocks/ego"
5152
"github.com/tochemey/ego/v3/projection"
5253
testpb "github.com/tochemey/ego/v3/test/data/pb/v3"
53-
testkit2 "github.com/tochemey/ego/v3/testkit"
54+
testkit "github.com/tochemey/ego/v3/testkit"
5455
)
5556

5657
// nolint
5758
func TestEngine(t *testing.T) {
5859
t.Run("EventSourced entity With single node cluster enabled", func(t *testing.T) {
5960
ctx := context.TODO()
6061
// create the event store
61-
eventStore := testkit2.NewEventsStore()
62+
eventStore := testkit.NewEventsStore()
6263
require.NoError(t, eventStore.Connect(ctx))
63-
offsetStore := testkit2.NewOffsetStore()
64+
offsetStore := testkit.NewOffsetStore()
6465
require.NoError(t, offsetStore.Connect(ctx))
6566

6667
nodePorts := dynaport.Get(3)
@@ -130,7 +131,7 @@ func TestEngine(t *testing.T) {
130131
// create an entity behavior with a given id
131132
behavior := NewEventSourcedEntity(entityID)
132133
// create an entity
133-
err = engine.Entity(ctx, behavior)
134+
err = engine.Entity(ctx, behavior, WithPassivateAfter(time.Hour))
134135
require.NoError(t, err)
135136
// send some commands to the pid
136137
var command proto.Message
@@ -190,7 +191,7 @@ func TestEngine(t *testing.T) {
190191
t.Run("EventSourced entity With no cluster enabled", func(t *testing.T) {
191192
ctx := context.TODO()
192193
// create the event store
193-
eventStore := testkit2.NewEventsStore()
194+
eventStore := testkit.NewEventsStore()
194195
// connect to the event store
195196
require.NoError(t, eventStore.Connect(ctx))
196197

@@ -263,7 +264,7 @@ func TestEngine(t *testing.T) {
263264
t.Run("EventSourced entity With SendCommand when not started", func(t *testing.T) {
264265
ctx := context.TODO()
265266
// create the event store
266-
eventStore := testkit2.NewEventsStore()
267+
eventStore := testkit.NewEventsStore()
267268
require.NoError(t, eventStore.Connect(ctx))
268269

269270
// create the ego engine
@@ -273,14 +274,14 @@ func TestEngine(t *testing.T) {
273274

274275
_, _, err := engine.SendCommand(ctx, entityID, new(samplepb.CreateAccount), time.Minute)
275276
require.Error(t, err)
276-
assert.EqualError(t, err, ErrEngineNotStarted.Error())
277+
require.EqualError(t, err, ErrEngineNotStarted.Error())
277278

278-
assert.NoError(t, eventStore.Disconnect(ctx))
279+
require.NoError(t, eventStore.Disconnect(ctx))
279280
})
280281
t.Run("EventSourced entity With SendCommand when entityID is not set", func(t *testing.T) {
281282
ctx := context.TODO()
282283
// create the event store
283-
eventStore := testkit2.NewEventsStore()
284+
eventStore := testkit.NewEventsStore()
284285
require.NoError(t, eventStore.Connect(ctx))
285286

286287
// create the ego engine
@@ -301,7 +302,7 @@ func TestEngine(t *testing.T) {
301302
t.Run("EventSourced entity With SendCommand when entity is not found", func(t *testing.T) {
302303
ctx := context.TODO()
303304
// create the event store
304-
eventStore := testkit2.NewEventsStore()
305+
eventStore := testkit.NewEventsStore()
305306
require.NoError(t, eventStore.Connect(ctx))
306307

307308
// create the ego engine
@@ -322,7 +323,7 @@ func TestEngine(t *testing.T) {
322323
t.Run("EventSourced entity With IsProjectionRunning when not started", func(t *testing.T) {
323324
ctx := context.TODO()
324325
// create the event store
325-
eventStore := testkit2.NewEventsStore()
326+
eventStore := testkit.NewEventsStore()
326327
require.NoError(t, eventStore.Connect(ctx))
327328

328329
// create the ego engine
@@ -338,11 +339,11 @@ func TestEngine(t *testing.T) {
338339
t.Run("EventSourced entity With RemoveProjection", func(t *testing.T) {
339340
ctx := context.TODO()
340341
// create the event store
341-
eventStore := testkit2.NewEventsStore()
342+
eventStore := testkit.NewEventsStore()
342343
// connect to the event store
343344
require.NoError(t, eventStore.Connect(ctx))
344345

345-
offsetStore := testkit2.NewOffsetStore()
346+
offsetStore := testkit.NewOffsetStore()
346347
require.NoError(t, offsetStore.Connect(ctx))
347348

348349
// create the ego engine
@@ -381,7 +382,7 @@ func TestEngine(t *testing.T) {
381382
t.Run("EventSourced entity With RemoveProjection when not started", func(t *testing.T) {
382383
ctx := context.TODO()
383384
// create the event store
384-
eventStore := testkit2.NewEventsStore()
385+
eventStore := testkit.NewEventsStore()
385386
require.NoError(t, eventStore.Connect(ctx))
386387

387388
// create the ego engine
@@ -395,7 +396,7 @@ func TestEngine(t *testing.T) {
395396
})
396397
t.Run("DurableStore entity With single node cluster enabled", func(t *testing.T) {
397398
ctx := context.TODO()
398-
stateStore := testkit2.NewDurableStore()
399+
stateStore := testkit.NewDurableStore()
399400
require.NoError(t, stateStore.Connect(ctx))
400401

401402
nodePorts := dynaport.Get(3)
@@ -491,7 +492,7 @@ func TestEngine(t *testing.T) {
491492
})
492493
t.Run("DurableStore entity With no cluster enabled", func(t *testing.T) {
493494
ctx := context.TODO()
494-
stateStore := testkit2.NewDurableStore()
495+
stateStore := testkit.NewDurableStore()
495496
require.NoError(t, stateStore.Connect(ctx))
496497

497498
// create the ego engine
@@ -512,7 +513,7 @@ func TestEngine(t *testing.T) {
512513
entityID := uuid.NewString()
513514
behavior := NewAccountDurableStateBehavior(entityID)
514515

515-
err = engine.DurableStateEntity(ctx, behavior)
516+
err = engine.DurableStateEntity(ctx, behavior, WithPassivateAfter(time.Hour))
516517
require.NoError(t, err)
517518
var command proto.Message
518519

@@ -557,7 +558,7 @@ func TestEngine(t *testing.T) {
557558
t.Run("DurableStore entity With SendCommand when not started", func(t *testing.T) {
558559
ctx := context.TODO()
559560

560-
stateStore := testkit2.NewDurableStore()
561+
stateStore := testkit.NewDurableStore()
561562
require.NoError(t, stateStore.Connect(ctx))
562563

563564
// create the ego engine
@@ -575,7 +576,7 @@ func TestEngine(t *testing.T) {
575576
})
576577
t.Run("DurableStore entity With SendCommand when entityID is not set", func(t *testing.T) {
577578
ctx := context.TODO()
578-
stateStore := testkit2.NewDurableStore()
579+
stateStore := testkit.NewDurableStore()
579580
require.NoError(t, stateStore.Connect(ctx))
580581

581582
// create the ego engine
@@ -597,7 +598,7 @@ func TestEngine(t *testing.T) {
597598
t.Run("DurableStore entity With SendCommand when entity is not found", func(t *testing.T) {
598599
ctx := context.TODO()
599600

600-
stateStore := testkit2.NewDurableStore()
601+
stateStore := testkit.NewDurableStore()
601602
require.NoError(t, stateStore.Connect(ctx))
602603

603604
// create the ego engine
@@ -619,9 +620,9 @@ func TestEngine(t *testing.T) {
619620
t.Run("With Events Publisher with cluster enabled", func(t *testing.T) {
620621
ctx := context.TODO()
621622
// create the event store
622-
eventStore := testkit2.NewEventsStore()
623+
eventStore := testkit.NewEventsStore()
623624
require.NoError(t, eventStore.Connect(ctx))
624-
offsetStore := testkit2.NewOffsetStore()
625+
offsetStore := testkit.NewOffsetStore()
625626
require.NoError(t, offsetStore.Connect(ctx))
626627

627628
nodePorts := dynaport.Get(3)
@@ -768,7 +769,7 @@ func TestEngine(t *testing.T) {
768769
})
769770
t.Run("With DurableState Publisher with no cluster enabled", func(t *testing.T) {
770771
ctx := context.TODO()
771-
stateStore := testkit2.NewDurableStore()
772+
stateStore := testkit.NewDurableStore()
772773
require.NoError(t, stateStore.Connect(ctx))
773774

774775
// mock the state publisher
@@ -837,7 +838,7 @@ func TestEngine(t *testing.T) {
837838
})
838839
t.Run("With DurableState Publisher with cluster enabled", func(t *testing.T) {
839840
ctx := context.TODO()
840-
stateStore := testkit2.NewDurableStore()
841+
stateStore := testkit.NewDurableStore()
841842
require.NoError(t, stateStore.Connect(ctx))
842843

843844
nodePorts := dynaport.Get(3)
@@ -949,7 +950,7 @@ func TestEngine(t *testing.T) {
949950
t.Run("With DurableState Publisher when not started", func(t *testing.T) {
950951
ctx := context.TODO()
951952
// create the event store
952-
eventStore := testkit2.NewEventsStore()
953+
eventStore := testkit.NewEventsStore()
953954
require.NoError(t, eventStore.Connect(ctx))
954955

955956
publisher := new(egomock.StatePublisher)
@@ -965,7 +966,7 @@ func TestEngine(t *testing.T) {
965966
t.Run("With EventPublisher when not started", func(t *testing.T) {
966967
ctx := context.TODO()
967968
// create the event store
968-
eventStore := testkit2.NewEventsStore()
969+
eventStore := testkit.NewEventsStore()
969970
require.NoError(t, eventStore.Connect(ctx))
970971

971972
publisher := new(egomock.EventPublisher)
@@ -980,7 +981,7 @@ func TestEngine(t *testing.T) {
980981
})
981982
t.Run("With Engine Stop failure when EventPublisher close fails", func(t *testing.T) {
982983
ctx := context.TODO()
983-
stateStore := testkit2.NewDurableStore()
984+
stateStore := testkit.NewDurableStore()
984985
require.NoError(t, stateStore.Connect(ctx))
985986

986987
// mock the state publisher
@@ -1008,10 +1009,9 @@ func TestEngine(t *testing.T) {
10081009
lib.Pause(time.Second)
10091010
publisher.AssertExpectations(t)
10101011
})
1011-
10121012
t.Run("With Engine Stop failure when DurableState Publisher close fails", func(t *testing.T) {
10131013
ctx := context.TODO()
1014-
stateStore := testkit2.NewDurableStore()
1014+
stateStore := testkit.NewDurableStore()
10151015
require.NoError(t, stateStore.Connect(ctx))
10161016

10171017
// mock the state publisher
@@ -1039,6 +1039,130 @@ func TestEngine(t *testing.T) {
10391039
lib.Pause(time.Second)
10401040
publisher.AssertExpectations(t)
10411041
})
1042+
t.Run("With AddProjection when engine not started", func(t *testing.T) {
1043+
ctx := context.TODO()
1044+
// create the event store
1045+
eventStore := testkit.NewEventsStore()
1046+
offsetStore := testkit.NewOffsetStore()
1047+
1048+
// create a projection message handler
1049+
handler := projection.NewDiscardHandler(log.DiscardLogger)
1050+
1051+
engine := NewEngine("Sample", eventStore,
1052+
WithLogger(log.DiscardLogger))
1053+
1054+
projectionName := "projection"
1055+
err := engine.AddProjection(ctx, projectionName, handler, offsetStore)
1056+
require.Error(t, err)
1057+
})
1058+
t.Run("With AddProjection when projection name is invalid", func(t *testing.T) {
1059+
ctx := context.TODO()
1060+
// create the event store
1061+
eventStore := testkit.NewEventsStore()
1062+
// connect to the event store
1063+
require.NoError(t, eventStore.Connect(ctx))
1064+
1065+
offsetStore := testkit.NewOffsetStore()
1066+
require.NoError(t, offsetStore.Connect(ctx))
1067+
1068+
// create the ego engine
1069+
engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger))
1070+
// start ego engine
1071+
err := engine.Start(ctx)
1072+
require.NoError(t, err)
1073+
1074+
lib.Pause(time.Second)
1075+
1076+
// create a projection message handler
1077+
handler := projection.NewDiscardHandler(log.DiscardLogger)
1078+
// add projection
1079+
projectionName := strings.Repeat("a", 256)
1080+
err = engine.AddProjection(ctx, projectionName, handler, offsetStore)
1081+
require.Error(t, err)
1082+
require.Contains(t, err.Error(), "failed to register the projection")
1083+
1084+
// free resources
1085+
assert.NoError(t, offsetStore.Disconnect(ctx))
1086+
assert.NoError(t, eventStore.Disconnect(ctx))
1087+
assert.NoError(t, engine.Stop(ctx))
1088+
})
1089+
t.Run("With Subscribe when not started", func(t *testing.T) {
1090+
ctx := context.TODO()
1091+
// create the event store
1092+
eventStore := testkit.NewEventsStore()
1093+
require.NoError(t, eventStore.Connect(ctx))
1094+
1095+
engine := NewEngine("Sample", eventStore,
1096+
WithLogger(log.DiscardLogger))
1097+
1098+
// subscribe to events
1099+
subscriber, err := engine.Subscribe()
1100+
require.Error(t, err)
1101+
require.ErrorIs(t, err, ErrEngineNotStarted)
1102+
require.Nil(t, subscriber)
1103+
1104+
// free resources
1105+
require.NoError(t, eventStore.Disconnect(ctx))
1106+
require.NoError(t, engine.Stop(ctx))
1107+
})
1108+
t.Run("EventSourced entity when not started", func(t *testing.T) {
1109+
ctx := context.TODO()
1110+
// create the event store
1111+
eventStore := testkit.NewEventsStore()
1112+
require.NoError(t, eventStore.Connect(ctx))
1113+
1114+
// create the ego engine
1115+
engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger))
1116+
// create a persistence id
1117+
entityID := uuid.NewString()
1118+
// create an entity behavior with a given id
1119+
behavior := NewEventSourcedEntity(entityID)
1120+
1121+
err := engine.Entity(ctx, behavior)
1122+
require.Error(t, err)
1123+
require.ErrorIs(t, err, ErrEngineNotStarted)
1124+
1125+
require.NoError(t, eventStore.Disconnect(ctx))
1126+
})
1127+
t.Run("DurableStore entity when not started", func(t *testing.T) {
1128+
ctx := context.TODO()
1129+
1130+
stateStore := testkit.NewDurableStore()
1131+
require.NoError(t, stateStore.Connect(ctx))
1132+
1133+
// create the ego engine
1134+
engine := NewEngine("Sample", nil,
1135+
WithStateStore(stateStore),
1136+
WithLogger(log.DiscardLogger))
1137+
1138+
entityID := uuid.NewString()
1139+
behavior := NewAccountDurableStateBehavior(entityID)
1140+
1141+
err := engine.DurableStateEntity(ctx, behavior)
1142+
require.Error(t, err)
1143+
require.ErrorIs(t, err, ErrEngineNotStarted)
1144+
1145+
require.NoError(t, stateStore.Disconnect(ctx))
1146+
require.NoError(t, engine.Stop(ctx))
1147+
})
1148+
t.Run("DurableStore entity when durable store not set", func(t *testing.T) {
1149+
ctx := context.TODO()
1150+
1151+
// create the ego engine
1152+
engine := NewEngine("Sample", nil,
1153+
WithLogger(log.DiscardLogger))
1154+
1155+
require.NoError(t, engine.Start(ctx))
1156+
1157+
entityID := uuid.NewString()
1158+
behavior := NewAccountDurableStateBehavior(entityID)
1159+
1160+
err := engine.DurableStateEntity(ctx, behavior)
1161+
require.Error(t, err)
1162+
require.ErrorIs(t, err, ErrDurableStateStoreRequired)
1163+
1164+
require.NoError(t, engine.Stop(ctx))
1165+
})
10421166
}
10431167

10441168
// EventSourcedEntity implements persistence.Behavior

0 commit comments

Comments
 (0)