Skip to content

Commit

Permalink
Update your-first-app example (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 authored Aug 29, 2019
1 parent 2bff82c commit f64191a
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 105 deletions.
2 changes: 1 addition & 1 deletion _examples/your-first-app/.validate_example.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
validation_cmd: "docker-compose up"
teardown_cmd: "docker-compose down"
timeout: 60
expected_output: "received event [0-9]+"
expected_output: "received event {ID:[0-9]+}"
65 changes: 38 additions & 27 deletions _examples/your-first-app/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
# Your first app
# Your first Watermill app

Before checking the examples, it is recommended to read [getting started guide](https://watermill.io/docs/getting-started/).
This example project shows a basic setup of Watermill. The application runs in a loop, consuming events from a Kafka
topic, modyfing them and publishing to another topic.

There's a docker-compose file included, so you can run the example and see it in action.

To understand the background and internals, see [getting started guide](https://watermill.io/docs/getting-started/).

## Files

- [main.go](main.go) - example source code, probably the **most interesting file to you**
- [main.go](main.go) - example source code, the **most interesting file for you**
- [docker-compose.yml](docker-compose.yml) - local environment Docker Compose configuration, contains Golang, Kafka and Zookeeper
- [go.mod](go.mod) - Go modules dependencies, you can find more information at [Go wiki](https://github.com/golang/go/wiki/Modules)
- [go.sum](go.sum) - Go modules checksums
Expand All @@ -17,34 +22,40 @@ To run this example you will need Docker and docker-compose installed. See insta

```bash
> docker-compose up
[a lot of Kafka logs...]
server_1 | 2018/11/18 11:16:34 received event 1542539794
server_1 | 2018/11/18 11:16:35 received event 1542539795
server_1 | 2018/11/18 11:16:36 received event 1542539796
server_1 | 2018/11/18 11:16:37 received event 1542539797
server_1 | 2018/11/18 11:16:38 received event 1542539798
server_1 | 2018/11/18 11:16:39 received event 1542539799
[some initial logs]
server_1 | 2019/08/29 19:41:23 received event {ID:0}
server_1 | 2019/08/29 19:41:23 received event {ID:1}
server_1 | 2019/08/29 19:41:23 received event {ID:2}
server_1 | 2019/08/29 19:41:23 received event {ID:3}
server_1 | 2019/08/29 19:41:24 received event {ID:4}
server_1 | 2019/08/29 19:41:25 received event {ID:5}
server_1 | 2019/08/29 19:41:26 received event {ID:6}
server_1 | 2019/08/29 19:41:27 received event {ID:7}
server_1 | 2019/08/29 19:41:28 received event {ID:8}
server_1 | 2019/08/29 19:41:29 received event {ID:9}
```

Now all that's left is to take a look at the Kafka topics to check that all messages are there:
Open another termial and take a look at Kafka topics to see that all messages are there. The initial events should be present on the `events` topic:

```bash
> docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic deadly-easy-topic

{"num":1542539605}
{"num":1542539606}
{"num":1542539607}
{"num":1542539608}
{"num":1542539609}
{"num":1542539610}
> docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic events

{"id":12}
{"id":13}
{"id":14}
{"id":15}
{"id":16}
{"id":17}
```

And the processed messages will be stored in the `events-processed` topic:

```bash
> docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic deadly-easy-topic_processed

{"event_num":1542539642,"time":"2018-11-18T11:14:02.26452706Z"}
{"event_num":1542539643,"time":"2018-11-18T11:14:03.26633757Z"}
{"event_num":1542539644,"time":"2018-11-18T11:14:04.268420818Z"}
{"event_num":1542539645,"time":"2018-11-18T11:14:05.270183092Z"}
{"event_num":1542539646,"time":"2018-11-18T11:14:06.272387936Z"}
{"event_num":1542539647,"time":"2018-11-18T11:14:07.274663833Z"}
> docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic events-processed

{"processed_id":21,"time":"2019-08-29T19:42:31.4464598Z"}
{"processed_id":22,"time":"2019-08-29T19:42:32.4501767Z"}
{"processed_id":23,"time":"2019-08-29T19:42:33.4530692Z"}
{"processed_id":24,"time":"2019-08-29T19:42:34.4561694Z"}
{"processed_id":25,"time":"2019-08-29T19:42:35.4608918Z"}
```
2 changes: 1 addition & 1 deletion _examples/your-first-app/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
server:
image: golang:1.11
image: golang:1.12
restart: unless-stopped
depends_on:
- kafka
Expand Down
2 changes: 0 additions & 2 deletions _examples/your-first-app/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4r
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
Expand Down Expand Up @@ -85,7 +84,6 @@ github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 h1:eUm8ma4+yPknhXtkYlWh3tMkE6gBjXZToDned9s2gbQ=
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
Expand Down
151 changes: 78 additions & 73 deletions _examples/your-first-app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

var (
brokers = []string{"kafka:9092"}
consumeTopic = "your-first-app_events"
publishTopic = "your-first-app_events-processed"
consumeTopic = "events"
publishTopic = "events-processed"

logger = watermill.NewStdLogger(
true, // debug
Expand All @@ -25,66 +25,20 @@ var (
marshaler = kafka.DefaultMarshaler{}
)

// createPublisher is a helper function which creates Publisher, in this case - Kafka Publisher.
// It is based on `confluent-kafka-go` which requires rdkafka installed.
func createPublisher() message.Publisher {
kafkaPublisher, err := kafka.NewPublisher(
brokers,
marshaler,
nil,
logger,
)
if err != nil {
panic(err)
}

return kafkaPublisher
}

// createSubscriber is helper function as previous, but in this case creates Subscriber.
func createSubscriber(consumerGroup string) message.Subscriber {
kafkaSubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{
Brokers: brokers,
ConsumerGroup: consumerGroup, // every handler will have separated consumer group
}, nil, marshaler, logger)
if err != nil {
panic(err)
}

return kafkaSubscriber
}

type event struct {
Num int `json:"num"`
ID int `json:"id"`
}

// publishEvents which will produce some events for consuming.
func publishEvents(publisher message.Publisher) {
i := 0
for {
payload, err := json.Marshal(event{Num: int(time.Now().Unix())})
if err != nil {
panic(err)
}

err = publisher.Publish(consumeTopic, message.NewMessage(
watermill.NewUUID(), // uuid of the message, very useful for debugging
payload,
))
if err != nil {
panic(err)
}

i++
time.Sleep(time.Second)
}
type processedEvent struct {
ProcessedID int `json:"processed_id"`
Time time.Time `json:"time"`
}

func main() {
publisher := createPublisher()

// producing events in background
go publishEvents(publisher)
// Subscriber is created with consumer group handler_1
subscriber := createSubscriber("handler_1")

router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
Expand All @@ -94,49 +48,100 @@ func main() {
router.AddPlugin(plugin.SignalsHandler)
router.AddMiddleware(middleware.Recoverer)

// Consumer is created with consumer group handler_1
subscriber := createSubscriber("handler_1")

// adding handler, multiple handlers can be added
// Adding a handler (multiple handlers can be added)
router.AddHandler(
"handler_1", // handler name, must be unique
consumeTopic, // topic from which messages should be consumed
subscriber,
publishTopic, // topic to which produced messages should be published
publishTopic, // topic to which messages should be published
publisher,
func(msg *message.Message) ([]*message.Message, error) {
consumedPayload := event{}
err := json.Unmarshal(msg.Payload, &consumedPayload)
if err != nil {
// default behavior when handler returns error is sending Nack (negative-acknowledgement)
// the message will be processed again
// When a handler returns an error, the default behavior is to send a Nack (negative-acknowledgement).
// The message will be processed again.
//
// you can change default behaviour by using for example middleware.Retry or middleware.PoisonQueue
// you can also implement your own
// You can change the default behaviour by using middlewares, like Retry or PoisonQueue.
// You can also implement your own middleware.
return nil, err
}

log.Printf("received event %d", consumedPayload.Num)
log.Printf("received event %+v", consumedPayload)

type processedEvent struct {
EventNum int `json:"event_num"`
Time time.Time `json:"time"`
}
producedPayload, err := json.Marshal(processedEvent{
EventNum: consumedPayload.Num,
Time: time.Now(),
newPayload, err := json.Marshal(processedEvent{
ProcessedID: consumedPayload.ID,
Time: time.Now(),
})
if err != nil {
return nil, err
}

producedMessage := message.NewMessage(watermill.NewUUID(), producedPayload)
newMessage := message.NewMessage(watermill.NewUUID(), newPayload)

return []*message.Message{producedMessage}, nil
return []*message.Message{newMessage}, nil
},
)

// Simulate incoming events in the background
go simulateEvents(publisher)

if err := router.Run(context.Background()); err != nil {
panic(err)
}
}

// createPublisher is a helper function that creates a Publisher, in this case - the Kafka Publisher.
func createPublisher() message.Publisher {
kafkaPublisher, err := kafka.NewPublisher(
brokers,
marshaler,
nil,
logger,
)
if err != nil {
panic(err)
}

return kafkaPublisher
}

// createSubscriber is a helper function similar to the previous one, but in this case it creates a Subscriber.
func createSubscriber(consumerGroup string) message.Subscriber {
kafkaSubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{
Brokers: brokers,
ConsumerGroup: consumerGroup, // every handler will use a separate consumer group
}, nil, marshaler, logger)
if err != nil {
panic(err)
}

return kafkaSubscriber
}

// simulateEvents produces events that will be later consumed.
func simulateEvents(publisher message.Publisher) {
i := 0
for {
e := event{
ID: i,
}

payload, err := json.Marshal(e)
if err != nil {
panic(err)
}

err = publisher.Publish(consumeTopic, message.NewMessage(
watermill.NewUUID(), // internal uuid of the message, useful for debugging
payload,
))
if err != nil {
panic(err)
}

i++

time.Sleep(time.Second)
}
}
2 changes: 1 addition & 1 deletion dev/validate-examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/ThreeDotsLabs/watermill/dev/validate-examples

go 1.12

require gopkg.in/yaml.v2 v2.2.2 // indirect
require gopkg.in/yaml.v2 v2.2.2
1 change: 1 addition & 0 deletions dev/validate-examples/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
5 changes: 5 additions & 0 deletions dev/validate-examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func main() {
}

exampleDirectory := filepath.Dir(exampleConfig)

fmt.Printf("validating %s\n", exampleDirectory)

ok, err := validate(exampleConfig)

if err != nil {
Expand Down Expand Up @@ -88,6 +91,8 @@ func validate(path string) (bool, error) {
return false, fmt.Errorf("could not attach to stdout, err: %v", err)
}

fmt.Printf("running: %v\n", validationCmd.Args)

err = validationCmd.Start()
if err != nil {
return false, fmt.Errorf("could not start validation, err: %v", err)
Expand Down

0 comments on commit f64191a

Please sign in to comment.