Skip to content

Commit 6aa60f0

Browse files
committed
refactor: add projection removal and status check
1 parent 2906536 commit 6aa60f0

File tree

12 files changed

+1195
-107
lines changed

12 files changed

+1195
-107
lines changed

.gitignore

-1
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,4 @@ vendor
1111
gen.env
1212
.env
1313
gen/
14-
mocks
1514
/.fleet/settings.json

Earthfile

+12
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ code:
6666
vendor:
6767
FROM +code
6868

69+
COPY +mock/mocks ./mocks
70+
6971
RUN go mod vendor
7072
SAVE ARTIFACT /app /files
7173

@@ -84,3 +86,13 @@ local-test:
8486
END
8587

8688
SAVE ARTIFACT coverage.out AS LOCAL coverage.out
89+
90+
mock:
91+
# copy in the necessary files that need mock generated code
92+
FROM +code
93+
94+
# generate the mocks
95+
RUN mockery --dir eventstore --name EventsStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/eventstore --case snake
96+
RUN mockery --dir offsetstore --name OffsetStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/offsetstore --case snake
97+
98+
SAVE ARTIFACT ./mocks mocks AS LOCAL mocks

engine.go

+89-72
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"sync"
3333
"time"
3434

35+
"github.com/tochemey/goakt/v2/address"
3536
"go.uber.org/atomic"
3637
"google.golang.org/protobuf/proto"
3738

@@ -71,7 +72,7 @@ type Engine struct {
7172
remotingPort int
7273
minimumPeersQuorum uint16
7374
eventStream eventstream.Stream
74-
locker *sync.Mutex
75+
mutex *sync.Mutex
7576
}
7677

7778
// NewEngine creates an instance of Engine
@@ -82,7 +83,7 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option)
8283
enableCluster: atomic.NewBool(false),
8384
logger: log.New(log.ErrorLevel, os.Stderr),
8485
eventStream: eventstream.New(),
85-
locker: &sync.Mutex{},
86+
mutex: &sync.Mutex{},
8687
}
8788

8889
for _, opt := range opts {
@@ -94,133 +95,152 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option)
9495
}
9596

9697
// Start starts the ego engine
97-
func (x *Engine) Start(ctx context.Context) error {
98+
func (engine *Engine) Start(ctx context.Context) error {
9899
opts := []actors.Option{
99-
actors.WithLogger(x.logger),
100+
actors.WithLogger(engine.logger),
100101
actors.WithPassivationDisabled(),
101102
actors.WithActorInitMaxRetries(1),
102103
actors.WithReplyTimeout(5 * time.Second),
103104
actors.WithSupervisorDirective(actors.NewStopDirective()),
104105
}
105106

106-
if x.enableCluster.Load() {
107-
if x.hostName == "" {
108-
x.hostName, _ = os.Hostname()
107+
if engine.enableCluster.Load() {
108+
if engine.hostName == "" {
109+
engine.hostName, _ = os.Hostname()
109110
}
110111

111112
replicaCount := 1
112-
if x.minimumPeersQuorum > 1 {
113+
if engine.minimumPeersQuorum > 1 {
113114
replicaCount = 2
114115
}
115116

116117
clusterConfig := actors.
117118
NewClusterConfig().
118-
WithDiscovery(x.discoveryProvider).
119-
WithDiscoveryPort(x.gossipPort).
120-
WithPeersPort(x.peersPort).
121-
WithMinimumPeersQuorum(uint32(x.minimumPeersQuorum)).
119+
WithDiscovery(engine.discoveryProvider).
120+
WithDiscoveryPort(engine.gossipPort).
121+
WithPeersPort(engine.peersPort).
122+
WithMinimumPeersQuorum(uint32(engine.minimumPeersQuorum)).
122123
WithReplicaCount(uint32(replicaCount)).
123-
WithPartitionCount(x.partitionsCount).
124+
WithPartitionCount(engine.partitionsCount).
124125
WithKinds(new(actor))
125126

126127
opts = append(opts,
127128
actors.WithCluster(clusterConfig),
128-
actors.WithRemoting(x.hostName, int32(x.remotingPort)))
129+
actors.WithRemoting(engine.hostName, int32(engine.remotingPort)))
129130
}
130131

131132
var err error
132-
x.actorSystem, err = actors.NewActorSystem(x.name, opts...)
133+
engine.actorSystem, err = actors.NewActorSystem(engine.name, opts...)
133134
if err != nil {
134-
x.logger.Error(fmt.Errorf("failed to create the ego actor system: %w", err))
135-
return err
135+
return fmt.Errorf("failed to create the ego actor system: %w", err)
136136
}
137137

138-
if err := x.actorSystem.Start(ctx); err != nil {
138+
if err := engine.actorSystem.Start(ctx); err != nil {
139139
return err
140140
}
141141

142-
x.started.Store(true)
142+
engine.started.Store(true)
143143

144144
return nil
145145
}
146146

147-
// AddProjection add a projection to the running eGo engine and start it
148-
func (x *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error {
149-
x.locker.Lock()
150-
started := x.started.Load()
151-
x.locker.Unlock()
152-
if !started {
147+
// AddProjection add a projection to the running eGo engine and starts it
148+
func (engine *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error {
149+
if !engine.Started() {
153150
return ErrEngineNotStarted
154151
}
155152

156-
actor := projection.New(name, handler, x.eventsStore, offsetStore, opts...)
153+
actor := projection.New(name, handler, engine.eventsStore, offsetStore, opts...)
157154

158-
var (
159-
pid *actors.PID
160-
err error
161-
)
155+
engine.mutex.Lock()
156+
actorSystem := engine.actorSystem
157+
engine.mutex.Unlock()
162158

163-
x.locker.Lock()
164-
actorSystem := x.actorSystem
165-
x.locker.Unlock()
159+
if _, err := actorSystem.Spawn(ctx, name, actor); err != nil {
160+
return fmt.Errorf("failed to register the projection=(%s): %w", name, err)
161+
}
166162

167-
if pid, err = actorSystem.Spawn(ctx, name, actor); err != nil {
168-
x.logger.Error(fmt.Errorf("failed to register the projection=(%s): %w", name, err))
169-
return err
163+
return nil
164+
}
165+
166+
// RemoveProjection stops and removes a given projection from the engine
167+
func (engine *Engine) RemoveProjection(ctx context.Context, name string) error {
168+
if !engine.Started() {
169+
return ErrEngineNotStarted
170170
}
171171

172-
if err := actors.Tell(ctx, pid, projection.Start); err != nil {
173-
x.logger.Error(fmt.Errorf("failed to start the projection=(%s): %w", name, err))
174-
return err
172+
engine.mutex.Lock()
173+
actorSystem := engine.actorSystem
174+
engine.mutex.Unlock()
175+
176+
return actorSystem.Kill(ctx, name)
177+
}
178+
179+
// IsProjectionRunning returns true when the projection is active and running
180+
// One needs to check the error to see whether this function does not return a false negative
181+
func (engine *Engine) IsProjectionRunning(ctx context.Context, name string) (bool, error) {
182+
if !engine.Started() {
183+
return false, ErrEngineNotStarted
175184
}
185+
engine.mutex.Lock()
186+
actorSystem := engine.actorSystem
187+
engine.mutex.Unlock()
176188

177-
return nil
189+
addr, pid, err := actorSystem.ActorOf(ctx, name)
190+
if err != nil {
191+
return false, fmt.Errorf("failed to get projection %s: %w", name, err)
192+
}
193+
194+
if pid != nil {
195+
return pid.IsRunning(), nil
196+
}
197+
198+
return addr != nil && proto.Equal(addr.Address, address.NoSender), nil
178199
}
179200

180201
// Stop stops the ego engine
181-
func (x *Engine) Stop(ctx context.Context) error {
182-
x.started.Store(false)
183-
x.eventStream.Close()
184-
return x.actorSystem.Stop(ctx)
202+
func (engine *Engine) Stop(ctx context.Context) error {
203+
engine.started.Store(false)
204+
engine.eventStream.Close()
205+
return engine.actorSystem.Stop(ctx)
206+
}
207+
208+
// Started returns true when the eGo engine has started
209+
func (engine *Engine) Started() bool {
210+
return engine.started.Load()
185211
}
186212

187213
// Subscribe creates an events subscriber
188-
func (x *Engine) Subscribe() (eventstream.Subscriber, error) {
189-
x.locker.Lock()
190-
started := x.started.Load()
191-
x.locker.Unlock()
192-
if !started {
214+
func (engine *Engine) Subscribe() (eventstream.Subscriber, error) {
215+
if !engine.Started() {
193216
return nil, ErrEngineNotStarted
194217
}
195218

196-
x.locker.Lock()
197-
eventStream := x.eventStream
198-
x.locker.Unlock()
219+
engine.mutex.Lock()
220+
eventStream := engine.eventStream
221+
engine.mutex.Unlock()
199222

200223
subscriber := eventStream.AddSubscriber()
201-
for i := 0; i < int(x.partitionsCount); i++ {
224+
for i := 0; i < int(engine.partitionsCount); i++ {
202225
topic := fmt.Sprintf(eventsTopic, i)
203-
x.eventStream.Subscribe(subscriber, topic)
226+
engine.eventStream.Subscribe(subscriber, topic)
204227
}
205228

206229
return subscriber, nil
207230
}
208231

209232
// Entity creates an entity. This will return the entity path
210233
// that can be used to send command to the entity
211-
func (x *Engine) Entity(ctx context.Context, behavior EntityBehavior) error {
212-
x.locker.Lock()
213-
started := x.started.Load()
214-
x.locker.Unlock()
215-
if !started {
234+
func (engine *Engine) Entity(ctx context.Context, behavior EntityBehavior) error {
235+
if !engine.Started() {
216236
return ErrEngineNotStarted
217237
}
218238

219-
x.locker.Lock()
220-
actorSystem := x.actorSystem
221-
eventsStore := x.eventsStore
222-
eventStream := x.eventStream
223-
x.locker.Unlock()
239+
engine.mutex.Lock()
240+
actorSystem := engine.actorSystem
241+
eventsStore := engine.eventsStore
242+
eventStream := engine.eventStream
243+
engine.mutex.Unlock()
224244

225245
_, err := actorSystem.Spawn(ctx,
226246
behavior.ID(),
@@ -237,11 +257,8 @@ func (x *Engine) Entity(ctx context.Context, behavior EntityBehavior) error {
237257
// 1. the resulting state after the command has been handled and the emitted event persisted
238258
// 2. nil when there is no resulting state or no event persisted
239259
// 3. an error in case of error
240-
func (x *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) {
241-
x.locker.Lock()
242-
started := x.started.Load()
243-
x.locker.Unlock()
244-
if !started {
260+
func (engine *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) {
261+
if !engine.Started() {
245262
return nil, 0, ErrEngineNotStarted
246263
}
247264

@@ -250,9 +267,9 @@ func (x *Engine) SendCommand(ctx context.Context, entityID string, cmd Command,
250267
return nil, 0, ErrUndefinedEntityID
251268
}
252269

253-
x.locker.Lock()
254-
actorSystem := x.actorSystem
255-
x.locker.Unlock()
270+
engine.mutex.Lock()
271+
actorSystem := engine.actorSystem
272+
engine.mutex.Unlock()
256273

257274
// locate the given actor
258275
addr, pid, err := actorSystem.ActorOf(ctx, entityID)

engine_test.go

+80
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ func TestEgo(t *testing.T) {
9797
err = engine.AddProjection(ctx, "discard", handler, offsetStore)
9898
require.NoError(t, err)
9999

100+
lib.Pause(time.Second)
101+
102+
running, err := engine.IsProjectionRunning(ctx, "discard")
103+
require.NoError(t, err)
104+
require.True(t, running)
105+
100106
// subscribe to events
101107
subscriber, err := engine.Subscribe()
102108
require.NoError(t, err)
@@ -276,6 +282,80 @@ func TestEgo(t *testing.T) {
276282
assert.NoError(t, eventStore.Disconnect(ctx))
277283
assert.NoError(t, engine.Stop(ctx))
278284
})
285+
t.Run("With IsProjectionRunning when not started", func(t *testing.T) {
286+
ctx := context.TODO()
287+
// create the event store
288+
eventStore := memory.NewEventsStore()
289+
require.NoError(t, eventStore.Connect(ctx))
290+
291+
// create the ego engine
292+
engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger))
293+
294+
running, err := engine.IsProjectionRunning(ctx, "isProjectionRunning")
295+
require.Error(t, err)
296+
assert.EqualError(t, err, ErrEngineNotStarted.Error())
297+
assert.False(t, running)
298+
299+
assert.NoError(t, eventStore.Disconnect(ctx))
300+
})
301+
t.Run("With RemoveProjection", func(t *testing.T) {
302+
ctx := context.TODO()
303+
// create the event store
304+
eventStore := memory.NewEventsStore()
305+
// connect to the event store
306+
require.NoError(t, eventStore.Connect(ctx))
307+
308+
offsetStore := offsetstore.NewOffsetStore()
309+
require.NoError(t, offsetStore.Connect(ctx))
310+
311+
// create the ego engine
312+
engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger))
313+
// start ego engine
314+
err := engine.Start(ctx)
315+
require.NoError(t, err)
316+
317+
// create a projection message handler
318+
handler := projection.NewDiscardHandler(log.DiscardLogger)
319+
// add projection
320+
projectionName := "projection"
321+
err = engine.AddProjection(ctx, projectionName, handler, offsetStore)
322+
require.NoError(t, err)
323+
324+
lib.Pause(time.Second)
325+
326+
running, err := engine.IsProjectionRunning(ctx, projectionName)
327+
require.NoError(t, err)
328+
require.True(t, running)
329+
330+
err = engine.RemoveProjection(ctx, projectionName)
331+
require.NoError(t, err)
332+
333+
lib.Pause(time.Second)
334+
335+
running, err = engine.IsProjectionRunning(ctx, projectionName)
336+
require.Error(t, err)
337+
require.False(t, running)
338+
339+
// free resources
340+
assert.NoError(t, offsetStore.Disconnect(ctx))
341+
assert.NoError(t, eventStore.Disconnect(ctx))
342+
assert.NoError(t, engine.Stop(ctx))
343+
})
344+
t.Run("With RemoveProjection when not started", func(t *testing.T) {
345+
ctx := context.TODO()
346+
// create the event store
347+
eventStore := memory.NewEventsStore()
348+
require.NoError(t, eventStore.Connect(ctx))
349+
350+
// create the ego engine
351+
engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger))
352+
353+
err := engine.RemoveProjection(ctx, "isProjectionRunning")
354+
require.Error(t, err)
355+
assert.EqualError(t, err, ErrEngineNotStarted.Error())
356+
357+
assert.NoError(t, eventStore.Disconnect(ctx))
358+
})
279359
}
280360

281361
// AccountBehavior implements persistence.Behavior

0 commit comments

Comments
 (0)