-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path3-demo.txt
38 lines (23 loc) · 944 Bytes
/
3-demo.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
docker-compose exec ksql-cli ksql http://ksql-server:8088
SET 'auto.offset.reset'='earliest';
CREATE STREAM stream_actors_raw WITH (KAFKA_TOPIC='mongo.got.got.actors', VALUE_FORMAT='AVRO');
DESCRIBE stream_actors_raw;
SELECT ROWKEY, AFTER, PATCH FROM stream_actors_raw EMIT CHANGES
CREATE STREAM actors AS \
SELECT
CONCAT(IFNULL(after, ''), IFNULL(patch,'')) AS document, \
op, \
source, \
ts_ms
FROM stream_actors_raw ;
SELECT
EXTRACTJSONFIELD(REPLACE(EXTRACTJSONFIELD(document,'$._id'), '$oid', 'oid'), '$.oid') AS id,
EXTRACTJSONFIELD(document, '$.name') AS name,
op
FROM actors WHERE document IS NOT NULL EMIT CHANGES;
CREATE STREAM actors_clean AS
SELECT
EXTRACTJSONFIELD(REPLACE(EXTRACTJSONFIELD(document,'$._id'), '$oid', 'oid'), '$.oid') AS id,
EXTRACTJSONFIELD(document, '$.name') AS name,
op
FROM actors WHERE document IS NOT NULL EMIT CHANGES;