diff --git a/.gitignore b/.gitignore index e58c303..1058d9c 100644 --- a/.gitignore +++ b/.gitignore @@ -11,5 +11,4 @@ vendor gen.env .env gen/ -mocks /.fleet/settings.json diff --git a/Earthfile b/Earthfile index 0758c20..3ee7cbb 100644 --- a/Earthfile +++ b/Earthfile @@ -66,6 +66,8 @@ code: vendor: FROM +code + COPY +mock/mocks ./mocks + RUN go mod vendor SAVE ARTIFACT /app /files @@ -84,3 +86,13 @@ local-test: END SAVE ARTIFACT coverage.out AS LOCAL coverage.out + +mock: + # copy in the necessary files that need mock generated code + FROM +code + + # generate the mocks + RUN mockery --dir eventstore --name EventsStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/eventstore --case snake + RUN mockery --dir offsetstore --name OffsetStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/offsetstore --case snake + + SAVE ARTIFACT ./mocks mocks AS LOCAL mocks \ No newline at end of file diff --git a/engine.go b/engine.go index fc6149e..b685241 100644 --- a/engine.go +++ b/engine.go @@ -32,6 +32,7 @@ import ( "sync" "time" + "github.com/tochemey/goakt/v2/address" "go.uber.org/atomic" "google.golang.org/protobuf/proto" @@ -71,7 +72,7 @@ type Engine struct { remotingPort int minimumPeersQuorum uint16 eventStream eventstream.Stream - locker *sync.Mutex + mutex *sync.Mutex } // NewEngine creates an instance of Engine @@ -82,7 +83,7 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option) enableCluster: atomic.NewBool(false), logger: log.New(log.ErrorLevel, os.Stderr), eventStream: eventstream.New(), - locker: &sync.Mutex{}, + mutex: &sync.Mutex{}, } for _, opt := range opts { @@ -94,113 +95,135 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option) } // Start starts the ego engine -func (x *Engine) Start(ctx context.Context) error { +func (engine *Engine) Start(ctx context.Context) error { opts := []actors.Option{ - actors.WithLogger(x.logger), + actors.WithLogger(engine.logger), actors.WithPassivationDisabled(), actors.WithActorInitMaxRetries(1), actors.WithReplyTimeout(5 * time.Second), actors.WithSupervisorDirective(actors.NewStopDirective()), } - if x.enableCluster.Load() { - if x.hostName == "" { - x.hostName, _ = os.Hostname() + if engine.enableCluster.Load() { + if engine.hostName == "" { + engine.hostName, _ = os.Hostname() } replicaCount := 1 - if x.minimumPeersQuorum > 1 { + if engine.minimumPeersQuorum > 1 { replicaCount = 2 } clusterConfig := actors. NewClusterConfig(). - WithDiscovery(x.discoveryProvider). - WithDiscoveryPort(x.gossipPort). - WithPeersPort(x.peersPort). - WithMinimumPeersQuorum(uint32(x.minimumPeersQuorum)). + WithDiscovery(engine.discoveryProvider). + WithDiscoveryPort(engine.gossipPort). + WithPeersPort(engine.peersPort). + WithMinimumPeersQuorum(uint32(engine.minimumPeersQuorum)). WithReplicaCount(uint32(replicaCount)). - WithPartitionCount(x.partitionsCount). + WithPartitionCount(engine.partitionsCount). WithKinds(new(actor)) opts = append(opts, actors.WithCluster(clusterConfig), - actors.WithRemoting(x.hostName, int32(x.remotingPort))) + actors.WithRemoting(engine.hostName, int32(engine.remotingPort))) } var err error - x.actorSystem, err = actors.NewActorSystem(x.name, opts...) + engine.actorSystem, err = actors.NewActorSystem(engine.name, opts...) if err != nil { - x.logger.Error(fmt.Errorf("failed to create the ego actor system: %w", err)) - return err + return fmt.Errorf("failed to create the ego actor system: %w", err) } - if err := x.actorSystem.Start(ctx); err != nil { + if err := engine.actorSystem.Start(ctx); err != nil { return err } - x.started.Store(true) + engine.started.Store(true) return nil } -// AddProjection add a projection to the running eGo engine and start it -func (x *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error { - x.locker.Lock() - started := x.started.Load() - x.locker.Unlock() - if !started { +// AddProjection add a projection to the running eGo engine and starts it +func (engine *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error { + if !engine.Started() { return ErrEngineNotStarted } - actor := projection.New(name, handler, x.eventsStore, offsetStore, opts...) + actor := projection.New(name, handler, engine.eventsStore, offsetStore, opts...) - var ( - pid *actors.PID - err error - ) + engine.mutex.Lock() + actorSystem := engine.actorSystem + engine.mutex.Unlock() - x.locker.Lock() - actorSystem := x.actorSystem - x.locker.Unlock() + if _, err := actorSystem.Spawn(ctx, name, actor); err != nil { + return fmt.Errorf("failed to register the projection=(%s): %w", name, err) + } - if pid, err = actorSystem.Spawn(ctx, name, actor); err != nil { - x.logger.Error(fmt.Errorf("failed to register the projection=(%s): %w", name, err)) - return err + return nil +} + +// RemoveProjection stops and removes a given projection from the engine +func (engine *Engine) RemoveProjection(ctx context.Context, name string) error { + if !engine.Started() { + return ErrEngineNotStarted } - if err := actors.Tell(ctx, pid, projection.Start); err != nil { - x.logger.Error(fmt.Errorf("failed to start the projection=(%s): %w", name, err)) - return err + engine.mutex.Lock() + actorSystem := engine.actorSystem + engine.mutex.Unlock() + + return actorSystem.Kill(ctx, name) +} + +// IsProjectionRunning returns true when the projection is active and running +// One needs to check the error to see whether this function does not return a false negative +func (engine *Engine) IsProjectionRunning(ctx context.Context, name string) (bool, error) { + if !engine.Started() { + return false, ErrEngineNotStarted } + engine.mutex.Lock() + actorSystem := engine.actorSystem + engine.mutex.Unlock() - return nil + addr, pid, err := actorSystem.ActorOf(ctx, name) + if err != nil { + return false, fmt.Errorf("failed to get projection %s: %w", name, err) + } + + if pid != nil { + return pid.IsRunning(), nil + } + + return addr != nil && proto.Equal(addr.Address, address.NoSender), nil } // Stop stops the ego engine -func (x *Engine) Stop(ctx context.Context) error { - x.started.Store(false) - x.eventStream.Close() - return x.actorSystem.Stop(ctx) +func (engine *Engine) Stop(ctx context.Context) error { + engine.started.Store(false) + engine.eventStream.Close() + return engine.actorSystem.Stop(ctx) +} + +// Started returns true when the eGo engine has started +func (engine *Engine) Started() bool { + return engine.started.Load() } // Subscribe creates an events subscriber -func (x *Engine) Subscribe() (eventstream.Subscriber, error) { - x.locker.Lock() - started := x.started.Load() - x.locker.Unlock() - if !started { +func (engine *Engine) Subscribe() (eventstream.Subscriber, error) { + if !engine.Started() { return nil, ErrEngineNotStarted } - x.locker.Lock() - eventStream := x.eventStream - x.locker.Unlock() + engine.mutex.Lock() + eventStream := engine.eventStream + engine.mutex.Unlock() subscriber := eventStream.AddSubscriber() - for i := 0; i < int(x.partitionsCount); i++ { + for i := 0; i < int(engine.partitionsCount); i++ { topic := fmt.Sprintf(eventsTopic, i) - x.eventStream.Subscribe(subscriber, topic) + engine.eventStream.Subscribe(subscriber, topic) } return subscriber, nil @@ -208,19 +231,16 @@ func (x *Engine) Subscribe() (eventstream.Subscriber, error) { // Entity creates an entity. This will return the entity path // that can be used to send command to the entity -func (x *Engine) Entity(ctx context.Context, behavior EntityBehavior) error { - x.locker.Lock() - started := x.started.Load() - x.locker.Unlock() - if !started { +func (engine *Engine) Entity(ctx context.Context, behavior EntityBehavior) error { + if !engine.Started() { return ErrEngineNotStarted } - x.locker.Lock() - actorSystem := x.actorSystem - eventsStore := x.eventsStore - eventStream := x.eventStream - x.locker.Unlock() + engine.mutex.Lock() + actorSystem := engine.actorSystem + eventsStore := engine.eventsStore + eventStream := engine.eventStream + engine.mutex.Unlock() _, err := actorSystem.Spawn(ctx, behavior.ID(), @@ -237,11 +257,8 @@ func (x *Engine) Entity(ctx context.Context, behavior EntityBehavior) error { // 1. the resulting state after the command has been handled and the emitted event persisted // 2. nil when there is no resulting state or no event persisted // 3. an error in case of error -func (x *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) { - x.locker.Lock() - started := x.started.Load() - x.locker.Unlock() - if !started { +func (engine *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) { + if !engine.Started() { return nil, 0, ErrEngineNotStarted } @@ -250,9 +267,9 @@ func (x *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, return nil, 0, ErrUndefinedEntityID } - x.locker.Lock() - actorSystem := x.actorSystem - x.locker.Unlock() + engine.mutex.Lock() + actorSystem := engine.actorSystem + engine.mutex.Unlock() // locate the given actor addr, pid, err := actorSystem.ActorOf(ctx, entityID) diff --git a/engine_test.go b/engine_test.go index f7e3858..9ccd1f8 100644 --- a/engine_test.go +++ b/engine_test.go @@ -97,6 +97,12 @@ func TestEgo(t *testing.T) { err = engine.AddProjection(ctx, "discard", handler, offsetStore) require.NoError(t, err) + lib.Pause(time.Second) + + running, err := engine.IsProjectionRunning(ctx, "discard") + require.NoError(t, err) + require.True(t, running) + // subscribe to events subscriber, err := engine.Subscribe() require.NoError(t, err) @@ -276,6 +282,80 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) + t.Run("With IsProjectionRunning when not started", func(t *testing.T) { + ctx := context.TODO() + // create the event store + eventStore := memory.NewEventsStore() + require.NoError(t, eventStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger)) + + running, err := engine.IsProjectionRunning(ctx, "isProjectionRunning") + require.Error(t, err) + assert.EqualError(t, err, ErrEngineNotStarted.Error()) + assert.False(t, running) + + assert.NoError(t, eventStore.Disconnect(ctx)) + }) + t.Run("With RemoveProjection", func(t *testing.T) { + ctx := context.TODO() + // create the event store + eventStore := memory.NewEventsStore() + // connect to the event store + require.NoError(t, eventStore.Connect(ctx)) + + offsetStore := offsetstore.NewOffsetStore() + require.NoError(t, offsetStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger)) + // start ego engine + err := engine.Start(ctx) + require.NoError(t, err) + + // create a projection message handler + handler := projection.NewDiscardHandler(log.DiscardLogger) + // add projection + projectionName := "projection" + err = engine.AddProjection(ctx, projectionName, handler, offsetStore) + require.NoError(t, err) + + lib.Pause(time.Second) + + running, err := engine.IsProjectionRunning(ctx, projectionName) + require.NoError(t, err) + require.True(t, running) + + err = engine.RemoveProjection(ctx, projectionName) + require.NoError(t, err) + + lib.Pause(time.Second) + + running, err = engine.IsProjectionRunning(ctx, projectionName) + require.Error(t, err) + require.False(t, running) + + // free resources + assert.NoError(t, offsetStore.Disconnect(ctx)) + assert.NoError(t, eventStore.Disconnect(ctx)) + assert.NoError(t, engine.Stop(ctx)) + }) + t.Run("With RemoveProjection when not started", func(t *testing.T) { + ctx := context.TODO() + // create the event store + eventStore := memory.NewEventsStore() + require.NoError(t, eventStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger)) + + err := engine.RemoveProjection(ctx, "isProjectionRunning") + require.Error(t, err) + assert.EqualError(t, err, ErrEngineNotStarted.Error()) + + assert.NoError(t, eventStore.Disconnect(ctx)) + }) } // AccountBehavior implements persistence.Behavior diff --git a/go.mod b/go.mod index de7f393..96ac63f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( github.com/jackc/pgx/v5 v5.7.1 github.com/lib/pq v1.10.9 github.com/ory/dockertest/v3 v3.11.0 - github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.9.0 github.com/tochemey/goakt/v2 v2.7.0 github.com/travisjeffery/go-dynaport v1.0.0 @@ -83,6 +82,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runc v1.1.14 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/redis/go-redis/v9 v9.6.1 // indirect github.com/reugn/go-quartz v0.13.0 // indirect diff --git a/mocks/eventstore/events_store.go b/mocks/eventstore/events_store.go new file mode 100644 index 0000000..2e25688 --- /dev/null +++ b/mocks/eventstore/events_store.go @@ -0,0 +1,544 @@ +// Code generated by mockery. DO NOT EDIT. + +package eventstore + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + egopb "github.com/tochemey/ego/v3/egopb" +) + +// EventsStore is an autogenerated mock type for the EventsStore type +type EventsStore struct { + mock.Mock +} + +type EventsStore_Expecter struct { + mock *mock.Mock +} + +func (_m *EventsStore) EXPECT() *EventsStore_Expecter { + return &EventsStore_Expecter{mock: &_m.Mock} +} + +// Connect provides a mock function with given fields: ctx +func (_m *EventsStore) Connect(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventsStore_Connect_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Connect' +type EventsStore_Connect_Call struct { + *mock.Call +} + +// Connect is a helper method to define mock.On call +// - ctx context.Context +func (_e *EventsStore_Expecter) Connect(ctx interface{}) *EventsStore_Connect_Call { + return &EventsStore_Connect_Call{Call: _e.mock.On("Connect", ctx)} +} + +func (_c *EventsStore_Connect_Call) Run(run func(ctx context.Context)) *EventsStore_Connect_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *EventsStore_Connect_Call) Return(_a0 error) *EventsStore_Connect_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventsStore_Connect_Call) RunAndReturn(run func(context.Context) error) *EventsStore_Connect_Call { + _c.Call.Return(run) + return _c +} + +// DeleteEvents provides a mock function with given fields: ctx, persistenceID, toSequenceNumber +func (_m *EventsStore) DeleteEvents(ctx context.Context, persistenceID string, toSequenceNumber uint64) error { + ret := _m.Called(ctx, persistenceID, toSequenceNumber) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, uint64) error); ok { + r0 = rf(ctx, persistenceID, toSequenceNumber) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventsStore_DeleteEvents_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteEvents' +type EventsStore_DeleteEvents_Call struct { + *mock.Call +} + +// DeleteEvents is a helper method to define mock.On call +// - ctx context.Context +// - persistenceID string +// - toSequenceNumber uint64 +func (_e *EventsStore_Expecter) DeleteEvents(ctx interface{}, persistenceID interface{}, toSequenceNumber interface{}) *EventsStore_DeleteEvents_Call { + return &EventsStore_DeleteEvents_Call{Call: _e.mock.On("DeleteEvents", ctx, persistenceID, toSequenceNumber)} +} + +func (_c *EventsStore_DeleteEvents_Call) Run(run func(ctx context.Context, persistenceID string, toSequenceNumber uint64)) *EventsStore_DeleteEvents_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(uint64)) + }) + return _c +} + +func (_c *EventsStore_DeleteEvents_Call) Return(_a0 error) *EventsStore_DeleteEvents_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventsStore_DeleteEvents_Call) RunAndReturn(run func(context.Context, string, uint64) error) *EventsStore_DeleteEvents_Call { + _c.Call.Return(run) + return _c +} + +// Disconnect provides a mock function with given fields: ctx +func (_m *EventsStore) Disconnect(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventsStore_Disconnect_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Disconnect' +type EventsStore_Disconnect_Call struct { + *mock.Call +} + +// Disconnect is a helper method to define mock.On call +// - ctx context.Context +func (_e *EventsStore_Expecter) Disconnect(ctx interface{}) *EventsStore_Disconnect_Call { + return &EventsStore_Disconnect_Call{Call: _e.mock.On("Disconnect", ctx)} +} + +func (_c *EventsStore_Disconnect_Call) Run(run func(ctx context.Context)) *EventsStore_Disconnect_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *EventsStore_Disconnect_Call) Return(_a0 error) *EventsStore_Disconnect_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventsStore_Disconnect_Call) RunAndReturn(run func(context.Context) error) *EventsStore_Disconnect_Call { + _c.Call.Return(run) + return _c +} + +// GetLatestEvent provides a mock function with given fields: ctx, persistenceID +func (_m *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string) (*egopb.Event, error) { + ret := _m.Called(ctx, persistenceID) + + var r0 *egopb.Event + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*egopb.Event, error)); ok { + return rf(ctx, persistenceID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *egopb.Event); ok { + r0 = rf(ctx, persistenceID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*egopb.Event) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, persistenceID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EventsStore_GetLatestEvent_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestEvent' +type EventsStore_GetLatestEvent_Call struct { + *mock.Call +} + +// GetLatestEvent is a helper method to define mock.On call +// - ctx context.Context +// - persistenceID string +func (_e *EventsStore_Expecter) GetLatestEvent(ctx interface{}, persistenceID interface{}) *EventsStore_GetLatestEvent_Call { + return &EventsStore_GetLatestEvent_Call{Call: _e.mock.On("GetLatestEvent", ctx, persistenceID)} +} + +func (_c *EventsStore_GetLatestEvent_Call) Run(run func(ctx context.Context, persistenceID string)) *EventsStore_GetLatestEvent_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *EventsStore_GetLatestEvent_Call) Return(_a0 *egopb.Event, _a1 error) *EventsStore_GetLatestEvent_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EventsStore_GetLatestEvent_Call) RunAndReturn(run func(context.Context, string) (*egopb.Event, error)) *EventsStore_GetLatestEvent_Call { + _c.Call.Return(run) + return _c +} + +// GetShardEvents provides a mock function with given fields: ctx, shardNumber, offset, max +func (_m *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, offset int64, max uint64) ([]*egopb.Event, int64, error) { + ret := _m.Called(ctx, shardNumber, offset, max) + + var r0 []*egopb.Event + var r1 int64 + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, int64, uint64) ([]*egopb.Event, int64, error)); ok { + return rf(ctx, shardNumber, offset, max) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, int64, uint64) []*egopb.Event); ok { + r0 = rf(ctx, shardNumber, offset, max) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*egopb.Event) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, int64, uint64) int64); ok { + r1 = rf(ctx, shardNumber, offset, max) + } else { + r1 = ret.Get(1).(int64) + } + + if rf, ok := ret.Get(2).(func(context.Context, uint64, int64, uint64) error); ok { + r2 = rf(ctx, shardNumber, offset, max) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// EventsStore_GetShardEvents_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetShardEvents' +type EventsStore_GetShardEvents_Call struct { + *mock.Call +} + +// GetShardEvents is a helper method to define mock.On call +// - ctx context.Context +// - shardNumber uint64 +// - offset int64 +// - max uint64 +func (_e *EventsStore_Expecter) GetShardEvents(ctx interface{}, shardNumber interface{}, offset interface{}, max interface{}) *EventsStore_GetShardEvents_Call { + return &EventsStore_GetShardEvents_Call{Call: _e.mock.On("GetShardEvents", ctx, shardNumber, offset, max)} +} + +func (_c *EventsStore_GetShardEvents_Call) Run(run func(ctx context.Context, shardNumber uint64, offset int64, max uint64)) *EventsStore_GetShardEvents_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64), args[2].(int64), args[3].(uint64)) + }) + return _c +} + +func (_c *EventsStore_GetShardEvents_Call) Return(_a0 []*egopb.Event, _a1 int64, _a2 error) *EventsStore_GetShardEvents_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *EventsStore_GetShardEvents_Call) RunAndReturn(run func(context.Context, uint64, int64, uint64) ([]*egopb.Event, int64, error)) *EventsStore_GetShardEvents_Call { + _c.Call.Return(run) + return _c +} + +// PersistenceIDs provides a mock function with given fields: ctx, pageSize, pageToken +func (_m *EventsStore) PersistenceIDs(ctx context.Context, pageSize uint64, pageToken string) ([]string, string, error) { + ret := _m.Called(ctx, pageSize, pageToken) + + var r0 []string + var r1 string + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, string) ([]string, string, error)); ok { + return rf(ctx, pageSize, pageToken) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, string) []string); ok { + r0 = rf(ctx, pageSize, pageToken) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, string) string); ok { + r1 = rf(ctx, pageSize, pageToken) + } else { + r1 = ret.Get(1).(string) + } + + if rf, ok := ret.Get(2).(func(context.Context, uint64, string) error); ok { + r2 = rf(ctx, pageSize, pageToken) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// EventsStore_PersistenceIDs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PersistenceIDs' +type EventsStore_PersistenceIDs_Call struct { + *mock.Call +} + +// PersistenceIDs is a helper method to define mock.On call +// - ctx context.Context +// - pageSize uint64 +// - pageToken string +func (_e *EventsStore_Expecter) PersistenceIDs(ctx interface{}, pageSize interface{}, pageToken interface{}) *EventsStore_PersistenceIDs_Call { + return &EventsStore_PersistenceIDs_Call{Call: _e.mock.On("PersistenceIDs", ctx, pageSize, pageToken)} +} + +func (_c *EventsStore_PersistenceIDs_Call) Run(run func(ctx context.Context, pageSize uint64, pageToken string)) *EventsStore_PersistenceIDs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64), args[2].(string)) + }) + return _c +} + +func (_c *EventsStore_PersistenceIDs_Call) Return(persistenceIDs []string, nextPageToken string, err error) *EventsStore_PersistenceIDs_Call { + _c.Call.Return(persistenceIDs, nextPageToken, err) + return _c +} + +func (_c *EventsStore_PersistenceIDs_Call) RunAndReturn(run func(context.Context, uint64, string) ([]string, string, error)) *EventsStore_PersistenceIDs_Call { + _c.Call.Return(run) + return _c +} + +// Ping provides a mock function with given fields: ctx +func (_m *EventsStore) Ping(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventsStore_Ping_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ping' +type EventsStore_Ping_Call struct { + *mock.Call +} + +// Ping is a helper method to define mock.On call +// - ctx context.Context +func (_e *EventsStore_Expecter) Ping(ctx interface{}) *EventsStore_Ping_Call { + return &EventsStore_Ping_Call{Call: _e.mock.On("Ping", ctx)} +} + +func (_c *EventsStore_Ping_Call) Run(run func(ctx context.Context)) *EventsStore_Ping_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *EventsStore_Ping_Call) Return(_a0 error) *EventsStore_Ping_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventsStore_Ping_Call) RunAndReturn(run func(context.Context) error) *EventsStore_Ping_Call { + _c.Call.Return(run) + return _c +} + +// ReplayEvents provides a mock function with given fields: ctx, persistenceID, fromSequenceNumber, toSequenceNumber, max +func (_m *EventsStore) ReplayEvents(ctx context.Context, persistenceID string, fromSequenceNumber uint64, toSequenceNumber uint64, max uint64) ([]*egopb.Event, error) { + ret := _m.Called(ctx, persistenceID, fromSequenceNumber, toSequenceNumber, max) + + var r0 []*egopb.Event + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, uint64, uint64, uint64) ([]*egopb.Event, error)); ok { + return rf(ctx, persistenceID, fromSequenceNumber, toSequenceNumber, max) + } + if rf, ok := ret.Get(0).(func(context.Context, string, uint64, uint64, uint64) []*egopb.Event); ok { + r0 = rf(ctx, persistenceID, fromSequenceNumber, toSequenceNumber, max) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*egopb.Event) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, uint64, uint64, uint64) error); ok { + r1 = rf(ctx, persistenceID, fromSequenceNumber, toSequenceNumber, max) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EventsStore_ReplayEvents_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReplayEvents' +type EventsStore_ReplayEvents_Call struct { + *mock.Call +} + +// ReplayEvents is a helper method to define mock.On call +// - ctx context.Context +// - persistenceID string +// - fromSequenceNumber uint64 +// - toSequenceNumber uint64 +// - max uint64 +func (_e *EventsStore_Expecter) ReplayEvents(ctx interface{}, persistenceID interface{}, fromSequenceNumber interface{}, toSequenceNumber interface{}, max interface{}) *EventsStore_ReplayEvents_Call { + return &EventsStore_ReplayEvents_Call{Call: _e.mock.On("ReplayEvents", ctx, persistenceID, fromSequenceNumber, toSequenceNumber, max)} +} + +func (_c *EventsStore_ReplayEvents_Call) Run(run func(ctx context.Context, persistenceID string, fromSequenceNumber uint64, toSequenceNumber uint64, max uint64)) *EventsStore_ReplayEvents_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(uint64), args[3].(uint64), args[4].(uint64)) + }) + return _c +} + +func (_c *EventsStore_ReplayEvents_Call) Return(_a0 []*egopb.Event, _a1 error) *EventsStore_ReplayEvents_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EventsStore_ReplayEvents_Call) RunAndReturn(run func(context.Context, string, uint64, uint64, uint64) ([]*egopb.Event, error)) *EventsStore_ReplayEvents_Call { + _c.Call.Return(run) + return _c +} + +// ShardNumbers provides a mock function with given fields: ctx +func (_m *EventsStore) ShardNumbers(ctx context.Context) ([]uint64, error) { + ret := _m.Called(ctx) + + var r0 []uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]uint64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []uint64); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]uint64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EventsStore_ShardNumbers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShardNumbers' +type EventsStore_ShardNumbers_Call struct { + *mock.Call +} + +// ShardNumbers is a helper method to define mock.On call +// - ctx context.Context +func (_e *EventsStore_Expecter) ShardNumbers(ctx interface{}) *EventsStore_ShardNumbers_Call { + return &EventsStore_ShardNumbers_Call{Call: _e.mock.On("ShardNumbers", ctx)} +} + +func (_c *EventsStore_ShardNumbers_Call) Run(run func(ctx context.Context)) *EventsStore_ShardNumbers_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *EventsStore_ShardNumbers_Call) Return(_a0 []uint64, _a1 error) *EventsStore_ShardNumbers_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EventsStore_ShardNumbers_Call) RunAndReturn(run func(context.Context) ([]uint64, error)) *EventsStore_ShardNumbers_Call { + _c.Call.Return(run) + return _c +} + +// WriteEvents provides a mock function with given fields: ctx, events +func (_m *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) error { + ret := _m.Called(ctx, events) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []*egopb.Event) error); ok { + r0 = rf(ctx, events) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventsStore_WriteEvents_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteEvents' +type EventsStore_WriteEvents_Call struct { + *mock.Call +} + +// WriteEvents is a helper method to define mock.On call +// - ctx context.Context +// - events []*egopb.Event +func (_e *EventsStore_Expecter) WriteEvents(ctx interface{}, events interface{}) *EventsStore_WriteEvents_Call { + return &EventsStore_WriteEvents_Call{Call: _e.mock.On("WriteEvents", ctx, events)} +} + +func (_c *EventsStore_WriteEvents_Call) Run(run func(ctx context.Context, events []*egopb.Event)) *EventsStore_WriteEvents_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]*egopb.Event)) + }) + return _c +} + +func (_c *EventsStore_WriteEvents_Call) Return(_a0 error) *EventsStore_WriteEvents_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventsStore_WriteEvents_Call) RunAndReturn(run func(context.Context, []*egopb.Event) error) *EventsStore_WriteEvents_Call { + _c.Call.Return(run) + return _c +} + +// NewEventsStore creates a new instance of EventsStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEventsStore(t interface { + mock.TestingT + Cleanup(func()) +}) *EventsStore { + mock := &EventsStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/offsetstore/offset_store.go b/mocks/offsetstore/offset_store.go new file mode 100644 index 0000000..3f5fb3d --- /dev/null +++ b/mocks/offsetstore/offset_store.go @@ -0,0 +1,305 @@ +// Code generated by mockery. DO NOT EDIT. + +package offsetstore + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + egopb "github.com/tochemey/ego/v3/egopb" +) + +// OffsetStore is an autogenerated mock type for the OffsetStore type +type OffsetStore struct { + mock.Mock +} + +type OffsetStore_Expecter struct { + mock *mock.Mock +} + +func (_m *OffsetStore) EXPECT() *OffsetStore_Expecter { + return &OffsetStore_Expecter{mock: &_m.Mock} +} + +// Connect provides a mock function with given fields: ctx +func (_m *OffsetStore) Connect(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OffsetStore_Connect_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Connect' +type OffsetStore_Connect_Call struct { + *mock.Call +} + +// Connect is a helper method to define mock.On call +// - ctx context.Context +func (_e *OffsetStore_Expecter) Connect(ctx interface{}) *OffsetStore_Connect_Call { + return &OffsetStore_Connect_Call{Call: _e.mock.On("Connect", ctx)} +} + +func (_c *OffsetStore_Connect_Call) Run(run func(ctx context.Context)) *OffsetStore_Connect_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *OffsetStore_Connect_Call) Return(_a0 error) *OffsetStore_Connect_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OffsetStore_Connect_Call) RunAndReturn(run func(context.Context) error) *OffsetStore_Connect_Call { + _c.Call.Return(run) + return _c +} + +// Disconnect provides a mock function with given fields: ctx +func (_m *OffsetStore) Disconnect(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OffsetStore_Disconnect_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Disconnect' +type OffsetStore_Disconnect_Call struct { + *mock.Call +} + +// Disconnect is a helper method to define mock.On call +// - ctx context.Context +func (_e *OffsetStore_Expecter) Disconnect(ctx interface{}) *OffsetStore_Disconnect_Call { + return &OffsetStore_Disconnect_Call{Call: _e.mock.On("Disconnect", ctx)} +} + +func (_c *OffsetStore_Disconnect_Call) Run(run func(ctx context.Context)) *OffsetStore_Disconnect_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *OffsetStore_Disconnect_Call) Return(_a0 error) *OffsetStore_Disconnect_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OffsetStore_Disconnect_Call) RunAndReturn(run func(context.Context) error) *OffsetStore_Disconnect_Call { + _c.Call.Return(run) + return _c +} + +// GetCurrentOffset provides a mock function with given fields: ctx, projectionID +func (_m *OffsetStore) GetCurrentOffset(ctx context.Context, projectionID *egopb.ProjectionId) (*egopb.Offset, error) { + ret := _m.Called(ctx, projectionID) + + var r0 *egopb.Offset + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *egopb.ProjectionId) (*egopb.Offset, error)); ok { + return rf(ctx, projectionID) + } + if rf, ok := ret.Get(0).(func(context.Context, *egopb.ProjectionId) *egopb.Offset); ok { + r0 = rf(ctx, projectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*egopb.Offset) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *egopb.ProjectionId) error); ok { + r1 = rf(ctx, projectionID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// OffsetStore_GetCurrentOffset_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCurrentOffset' +type OffsetStore_GetCurrentOffset_Call struct { + *mock.Call +} + +// GetCurrentOffset is a helper method to define mock.On call +// - ctx context.Context +// - projectionID *egopb.ProjectionId +func (_e *OffsetStore_Expecter) GetCurrentOffset(ctx interface{}, projectionID interface{}) *OffsetStore_GetCurrentOffset_Call { + return &OffsetStore_GetCurrentOffset_Call{Call: _e.mock.On("GetCurrentOffset", ctx, projectionID)} +} + +func (_c *OffsetStore_GetCurrentOffset_Call) Run(run func(ctx context.Context, projectionID *egopb.ProjectionId)) *OffsetStore_GetCurrentOffset_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*egopb.ProjectionId)) + }) + return _c +} + +func (_c *OffsetStore_GetCurrentOffset_Call) Return(currentOffset *egopb.Offset, err error) *OffsetStore_GetCurrentOffset_Call { + _c.Call.Return(currentOffset, err) + return _c +} + +func (_c *OffsetStore_GetCurrentOffset_Call) RunAndReturn(run func(context.Context, *egopb.ProjectionId) (*egopb.Offset, error)) *OffsetStore_GetCurrentOffset_Call { + _c.Call.Return(run) + return _c +} + +// Ping provides a mock function with given fields: ctx +func (_m *OffsetStore) Ping(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OffsetStore_Ping_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ping' +type OffsetStore_Ping_Call struct { + *mock.Call +} + +// Ping is a helper method to define mock.On call +// - ctx context.Context +func (_e *OffsetStore_Expecter) Ping(ctx interface{}) *OffsetStore_Ping_Call { + return &OffsetStore_Ping_Call{Call: _e.mock.On("Ping", ctx)} +} + +func (_c *OffsetStore_Ping_Call) Run(run func(ctx context.Context)) *OffsetStore_Ping_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *OffsetStore_Ping_Call) Return(_a0 error) *OffsetStore_Ping_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OffsetStore_Ping_Call) RunAndReturn(run func(context.Context) error) *OffsetStore_Ping_Call { + _c.Call.Return(run) + return _c +} + +// ResetOffset provides a mock function with given fields: ctx, projectionName, value +func (_m *OffsetStore) ResetOffset(ctx context.Context, projectionName string, value int64) error { + ret := _m.Called(ctx, projectionName, value) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, int64) error); ok { + r0 = rf(ctx, projectionName, value) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OffsetStore_ResetOffset_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResetOffset' +type OffsetStore_ResetOffset_Call struct { + *mock.Call +} + +// ResetOffset is a helper method to define mock.On call +// - ctx context.Context +// - projectionName string +// - value int64 +func (_e *OffsetStore_Expecter) ResetOffset(ctx interface{}, projectionName interface{}, value interface{}) *OffsetStore_ResetOffset_Call { + return &OffsetStore_ResetOffset_Call{Call: _e.mock.On("ResetOffset", ctx, projectionName, value)} +} + +func (_c *OffsetStore_ResetOffset_Call) Run(run func(ctx context.Context, projectionName string, value int64)) *OffsetStore_ResetOffset_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(int64)) + }) + return _c +} + +func (_c *OffsetStore_ResetOffset_Call) Return(_a0 error) *OffsetStore_ResetOffset_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OffsetStore_ResetOffset_Call) RunAndReturn(run func(context.Context, string, int64) error) *OffsetStore_ResetOffset_Call { + _c.Call.Return(run) + return _c +} + +// WriteOffset provides a mock function with given fields: ctx, offset +func (_m *OffsetStore) WriteOffset(ctx context.Context, offset *egopb.Offset) error { + ret := _m.Called(ctx, offset) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *egopb.Offset) error); ok { + r0 = rf(ctx, offset) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OffsetStore_WriteOffset_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteOffset' +type OffsetStore_WriteOffset_Call struct { + *mock.Call +} + +// WriteOffset is a helper method to define mock.On call +// - ctx context.Context +// - offset *egopb.Offset +func (_e *OffsetStore_Expecter) WriteOffset(ctx interface{}, offset interface{}) *OffsetStore_WriteOffset_Call { + return &OffsetStore_WriteOffset_Call{Call: _e.mock.On("WriteOffset", ctx, offset)} +} + +func (_c *OffsetStore_WriteOffset_Call) Run(run func(ctx context.Context, offset *egopb.Offset)) *OffsetStore_WriteOffset_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*egopb.Offset)) + }) + return _c +} + +func (_c *OffsetStore_WriteOffset_Call) Return(_a0 error) *OffsetStore_WriteOffset_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OffsetStore_WriteOffset_Call) RunAndReturn(run func(context.Context, *egopb.Offset) error) *OffsetStore_WriteOffset_Call { + _c.Call.Return(run) + return _c +} + +// NewOffsetStore creates a new instance of OffsetStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewOffsetStore(t interface { + mock.TestingT + Cleanup(func()) +}) *OffsetStore { + mock := &OffsetStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/projection/actor.go b/projection/projection.go similarity index 95% rename from projection/actor.go rename to projection/projection.go index 71d8d24..ffe2e97 100644 --- a/projection/actor.go +++ b/projection/projection.go @@ -27,8 +27,6 @@ package projection import ( "context" - "google.golang.org/protobuf/types/known/emptypb" - "github.com/tochemey/goakt/v2/actors" "github.com/tochemey/goakt/v2/goaktpb" @@ -36,9 +34,6 @@ import ( "github.com/tochemey/ego/v3/offsetstore" ) -// Start is used to start the projection -var Start = new(emptypb.Empty) - // Projection defines the projection actor // Only a single instance of this will run throughout the cluster type Projection struct { diff --git a/projection/actor_test.go b/projection/projection_test.go similarity index 96% rename from projection/actor_test.go rename to projection/projection_test.go index ff0dd7a..914d7ab 100644 --- a/projection/actor_test.go +++ b/projection/projection_test.go @@ -46,7 +46,7 @@ import ( testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) -func TestActor(t *testing.T) { +func TestProjection(t *testing.T) { t.Run("With happy path", func(t *testing.T) { defer goleak.VerifyNone(t) ctx := context.TODO() @@ -77,7 +77,7 @@ func TestActor(t *testing.T) { // set up the offset store offsetStore := memoffsetstore.NewOffsetStore() assert.NotNil(t, offsetStore) - require.NoError(t, offsetStore.Disconnect(ctx)) + require.NoError(t, offsetStore.Connect(ctx)) handler := NewDiscardHandler(logger) @@ -90,9 +90,6 @@ func TestActor(t *testing.T) { lib.Pause(time.Second) - // start the projection - require.NoError(t, actors.Tell(ctx, pid, Start)) - // persist some events state, err := anypb.New(new(testpb.Account)) assert.NoError(t, err) diff --git a/projection/runner.go b/projection/runner.go index ab24f91..10ca4ae 100644 --- a/projection/runner.go +++ b/projection/runner.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "os" "time" "github.com/flowchartsman/retry" @@ -81,7 +82,7 @@ func newRunner(name string, opts ...Option) *runner { runner := &runner{ name: name, - logger: log.DefaultLogger, + logger: log.New(log.ErrorLevel, os.Stderr), handler: handler, eventsStore: eventsStore, offsetsStore: offsetStore, @@ -111,18 +112,10 @@ func (x *runner) Start(ctx context.Context) error { return errors.New("offsets store is not defined") } - if err := x.offsetsStore.Ping(ctx); err != nil { - return fmt.Errorf("failed to connect to the offsets store: %ws", err) - } - if x.eventsStore == nil { return errors.New("events store is not defined") } - if err := x.eventsStore.Ping(ctx); err != nil { - return fmt.Errorf("failed to connect to the events store: %w", err) - } - // we will ping the stores 5 times to see whether there have started successfully or not. // The operation will be done in an exponential backoff mechanism with an initial delay of a second and a maximum delay of a second. // Once the retries have completed and still not connected we fail the start process of the projection. @@ -356,8 +349,9 @@ func (x *runner) doProcess(ctx context.Context, shard uint64) error { func (x *runner) preStart(ctx context.Context) error { if !x.resetOffsetTo.IsZero() { if err := x.offsetsStore.ResetOffset(ctx, x.name, x.resetOffsetTo.UnixMilli()); err != nil { - x.logger.Error(fmt.Errorf("failed to reset projection=%s: %w", x.name, err)) - return err + fmtErr := fmt.Errorf("failed to reset projection=%s: %w", x.name, err) + x.logger.Error(fmtErr) + return fmtErr } } diff --git a/projection/runner_test.go b/projection/runner_test.go index ec72355..dfec31b 100644 --- a/projection/runner_test.go +++ b/projection/runner_test.go @@ -32,6 +32,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" @@ -43,11 +44,13 @@ import ( "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstore/memory" "github.com/tochemey/ego/v3/internal/lib" + mockseventstore "github.com/tochemey/ego/v3/mocks/eventstore" + mocksoffsetstore "github.com/tochemey/ego/v3/mocks/offsetstore" memoffsetstore "github.com/tochemey/ego/v3/offsetstore/memory" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) -func TestProjection(t *testing.T) { +func TestRunner(t *testing.T) { t.Run("with happy path", func(t *testing.T) { defer goleak.VerifyNone(t) ctx := context.TODO() @@ -57,25 +60,27 @@ func TestProjection(t *testing.T) { logger := log.DefaultLogger // set up the event store - journalStore := memory.NewEventsStore() - assert.NotNil(t, journalStore) - require.NoError(t, journalStore.Connect(ctx)) + eventsStore := memory.NewEventsStore() + assert.NotNil(t, eventsStore) + require.NoError(t, eventsStore.Connect(ctx)) // set up the offset store offsetStore := memoffsetstore.NewOffsetStore() assert.NotNil(t, offsetStore) - require.NoError(t, offsetStore.Disconnect(ctx)) + require.NoError(t, offsetStore.Connect(ctx)) // set up the projection // create a handler that return successfully handler := NewDiscardHandler(logger) // create an instance of the projection - runner := newRunner(projectionName, handler, journalStore, offsetStore, WithRefreshInterval(time.Millisecond)) + runner := newRunner(projectionName, handler, eventsStore, offsetStore, WithRefreshInterval(time.Millisecond)) // start the projection err := runner.Start(ctx) require.NoError(t, err) + require.Equal(t, projectionName, runner.Name()) + // run the projection runner.Run(ctx) @@ -101,7 +106,7 @@ func TestProjection(t *testing.T) { } } - require.NoError(t, journalStore.WriteEvents(ctx, journals)) + require.NoError(t, eventsStore.WriteEvents(ctx, journals)) require.True(t, runner.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case @@ -122,7 +127,7 @@ func TestProjection(t *testing.T) { assert.EqualValues(t, 10, handler.EventsCount()) // free resources - assert.NoError(t, journalStore.Disconnect(ctx)) + assert.NoError(t, eventsStore.Disconnect(ctx)) assert.NoError(t, offsetStore.Disconnect(ctx)) assert.NoError(t, runner.Stop()) }) @@ -408,6 +413,138 @@ func TestProjection(t *testing.T) { assert.NoError(t, offsetStore.Disconnect(ctx)) assert.NoError(t, runner.Stop()) }) + t.Run("with events store is not defined", func(t *testing.T) { + ctx := context.Background() + handler := NewDiscardHandler(log.DiscardLogger) + projectionName := "db-writer" + // set up the offset store + offsetStore := memoffsetstore.NewOffsetStore() + assert.NotNil(t, offsetStore) + require.NoError(t, offsetStore.Connect(ctx)) + + // create an instance of the projection + runner := newRunner(projectionName, handler, nil, offsetStore, WithRefreshInterval(time.Millisecond)) + // start the projection + err := runner.Start(ctx) + require.Error(t, err) + assert.EqualError(t, err, "events store is not defined") + assert.NoError(t, offsetStore.Disconnect(ctx)) + }) + t.Run("with offset store is not defined", func(t *testing.T) { + ctx := context.Background() + handler := NewDiscardHandler(log.DiscardLogger) + projectionName := "db-writer" + // set up the event store + eventsStore := memory.NewEventsStore() + assert.NotNil(t, eventsStore) + require.NoError(t, eventsStore.Connect(ctx)) + + // create an instance of the projection + runner := newRunner(projectionName, handler, eventsStore, nil, WithRefreshInterval(time.Millisecond)) + // start the projection + err := runner.Start(ctx) + require.Error(t, err) + assert.EqualError(t, err, "offsets store is not defined") + assert.NoError(t, eventsStore.Disconnect(ctx)) + }) + t.Run("with start when already started returns nil", func(t *testing.T) { + ctx := context.Background() + handler := NewDiscardHandler(log.DiscardLogger) + projectionName := "db-writer" + // set up the event store + eventsStore := memory.NewEventsStore() + assert.NotNil(t, eventsStore) + require.NoError(t, eventsStore.Connect(ctx)) + + // set up the offset store + offsetStore := memoffsetstore.NewOffsetStore() + assert.NotNil(t, offsetStore) + require.NoError(t, offsetStore.Connect(ctx)) + + // create an instance of the projection + runner := newRunner(projectionName, handler, eventsStore, offsetStore, WithRefreshInterval(time.Millisecond)) + // start the projection + err := runner.Start(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + + require.NoError(t, runner.Start(ctx)) + + // free resources + assert.NoError(t, eventsStore.Disconnect(ctx)) + assert.NoError(t, offsetStore.Disconnect(ctx)) + assert.NoError(t, runner.Stop()) + }) + t.Run("with start when max retry to ping events store fails", func(t *testing.T) { + ctx := context.Background() + handler := NewDiscardHandler(log.DiscardLogger) + projectionName := "db-writer" + + // set up the offset store + offsetStore := memoffsetstore.NewOffsetStore() + assert.NotNil(t, offsetStore) + require.NoError(t, offsetStore.Connect(ctx)) + + eventsStore := new(mockseventstore.EventsStore) + eventsStore.EXPECT().Ping(mock.Anything).Return(errors.New("fail ping")) + + // create an instance of the projection + runner := newRunner(projectionName, handler, eventsStore, offsetStore, WithRefreshInterval(time.Millisecond)) + // start the projection + err := runner.Start(ctx) + require.Error(t, err) + assert.EqualError(t, err, "failed to start the projection: fail ping") + eventsStore.AssertExpectations(t) + assert.NoError(t, offsetStore.Disconnect(ctx)) + }) + t.Run("with start when max retry to ping offsets store store fails", func(t *testing.T) { + ctx := context.Background() + handler := NewDiscardHandler(log.DiscardLogger) + projectionName := "db-writer" + + // set up the event store + eventsStore := memory.NewEventsStore() + assert.NotNil(t, eventsStore) + require.NoError(t, eventsStore.Connect(ctx)) + + offsetStore := new(mocksoffsetstore.OffsetStore) + offsetStore.EXPECT().Ping(mock.Anything).Return(errors.New("fail ping")) + + // create an instance of the projection + runner := newRunner(projectionName, handler, eventsStore, offsetStore, WithRefreshInterval(time.Millisecond)) + // start the projection + err := runner.Start(ctx) + require.Error(t, err) + assert.EqualError(t, err, "failed to start the projection: fail ping") + offsetStore.AssertExpectations(t) + assert.NoError(t, eventsStore.Disconnect(ctx)) + }) + t.Run("with start when ResetOffset fails", func(t *testing.T) { + ctx := context.Background() + handler := NewDiscardHandler(log.DiscardLogger) + projectionName := "db-writer" + + eventsStore := new(mockseventstore.EventsStore) + eventsStore.EXPECT().Ping(mock.Anything).Return(nil) + + resetOffsetTo := time.Now().UTC() + offsetStore := new(mocksoffsetstore.OffsetStore) + offsetStore.EXPECT().Ping(mock.Anything).Return(nil) + offsetStore.EXPECT().ResetOffset(ctx, projectionName, resetOffsetTo.UnixMilli()).Return(errors.New("fail to reset offset")) + + // create an instance of the projection + runner := newRunner(projectionName, handler, eventsStore, offsetStore, WithRefreshInterval(time.Millisecond)) + // purposefully for test + runner.resetOffsetTo = resetOffsetTo + + // start the projection + err := runner.Start(ctx) + require.Error(t, err) + assert.EqualError(t, err, "failed to reset projection=db-writer: fail to reset offset") + offsetStore.AssertExpectations(t) + eventsStore.AssertExpectations(t) + }) } type testHandler1 struct{} diff --git a/readme.md b/readme.md index f8fc640..0db0b35 100644 --- a/readme.md +++ b/readme.md @@ -18,16 +18,18 @@ In eGo commands sent the aggregate root are processed in order. When a command i and timestamp that can help track it. The aggregate root in eGo is responsible for defining how to handle events that are the result of command handlers. The end result of events handling is to build the new state of the aggregate root. When running in cluster mode, aggregate root are sharded. -- Commands handler: The command handlers define how to handle each incoming command, +- `Commands handler`: The command handlers define how to handle each incoming command, which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can be returned as a no-op. Command handlers are the meat of the event sourced actor. They encode the business rules of your event sourced actor and act as a guardian of the Aggregate consistency. The command handler must first validate that the incoming command can be applied to the current model state. Any decision should be solely based on the data passed in the commands and the state of the Behavior. In case of successful validation, one or more events expressing the mutations are persisted. Once the events are persisted, they are applied to the state producing a new valid state. -- Events handler: The event handlers are used to mutate the state of the Aggregate by applying the events to it. +- `Events handler`: The event handlers are used to mutate the state of the Aggregate by applying the events to it. Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event store. +#### Howto + To define an Aggregate Root, one needs to: 1. the state of the aggregate root using google protocol buffers message 2. the various commands that will be handled by the aggregate root @@ -41,7 +43,9 @@ Every event handled by Aggregate Root are pushed to an events stream. That enabl #### Projection One can add a projection to the eGo engine to help build a read model. Projections in eGo rely on an offset store to track how far they have consumed events -persisted by the write model. The offset used in eGo is a timestamp-based offset. +persisted by the write model. The offset used in eGo is a timestamp-based offset. One can also: +- remove a given projection: this will stop the projection and remove it from the system +- check the status of a given projection #### Events Store @@ -59,7 +63,11 @@ One can implement a custom offsets store. See [OffsetStore](./offsetstore/iface. The cluster mode heavily relies on [Go-Akt](https://github.com/Tochemey/goakt#clustering) clustering. -#### Examples +#### Mocks + +eGo ships in some [mocks](./mocks) + +### Examples Check the [examples](./example)