A Docker Compose setup with some examples for easily trying out locally various Kafka related services, in order to get hands-on feeling on the tools & APIs.
NOTE that the setup & configuration is not suitable for actual usage.
If you're interested in event-driven architectures & Kafka, I recommend reading Designing Event-Driven Systems (Ben Stepford) (ebook freely available)
docker-compose up -d
Reference: https://docs.confluent.io/current/installation/cli-reference.html
Some examples of using Kafka command-line tools
# Create a topic
docker-compose exec kafka kafka-topics --create --topic my-topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
# List topics
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181
# Describe a topic
docker-compose exec kafka kafka-topics --describe --topic my-topic --zookeeper zookeeper:2181
# Produce to a topic
docker-compose exec kafka bash -c "echo 'test message' | kafka-console-producer --request-required-acks 1 --broker-list kafka:9092 --topic my-topic"
# Consume a topic
docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:9092 --topic my-topic --from-beginning --max-messages 1
Available at http://localhost:9021
Full API at https://docs.confluent.io/current/kafka-rest/api.html
curl -XPOST \
-H "Content-Type: application/vnd.kafka.json.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"records":[{"value":{"field1":"value1"}},{"value":{"field1":"value2"}},{"value":{"field1":"value3"}}]}' \
"http://localhost:8082/topics/my-topic-for-rest" | jq "."
# Get topics
curl "http://localhost:8082/topics" | jq "."
# Get info on a topic "my-topic-for-rest":
curl "http://localhost:8082/topics/my-topic-for-rest" | jq "."
REST Proxy works with consumer instances tied to a REST Proxy instance
# Create a consumer instance "my-consumer-instance" under consumer group "my-consumer-group"
curl -XPOST \
-H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my-consumer-instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my-consumer-group | jq "."
# Subscribe to topic "my-topic-for-rest" with "my-consumer-instance"
curl -XPOST \
-H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"topics":["my-topic-for-rest"]}' \
http://localhost:8082/consumers/my-consumer-group/instances/my-consumer-instance/subscription
# Consume messages with the consumer (after the subscription done above)
curl -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my-consumer-group/instances/my-consumer-instance/records | jq "."
Start KSQLDB CLI
docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
Read metadata:
ksql> SHOW STREAMS;
ksql> SHOW TOPICS;
Create a stream (backed by a new Kafka topic myevents
)
ksql> CREATE STREAM myEvents (userId VARCHAR, type INTEGER)
WITH (kafka_topic='myevents', value_format='json', partitions=1);
Run a continuous query in CLI
ksql> SELECT * FROM myEvents
EMIT CHANGES;
Write messages to a stream:
ksql> INSERT INTO myEvents (userId, type) VALUES ('user1', 1);
ksql> INSERT INTO myEvents (userId, type) VALUES ('user1', 4);
ksql> INSERT INTO myEvents (userId, type) VALUES ('user2', 2);
ksql> INSERT INTO myEvents (userId, type) VALUES ('user2', 5);
First, configure ksqldb to consume topic from the beginning:
ksql> SET 'auto.offset.reset' = 'earliest';
Create a derived stream (stateless filtering, creates also a Kafka topic):
ksql> CREATE STREAM myOddTypeEvents AS
SELECT userId, type
FROM myEvents
WHERE type % 2 = 1
EMIT CHANGES;
Create a stateful stream
ksql> CREATE TABLE eventCountByType AS
SELECT type, COUNT(*) AS count
FROM myEvents
GROUP BY type
EMIT CHANGES;
- Examples: https://ksqldb.io/examples.html
- Developer guide: https://docs.ksqldb.io/en/latest/developer-guide/
kafka-streams-example directory contains a simple Kafka Streams example doing stateless mapping with filtering from a topic to another.
Create topics:
docker-compose exec kafka kafka-topics --create --topic string-topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
docker-compose exec kafka kafka-topics --create --topic string-length-topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
Start Kafka Streams example:
docker-compose run kafka-streams-runner gradle --no-daemon -Pmain=examples.KafkaStreamsExample execute
Write a couple of messages to input topic & verify output topic:
docker-compose exec kafka bash -c "echo 'test message' | kafka-console-producer --request-required-acks 1 --broker-list kafka:9092 --topic string-topic"
docker-compose exec kafka bash -c "echo 'te' | kafka-console-producer --request-required-acks 1 --broker-list kafka:9092 --topic string-topic"
docker-compose exec kafka bash -c "echo 'somewhat longer test message' | kafka-console-producer --request-required-acks 1 --broker-list kafka:9092 --topic string-topic"
docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:9092 --topic string-length-topic --from-beginning --max-messages 10
- Kafka Connect
- Some example setup