Skip to content

kayvansol/kafka-sink

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

26 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka Sink Connector for SQL Server

alt text

Docker file for the connector that installing kafka-connect-jdbc library Dockerfile:

FROM docker.arvancloud.ir/confluentinc/cp-kafka-connect:latest

RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.6 --no-prompt

the build command :

docker build -t docker.arvancloud.ir/confluentinc/cp-kafka-connect:latest .

alt text

cd kafka-connect

Docker Compose file docker-compose.yml :

---
version: '3'

services:

  zookeeper1:
    image: docker.arvancloud.ir/confluentinc/cp-zookeeper:latest
    hostname: zookeeper1
    container_name: zookeeper1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka-network


  kafka:
    image: docker.arvancloud.ir/confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper1
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper1:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper1:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
    networks:
      - kafka-network


  connect:
    image: docker.arvancloud.ir/confluentinc/cp-kafka-connect:latest
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper1
      - kafka
    ports:
      - '8083:8083'
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper1:2181'
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.4.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: 'io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor'
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: 'io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components'
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    networks:
      - kafka-network


  schema-registry:
    image: docker.arvancloud.ir/confluentinc/cp-schema-registry:latest
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper1
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      #SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper1:2181'
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092
    networks:
      - kafka-network
    restart: unless-stopped


  akhq:
    image: tchiotludo/akhq
    hostname: web-ui
    container_name: web-ui  
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: "kafka:29092"
              schema-registry:
                url: "http://schema-registry:8081"
              connect:
                - name: "connect"
                  url: "http://connect:8083"
              ksqldb:
                - name: "ksqldb"
                  url: "http://ksqldb-server2:8088"
                  
    ports:
      - 8080:8080
    links:
      - kafka
      - schema-registry
      -	ksqldb-server2
    networks:
      - kafka-network


  ksqldb-server2:
    image: docker.arvancloud.ir/confluentinc/cp-ksqldb-server:latest
    hostname: ksqldb-server2
    container_name: ksqldb-server2
    depends_on:
      - kafka
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksqldb"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksqldb/log4j.properties"
      KSQL_BOOTSTRAP_SERVERS: "kafka:29092"
      KSQL_HOST_NAME: ksqldb-server2
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_URL: "http://connect:8083"
    networks:
      - kafka-network


  ksqldb-cli2:
    image: docker.arvancloud.ir/confluentinc/cp-ksqldb-cli:latest
    container_name: ksqldb-cli2
    depends_on:
      - kafka
      - ksqldb-server2
    entrypoint: /bin/sh
    environment:
      KSQL_CONFIG_DIR: "/etc/ksqldb"
    tty: true
    # volumes:
    #   - codes/src:/opt/app/src
    #   - codes/test:/opt/app/test
    networks:
      - kafka-network
  

networks:
  kafka-network:
    driver: bridge
    name: kafka-network
docker compose up

Docker Desktop :

alt text

then check all the container's logs for being healthy.

Create a kafka topic named usertopic :

docker exec -it kafka bash

cd ..
cd ..

bin/kafka-topics --create --if-not-exists --topic usertopic --replication-factor=1 --partitions=3 --bootstrap-server kafka:9092

Create a kafka stream on top of the created topic with apache avro format :

docker exec -it ksqldb-cli2 bash

ksql http://ksqldb-server2:8088

alt text

ksql> CREATE STREAM s2 (name VARCHAR, favorite_number INTEGER,favorite_color VARCHAR) WITH (kafka_topic='usertopic', value_format='avro');

Create a .net6 project with C# (named AvroSpecific) :

In NuGet Package Manager type the below commands :

PM> NuGet\Install-Package Confluent.Kafka -Version 2.4.0

PM> NuGet\Install-Package Confluent.SchemaRegistry -Version 2.4.0

PM> NuGet\Install-Package Confluent.SchemaRegistry.Serdes.Avro -Version 2.4.0

The kafka producer C# program with avro serializer and Schema Registry Program.cs :

using AvroSpecific;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;

public void Main()
{
    string bootstrapServers = "localhost:29092";
    string schemaRegistryUrl = "localhost:8081";
    string topicName = "usertopic";

    var producerConfig = new ProducerConfig
    {
        BootstrapServers = bootstrapServers
    };

    var schemaRegistryConfig = new SchemaRegistryConfig
    {        
        Url = schemaRegistryUrl
    };


    var avroSerializerConfig = new AvroSerializerConfig
    {
        // optional Avro serializer properties:
        BufferBytes = 100
    };

    CancellationTokenSource cts = new CancellationTokenSource();

    using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))

    using (var producer = new ProducerBuilder<string, User>(producerConfig)
            .SetValueSerializer(new AvroSerializer<User>(schemaRegistry, avroSerializerConfig))
            .Build())
    {

        Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

        int i = 1;

        string text;

        while ((text = Console.ReadLine()) != "q")
        {
            User user = new User { name = text, favorite_color = "green", favorite_number = ++i };

            producer.ProduceAsync(topicName, new Message<string, User> { Key = null, Value = user })
                .ContinueWith(task =>
                {
                    if (!task.IsFaulted)
                    {
                        Console.WriteLine($"produced to: {task.Result.TopicPartitionOffset}");
                        return;
                    }
                    
                    Console.WriteLine($"error producing message: {task.Exception.InnerException}");
                });
        }
    }

    cts.Cancel();

}

User.cs :

using Avro;
using Avro.Specific;

public class User : ISpecificRecord
{
	public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""User"",""namespace"":""confluent.io.examples.serialization.avro"",""fields"":    
                [{""name"":""name"",""type"":""string""},{""name"":""favorite_number"",""type"":""int""},{""name"":""favorite_color"",""type"":""string""}]}");
	private string _name;
	private int _favorite_number;
	private string _favorite_color;
	public virtual Schema Schema
	{
		get
		{
			return User._SCHEMA;
		}
	}
	public string name
	{
		get
		{
			return this._name;
		}
		set
		{
			this._name = value;
		}
	}
	public int favorite_number
	{
		get
		{
			return this._favorite_number;
		}
		set
		{
			this._favorite_number = value;
		}
	}
	public string favorite_color
	{
		get
		{
			return this._favorite_color;
		}
		set
		{
			this._favorite_color = value;
		}
	}
	public virtual object Get(int fieldPos)
	{
		switch (fieldPos)
		{
			case 0: return this.name;
			case 1: return this.favorite_number;
			case 2: return this.favorite_color;
			default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
		};
	}
	public virtual void Put(int fieldPos, object fieldValue)
	{
		switch (fieldPos)
		{
			case 0: this.name = (System.String)fieldValue; break;
			case 1: this.favorite_number = (System.Int32)fieldValue; break;
			case 2: this.favorite_color = (System.String)fieldValue; break;
			default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
		};
	}
}

User.avsc :

{
  "namespace": "confluent.io.examples.serialization.avro",
  "name": "User",
  "type": "record",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    },
    {
      "name": "favorite_color",
      "type": "string"
    }
  ]
}

Run the c# code to insert some data to stream (to topic) :

then in ksql command shell :

ksql> select * from s2;

alt text

go to http://localhost:8080/ui/docker-kafka-server/topic/usertopic/data?sort=Oldest&partition=All

alt text

and the schema created automatically or altered by you at schema-registry, also you can create the schema by powershell script manually:

$body = @{
   schema = @{
         "type": "record",
         "name": "User",
         "namespace": "confluent.io.examples.serialization.avro",
         "fields": [
           {
             "name": "name",
             "type": "string"
           },
           {
             "name": "favorite_number",
             "type": "int"
           },
           {
             "name": "favorite_color",
             "type": "string"
           }
         ],
      }
   }

Invoke-RestMethod -Method Post -Uri "http://localhost:8081/subjects/usertopic-value/versions" -ContentType "application/vnd.schemaregistry.v1+json" -Body ($body | ConvertTo-Json)

alt text

then create a kafka connector to sql server :

$body = @{
    name = "sql-server-sink"
    config = @{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "table.name.format": "mytopic",
        "connection.password": "your password",
        "tasks.max": "1",
        "topics": "usertopic",
        "schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "auto.evolve": "true",
        "connection.user": "sa",
        "value.converter.schemas.enable": "true",
        "name": "sql-server-sink",
        "auto.create": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "connection.url": "jdbc:sqlserver://192.168.1.4:1433;databaseName=kafkaconnect",
        "insert.mode": "insert",
        "pk.mode": "none"
    }
}

Invoke-RestMethod -Method Post -Uri "http://localhost:8083/connectors" -ContentType "application/json" -Body ($body | ConvertTo-Json)

alt text

and after inserting new data to the stream by c# program, the defined kafka connector, sync data with related sql server table :

alt text