This project was implemented as part of the IT Infrastructure Design class at the ECE School of NTUA. It explores the synchronous and asynchronous communication capabilites of Apache Kafka in a microservices platform.
- Select your server (i.e. Debian, Fedora, Ubuntu) from here.
- Follow the instructions on sections
SET UP THE REPOSITORY
andINSTALL DOCKER ENGINE
.
git clone https://github.com/AthinaKyriakou/kafka-telsol.git
cd kafka-telsol
Open a terminal into the kafka-telsol directory.
- Run Docker Compose on a terminal. The first time it runs for long...
docker-compose up -d
- Make sure that everything is up and running
docker-compose ps
Container name | Command | State | Ports |
---|---|---|---|
broker | /etc/confluent/docker/run | Up | 0.0.0.0:9092->9092/tcp |
kafka-connect | bash -c cd /usr/share/conf ... | Up (healthy) | 0.0.0.0:8083->8083/tcp, 9092/tcp |
kafkacat | /bin/sh -c apk add jq; | Up | |
wh ... | |||
ksqldb | /usr/bin/docker/run | Up | 0.0.0.0:8088->8088/tcp |
mysql | docker-entrypoint.sh mysqld | Up | 0.0.0.0:3306->3306/tcp, 33060/tcp |
schema-registry | /etc/confluent/docker/run | Up | 0.0.0.0:8081->8081/tcp |
zookeeper | /etc/confluent/docker/run | Up | 2181/tcp, 2888/tcp, 3888/tcp |
docker-compose logs kafka-connect|grep kafka-connect-jdbc|more
Copy path: INFO Loading plugin from:
docker exec -it kafka-connect bash
cd <path>
ls
Within this folder there needs to be the mysql-connector-java-8.0.23.jar of the JDBC driver.
Adapted from here.
Get in the producerApp folder:
cd producerApp
Compile the producerApp:
mvn clean compile package
Run the producerApp:
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.KafkaProducerApplication
In new terminal start ksqldb:
docker exec -it ksqldb ksql http://ksqldb:8088
Create the test01 topic using Apache Avro to manage the schema (alternative JSON)
CREATE STREAM TEST01 (COL1 INT, COL2 VARCHAR)
WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
Insert dummy data to test01 topic
INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('X',1,'FOO');
INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Y',2,'BAR');
Show topics and print the data
SHOW TOPICS;
PRINT test01 FROM BEGINNING;
Get in the consumerApp folder:
cd consumerApp
Compile the consumerApp:
mvn clean compile package
Run the producerApp:
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.KafkaConsumerApplication
a. In a new terminal open MySQL as root, create a demo database and grant privileges to athina user (which is the MYSQL_USER=athina in the docker-compose.yml file)
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD'
create database demo;
grant all on demo.* to 'athina'@'%';
To check that privileges were granted you can start MySQL as athina user
docker exec -it mysql bash -c 'mysql -u$MYSQL_USER -p$MYSQL_PASSWORD'
and run
use demo;
show grants;
In a new terminal:
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
-H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"topics": "insertion_db", "insertion_db_points",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connection.user": "athina",
"connection.password": "athina",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "insert",
"pk.mode": "record_key",
"pk.fields": "MESSAGE_KEY"
}'
Here we insert data to the demo db from the test01 topic. Each topic will create its own table in the db.
show connectors;
In a new terminal open MySQL:
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD'
Use the created db, see the created table from the test01 topic the inserted data.
use demo;
select * from test01;
Keep publishing data to the test01 topic from the ksqldb. The data will appear in the test01 table of the demo database.
Get in the userDataValidatorApp folder:
cd userDataValidatorApp
When you run the userDataValidatorApp for the first time:
gradle wrapper
Compile the userDataValidatorApp:
./gradlew shadowJar
Run the userDataValidatorApp:
java -jar build/libs/kafka-user-data-validator-application-standalone-0.0.1.jar
docker logs -f kafka-connect
To keep track of what is happening, I usually have the following terminals open:
- one open in the kafka-telsol folder
- MySQL:
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD'
- ksqlDB:
docker exec -it ksqldb ksql http://ksqldb:8088
- logs:
docker logs -f kafka-connect