Skip to content

Commit da38f98

Browse files
committed
flink sql demo: tumbling window
1 parent 1d1aac3 commit da38f98

File tree

10 files changed

+274
-1
lines changed

10 files changed

+274
-1
lines changed

.env

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ KEYCLOAK_VERSION=legacy
55
JAEGER_VERSION=latest
66
OTEL_VERSION=latest
77
APICURIO_VERSION=2.4.2.Final
8-
KCAT_VERSION=latest
8+
KCAT_VERSION=latest
9+
FLINK_VERSION=1.18.1-java17

README.adoc

+64
Original file line numberDiff line numberDiff line change
@@ -2705,6 +2705,70 @@ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * fr
27052705
27062706
----
27072707

2708+
<<<
2709+
2710+
== Flink SQL
2711+
2712+
=== Windowing
2713+
2714+
==== Tumbling Window example: heart rate monitoring
2715+
2716+
Implementation of a tumbling window (1 minute) to monitor heart rate. Values over a threshold of 120 beats per minute are reported.
2717+
2718+
SQL statements are contained in file: _flink-window-tumbling-heartbeat/sql/app/heartbeats.sql_
2719+
2720+
Bootstrap:
2721+
2722+
[source,bash]
2723+
----
2724+
scripts/bootstrap-flink.sh
2725+
----
2726+
2727+
2728+
Create topic _heartbeat_:
2729+
2730+
[source,bash]
2731+
----
2732+
docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic heartbeat --replication-factor 1 --partitions 1
2733+
----
2734+
2735+
Execute Flink Job:
2736+
2737+
[source,bash]
2738+
----
2739+
#Connect to sql-client container
2740+
docker exec -it sql-client bash
2741+
#launch sql statements
2742+
root@44c67639b002~$ sql-client.sh -f app/heartbeats.sql
2743+
----
2744+
2745+
Check if the job _insert-into_default_catalog.default_database.heartbeat_60sec_ is running in Flink Web Console at: http://localhost:18081/#/job/running
2746+
2747+
Validate results, consuming from output topic _heartbeat_60sec_:
2748+
2749+
[source,bash]
2750+
----
2751+
docker exec -e SCHEMA_REGISTRY_LOG4J_OPTS=" " -it schema-registry /usr/bin/kafka-avro-console-consumer \
2752+
--topic heartbeat_60sec \
2753+
--from-beginning \
2754+
--bootstrap-server broker:9092
2755+
----
2756+
2757+
[source,bash]
2758+
----
2759+
{"window_start":{"string":"2023-02-18 15:10:00"},"window_end":{"string":"2023-02-18 15:11:00"},"heartbeats_over_120":{"long":3}}
2760+
2761+
{"window_start":{"string":"2023-02-18 15:15:00"},"window_end":{"string":"2023-02-18 15:16:00"},"heartbeats_over_120":{"long":10}}
2762+
----
2763+
2764+
Teardown:
2765+
2766+
[source,bash]
2767+
----
2768+
scripts/tear-down-flink.sh
2769+
----
2770+
2771+
27082772
<<<
27092773

27102774
== Transactions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
---
2+
version: '2'
3+
4+
services:
5+
broker:
6+
image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
7+
hostname: broker
8+
container_name: broker
9+
ports:
10+
- "9092:9092"
11+
environment:
12+
KAFKA_NODE_ID: 1
13+
KAFKA_PROCESS_ROLES: 'broker,controller'
14+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
15+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT'
16+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092'
17+
KAFKA_LISTENERS: 'PLAINTEXT://broker:9092,CONTROLLER://broker:29093'
18+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
19+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
20+
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
21+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
22+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
23+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
24+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
25+
KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
26+
CLUSTER_ID: 'QTnB2tAgTWa1ec5wYon2jg'
27+
28+
schema-registry:
29+
image: confluentinc/cp-schema-registry:${CONFLUENT_VERSION}
30+
hostname: schema-registry
31+
container_name: schema-registry
32+
depends_on:
33+
- broker
34+
ports:
35+
- "8081:8081"
36+
environment:
37+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
38+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
39+
40+
flink-jobmanager:
41+
container_name: jobmanager
42+
image: flink:${FLINK_VERSION}
43+
ports:
44+
- "18081:18081"
45+
command: jobmanager
46+
environment:
47+
- |
48+
FLINK_PROPERTIES=
49+
jobmanager.rpc.address: jobmanager
50+
rest.port: 18081
51+
state.backend: rocksdb
52+
state.backend.incremental: true
53+
54+
flink-taskmanager:
55+
container_name: taskmanager
56+
image: flink:${FLINK_VERSION}
57+
depends_on:
58+
- flink-jobmanager
59+
command: taskmanager
60+
environment:
61+
- |
62+
FLINK_PROPERTIES=
63+
jobmanager.rpc.address: jobmanager
64+
rest.port: 18081
65+
taskmanager.numberOfTaskSlots: 2
66+
state.backend: rocksdb
67+
state.backend.incremental: true
68+
69+
flink-sql-client:
70+
container_name: sql-client
71+
depends_on:
72+
- flink-jobmanager
73+
- flink-taskmanager
74+
build:
75+
context: .
76+
dockerfile: sql/Dockerfile
77+
environment:
78+
FLINK_JOBMANAGER_HOST: jobmanager
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
###############################################################################
2+
# inspired by
3+
# - https://github.com/wuchong/flink-sql-demo/tree/v1.11-EN/sql-client
4+
# - https://github.com/theodorecurtil/flink_sql_job
5+
###############################################################################
6+
7+
FROM flink:1.18.1-java17
8+
9+
COPY sql/bin/* /opt/sql-client/
10+
RUN mkdir -p /opt/sql-client/lib
11+
12+
RUN wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar; \
13+
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.18.1/flink-json-1.18.1.jar; \
14+
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.18.1/flink-sql-avro-confluent-registry-1.18.1.jar;
15+
16+
COPY sql/conf/* /opt/flink/conf/
17+
COPY sql/app/* /opt/sql-client/app/
18+
19+
WORKDIR /opt/sql-client
20+
ENV SQL_CLIENT_HOME /opt/sql-client
21+
22+
COPY sql/docker-entrypoint.sh /
23+
RUN ["chmod", "+x", "/docker-entrypoint.sh"]
24+
ENTRYPOINT ["/docker-entrypoint.sh"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
-- adding connectors
2+
ADD JAR '/opt/sql-client/lib/flink-sql-connector-kafka-3.1.0-1.18.jar';
3+
ADD JAR '/opt/sql-client/lib/flink-sql-avro-confluent-registry-1.18.1.jar';
4+
ADD JAR '/opt/sql-client/lib/flink-json-1.18.1.jar';
5+
6+
DROP TABLE IF EXISTS heartbeat_60sec;
7+
DROP TABLE IF EXISTS heartbeat;
8+
9+
CREATE TABLE heartbeat (
10+
person_id STRING,
11+
heartbeat_value INT,
12+
beat_time TIMESTAMP(3),
13+
WATERMARK FOR beat_time AS beat_time
14+
) WITH (
15+
'connector' = 'kafka',
16+
'topic' = 'heartbeat',
17+
'properties.bootstrap.servers' = 'broker:9092',
18+
'properties.group.id' = 'flink-heartbeats',
19+
'scan.startup.mode' = 'earliest-offset',
20+
'key.format' = 'raw',
21+
'key.fields' = 'person_id',
22+
'value.format' = 'avro-confluent',
23+
'value.avro-confluent.url' = 'http://schema-registry:8081',
24+
'value.fields-include' = 'EXCEPT_KEY'
25+
);
26+
27+
CREATE TABLE heartbeat_60sec (
28+
person_id STRING,
29+
window_start STRING,
30+
window_end STRING,
31+
heartbeats_over_120 BIGINT
32+
) WITH (
33+
'connector' = 'kafka',
34+
'topic' = 'heartbeat_60sec',
35+
'properties.bootstrap.servers' = 'broker:9092',
36+
'properties.group.id' = 'flink-heartbeats60sec',
37+
'scan.startup.mode' = 'earliest-offset',
38+
'key.format' = 'raw',
39+
'key.fields' = 'person_id',
40+
'value.format' = 'avro-confluent',
41+
'value.avro-confluent.url' = 'http://schema-registry:8081',
42+
'value.fields-include' = 'EXCEPT_KEY'
43+
);
44+
45+
INSERT INTO heartbeat_60sec
46+
SELECT
47+
person_id,
48+
DATE_FORMAT(window_start, 'yyyy-MM-dd HH:mm:ss') AS window_start,
49+
DATE_FORMAT(window_end, 'yyyy-MM-dd HH:mm:ss') AS window_end,
50+
COUNT(*) AS heartbeats_over_120
51+
FROM TABLE(TUMBLE(TABLE heartbeat, DESCRIPTOR(beat_time), INTERVAL '1' MINUTES))
52+
WHERE
53+
heartbeat_value > 120
54+
GROUP BY
55+
person_id, window_start, window_end;
56+
57+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 98, TO_TIMESTAMP('2023-02-18 15:10:00'));
58+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 97, TO_TIMESTAMP('2023-02-18 15:10:05'));
59+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 103, TO_TIMESTAMP('2023-02-18 15:10:10'));
60+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 102, TO_TIMESTAMP('2023-02-18 15:10:15'));
61+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 110, TO_TIMESTAMP('2023-02-18 15:10:20'));
62+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 122, TO_TIMESTAMP('2023-02-18 15:10:25'));
63+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 125, TO_TIMESTAMP('2023-02-18 15:10:30'));
64+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 121, TO_TIMESTAMP('2023-02-18 15:10:35'));
65+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 118, TO_TIMESTAMP('2023-02-18 15:10:40'));
66+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 98, TO_TIMESTAMP('2023-02-18 15:10:45'));
67+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 98, TO_TIMESTAMP('2023-02-18 15:10:50'));
68+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 93, TO_TIMESTAMP('2023-02-18 15:10:55'));
69+
70+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 111, TO_TIMESTAMP('2023-02-18 15:15:00'));
71+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 117, TO_TIMESTAMP('2023-02-18 15:15:05'));
72+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 125, TO_TIMESTAMP('2023-02-18 15:15:10'));
73+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 127, TO_TIMESTAMP('2023-02-18 15:15:15'));
74+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 128, TO_TIMESTAMP('2023-02-18 15:15:20'));
75+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 130, TO_TIMESTAMP('2023-02-18 15:15:25'));
76+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 131, TO_TIMESTAMP('2023-02-18 15:15:30'));
77+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 125, TO_TIMESTAMP('2023-02-18 15:15:35'));
78+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 127, TO_TIMESTAMP('2023-02-18 15:15:40'));
79+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 128, TO_TIMESTAMP('2023-02-18 15:15:45'));
80+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 129, TO_TIMESTAMP('2023-02-18 15:15:50'));
81+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 128, TO_TIMESTAMP('2023-02-18 15:15:55'));
82+
83+
INSERT INTO heartbeat (person_id, heartbeat_value, beat_time) VALUES ('MGG1', 100, TO_TIMESTAMP('2023-02-18 15:30:00'));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
3+
${FLINK_HOME}/bin/sql-client.sh embedded -d ${FLINK_HOME}/conf/sql-client-conf.yaml -l ${SQL_CLIENT_HOME}/lib
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
jobmanager.rpc.address: jobmanager
2+
rest.port: 18081
3+
state.backend: rocksdb
4+
state.backend.incremental: true
5+
state.checkpoint-storage: filesystem
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/bin/bash
2+
3+
${FLINK_HOME}/bin/sql-client.sh
4+
5+
tail -f /dev/null

scripts/bootstrap-flink.sh

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
3+
sh scripts/tear-down-flink.sh
4+
5+
echo "Starting docker containers..."
6+
docker-compose -f flink-window-tumbling-heartbeat/docker-compose.yml --env-file .env up -d

scripts/tear-down-flink.sh

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/bash
2+
3+
echo "Stopping docker containers..."
4+
docker-compose -f flink-window-tumbling-heartbeat/docker-compose.yml --env-file .env down --volumes

0 commit comments

Comments
 (0)