diff --git a/examples/todo/.docker/compose.yml b/examples/todo/.docker/compose.yml index a7380cd7..12a349a5 100644 --- a/examples/todo/.docker/compose.yml +++ b/examples/todo/.docker/compose.yml @@ -8,6 +8,7 @@ services: environment: NATS_URL: nats://eventbus:4222 MONGO_URL: mongodb://eventstore:27017 + TODO_DEBOUNCE: ${TODO_DEBOUNCE} depends_on: - eventbus - eventstore diff --git a/examples/todo/Makefile b/examples/todo/Makefile index 174a30b1..6719bb18 100644 --- a/examples/todo/Makefile +++ b/examples/todo/Makefile @@ -1,12 +1,21 @@ +.PHONY: test +test: + go test ./... + .PHONY: build build: docker compose -f .docker/compose.yml build -.PHONY: up -up: +.PHONY: default +default: docker compose -f .docker/compose.yml up --remove-orphans; \ docker compose -f .docker/compose.yml down --remove-orphans +.PHONY: debounce +debounce: + TODO_DEBOUNCE=1s docker compose -f .docker/compose.yml up --remove-orphans; \ + docker compose -f .docker/compose.yml down --remove-orphans + .PHONY: down down: docker compose -f .docker/compose.yml down --remove-orphans diff --git a/examples/todo/README.md b/examples/todo/README.md index 8e80aabd..a3bff9bf 100644 --- a/examples/todo/README.md +++ b/examples/todo/README.md @@ -1,17 +1,18 @@ -# Example: To-Do List +# Example – To-Do App -This example uses a "todo list" to show how to define and test an event-sourced -aggregate and commands for the aggregate. +This example shows how to implement a "todo list" app. The app consists of a +"todo" server and a client that sends commands to the server. -- `events.go` defines the events of the list -- `commands.go` defines and handles commands that can be dispatched by other - services -- `list.go` implements the "todo list" aggregate -- `repos.go` defines the repositories -- `cmd/server` is the todo server -- `cmd/client` is an example client that dispatches commands to the server +- [`list.go`](./list.go) implements the "todo list" aggregate +- [`events.go`](./events.go) defines and registers the "todo list" events +- [`commands.go`](./commands.go) defines and registers the "todo list" commands +- [`counter.go`](./counter.go) implements a read model projection +- [`cmd/server`](./cmd/server) is the server app +- [`cmd/client`](./cmd/client) is the client app -## Used Backends +## Details + +### Backends - NATS Core (Event Bus) - MongoDB (Event Store) @@ -20,6 +21,17 @@ aggregate and commands for the aggregate. Requires Docker. +### Default setup + ```sh make build && make up ``` + +### Debounced projection + +This setup sets the `TODO_DEBOUNCE` environment variable to `1s`, resulting in +a single "batch"-update of the `Counter` projection. + +```sh +make build && make debounce +``` diff --git a/examples/todo/cmd/client/main.go b/examples/todo/cmd/client/main.go index 8b9a32aa..bdc197ba 100644 --- a/examples/todo/cmd/client/main.go +++ b/examples/todo/cmd/client/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "log" + "math/rand" "time" "github.com/google/uuid" @@ -28,6 +29,8 @@ func main() { // Create a new todo list and add some tasks. listID := uuid.New() for i := 0; i < 10; i++ { + sleepRandom() + cmd := todo.AddTask(listID, fmt.Sprintf("Task %d", i+1)) if err := cbus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil { log.Panicf("Failed to dispatch command: %v [cmd=%v, task=%q]", err, cmd.Name(), cmd.Payload()) @@ -36,6 +39,8 @@ func main() { // Then remove every second task. for i := 0; i < 10; i += 2 { + sleepRandom() + cmd := todo.RemoveTask(listID, fmt.Sprintf("Task %d", i+1)) if err := cbus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil { log.Panicf("Failed to dispatch command: %v [cmd=%v, task=%q]", err, cmd.Name(), cmd.Payload()) @@ -45,11 +50,16 @@ func main() { // Remaining tasks: Task 2, Task 4, Task 6, Task 8, Task 10 // Then mark "Task 6" and "Task 10" as done. + sleepRandom() + cmd := todo.DoneTasks(listID, "Task 6", "Task 10") if err := cbus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil { log.Panicf("Failed to dispatch command: %v [cmd=%v, tasks=%v]", err, cmd.Name(), cmd.Payload()) } +} - // Give the "server" service time to run projections before Docker stops all services. - <-time.After(3 * time.Second) +func sleepRandom() { + dur := time.Duration(rand.Intn(1000)) * time.Millisecond + log.Printf("Waiting for %s before dispatching next command ...", dur) + time.Sleep(dur) } diff --git a/examples/todo/cmd/server/main.go b/examples/todo/cmd/server/main.go index 0142d0aa..77fcb983 100644 --- a/examples/todo/cmd/server/main.go +++ b/examples/todo/cmd/server/main.go @@ -3,15 +3,24 @@ package main import ( "fmt" "log" + "os" "time" - "github.com/modernice/goes/aggregate/repository" "github.com/modernice/goes/examples/todo" "github.com/modernice/goes/examples/todo/cmd" "github.com/modernice/goes/projection/schedule" ) +var intro = `Running "todo" server with the following options: + + TODO_DEBOUNCE: %s +` + func main() { + debounce := parseDebounce() + + fmt.Printf(intro, debounceText(debounce)) + var setup cmd.Setup ctx, cancel := setup.Context() @@ -23,15 +32,28 @@ func main() { cbus, _ := setup.Commands(ereg.Registry, ebus) repo := setup.Aggregates(estore) - lists := repository.Typed(repo, todo.New) counter := todo.NewCounter() - counterErrors, err := counter.Project(ctx, ebus, estore, schedule.Debounce(time.Second)) + counterErrors, err := counter.Project(ctx, ebus, estore, schedule.Debounce(debounce)) if err != nil { log.Panic(fmt.Errorf("project counter: %w", err)) } - commandErrors := todo.HandleCommands(ctx, cbus, lists) + commandErrors := todo.HandleCommands(ctx, cbus, repo) cmd.LogErrors(ctx, counterErrors, commandErrors) } + +func parseDebounce() time.Duration { + if d, err := time.ParseDuration(os.Getenv("TODO_DEBOUNCE")); err == nil { + return d + } + return 0 +} + +func debounceText(dur time.Duration) string { + if dur == 0 { + return fmt.Sprintf("%s (disabled)", dur) + } + return dur.String() +} diff --git a/examples/todo/cmd/setup.go b/examples/todo/cmd/setup.go index a16f68fd..10e77835 100644 --- a/examples/todo/cmd/setup.go +++ b/examples/todo/cmd/setup.go @@ -14,6 +14,7 @@ import ( "github.com/modernice/goes/command" "github.com/modernice/goes/command/cmdbus" "github.com/modernice/goes/event" + "github.com/modernice/goes/event/eventstore" "github.com/modernice/goes/examples/todo" ) @@ -30,7 +31,7 @@ func (s *Setup) Events(ctx context.Context) (_ event.Bus, _ event.Store, _ *code todo.RegisterEvents(r) bus := nats.NewEventBus(r) - store := mongo.NewEventStore(r) + store := eventstore.WithBus(mongo.NewEventStore(r), bus) return bus, store, r, func() { log.Printf("Disconnecting from NATS ...") diff --git a/examples/todo/commands.go b/examples/todo/commands.go index 1601831c..fe85a3b1 100644 --- a/examples/todo/commands.go +++ b/examples/todo/commands.go @@ -2,14 +2,15 @@ package todo import ( "context" - "log" "github.com/google/uuid" + "github.com/modernice/goes/aggregate" "github.com/modernice/goes/codec" "github.com/modernice/goes/command" - "github.com/modernice/goes/helper/streams" + "github.com/modernice/goes/command/handler" ) +// Commands const ( AddTaskCmd = "todo.list.add_task" RemoveTaskCmd = "todo.list.remove_task" @@ -38,33 +39,9 @@ func RegisterCommands(r *codec.GobRegistry) { codec.GobRegister[[]string](r, DoneTaskCmd) } -// HandleCommands handles commands until ctx is canceled. Any asynchronous -// errors that happen during the command handling are reported to the returned -// error channel. -func HandleCommands(ctx context.Context, bus command.Bus, lists ListRepository) <-chan error { - addErrors := command.MustHandle(ctx, bus, AddTaskCmd, func(ctx command.Ctx[string]) error { - return lists.Use(ctx, ctx.AggregateID(), func(list *List) error { - log.Printf("Handling %q command ...", ctx.Name()) - defer list.print() - return list.Add(ctx.Payload()) - }) - }) - - removeErrors := command.MustHandle(ctx, bus, RemoveTaskCmd, func(ctx command.Ctx[string]) error { - return lists.Use(ctx, ctx.AggregateID(), func(list *List) error { - log.Printf("Handling %q command ...", ctx.Name()) - defer list.print() - return list.Remove(ctx.Payload()) - }) - }) - - doneErrors := command.MustHandle(ctx, bus, DoneTaskCmd, func(ctx command.Ctx[[]string]) error { - return lists.Use(ctx, ctx.AggregateID(), func(list *List) error { - log.Printf("Handling %q command ...", ctx.Name()) - defer list.print() - return list.Done(ctx.Payload()...) - }) - }) - - return streams.FanInContext(ctx, addErrors, removeErrors, doneErrors) +// HandleCommands handles todo list commands that are dispatched over the +// provided command bus until ctx is canceled. Command errors are sent into +// the returned error channel. +func HandleCommands(ctx context.Context, bus command.Bus, repo aggregate.Repository) <-chan error { + return handler.New(New, repo, bus).MustHandle(ctx) } diff --git a/examples/todo/counter.go b/examples/todo/counter.go index 0efcb9a3..56d05a1a 100644 --- a/examples/todo/counter.go +++ b/examples/todo/counter.go @@ -5,13 +5,14 @@ import ( "fmt" "log" "sync" + "time" "github.com/modernice/goes/event" "github.com/modernice/goes/projection" "github.com/modernice/goes/projection/schedule" ) -// Counter provides the number of active, removed and archived tasks. +// Counter is a read model that provides the number of active, removed, and archived tasks. type Counter struct { *projection.Base @@ -62,14 +63,19 @@ func (c *Counter) Project( store event.Store, opts ...schedule.ContinuousOption, ) (<-chan error, error) { - s := schedule.Continuously(bus, store, TaskEvents[:], opts...) + s := schedule.Continuously(bus, store, ListEvents[:], opts...) errs, err := s.Subscribe(ctx, func(ctx projection.Job) error { - log.Println("Applying job ...") + c.print() + defer c.print() + + start := time.Now() + log.Println("[Counter] Applying projection job ...") + defer func() { log.Printf("[Counter] Applied projection job. (%s)", time.Since(start)) }() c.Lock() defer c.Unlock() - defer c.print() + return ctx.Apply(ctx, c) }) if err != nil { @@ -94,5 +100,7 @@ func (c *Counter) tasksDone(evt event.Of[[]string]) { } func (c *Counter) print() { + c.RLock() + defer c.RUnlock() log.Printf("[Counter] Active: %d, Removed: %d, Archived: %d", c.active, c.removed, c.archived) } diff --git a/examples/todo/events.go b/examples/todo/events.go index fa0eea46..751ccf58 100644 --- a/examples/todo/events.go +++ b/examples/todo/events.go @@ -4,23 +4,25 @@ import ( "github.com/modernice/goes/codec" ) +// Events const ( TaskAdded = "todo.list.task_added" TaskRemoved = "todo.list.task_removed" TasksDone = "todo.list.tasks_done" ) -var TaskEvents = [...]string{ +// ListEvents are all events of a todo list. +var ListEvents = [...]string{ TaskAdded, TaskRemoved, TasksDone, } -// TaskRemovedEvent is the event data for TaskRemoved. +// TaskRemovedEvent is the event data for the TaskRemoved event. // -// You can use any type as event data. In this example the event data is a -// struct. If you look below you can see that the TaskAdded and TasksDone events -// use other types for their event data. +// You can use any type as event data. In this case, the event data is a struct. +// The event data types for the TaskAdded and TasksDone events are `string` and +// `[]string` respectively. type TaskRemovedEvent struct{ Task string } // RegisterEvents registers events into a registry. diff --git a/examples/todo/list.go b/examples/todo/list.go index 757b31c7..bd04d22a 100644 --- a/examples/todo/list.go +++ b/examples/todo/list.go @@ -6,6 +6,8 @@ import ( "github.com/google/uuid" "github.com/modernice/goes/aggregate" + "github.com/modernice/goes/command" + "github.com/modernice/goes/command/handler" "github.com/modernice/goes/event" ) @@ -15,6 +17,7 @@ const ListAggregate = "todo.list" // List is a "todo" list. type List struct { *aggregate.Base + *handler.BaseHandler tasks []string archive []string @@ -22,13 +25,32 @@ type List struct { // New returns the "todo" list with the given id. func New(id uuid.UUID) *List { - list := &List{Base: aggregate.New(ListAggregate, id)} + var list *List + list = &List{ + Base: aggregate.New(ListAggregate, id), + BaseHandler: handler.NewBase( + handler.BeforeHandle(func(ctx command.Context) error { + log.Printf("Handling %q command ... [list=%s]", ctx.Name(), id) + return nil + }), + handler.AfterHandle(func(command.Context) { + list.print() + }), + ), + } // Register the event appliers for each of the aggregate events. event.ApplyWith(list, list.add, TaskAdded) event.ApplyWith(list, list.remove, TaskRemoved) event.ApplyWith(list, list.done, TasksDone) + // Register the commands handlers. + command.ApplyWith(list, list.Add, AddTaskCmd) + command.ApplyWith(list, list.Remove, RemoveTaskCmd) + command.ApplyWith(list, func(tasks []string) error { + return list.Done(tasks...) + }, DoneTaskCmd) + return list } diff --git a/examples/todo/repos.go b/examples/todo/repos.go deleted file mode 100644 index fdb86c95..00000000 --- a/examples/todo/repos.go +++ /dev/null @@ -1,23 +0,0 @@ -package todo - -import "github.com/modernice/goes/aggregate" - -// ListRepository is the "todo list" repository. -// -// goes provides a generic TypedRepository that can be used to define your -// repositories within your own app, strongly typed. Use the -// github.com/modernice/goes/aggregate/repository.Typed function to create a -// TypedRepository from an aggregate.Repository. The provided makeFunc infers -// the aggregate type automatically :). -// -// type List struct { *aggregate.Base } -// func NewList(id uuid.UUID) *List { ... } -// -// // Define the ListRepository interface as an alias. -// type ListRepository = aggregate.TypedRepository[*List] -// -// var repo aggregate.Repository -// typed := repository.Typed(repo, NewList) -// -// // typed is a ListRepository, which is an aggregate.TypedRepository[*List] -type ListRepository = aggregate.TypedRepository[*List]