Skip to content

Commit

Permalink
docs(examples): update "todo" example
Browse files Browse the repository at this point in the history
  • Loading branch information
bounoable committed Apr 20, 2022
1 parent a0d981e commit 071a763
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 84 deletions.
1 change: 1 addition & 0 deletions examples/todo/.docker/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
environment:
NATS_URL: nats://eventbus:4222
MONGO_URL: mongodb://eventstore:27017
TODO_DEBOUNCE: ${TODO_DEBOUNCE}
depends_on:
- eventbus
- eventstore
Expand Down
13 changes: 11 additions & 2 deletions examples/todo/Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
.PHONY: test
test:
go test ./...

.PHONY: build
build:
docker compose -f .docker/compose.yml build

.PHONY: up
up:
.PHONY: default
default:
docker compose -f .docker/compose.yml up --remove-orphans; \
docker compose -f .docker/compose.yml down --remove-orphans

.PHONY: debounce
debounce:
TODO_DEBOUNCE=1s docker compose -f .docker/compose.yml up --remove-orphans; \
docker compose -f .docker/compose.yml down --remove-orphans

.PHONY: down
down:
docker compose -f .docker/compose.yml down --remove-orphans
34 changes: 23 additions & 11 deletions examples/todo/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
# Example: To-Do List
# ExampleTo-Do App

This example uses a "todo list" to show how to define and test an event-sourced
aggregate and commands for the aggregate.
This example shows how to implement a "todo list" app. The app consists of a
"todo" server and a client that sends commands to the server.

- `events.go` defines the events of the list
- `commands.go` defines and handles commands that can be dispatched by other
services
- `list.go` implements the "todo list" aggregate
- `repos.go` defines the repositories
- `cmd/server` is the todo server
- `cmd/client` is an example client that dispatches commands to the server
- [`list.go`](./list.go) implements the "todo list" aggregate
- [`events.go`](./events.go) defines and registers the "todo list" events
- [`commands.go`](./commands.go) defines and registers the "todo list" commands
- [`counter.go`](./counter.go) implements a read model projection
- [`cmd/server`](./cmd/server) is the server app
- [`cmd/client`](./cmd/client) is the client app

## Used Backends
## Details

### Backends

- NATS Core (Event Bus)
- MongoDB (Event Store)
Expand All @@ -20,6 +21,17 @@ aggregate and commands for the aggregate.

Requires Docker.

### Default setup

```sh
make build && make up
```

### Debounced projection

This setup sets the `TODO_DEBOUNCE` environment variable to `1s`, resulting in
a single "batch"-update of the `Counter` projection.

```sh
make build && make debounce
```
14 changes: 12 additions & 2 deletions examples/todo/cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"log"
"math/rand"
"time"

"github.com/google/uuid"
Expand All @@ -28,6 +29,8 @@ func main() {
// Create a new todo list and add some tasks.
listID := uuid.New()
for i := 0; i < 10; i++ {
sleepRandom()

cmd := todo.AddTask(listID, fmt.Sprintf("Task %d", i+1))
if err := cbus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil {
log.Panicf("Failed to dispatch command: %v [cmd=%v, task=%q]", err, cmd.Name(), cmd.Payload())
Expand All @@ -36,6 +39,8 @@ func main() {

// Then remove every second task.
for i := 0; i < 10; i += 2 {
sleepRandom()

cmd := todo.RemoveTask(listID, fmt.Sprintf("Task %d", i+1))
if err := cbus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil {
log.Panicf("Failed to dispatch command: %v [cmd=%v, task=%q]", err, cmd.Name(), cmd.Payload())
Expand All @@ -45,11 +50,16 @@ func main() {
// Remaining tasks: Task 2, Task 4, Task 6, Task 8, Task 10

// Then mark "Task 6" and "Task 10" as done.
sleepRandom()

cmd := todo.DoneTasks(listID, "Task 6", "Task 10")
if err := cbus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil {
log.Panicf("Failed to dispatch command: %v [cmd=%v, tasks=%v]", err, cmd.Name(), cmd.Payload())
}
}

// Give the "server" service time to run projections before Docker stops all services.
<-time.After(3 * time.Second)
func sleepRandom() {
dur := time.Duration(rand.Intn(1000)) * time.Millisecond
log.Printf("Waiting for %s before dispatching next command ...", dur)
time.Sleep(dur)
}
30 changes: 26 additions & 4 deletions examples/todo/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,24 @@ package main
import (
"fmt"
"log"
"os"
"time"

"github.com/modernice/goes/aggregate/repository"
"github.com/modernice/goes/examples/todo"
"github.com/modernice/goes/examples/todo/cmd"
"github.com/modernice/goes/projection/schedule"
)

var intro = `Running "todo" server with the following options:
TODO_DEBOUNCE: %s
`

func main() {
debounce := parseDebounce()

fmt.Printf(intro, debounceText(debounce))

var setup cmd.Setup

ctx, cancel := setup.Context()
Expand All @@ -23,15 +32,28 @@ func main() {
cbus, _ := setup.Commands(ereg.Registry, ebus)

repo := setup.Aggregates(estore)
lists := repository.Typed(repo, todo.New)

counter := todo.NewCounter()
counterErrors, err := counter.Project(ctx, ebus, estore, schedule.Debounce(time.Second))
counterErrors, err := counter.Project(ctx, ebus, estore, schedule.Debounce(debounce))
if err != nil {
log.Panic(fmt.Errorf("project counter: %w", err))
}

commandErrors := todo.HandleCommands(ctx, cbus, lists)
commandErrors := todo.HandleCommands(ctx, cbus, repo)

cmd.LogErrors(ctx, counterErrors, commandErrors)
}

func parseDebounce() time.Duration {
if d, err := time.ParseDuration(os.Getenv("TODO_DEBOUNCE")); err == nil {
return d
}
return 0
}

func debounceText(dur time.Duration) string {
if dur == 0 {
return fmt.Sprintf("%s (disabled)", dur)
}
return dur.String()
}
3 changes: 2 additions & 1 deletion examples/todo/cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/modernice/goes/command"
"github.com/modernice/goes/command/cmdbus"
"github.com/modernice/goes/event"
"github.com/modernice/goes/event/eventstore"
"github.com/modernice/goes/examples/todo"
)

Expand All @@ -30,7 +31,7 @@ func (s *Setup) Events(ctx context.Context) (_ event.Bus, _ event.Store, _ *code
todo.RegisterEvents(r)

bus := nats.NewEventBus(r)
store := mongo.NewEventStore(r)
store := eventstore.WithBus(mongo.NewEventStore(r), bus)

return bus, store, r, func() {
log.Printf("Disconnecting from NATS ...")
Expand Down
39 changes: 8 additions & 31 deletions examples/todo/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package todo

import (
"context"
"log"

"github.com/google/uuid"
"github.com/modernice/goes/aggregate"
"github.com/modernice/goes/codec"
"github.com/modernice/goes/command"
"github.com/modernice/goes/helper/streams"
"github.com/modernice/goes/command/handler"
)

// Commands
const (
AddTaskCmd = "todo.list.add_task"
RemoveTaskCmd = "todo.list.remove_task"
Expand Down Expand Up @@ -38,33 +39,9 @@ func RegisterCommands(r *codec.GobRegistry) {
codec.GobRegister[[]string](r, DoneTaskCmd)
}

// HandleCommands handles commands until ctx is canceled. Any asynchronous
// errors that happen during the command handling are reported to the returned
// error channel.
func HandleCommands(ctx context.Context, bus command.Bus, lists ListRepository) <-chan error {
addErrors := command.MustHandle(ctx, bus, AddTaskCmd, func(ctx command.Ctx[string]) error {
return lists.Use(ctx, ctx.AggregateID(), func(list *List) error {
log.Printf("Handling %q command ...", ctx.Name())
defer list.print()
return list.Add(ctx.Payload())
})
})

removeErrors := command.MustHandle(ctx, bus, RemoveTaskCmd, func(ctx command.Ctx[string]) error {
return lists.Use(ctx, ctx.AggregateID(), func(list *List) error {
log.Printf("Handling %q command ...", ctx.Name())
defer list.print()
return list.Remove(ctx.Payload())
})
})

doneErrors := command.MustHandle(ctx, bus, DoneTaskCmd, func(ctx command.Ctx[[]string]) error {
return lists.Use(ctx, ctx.AggregateID(), func(list *List) error {
log.Printf("Handling %q command ...", ctx.Name())
defer list.print()
return list.Done(ctx.Payload()...)
})
})

return streams.FanInContext(ctx, addErrors, removeErrors, doneErrors)
// HandleCommands handles todo list commands that are dispatched over the
// provided command bus until ctx is canceled. Command errors are sent into
// the returned error channel.
func HandleCommands(ctx context.Context, bus command.Bus, repo aggregate.Repository) <-chan error {
return handler.New(New, repo, bus).MustHandle(ctx)
}
16 changes: 12 additions & 4 deletions examples/todo/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"fmt"
"log"
"sync"
"time"

"github.com/modernice/goes/event"
"github.com/modernice/goes/projection"
"github.com/modernice/goes/projection/schedule"
)

// Counter provides the number of active, removed and archived tasks.
// Counter is a read model that provides the number of active, removed, and archived tasks.
type Counter struct {
*projection.Base

Expand Down Expand Up @@ -62,14 +63,19 @@ func (c *Counter) Project(
store event.Store,
opts ...schedule.ContinuousOption,
) (<-chan error, error) {
s := schedule.Continuously(bus, store, TaskEvents[:], opts...)
s := schedule.Continuously(bus, store, ListEvents[:], opts...)

errs, err := s.Subscribe(ctx, func(ctx projection.Job) error {
log.Println("Applying job ...")
c.print()
defer c.print()

start := time.Now()
log.Println("[Counter] Applying projection job ...")
defer func() { log.Printf("[Counter] Applied projection job. (%s)", time.Since(start)) }()

c.Lock()
defer c.Unlock()
defer c.print()

return ctx.Apply(ctx, c)
})
if err != nil {
Expand All @@ -94,5 +100,7 @@ func (c *Counter) tasksDone(evt event.Of[[]string]) {
}

func (c *Counter) print() {
c.RLock()
defer c.RUnlock()
log.Printf("[Counter] Active: %d, Removed: %d, Archived: %d", c.active, c.removed, c.archived)
}
12 changes: 7 additions & 5 deletions examples/todo/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ import (
"github.com/modernice/goes/codec"
)

// Events
const (
TaskAdded = "todo.list.task_added"
TaskRemoved = "todo.list.task_removed"
TasksDone = "todo.list.tasks_done"
)

var TaskEvents = [...]string{
// ListEvents are all events of a todo list.
var ListEvents = [...]string{
TaskAdded,
TaskRemoved,
TasksDone,
}

// TaskRemovedEvent is the event data for TaskRemoved.
// TaskRemovedEvent is the event data for the TaskRemoved event.
//
// You can use any type as event data. In this example the event data is a
// struct. If you look below you can see that the TaskAdded and TasksDone events
// use other types for their event data.
// You can use any type as event data. In this case, the event data is a struct.
// The event data types for the TaskAdded and TasksDone events are `string` and
// `[]string` respectively.
type TaskRemovedEvent struct{ Task string }

// RegisterEvents registers events into a registry.
Expand Down
24 changes: 23 additions & 1 deletion examples/todo/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/google/uuid"
"github.com/modernice/goes/aggregate"
"github.com/modernice/goes/command"
"github.com/modernice/goes/command/handler"
"github.com/modernice/goes/event"
)

Expand All @@ -15,20 +17,40 @@ const ListAggregate = "todo.list"
// List is a "todo" list.
type List struct {
*aggregate.Base
*handler.BaseHandler

tasks []string
archive []string
}

// New returns the "todo" list with the given id.
func New(id uuid.UUID) *List {
list := &List{Base: aggregate.New(ListAggregate, id)}
var list *List
list = &List{
Base: aggregate.New(ListAggregate, id),
BaseHandler: handler.NewBase(
handler.BeforeHandle(func(ctx command.Context) error {
log.Printf("Handling %q command ... [list=%s]", ctx.Name(), id)
return nil
}),
handler.AfterHandle(func(command.Context) {
list.print()
}),
),
}

// Register the event appliers for each of the aggregate events.
event.ApplyWith(list, list.add, TaskAdded)
event.ApplyWith(list, list.remove, TaskRemoved)
event.ApplyWith(list, list.done, TasksDone)

// Register the commands handlers.
command.ApplyWith(list, list.Add, AddTaskCmd)
command.ApplyWith(list, list.Remove, RemoveTaskCmd)
command.ApplyWith(list, func(tasks []string) error {
return list.Done(tasks...)
}, DoneTaskCmd)

return list
}

Expand Down
Loading

0 comments on commit 071a763

Please sign in to comment.