This project is a simple example of how to use Flink to consume CDC events from a Postgres database and write them to an Iceberg table. It accompanies the blog post here. All components are running on a local Kubernetes cluster.
The following tools are required to run this project:
- Docker
 - Docker desktop (for local Kubernetes cluster)
 - Helm
 - Kubectl
 
We are going to use the local kubernetes cluster to run the following components. All manifests
are in the k8s directory.
- Postgres database
 - Minio (S3)
 - Flink operator
 - Kafka operator
 - Kafka cluster
 - Kafka connect cluster
 - Kafka UI (Provectus)
 - Postgres connector
 
To set up the infrastructure:
make setup-infraThe flink job can be deployed on k8s using the following command:
make deploy-flink-jobTo remove the flink job:
make remove-flink-job- If the postgres chart fails to install due to the custom image, you can uncomment this in the
k8s/postgres/values.yamlfile. 
 global:
   security:
     allowInsecureImages: true- For the Kafka connect cluster here: 
k8s/kafka/kafka-connect-cluster.yml, if you do not want to build and push the debezium base image with the postgres plugin, you can use my pre-built image that is already pushed to docker hub. The manifest would become something like this: 
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.9.0
  image: abyssnlp/debezium-connect-postgresql:latest
  replicas: 1
  bootstrapServers: kluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means fallback to the broker settings
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1- If deploying the Flink job fails at the 
iceberg-setuptask or if you don't have Apache Maven locally installed, you can open thecom.github.abyssnlp.setup.IcebergSetupclass on your IDE and run the main method. This will create the iceberg table and schema. 
More details on generating the data, setting up the CDC, building and deploying the flink application, querying the iceberg table and updating records can be found in the blog post.