Skip to content

Commit 3ce5e46

Browse files
committed
added example http sink connector
1 parent 2d0ffc3 commit 3ce5e46

File tree

12 files changed

+311
-4
lines changed

12 files changed

+311
-4
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,5 @@ README.pdf
4141
README.html
4242

4343
release/
44+
45+
.DS_Store

README.adoc

+56-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
= Practical examples with Apache Kafka®
22
:author: Giovanni Marigi
33
4-
:revdate: October 10, 2023
4+
:revdate: November 17, 2023
55
:revnumber: 1.2.1
66
:version-label!:
77
:toc: left
@@ -1786,6 +1786,61 @@ Teardown:
17861786
scripts/tear-down-connect-dlq.sh
17871787
----
17881788

1789+
=== HTTP Sink Connector example
1790+
1791+
Example of usage of HTTP Sink Connector.
1792+
1793+
Run the example:
1794+
1795+
[source,bash]
1796+
----
1797+
scripts/bootstrap-connect-sink-http.sh
1798+
----
1799+
1800+
A web application listening on port 8010 will start up.
1801+
1802+
A HTTP sink connector will be created with this config:
1803+
1804+
[source,bash]
1805+
----
1806+
{
1807+
"name": "SimpleHttpSink",
1808+
"config":
1809+
{
1810+
"topics": "topicA",
1811+
"tasks.max": "2",
1812+
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
1813+
"http.api.url": "http://host.docker.internal:8010/api/message",
1814+
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
1815+
"confluent.topic.bootstrap.servers": "broker:9092",
1816+
"confluent.topic.replication.factor": "1",
1817+
"reporter.bootstrap.servers": "broker:9092",
1818+
"reporter.result.topic.name": "success-responses",
1819+
"reporter.result.topic.replication.factor": "1",
1820+
"reporter.error.topic.name": "error-responses",
1821+
"reporter.error.topic.replication.factor": "1",
1822+
"consumer.override.max.poll.interval.ms": "5000"
1823+
}
1824+
}
1825+
----
1826+
1827+
Send json messages to _topicA_ topic:
1828+
1829+
[source,bash]
1830+
----
1831+
docker exec -it broker kafka-console-producer --broker-list broker:9092 --topic topicA --property "parse.key=true" --property "key.separator=:"
1832+
> 1:{"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}
1833+
----
1834+
1835+
Sink connector will execute a HTTP POST Request to the endpoint _http://localhost:8010//api/message_
1836+
1837+
Teardown:
1838+
1839+
[source,bash]
1840+
----
1841+
scripts/tear-down-connect-sink-http.sh
1842+
----
1843+
17891844
=== CDC with Debezium PostgreSQL Source Connector
17901845

17911846
Usage of Debezium Source connector for PostgreSQL to send RDMS table updates into a topic.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"name": "SimpleHttpSink",
3+
"config":
4+
{
5+
"topics": "topicA",
6+
"tasks.max": "2",
7+
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
8+
"http.api.url": "http://host.docker.internal:8010/api/message",
9+
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
10+
"confluent.topic.bootstrap.servers": "broker:9092",
11+
"confluent.topic.replication.factor": "1",
12+
"reporter.bootstrap.servers": "broker:9092",
13+
"reporter.result.topic.name": "success-responses",
14+
"reporter.result.topic.replication.factor": "1",
15+
"reporter.error.topic.name": "error-responses",
16+
"reporter.error.topic.replication.factor": "1",
17+
"consumer.override.max.poll.interval.ms": "5000"
18+
}
19+
}
+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
---
2+
version: '2'
3+
4+
services:
5+
broker:
6+
image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
7+
hostname: broker
8+
container_name: broker
9+
ports:
10+
- "9092:9092"
11+
environment:
12+
KAFKA_NODE_ID: 1
13+
KAFKA_PROCESS_ROLES: 'broker,controller'
14+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
15+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT'
16+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092'
17+
KAFKA_LISTENERS: 'PLAINTEXT://broker:9092,CONTROLLER://broker:29093'
18+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
19+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
20+
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
21+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
22+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
23+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
24+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
25+
KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
26+
CLUSTER_ID: 'QTnB2tAgTWa1ec5wYon2jg'
27+
28+
schema-registry:
29+
image: confluentinc/cp-schema-registry:${CONFLUENT_VERSION}
30+
hostname: schema-registry
31+
container_name: schema-registry
32+
depends_on:
33+
- broker
34+
ports:
35+
- "8081:8081"
36+
environment:
37+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
38+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
39+
40+
connect:
41+
image: confluentinc/cp-kafka-connect-base:${CONFLUENT_VERSION}
42+
hostname: connect
43+
container_name: connect
44+
extra_hosts:
45+
- "host.docker.internal:host-gateway"
46+
depends_on:
47+
- schema-registry
48+
ports:
49+
- "8083:8083"
50+
environment:
51+
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
52+
CONNECT_REST_ADVERTISED_HOST_NAME: connect
53+
CONNECT_REST_PORT: 8083
54+
CONNECT_GROUP_ID: compose-connect-group
55+
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
56+
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
57+
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
58+
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
59+
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
60+
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
61+
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
62+
CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: All
63+
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
64+
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
65+
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
66+
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
67+
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
68+
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
69+
command:
70+
- bash
71+
- -c
72+
- |
73+
echo "Installing Connector"
74+
confluent-hub install --no-prompt confluentinc/kafka-connect-http:latest
75+
#
76+
echo "Launching Kafka Connect worker"
77+
/etc/confluent/docker/run &
78+
#
79+
sleep infinity
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0"
2+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<parent>
7+
<groupId>org.springframework.boot</groupId>
8+
<artifactId>spring-boot-starter-parent</artifactId>
9+
<version>2.7.17</version>
10+
</parent>
11+
12+
<artifactId>rest-controller-demo</artifactId>
13+
14+
<properties>
15+
<maven.compiler.source>17</maven.compiler.source>
16+
<maven.compiler.target>17</maven.compiler.target>
17+
</properties>
18+
19+
<dependencies>
20+
21+
<dependency>
22+
<groupId>org.springframework.boot</groupId>
23+
<artifactId>spring-boot-starter</artifactId>
24+
</dependency>
25+
26+
<dependency>
27+
<groupId>org.springframework.boot</groupId>
28+
<artifactId>spring-boot-starter-web</artifactId>
29+
</dependency>
30+
31+
<dependency>
32+
<groupId>org.springframework.boot</groupId>
33+
<artifactId>spring-boot-starter-actuator</artifactId>
34+
</dependency>
35+
36+
</dependencies>
37+
38+
<build>
39+
<plugins>
40+
<plugin>
41+
<artifactId>maven-compiler-plugin</artifactId>
42+
<configuration>
43+
<source>${maven.compiler.source}</source>
44+
<target>${maven.compiler.target}</target>
45+
</configuration>
46+
</plugin>
47+
<plugin>
48+
<groupId>org.springframework.boot</groupId>
49+
<artifactId>spring-boot-maven-plugin</artifactId>
50+
<configuration>
51+
<profiles>
52+
<profile>local</profile>
53+
</profiles>
54+
<classifier>exec</classifier>
55+
</configuration>
56+
<executions>
57+
<execution>
58+
<goals>
59+
<goal>repackage</goal>
60+
</goals>
61+
</execution>
62+
</executions>
63+
</plugin>
64+
</plugins>
65+
</build>
66+
67+
<profiles>
68+
<profile>
69+
<id>local</id>
70+
<activation>
71+
<activeByDefault>true</activeByDefault>
72+
</activation>
73+
</profile>
74+
</profiles>
75+
76+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.confluent.springboot.kafka.demo;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class Application {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(Application.class, args);
11+
}
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.confluent.springboot.kafka.demo.controller;
2+
3+
import org.springframework.http.HttpStatus;
4+
import org.springframework.http.ResponseEntity;
5+
import org.springframework.web.bind.annotation.*;
6+
7+
8+
@RestController
9+
public class Controller {
10+
11+
private int request = 0;
12+
13+
@PostMapping(value="/api/message")
14+
public ResponseEntity send(@RequestBody String message) {
15+
System.out.println("\n\nRequest:" + request);
16+
if(request < 2) {
17+
try {
18+
request++;
19+
System.out.println("Sleeping...");
20+
Thread.sleep(8000);
21+
22+
} catch (InterruptedException e) {
23+
throw new RuntimeException(e);
24+
}
25+
}
26+
System.out.println("Message:" + message);
27+
return new ResponseEntity<>(HttpStatus.OK);
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
server:
2+
port: 8010

kafka-producer/src/main/java/org/hifly/kafka/demo/producer/KafkaConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
public class KafkaConfig {
1515

1616
private static final String BROKER_LIST =
17-
System.getenv("kafka.broker.list") != null? System.getenv("kafka.broker.list") :"localhost:9092,localhost:9093,localhost:9094";
17+
System.getenv("kafka.broker.list") != null? System.getenv("kafka.broker.list") :"localhost:12091,localhost:9093,localhost:9094";
1818
private static final String CONFLUENT_SCHEMA_REGISTRY_URL =
1919
System.getenv("confluent.schema.registry") != null? System.getenv("confluent.schema.registry"):"http://localhost:8081";
2020
private static final String APICURIO_SCHEMA_REGISTRY_URL =

kafka-producer/src/main/java/org/hifly/kafka/demo/producer/tx/Runner.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,12 @@ public static void main (String [] args) throws Exception {
2929
}
3030

3131
public static void groupOfSynchMessages(String topic, StringTXProducer baseProducer) throws InterruptedException {
32-
RecordMetadata lastRecord = null;
32+
RecordMetadata lastRecord;
3333
for (int i = 0; i < 10; i++ ) {
3434
for(int k =0; k < 1000; k++ ) {
3535
lastRecord = baseProducer.produceSync(new ProducerRecord<>(topic, "GROUP-" + i + "-" + k));
3636
RecordMetadataUtil.prettyPrinter(lastRecord);
3737
}
38-
Thread.sleep(1000);
3938
}
4039
}
4140

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/bin/bash
2+
3+
sh scripts/tear-down.sh
4+
5+
echo "Starting docker containers..."
6+
docker-compose -f kafka-connect-sink-http/docker-compose.yml --env-file .env up -d
7+
8+
echo "Wait 20 seconds..."
9+
10+
sleep 20
11+
12+
echo "Create topicA..."
13+
docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic topicA --replication-factor 1 --partitions 1
14+
15+
echo "Installing http sink..."
16+
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @kafka-connect-sink-http/config/http_sink.json
17+
18+
echo "Wait 3 seconds..."
19+
20+
sleep 3
21+
22+
echo "connectors status..."
23+
curl -v http://localhost:8083/connectors?expand=status
24+
25+
sleep 5
26+
27+
echo "start http demo..."
28+
cd kafka-connect-sink-http/rest-controller
29+
mvn spring-boot:run
+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/bash
2+
3+
echo "Stopping docker containers..."
4+
docker-compose -f kafka-connect-sink-http/docker-compose.yml --env-file .env down --volumes

0 commit comments

Comments
 (0)