Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update your-first-app example #118

Merged
merged 5 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _examples/your-first-app/.validate_example.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
validation_cmd: "docker-compose up"
teardown_cmd: "docker-compose down"
timeout: 60
expected_output: "received event [0-9]+"
expected_output: "received event {ID:[0-9]+}"
65 changes: 38 additions & 27 deletions _examples/your-first-app/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
# Your first app
# Your first Watermill app

Before checking the examples, it is recommended to read [getting started guide](https://watermill.io/docs/getting-started/).
This example project shows a basic setup of Watermill. The application runs in a loop, consuming events from a Kafka
topic, modyfing them and publishing to another topic.

There's a docker-compose file included, so you can run the example and see it in action.

To understand the background and internals, see [getting started guide](https://watermill.io/docs/getting-started/).

## Files

- [main.go](main.go) - example source code, probably the **most interesting file to you**
- [main.go](main.go) - example source code, the **most interesting file for you**
- [docker-compose.yml](docker-compose.yml) - local environment Docker Compose configuration, contains Golang, Kafka and Zookeeper
- [go.mod](go.mod) - Go modules dependencies, you can find more information at [Go wiki](https://github.com/golang/go/wiki/Modules)
- [go.sum](go.sum) - Go modules checksums
Expand All @@ -17,34 +22,40 @@ To run this example you will need Docker and docker-compose installed. See insta

```bash
> docker-compose up
[a lot of Kafka logs...]
server_1 | 2018/11/18 11:16:34 received event 1542539794
server_1 | 2018/11/18 11:16:35 received event 1542539795
server_1 | 2018/11/18 11:16:36 received event 1542539796
server_1 | 2018/11/18 11:16:37 received event 1542539797
server_1 | 2018/11/18 11:16:38 received event 1542539798
server_1 | 2018/11/18 11:16:39 received event 1542539799
[some initial logs]
server_1 | 2019/08/29 19:41:23 received event {ID:0}
server_1 | 2019/08/29 19:41:23 received event {ID:1}
server_1 | 2019/08/29 19:41:23 received event {ID:2}
server_1 | 2019/08/29 19:41:23 received event {ID:3}
server_1 | 2019/08/29 19:41:24 received event {ID:4}
server_1 | 2019/08/29 19:41:25 received event {ID:5}
server_1 | 2019/08/29 19:41:26 received event {ID:6}
server_1 | 2019/08/29 19:41:27 received event {ID:7}
server_1 | 2019/08/29 19:41:28 received event {ID:8}
server_1 | 2019/08/29 19:41:29 received event {ID:9}
```

Now all that's left is to take a look at the Kafka topics to check that all messages are there:
Open another termial and take a look at Kafka topics to see that all messages are there. The initial events should be present on the `events` topic:

```bash
> docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic deadly-easy-topic

{"num":1542539605}
{"num":1542539606}
{"num":1542539607}
{"num":1542539608}
{"num":1542539609}
{"num":1542539610}
> docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic events

{"id":12}
{"id":13}
{"id":14}
{"id":15}
{"id":16}
{"id":17}
```

And the processed messages will be stored in the `events-processed` topic:

```bash
> docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic deadly-easy-topic_processed

{"event_num":1542539642,"time":"2018-11-18T11:14:02.26452706Z"}
{"event_num":1542539643,"time":"2018-11-18T11:14:03.26633757Z"}
{"event_num":1542539644,"time":"2018-11-18T11:14:04.268420818Z"}
{"event_num":1542539645,"time":"2018-11-18T11:14:05.270183092Z"}
{"event_num":1542539646,"time":"2018-11-18T11:14:06.272387936Z"}
{"event_num":1542539647,"time":"2018-11-18T11:14:07.274663833Z"}
> docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic events-processed

{"processed_id":21,"time":"2019-08-29T19:42:31.4464598Z"}
{"processed_id":22,"time":"2019-08-29T19:42:32.4501767Z"}
{"processed_id":23,"time":"2019-08-29T19:42:33.4530692Z"}
{"processed_id":24,"time":"2019-08-29T19:42:34.4561694Z"}
{"processed_id":25,"time":"2019-08-29T19:42:35.4608918Z"}
```
2 changes: 1 addition & 1 deletion _examples/your-first-app/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
server:
image: golang:1.11
image: golang:1.12
restart: unless-stopped
depends_on:
- kafka
Expand Down
2 changes: 0 additions & 2 deletions _examples/your-first-app/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4r
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
Expand Down Expand Up @@ -85,7 +84,6 @@ github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 h1:eUm8ma4+yPknhXtkYlWh3tMkE6gBjXZToDned9s2gbQ=
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
Expand Down
151 changes: 78 additions & 73 deletions _examples/your-first-app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

var (
brokers = []string{"kafka:9092"}
consumeTopic = "your-first-app_events"
publishTopic = "your-first-app_events-processed"
consumeTopic = "events"
publishTopic = "events-processed"

logger = watermill.NewStdLogger(
true, // debug
Expand All @@ -25,66 +25,20 @@ var (
marshaler = kafka.DefaultMarshaler{}
)

// createPublisher is a helper function which creates Publisher, in this case - Kafka Publisher.
// It is based on `confluent-kafka-go` which requires rdkafka installed.
func createPublisher() message.Publisher {
kafkaPublisher, err := kafka.NewPublisher(
brokers,
marshaler,
nil,
logger,
)
if err != nil {
panic(err)
}

return kafkaPublisher
}

// createSubscriber is helper function as previous, but in this case creates Subscriber.
func createSubscriber(consumerGroup string) message.Subscriber {
kafkaSubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{
Brokers: brokers,
ConsumerGroup: consumerGroup, // every handler will have separated consumer group
}, nil, marshaler, logger)
if err != nil {
panic(err)
}

return kafkaSubscriber
}

type event struct {
Num int `json:"num"`
ID int `json:"id"`
}

// publishEvents which will produce some events for consuming.
func publishEvents(publisher message.Publisher) {
i := 0
for {
payload, err := json.Marshal(event{Num: int(time.Now().Unix())})
if err != nil {
panic(err)
}

err = publisher.Publish(consumeTopic, message.NewMessage(
watermill.NewUUID(), // uuid of the message, very useful for debugging
payload,
))
if err != nil {
panic(err)
}

i++
time.Sleep(time.Second)
}
type processedEvent struct {
ProcessedID int `json:"processed_id"`
Time time.Time `json:"time"`
}

func main() {
publisher := createPublisher()

// producing events in background
go publishEvents(publisher)
// Subscriber is created with consumer group handler_1
subscriber := createSubscriber("handler_1")

router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
Expand All @@ -94,49 +48,100 @@ func main() {
router.AddPlugin(plugin.SignalsHandler)
router.AddMiddleware(middleware.Recoverer)

// Consumer is created with consumer group handler_1
subscriber := createSubscriber("handler_1")

// adding handler, multiple handlers can be added
// Adding a handler (multiple handlers can be added)
router.AddHandler(
"handler_1", // handler name, must be unique
consumeTopic, // topic from which messages should be consumed
subscriber,
publishTopic, // topic to which produced messages should be published
publishTopic, // topic to which messages should be published
publisher,
func(msg *message.Message) ([]*message.Message, error) {
consumedPayload := event{}
err := json.Unmarshal(msg.Payload, &consumedPayload)
if err != nil {
// default behavior when handler returns error is sending Nack (negative-acknowledgement)
// the message will be processed again
// When a handler returns an error, the default behavior is to send a Nack (negative-acknowledgement).
// The message will be processed again.
//
// you can change default behaviour by using for example middleware.Retry or middleware.PoisonQueue
// you can also implement your own
// You can change the default behaviour by using middlewares, like Retry or PoisonQueue.
// You can also implement your own middleware.
return nil, err
}

log.Printf("received event %d", consumedPayload.Num)
log.Printf("received event %+v", consumedPayload)

type processedEvent struct {
EventNum int `json:"event_num"`
Time time.Time `json:"time"`
}
producedPayload, err := json.Marshal(processedEvent{
EventNum: consumedPayload.Num,
Time: time.Now(),
newPayload, err := json.Marshal(processedEvent{
ProcessedID: consumedPayload.ID,
Time: time.Now(),
})
if err != nil {
return nil, err
}

producedMessage := message.NewMessage(watermill.NewUUID(), producedPayload)
newMessage := message.NewMessage(watermill.NewUUID(), newPayload)

return []*message.Message{producedMessage}, nil
return []*message.Message{newMessage}, nil
},
)

// Simulate incoming events in the background
go simulateEvents(publisher)

if err := router.Run(context.Background()); err != nil {
panic(err)
}
}

// createPublisher is a helper function that creates a Publisher, in this case - the Kafka Publisher.
func createPublisher() message.Publisher {
kafkaPublisher, err := kafka.NewPublisher(
brokers,
marshaler,
nil,
logger,
)
if err != nil {
panic(err)
}

return kafkaPublisher
}

// createSubscriber is a helper function similar to the previous one, but in this case it creates a Subscriber.
func createSubscriber(consumerGroup string) message.Subscriber {
kafkaSubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{
Brokers: brokers,
ConsumerGroup: consumerGroup, // every handler will use a separate consumer group
}, nil, marshaler, logger)
if err != nil {
panic(err)
}

return kafkaSubscriber
}

// simulateEvents produces events that will be later consumed.
func simulateEvents(publisher message.Publisher) {
i := 0
for {
e := event{
ID: i,
}

payload, err := json.Marshal(e)
if err != nil {
panic(err)
}

err = publisher.Publish(consumeTopic, message.NewMessage(
watermill.NewUUID(), // internal uuid of the message, useful for debugging
payload,
))
if err != nil {
panic(err)
}

i++

time.Sleep(time.Second)
}
}
2 changes: 1 addition & 1 deletion dev/validate-examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/ThreeDotsLabs/watermill/dev/validate-examples

go 1.12

require gopkg.in/yaml.v2 v2.2.2 // indirect
require gopkg.in/yaml.v2 v2.2.2
1 change: 1 addition & 0 deletions dev/validate-examples/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
5 changes: 5 additions & 0 deletions dev/validate-examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func main() {
}

exampleDirectory := filepath.Dir(exampleConfig)

fmt.Printf("validating %s\n", exampleDirectory)

ok, err := validate(exampleConfig)

if err != nil {
Expand Down Expand Up @@ -88,6 +91,8 @@ func validate(path string) (bool, error) {
return false, fmt.Errorf("could not attach to stdout, err: %v", err)
}

fmt.Printf("running: %v\n", validationCmd.Args)

err = validationCmd.Start()
if err != nil {
return false, fmt.Errorf("could not start validation, err: %v", err)
Expand Down