Skip to content

Commit 17c43bd

Browse files
committed
outbox table png
1 parent 01a1a0a commit 17c43bd

File tree

5 files changed

+41
-6
lines changed

5 files changed

+41
-6
lines changed

README.adoc

+30
Original file line numberDiff line numberDiff line change
@@ -1849,6 +1849,7 @@ The outbox table contains different operations for the same aggregate (_Consumer
18491849

18501850
- operation: CREATE --> topic: _loan_
18511851
- operation: INSTALLMENT_PAYMENT --> topic: _loan_payment_
1852+
- operation: EARLY_LOAN_CLOSURE --> topic: _loan_
18521853

18531854
Records from the outbox table are fetched using a jdbc source connector.
18541855

@@ -1861,11 +1862,15 @@ scripts/bootstrap-connect-event-router.sh
18611862

18621863
Outbox table:
18631864

1865+
image::images/outbox_table.png[Outbox table]
1866+
18641867
[source,bash]
18651868
----
18661869
insert into outbox_table (id, aggregate, operation, payload, event_time) values (1, 'Consumer Loan', 'CREATE', '{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}','2023-11-20 10:00:00');
18671870
18681871
insert into outbox_table (id, aggregate, operation, payload, event_time) values (2, 'Consumer Loan', 'INSTALLMENT_PAYMENT', '{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}','2023-12-01 09:30:00');
1872+
1873+
insert into outbox_table (id, aggregate, operation, payload, event_time) values (3, 'Consumer Loan', 'EARLY_LOAN_CLOSURE', '{\"event\":{\"type\":\"Early Loan Closure\",\"timestamp\":\"2023-11-25T14:15:00\",\"data\":{\"mortgageId\":\"ABC12\",\"closureAmount\":150000,\"closureDate\":\"2023-11-25\",\"paymentMethod\":\"Bank Transfer\",\"transactionNumber\":\"PQR456\"}}}','2023-11-25 09:30:00');
18691874
----
18701875

18711876
A jdbc source connector will be created with this config:
@@ -1918,6 +1923,31 @@ loan
19181923
loan_PAYMENT
19191924
----
19201925

1926+
[source,bash]
1927+
----
1928+
docker exec -it broker /bin/bash
1929+
[appuser@broker ~]$ cd /tmp/kraft-combined-logs/loan-0/
1930+
[appuser@broker loan-0]$ cat 00000000000000000000.log
1931+
�����Wz���Wz�����������������Consumer Loan
1932+
CREATE�{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}&2023-11-20 10:00:00k'�z<��Wz���Wz�����������������Consumer Loan$EARLY_LOAN_CLOSURE�{\"event\":{\"type\":\"Early Loan Closure\",\"timestamp\":\"2023-11-25T14:15:00\",\"data\":{\"mortgageId\":\"ABC12\",\"closureAmount\":150000,\"closureDate\":\"2023-11-25\",\"paymentMethod\":\"Bank Transfer\",\"transactionNumber\":\"PQR456\"}}}&2023-11-25 09:30:00
1933+
----
1934+
1935+
[source,bash]
1936+
----
1937+
docker exec -it broker /bin/bash
1938+
[appuser@broker ~]$ cd /tmp/kraft-combined-logs/loan-0/
1939+
[appuser@broker loan-0]$ cat 00000000000000000000.log
1940+
�����Wz���Wz�����������������Consumer Loan
1941+
CREATE�{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}&2023-11-20 10:00:00k'�z<��Wz���Wz�����������������Consumer Loan$EARLY_LOAN_CLOSURE�{\"event\":{\"type\":\"Early Loan Closure\",\"timestamp\":\"2023-11-25T14:15:00\",\"data\":{\"mortgageId\":\"ABC12\",\"closureAmount\":150000,\"closureDate\":\"2023-11-25\",\"paymentMethod\":\"Bank Transfer\",\"transactionNumber\":\"PQR456\"}}}&2023-11-25 09:30:00
1942+
----
1943+
1944+
----
1945+
docker exec -it broker /bin/bash
1946+
[appuser@broker ~]$ cd /tmp/kraft-combined-logs/loan_PAYMENT-0/
1947+
[appuser@broker loan-0]$ cat 00000000000000000000.log
1948+
,�A��Wz���Wz�����������������Consumer Loan&INSTALLMENT_PAYMENT�{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}&2023-12-01 09:30:00
1949+
----
1950+
19211951
Teardown:
19221952

19231953
[source,bash]

images/outbox_table.png

24.2 KB
Loading

kafka-connect-source-event-router/config/connector_jdbc_source.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"table.whitelist" : "public.outbox_table",
1111
"mode":"bulk",
1212
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
13-
"transforms":"valueToTopic,addPrefix,removeString1,removeString2",
13+
"transforms":"valueToTopic,addPrefix,removeString1,removeString2,removeString3",
1414
"transforms.valueToTopic.type":"io.confluent.connect.transforms.ExtractTopic$Value",
1515
"transforms.valueToTopic.field":"operation",
1616
"transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
@@ -22,6 +22,9 @@
2222
"transforms.removeString2.type": "org.apache.kafka.connect.transforms.RegexRouter",
2323
"transforms.removeString2.regex": "(.*)INSTALLMENT(.*)",
2424
"transforms.removeString2.replacement": "$1$2",
25+
"transforms.removeString3.type": "org.apache.kafka.connect.transforms.RegexRouter",
26+
"transforms.removeString3.regex": "(.*)EARLY_LOAN_CLOSURE(.*)",
27+
"transforms.removeString3.replacement": "$1$2",
2528
"topic.creation.default.replication.factor": 1,
2629
"topic.creation.default.partitions": 1
2730
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
CREATE TABLE outbox_table (
22
id serial PRIMARY KEY,
3-
aggregate VARCHAR ( 255 ),
4-
operation VARCHAR ( 255 ),
5-
payload VARCHAR,
6-
event_time VARCHAR ( 255 )
3+
aggregate VARCHAR ( 255 ) NOT NULL,
4+
operation VARCHAR ( 255 ) NOT NULL,
5+
payload VARCHAR NOT NULL,
6+
event_time VARCHAR ( 255 ) NOT NULL
77
);
88

99
insert into outbox_table (id, aggregate, operation, payload, event_time) values (1, 'Consumer Loan', 'CREATE', '{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}','2023-11-20 10:00:00');
10-
insert into outbox_table (id, aggregate, operation, payload, event_time) values (2, 'Consumer Loan', 'INSTALLMENT_PAYMENT', '{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}','2023-12-01 09:30:00');
10+
insert into outbox_table (id, aggregate, operation, payload, event_time) values (2, 'Consumer Loan', 'INSTALLMENT_PAYMENT', '{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}','2023-12-01 09:30:00');
11+
insert into outbox_table (id, aggregate, operation, payload, event_time) values (3, 'Consumer Loan', 'EARLY_LOAN_CLOSURE', '{\"event\":{\"type\":\"Early Loan Closure\",\"timestamp\":\"2023-11-25T14:15:00\",\"data\":{\"mortgageId\":\"ABC12\",\"closureAmount\":150000,\"closureDate\":\"2023-11-25\",\"paymentMethod\":\"Bank Transfer\",\"transactionNumber\":\"PQR456\"}}}','2023-11-25 09:30:00');

scripts/tear-down-connect-event-router.sh

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22

33
echo "Stopping docker containers..."
44
docker-compose -f kafka-connect-source-event-router/docker-compose.yml --env-file .env down --volumes
5+
rm -Rf kafka-connect-source-event-router/postgres-data

0 commit comments

Comments
 (0)