producer+consumer and walkthrough
marcel-dempers committed May 5, 2021
1 parent 7ec37e4 commit 8a7a1e3
# Introduction to Kafka

This guide is under the messaging section alongside other message brokers like `RabbitMQ` etc. </br>
Checkout the guide under the [messaging/kafka](../../messaging/kafka/ folder
# Notes
# Introduction to Kafka

# Building a Docker file
docker run --rm --name kafka -it kafka bash
Official [Docs](

docker run --rm -it kafka bash -c "ls -l /kafka/"
docker run --rm -it kafka bash -c "cat ~/kafka/config/"
docker run --rm -it kafka bash -c "ls -l ~/kafka/bin"
## Building a Docker file

As always, we start with a `dockerfile` </br>
We can build our `dockerfile`

cd .\messaging\kafka\
docker build . -t aimvector/kafka:2.7.0

## Exploring the Kafka Install

We can then run it to explore the contents:

docker run --rm --name kafka -it aimvector/kafka:2.7.0 bash
ls -l /kafka/
cat /kafka/config/
ls -l /kafka/bin

We can use the `docker cp` command to copy the file out of our container:

docker cp kafka:/kafka/config/ ./
docker cp kafka:/kafka/config/ ./zookeeper/

# Kafka
We'll need the Kafka configuration to tune our server and Kafka also requires
at least one Zookeeper instance in order to function. To achieve high availability, we'll run
multiple kafka as well as multiple zookeeper instances in the future

# Zookeeper

Let's build a Zookeeper image. The Apache folks have made it easy to start a Zookeeper instance the same way as the Kafka instance by simply running the `` script.

cd .\messaging\kafka\zookeeper
docker build . -t aimvector/zookeeper:2.7.0

Let's create a kafka network and run 1 zookeeper instance

docker network create kafka
docker run -it --rm --name kafka --net kafka -v ${PWD}/ kafka
docker run -d --rm --name zookeeper --net kafka zookeeper

# Zookeeper
# Kafka - 1

docker run -d `
--rm `
--name kafka-1 `
--net kafka `
-v ${PWD}/config/kafka-1/ `

# Kafka - 2

docker run -d `
--rm `
--name kafka-2 `
--net kafka `
-v ${PWD}/config/kafka-2/ `

# Kafka - 3

docker run -d `
--rm `
--name kafka-3 `
--net kafka `
-v ${PWD}/config/kafka-3/ `

docker run -it --rm --name zookeeper --net kafka zookeeper

# Topic
docker exec -it kafka bash

/kafka/bin/ --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
Let's create a Topic that allows us to store `Order` information. </br>
To create a topic, Kafka and Zookeeper have scripts with the installer that allows us to do so. </br>

# Producer
Access the container:
docker exec -it zookeeper bash
Create the Topic:
/kafka/bin/ \
--create \
--zookeeper zookeeper:2181 \
--replication-factor 1 \
--partitions 3 \
--topic Orders

echo "Hello, World" | /kafka/bin/ --broker-list localhost:9092 --topic TutorialTopic > /dev/null
Describe our Topic:
/kafka/bin/ \
--describe \
--topic Orders \
--zookeeper zookeeper:2181

# Consumer
We can take a look at how Kafka stores data

/kafka/bin/ --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
apt install -y tree
tree /tmp/kafka-logs/

# Build an Application: Producer
# Simple Producer & Consumer
The Kafka installation also ships with a script that allows us to produce
and consume messages to our Kafka network:

cd messaging/kafka/applications/producer
echo "New Order: 1" | \
/kafka/bin/ \
--broker-list kafka-1:9092 \
--topic Orders > /dev/null

docker run -it --rm -v ${PWD}:/app -w /app golang:1.15-alpine
We can then run the consumer that will receive that message on that Orders topic:

apk -U add ca-certificates && \
apk update && apk upgrade && apk add pkgconf git bash build-base && \
cd /tmp && \
git clone && \
cd librdkafka && \
git checkout v1.6.1 && \
./configure --prefix /usr && make && make install
/kafka/bin/ \
--bootstrap-server kafka-1:9092 \
--topic Orders --from-beginning
#apk add --no-cache git make librdkafka-dev gcc musl-dev librdkafka

go mod init producer
go get
Once we have a message in Kafka, we can explore where it got stored in which partition:

ls -lh /tmp/kafka-logs/Orders-*
total 4.0K
-rw-r--r-- 1 root root 10M May 4 06:54 00000000000000000000.index
-rw-r--r-- 1 root root 0 May 4 06:54 00000000000000000000.log
-rw-r--r-- 1 root root 10M May 4 06:54 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 May 4 06:54 leader-epoch-checkpoint
total 4.0K
-rw-r--r-- 1 root root 10M May 4 06:54 00000000000000000000.index
-rw-r--r-- 1 root root 0 May 4 06:54 00000000000000000000.log
-rw-r--r-- 1 root root 10M May 4 06:54 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 May 4 06:54 leader-epoch-checkpoint
total 8.0K
-rw-r--r-- 1 root root 10M May 4 06:54 00000000000000000000.index
-rw-r--r-- 1 root root 80 May 4 06:57 00000000000000000000.log
-rw-r--r-- 1 root root 10M May 4 06:54 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 May 4 06:54 leader-epoch-checkpoint

By seeing 0 bytes in partition 0 and 1, we know the message is sitting in partition 2 as it has 80 bytes. </br>
We can check the message with :

cat /tmp/kafka-logs/Orders-2/*.log

## Building a Producer: Go

docker run -it `
--net kafka `
-e KAFKA_PEERS="kafka-1:9092,kafka-2:9092,kafka-3:9092" `
-e KAFKA_TOPIC="Orders" `
-p 80:80 `

## Building a Consumer: Go

cd messaging\kafka\applications\consumer
docker build . -t kafka-consumer
docker run -it `
--net kafka `
-e KAFKA_PEERS="kafka-1:9092,kafka-2:9092,kafka-3:9092" `
-e KAFKA_TOPIC="Orders" `

# High Availability + Replication

Next up, we'll take a look at achieving high availability using replication techniques
and taking advantage of Kafka's distributed architecture.
package main

import (
log ""

var kafka_host = os.Getenv("KAFKA_HOSTS")
var kafkaBrokers = os.Getenv("KAFKA_PEERS")
var kafkaTopic = os.Getenv("KAFKA_TOPIC")

var globalProducer sarama.SyncProducer

func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRandomPartitioner

consumer, err := sarama.NewConsumer(strings.Split(kafkaBrokers, ","), config)
if err != nil {
fmt.Printf("Failed to open Kafka consumer: %s", err)

func consume() {
partitionList, err := consumer.Partitions(kafkaTopic)

if err != nil {
log.Fatalf("%s: %s", "Failed to connect to Kafka", err)
fmt.Printf("Failed to get the list of partitions: %s", err)

forever := make(chan bool)
var bufferSize = 256
var (
messages = make(chan *sarama.ConsumerMessage, bufferSize)
closing = make(chan struct{})
wg sync.WaitGroup

go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
fmt.Println("Initiating shutdown of consumer...")

for _, partition := range partitionList {
pc, err := consumer.ConsumePartition(kafkaTopic, partition, sarama.OffsetOldest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)

go func(pc sarama.PartitionConsumer) {

go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for message := range pc.Messages() {
messages <- message

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)

for msg := range messages {
fmt.Printf("Partition:\t%d\n", msg.Partition)
fmt.Printf("Offset:\t%d\n", msg.Offset)
fmt.Printf("Key:\t%s\n", string(msg.Key))
fmt.Printf("Value:\t%s\n", string(msg.Value))


fmt.Println("Done consuming topic", kafkaTopic)

if err := consumer.Close(); err != nil {
fmt.Printf("Failed to close consumer: %s", err)

FROM golang:1.14-alpine as build
FROM golang:1.16-alpine as dev-env

RUN apk add --no-cache git
RUN apk add --no-cache git gcc musl-dev


COPY consumer.go /src
FROM dev-env as build-env
COPY go.mod /go.sum /app/
RUN go mod download

RUN go build consumer.go
COPY . /app/

FROM alpine as runtime
RUN CGO_ENABLED=0 go build -o /consumer

COPY --from=build /src/consumer /app/consumer
FROM alpine:3.10 as runtime

CMD [ "/app/consumer" ]
COPY --from=build-env /consumer /usr/local/bin/consumer
RUN chmod +x /usr/local/bin/consumer

ENTRYPOINT ["consumer"]

