From 75844967e07925cf4afa3f6087a9a02c28497bbe Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Thu, 12 Aug 2021 10:27:22 +0200 Subject: [PATCH 01/14] Write Blog Post --- blog1.md | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 blog1.md diff --git a/blog1.md b/blog1.md new file mode 100644 index 00000000..1bade012 --- /dev/null +++ b/blog1.md @@ -0,0 +1,43 @@ + +@TODO cleanup documentation +@TODO add help page contributing page + +# Using Debezium to Create ACID Data Lake House + +Do you need to build flexible Data Lakehouse but dont know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates +its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark + +#### Debezium +Debezium is an open source distributed platform for change data capture. +Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms +(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of them) LINK here, it provides very simple interface to extend and write new sinks + +#### Apache Iceberg +Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Update queries, plus many other features listed here +Link +Apache iceberg has API fundation which supported by Spark and Presto and Trino, + +## debezium-server-iceberg + +[@TODO visual architecture diagram] + +Project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination +with this project its becomes possible to use best features from both projects enjoy realtime structured data feed and ACID table format with update support + +### Extending Debezium Server with custom sink +debezium-server-iceberg adds custom sink to Debezium server quarkus application [link here], +with custom sink received realtime json events converted to iceberg rows and processed using iceberg api +received rows are either appended or updated target table using iceberg api, since iceberg supports many cloud storage its eaily porrible to configure destination which could be +any of hadoop storage cloud storage location, consumed events are added to destination table as parquet files + +# update, append + +# destination, iceberg catalog + +# wait delay batch size + + +@Contribution + + +thanks to two projects \ No newline at end of file From 5a39b2b2e9257111b3146fa07b34f9ded49ea523 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 10:05:27 +0200 Subject: [PATCH 02/14] Write Blog Post --- blog1.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/blog1.md b/blog1.md index 1bade012..82441785 100644 --- a/blog1.md +++ b/blog1.md @@ -15,7 +15,7 @@ Debezium extracts realtime database changes as json, avro, protobuf events and d #### Apache Iceberg Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Update queries, plus many other features listed here Link -Apache iceberg has API fundation which supported by Spark and Presto and Trino, +Apache iceberg has great flexible foundation/API which integrated by Spark, Presto, Trino, Flink and Hive ## debezium-server-iceberg @@ -26,18 +26,18 @@ with this project its becomes possible to use best features from both projects e ### Extending Debezium Server with custom sink debezium-server-iceberg adds custom sink to Debezium server quarkus application [link here], -with custom sink received realtime json events converted to iceberg rows and processed using iceberg api -received rows are either appended or updated target table using iceberg api, since iceberg supports many cloud storage its eaily porrible to configure destination which could be -any of hadoop storage cloud storage location, consumed events are added to destination table as parquet files +with custom sink received realtime json events converted to iceberg rows and processed using iceberg API +received rows are either appended or updated to destination iceberg table as Parquet files, since iceberg supports many cloud storage its easily possible to configure destination which could be +any of hadoop storage cloud storage location. with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage # update, append - -# destination, iceberg catalog - +V 0.12 iceberg +retain deletes as soft delete! # wait delay batch size - -@Contribution +wait by reading debezium metrics! another great feature of debezium +# destination, iceberg catalog +@Contribution thanks to two projects \ No newline at end of file From efff3bfb569dec0415222a2a23b0943c23aad6b5 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 10:06:27 +0200 Subject: [PATCH 03/14] Write Blog Post --- blog1.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/blog1.md b/blog1.md index 82441785..ba746c48 100644 --- a/blog1.md +++ b/blog1.md @@ -1,6 +1,6 @@ @TODO cleanup documentation -@TODO add help page contributing page +@TODO add contributing page # Using Debezium to Create ACID Data Lake House @@ -37,7 +37,7 @@ retain deletes as soft delete! wait by reading debezium metrics! another great feature of debezium # destination, iceberg catalog -@Contribution +@Contribution ..etc -thanks to two projects \ No newline at end of file +thanks to two projects making it possible Apache iceberg[link], Debezium[link] \ No newline at end of file From 5fea18a58baf4d085258ddb72df61a67505c6ccb Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 17:54:48 +0200 Subject: [PATCH 04/14] Update documentation --- blog1.md | 43 ------------------------------------------- 1 file changed, 43 deletions(-) delete mode 100644 blog1.md diff --git a/blog1.md b/blog1.md deleted file mode 100644 index ba746c48..00000000 --- a/blog1.md +++ /dev/null @@ -1,43 +0,0 @@ - -@TODO cleanup documentation -@TODO add contributing page - -# Using Debezium to Create ACID Data Lake House - -Do you need to build flexible Data Lakehouse but dont know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates -its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark - -#### Debezium -Debezium is an open source distributed platform for change data capture. -Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms -(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of them) LINK here, it provides very simple interface to extend and write new sinks - -#### Apache Iceberg -Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Update queries, plus many other features listed here -Link -Apache iceberg has great flexible foundation/API which integrated by Spark, Presto, Trino, Flink and Hive - -## debezium-server-iceberg - -[@TODO visual architecture diagram] - -Project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination -with this project its becomes possible to use best features from both projects enjoy realtime structured data feed and ACID table format with update support - -### Extending Debezium Server with custom sink -debezium-server-iceberg adds custom sink to Debezium server quarkus application [link here], -with custom sink received realtime json events converted to iceberg rows and processed using iceberg API -received rows are either appended or updated to destination iceberg table as Parquet files, since iceberg supports many cloud storage its easily possible to configure destination which could be -any of hadoop storage cloud storage location. with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage - -# update, append -V 0.12 iceberg -retain deletes as soft delete! -# wait delay batch size - -wait by reading debezium metrics! another great feature of debezium -# destination, iceberg catalog - -@Contribution ..etc - -thanks to two projects making it possible Apache iceberg[link], Debezium[link] \ No newline at end of file From d9ab72e7025d78b421c62be8e896a30f86ddf4dd Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Mon, 16 Aug 2021 10:53:41 +0200 Subject: [PATCH 05/14] Update documentation --- BLOGPOST.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/BLOGPOST.md b/BLOGPOST.md index f78b4914..81c6d9ae 100644 --- a/BLOGPOST.md +++ b/BLOGPOST.md @@ -1,6 +1,6 @@ # Using Debezium to Create ACID Data Lake House -Do you need to build flexible Data Lakehouse but dont know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates +Do you need to build flexible Data Lakehouse but don't know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark #### Debezium @@ -10,22 +10,22 @@ Debezium extracts realtime database changes as json, avro, protobuf events and d it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink) #### Apache Iceberg -Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update) [plus many other benefits](https://iceberg.apache.org) +Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update)[it has many other benefits](https://iceberg.apache.org) Apache iceberg has great foundation and flexible API which currently supported by Spark, Presto, Trino, Flink and Hive -## debezium-server-iceberg +## Debezium Server Iceberg [@TODO visual architecture diagram] -This project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination -with this project its becomes possible to use best features from both projects enjoy realtime structured data feed and ACID table format with update support +Debezium Server Iceberg project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination supported by iceberg +Debezium Server Iceberg it is possible to use best features from both projects like realtime structured data pipeline and ACID table format with update support -### Extending Debezium Server with Iceberg sink -debezium-server Iceberg sink to [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation), +Debezium Iceberg sink extends [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation), -debezium-server Iceberg sink received realtime json events converted to iceberg rows and processed using iceberg API -received rows are either appended or updated to destination iceberg table as Parquet files, since iceberg supports many cloud storage its easily possible to configure destination which could be -any of hadoop storage cloud storage location. with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage +Iceberg consumer converts debezium json events to iceberg rows and commits them to destination iceberg table using iceberg API +It's possible to append database events to iceberg tables or do upsert using source table primary key +since iceberg supports many cloud storage its easily possible to configure destination which could be any of hadoop storage cloud storage location. +with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage # update, append Iceberg consumer by default works with upsert mode. When a row updated on source table destination row replaced with up-to-date record. From ceeae4bff8981c2fc21eddfc977682c3b9bacb8c Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 18 Aug 2021 21:07:22 +0200 Subject: [PATCH 06/14] Update documentation --- BLOGPOST.md | 27 +++++++++++++++------------ pom.xml | 7 ------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/BLOGPOST.md b/BLOGPOST.md index 81c6d9ae..fd14689a 100644 --- a/BLOGPOST.md +++ b/BLOGPOST.md @@ -17,27 +17,30 @@ Apache iceberg has great foundation and flexible API which currently supported b [@TODO visual architecture diagram] -Debezium Server Iceberg project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination supported by iceberg -Debezium Server Iceberg it is possible to use best features from both projects like realtime structured data pipeline and ACID table format with update support +Iceberg sink uses both projects and enables realtime data pipeline to any cloud storage, hdfs destination supported by iceberg +With Iceberg sink it is possible to use great features provided by both projects like realtime structured data flow and ACID table format with update support on data lake -Debezium Iceberg sink extends [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation), +Debezium Iceberg extends [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation) and implements new sink, -Iceberg consumer converts debezium json events to iceberg rows and commits them to destination iceberg table using iceberg API -It's possible to append database events to iceberg tables or do upsert using source table primary key -since iceberg supports many cloud storage its easily possible to configure destination which could be any of hadoop storage cloud storage location. -with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage +Iceberg sink converts debezium json events to iceberg parquet data file, delete file and commits them to destination iceberg table using iceberg Java API -# update, append -Iceberg consumer by default works with upsert mode. When a row updated on source table destination row replaced with up-to-date record. -with upsert mode data at destination is always deduplicate and kept up to date +since iceberg supports many cloud storages its easily possible to configure different destinations like s3, hdfs, ... +with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage + +### update, append +Iceberg sink by default works with upsert mode. When a row updated on source table destination row replaced with the new updated version. +with upsert mode data at destination kept identical to source data + +retain deletes as soft delete! V 0.12 iceberg retain deletes as soft delete! -# wait delay batch size +### wait delay batch size wait by reading debezium metrics! another great feature of debezium -# destination, iceberg catalog +### destination, iceberg catalog + @Contribution ..etc diff --git a/pom.xml b/pom.xml index 901b25b1..1cdb73d6 100644 --- a/pom.xml +++ b/pom.xml @@ -17,13 +17,6 @@ ${revision} pom - - - nexus-orgapacheiceberg - https://repository.apache.org/content/repositories/orgapacheiceberg-1018/ - - - 0.1.0-SNAPSHOT From 30ee24c22dd8234e1f18fd959e7d30be9c68b033 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Tue, 31 Aug 2021 14:25:21 +0200 Subject: [PATCH 07/14] Write Blog Post --- BLOGPOST.md | 108 ++++++++++++++++++++++++++++++++++++++++------------ README.md | 4 +- pom.xml | 2 +- 3 files changed, 86 insertions(+), 28 deletions(-) diff --git a/BLOGPOST.md b/BLOGPOST.md index fd14689a..8ae5cec6 100644 --- a/BLOGPOST.md +++ b/BLOGPOST.md @@ -1,7 +1,7 @@ -# Using Debezium to Create ACID Data Lake House +# Using Debezium to Create ACID Data Lake -Do you need to build flexible Data Lakehouse but don't know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates -its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark +Do you need to build flexible Data Lake? do you want your data pipeline to be near realtime and support ACID transactions, support updates on data lake? +Now its possible with Debezium Server Iceberg project( build on "Debezium" and "Apache Iceberg" projects) without any dependency to kafka or spark applications #### Debezium Debezium is an open source distributed platform for change data capture. @@ -10,42 +10,100 @@ Debezium extracts realtime database changes as json, avro, protobuf events and d it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink) #### Apache Iceberg -Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update)[it has many other benefits](https://iceberg.apache.org) -Apache iceberg has great foundation and flexible API which currently supported by Spark, Presto, Trino, Flink and Hive +Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes. It supports Insert and Row level Deletes(Update)[it has many other benefits](https://iceberg.apache.org) +Apache iceberg has great foundation and flexible API which currently integrated by Spark, Presto, Trino, Flink and Hive engines ## Debezium Server Iceberg [@TODO visual architecture diagram] -Iceberg sink uses both projects and enables realtime data pipeline to any cloud storage, hdfs destination supported by iceberg -With Iceberg sink it is possible to use great features provided by both projects like realtime structured data flow and ACID table format with update support on data lake +**Debezium Server Iceberg** project ads Iceberg consumer, +Iceberg consumer processes received events and then commits them to destination iceberg table. +Its possible to configure any supported iceberg destination/catalog. +If destination table not found in the destination catalog consumer will try to create it using event(table schema) and key schema(record key) + +Consumer groups batch of events to event destination, +for each destination events are converted to iceberg records, event schema used to do data type mapping to iceberg record. +After debezium events converted to iceberg records, they are saved to iceberg parquet files(data, delete files), +as last step these files are committed to destination table using iceberg java API. + +Iceberg Consumer is based on json events it requires event schema to do data type conversion, +and currently nested data types are not supported, so it requires flattening. + +example configuration +```properties +debezium.sink.type=iceberg +# run with append mode +debezium.sink.iceberg.upsert=false +debezium.sink.iceberg.upsert-keep-deletes=true +# iceberg +debezium.sink.iceberg.table-prefix=debeziumcdc_ +debezium.sink.iceberg.table-namespace=debeziumevents +debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET); +debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse +debezium.sink.iceberg.type=hadoop +debezium.sink.iceberg.catalog-name=mycatalog +debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog +# enable event schemas +debezium.format.value.schemas.enable=true +debezium.format.value=json +# unwrap message +debezium.transforms=unwrap +debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState +debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db +debezium.transforms.unwrap.delete.handling.mode=rewrite +debezium.transforms.unwrap.drop.tombstones=true +``` -Debezium Iceberg extends [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation) and implements new sink, +### update, append +By default, Iceberg sink is running with upsert mode `debezium.sink.iceberg.upsert=true`. When a row updated on source table destination row replaced with the new updated version. +With upsert mode data at destination kept identical to source data. Update mode uses iceberg equality delete feature and creates delete files using record key of target table -Iceberg sink converts debezium json events to iceberg parquet data file, delete file and commits them to destination iceberg table using iceberg Java API +Note: For the tables without record key operation mode falls back to append even configuration is set to upsert mode -since iceberg supports many cloud storages its easily possible to configure different destinations like s3, hdfs, ... -with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage +#### Keeping Deleted Records -### update, append -Iceberg sink by default works with upsert mode. When a row updated on source table destination row replaced with the new updated version. -with upsert mode data at destination kept identical to source data +For some use cases it's useful to keep deleted records as soft deletes, this is possible by setting `debezium.sink.iceberg.upsert-keep-deletes` to true +this setting will keep the latest version of deleted records (`__op=d`) in the iceberg table. Setting it to false will remove deleted records from the destination table. + +### Append +Setting `debezium.sink.iceberg.upsert` to false sets the operation mode to append, with append mode data deduplication is not done and all received records are appended to destination table + +Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode + +### Optimizing batch size (commit interval) -retain deletes as soft delete! +Debezium extracts/consumes database events in real time and this could cause too frequent commits(too many small files) to iceberg table, +which is not optimal for batch processing especially when near realtime data feed is sufficient. +To avoid this problem its possible to use following config and increase batch size per commit +**MaxBatchSizeWait**: This setting adds delay based on debezium metrics, +it periodically monitors streaming queue size, and it starts processing events when it reaches `debezium.source.max.batch.size` value +during wait debezium events are collected in memory (in debezium streaming queue) +this setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties -V 0.12 iceberg -retain deletes as soft delete! -### wait delay batch size +example setting: +```properties +debezium.sink.batch.batch-size-wait=MaxBatchSizeWait +debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc +debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.max.batch.size=50000 +debezium.source.max.queue.size=400000 +debezium.sink.batch.batch-size-wait.max-wait-ms=60000 +debezium.sink.batch.batch-size-wait.wait-interval-ms=10000 +``` -wait by reading debezium metrics! another great feature of debezium ### destination, iceberg catalog +### Contribution +This project is very new and there are many improvements to do new features, please feel free to test it , give feedback, open feature request or send pull request. -@Contribution ..etc +- [Project](https://github.com/memiiso/debezium-server-iceberg) +- [Releases](https://github.com/memiiso/debezium-server-iceberg/releases) -# Links -[Apache iceberg](https://iceberg.apache.org/) -[Apache iceberg Github](https://github.com/apache/iceberg) -[Debezium](https://debezium.io/) -[Debezium Github](https://github.com/debezium/debezium) \ No newline at end of file +## Links +- [Apache iceberg](https://iceberg.apache.org/) +- [Apache iceberg Github](https://github.com/apache/iceberg) +- [Debezium](https://debezium.io/) +- [Debezium Github](https://github.com/debezium/debezium) \ No newline at end of file diff --git a/README.md b/README.md index f4144ed3..37b4a38c 100644 --- a/README.md +++ b/README.md @@ -27,13 +27,13 @@ values received then the record with higher `__op` priority is added to destinat ### Append Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append, with append mode data deduplication is not done, all received records are appended to destination table -Note: For the tables without primary key operation mode is append even configuration is set to upsert mode +Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode #### Keeping Deleted Records By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the iceberg table, setting it to false will remove deleted records from the destination iceberg table. With this config its possible to keep last version of the deleted -record in the table(to do soft deletes). +record in the table(possible to do soft deletes). ### Optimizing batch size (or commit interval) diff --git a/pom.xml b/pom.xml index 1cdb73d6..5aab73ea 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,7 @@ 2.16.88 1.11.1 - 1.7.0.Alpha1 + 1.7.0.Beta1 2.0.3.Final From 879ae45e9cecc4a179845fa319c0b4840f17be88 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 1 Sep 2021 13:33:16 +0200 Subject: [PATCH 08/14] Write Blog Post --- BLOGPOST.md | 58 +++++++++---------- .../batchsizewait/DynamicBatchSizeWait.java | 1 + 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/BLOGPOST.md b/BLOGPOST.md index 8ae5cec6..b286c5ea 100644 --- a/BLOGPOST.md +++ b/BLOGPOST.md @@ -1,34 +1,33 @@ # Using Debezium to Create ACID Data Lake -Do you need to build flexible Data Lake? do you want your data pipeline to be near realtime and support ACID transactions, support updates on data lake? -Now its possible with Debezium Server Iceberg project( build on "Debezium" and "Apache Iceberg" projects) without any dependency to kafka or spark applications +Do you need to build Data Lake with near realtime data pipeline, do you want it to support ACID transactions and updates on data lake? +It is possible with Debezium Server Iceberg( build using "Debezium" and "Apache Iceberg" projects). and it has no dependency to kafka or spark applications #### Debezium -Debezium is an open source distributed platform for change data capture. +[Debezium](https://debezium.io/) is an open source distributed platform for change data capture. Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms (Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of [supported sinks](https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration)), it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink) #### Apache Iceberg -Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes. It supports Insert and Row level Deletes(Update)[it has many other benefits](https://iceberg.apache.org) -Apache iceberg has great foundation and flexible API which currently integrated by Spark, Presto, Trino, Flink and Hive engines +[Apache Iceberg](https://iceberg.apache.org/) is an open table format for huge analytic datasets. +Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. +It supports Insert, Row level Deletes/Updates. It has great and flexible foundation, its has [many other features](https://iceberg.apache.org) ## Debezium Server Iceberg -[@TODO visual architecture diagram] +**Debezium Server Iceberg** project ads Iceberg consumer to debezium server. +Iceberg consumer converts received events to iceberg format and commits them to destination iceberg table. With the consumer it's possible to configure any supported iceberg destination/catalog. +If destination table not found in the destination catalog consumer will automatically try to create it using event schema and key schema(record key) -**Debezium Server Iceberg** project ads Iceberg consumer, -Iceberg consumer processes received events and then commits them to destination iceberg table. -Its possible to configure any supported iceberg destination/catalog. -If destination table not found in the destination catalog consumer will try to create it using event(table schema) and key schema(record key) +on high level iceberg consumer +groups batch of events to event destination, +for each destination, events are converted to iceberg records. at this step event schema used to do type mapping to iceberg record that's why `debezium.format.value.schemas.enable` should be enabled(true). +After debezium events converted to iceberg records, they are saved to iceberg parquet files(iceberg data and delete files(for upsert)), +as last step these files are committed to destination table as data and delete file using iceberg java API. -Consumer groups batch of events to event destination, -for each destination events are converted to iceberg records, event schema used to do data type mapping to iceberg record. -After debezium events converted to iceberg records, they are saved to iceberg parquet files(data, delete files), -as last step these files are committed to destination table using iceberg java API. - -Iceberg Consumer is based on json events it requires event schema to do data type conversion, -and currently nested data types are not supported, so it requires flattening. +Currently, Iceberg Consumer works only with json events. With json events it requires event schema to do data type conversion. +Currently, nested data types are not supported, so it requires event flattening. example configuration ```properties @@ -58,6 +57,7 @@ debezium.transforms.unwrap.drop.tombstones=true ### update, append By default, Iceberg sink is running with upsert mode `debezium.sink.iceberg.upsert=true`. When a row updated on source table destination row replaced with the new updated version. With upsert mode data at destination kept identical to source data. Update mode uses iceberg equality delete feature and creates delete files using record key of target table +With update mode to avoid duplicate data deduplication is done on each batch Note: For the tables without record key operation mode falls back to append even configuration is set to upsert mode @@ -67,19 +67,18 @@ For some use cases it's useful to keep deleted records as soft deletes, this is this setting will keep the latest version of deleted records (`__op=d`) in the iceberg table. Setting it to false will remove deleted records from the destination table. ### Append -Setting `debezium.sink.iceberg.upsert` to false sets the operation mode to append, with append mode data deduplication is not done and all received records are appended to destination table - -Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode +this is most straightforward operation mode, setting `debezium.sink.iceberg.upsert` to false sets the operation mode to append, +with append mode data deduplication is not done and all received records are appended to destination table ### Optimizing batch size (commit interval) -Debezium extracts/consumes database events in real time and this could cause too frequent commits(too many small files) to iceberg table, +Debezium extracts/consumes database events in real time and this could cause too frequent commits( and too many small files) to iceberg table, which is not optimal for batch processing especially when near realtime data feed is sufficient. -To avoid this problem its possible to use following config and increase batch size per commit +To avoid this problem it's possible to use following configuration and increase batch size per commit **MaxBatchSizeWait**: This setting adds delay based on debezium metrics, it periodically monitors streaming queue size, and it starts processing events when it reaches `debezium.source.max.batch.size` value -during wait debezium events are collected in memory (in debezium streaming queue) +during the wait debezium events are collected in memory (in debezium streaming queue) and this way each commit receives more and consistent batch size this setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties example setting: @@ -95,15 +94,10 @@ debezium.sink.batch.batch-size-wait.wait-interval-ms=10000 ``` ### destination, iceberg catalog +The consumer uses iceberg catalog to read and commit data to destination table, all the catalog types and storage types used by Iceberg are supported. ### Contribution -This project is very new and there are many improvements to do new features, please feel free to test it , give feedback, open feature request or send pull request. - -- [Project](https://github.com/memiiso/debezium-server-iceberg) -- [Releases](https://github.com/memiiso/debezium-server-iceberg/releases) +This project is very new and there are many things to improve, please feel free to test it, give feedback, open feature request or send pull request. -## Links -- [Apache iceberg](https://iceberg.apache.org/) -- [Apache iceberg Github](https://github.com/apache/iceberg) -- [Debezium](https://debezium.io/) -- [Debezium Github](https://github.com/debezium/debezium) \ No newline at end of file +- [for more details please see the project](https://github.com/memiiso/debezium-server-iceberg) +- [Releases](https://github.com/memiiso/debezium-server-iceberg/releases) \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java index ef6de4f9..c0733014 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java @@ -25,6 +25,7 @@ */ @Dependent @Named("DynamicBatchSizeWait") +@Deprecated public class DynamicBatchSizeWait implements InterfaceBatchSizeWait { protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class); From 1d3ef185df787b9a6379bc287e35bf1bca48a3d5 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 1 Sep 2021 13:37:30 +0200 Subject: [PATCH 09/14] Write Blog Post --- 2021-10-01-debezium-server-iceberg.adoc | 156 ++++++++++++++++++++++++ BLOGPOST.md | 103 ---------------- 2 files changed, 156 insertions(+), 103 deletions(-) create mode 100644 2021-10-01-debezium-server-iceberg.adoc delete mode 100644 BLOGPOST.md diff --git a/2021-10-01-debezium-server-iceberg.adoc b/2021-10-01-debezium-server-iceberg.adoc new file mode 100644 index 00000000..3cfaddbd --- /dev/null +++ b/2021-10-01-debezium-server-iceberg.adoc @@ -0,0 +1,156 @@ +--- +layout: post +title: Using Debezium to Create ACID Data Lake +date: 2021-10-01 +tags: [ debezium, iceberg, datalake ] +author: isimsek +--- + +Do you need to build Lakehouse with near realtime data pipeline, do you want it to support ACID transactions and updates on data lake? +It is possible with "Debezium Server Iceberg" without any dependency to kafka or spark applications + +++++++ + +==== Debezium + +https://debezium.io/[Debezium] is an open source distributed platform for change data capture. +Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms +(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration[supported sinks], +it provides simple interface to https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink[implement new sink] + +==== Apache Iceberg + +https://iceberg.apache.org/[Apache Iceberg] is an open table format for huge analytic datasets. +Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. +It supports ACID Inserts, Row level Deletes/Updates. It has flexible foundation and provides https://iceberg.apache.org[many features] + +== Debezium Server Iceberg + +**Debezium Server Iceberg** project implements new consumer which uses Iceberg Java API to consume events. +Iceberg consumer converts received events to iceberg format and commits them to destination iceberg table. With the consumer it's possible to use cloud storage or catalog supported by iceberg. + +on high level data processing works like below :: +* it groups set of events per event destination, +* for each destination, events are converted to iceberg records. at this step event schema used to map data to iceberg record (`debezium.format.value.schemas.enable` should be enabled(true)). +* After events converted to iceberg records, they are saved to iceberg parquet files(iceberg data and delete files(for upsert)). +* as last step these files are committed to destination table(uploaded to destination storage)) as data and delete file using iceberg java API. + +If destination table not found in the destination catalog consumer will automatically try to create it using event schema and key schema(record key) + +Currently, Iceberg Consumer supports only json events. With json events event schema must be enabled to do correct type conversion to iceberg record. +and complex nested data types are not supported, so event flattening must be enabled. + +example configuration:: +[source,properties] +---- +debezium.sink.type=iceberg +# run with append mode +debezium.sink.iceberg.upsert=false +debezium.sink.iceberg.upsert-keep-deletes=true +# iceberg +debezium.sink.iceberg.table-prefix=debeziumcdc_ +debezium.sink.iceberg.table-namespace=debeziumevents +debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET); +debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse +debezium.sink.iceberg.type=hadoop +debezium.sink.iceberg.catalog-name=mycatalog +debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog +# enable event schemas +debezium.format.value.schemas.enable=true +debezium.format.value=json +# unwrap message +debezium.transforms=unwrap +debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState +debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db +debezium.transforms.unwrap.delete.handling.mode=rewrite +debezium.transforms.unwrap.drop.tombstones=true +---- + +=== Update, Append modes + +By default, Iceberg sink is running with upsert mode `debezium.sink.iceberg.upsert=true`. When a row updated on source table destination row replaced with the new updated version, and deleted records are deleted from destination. With upsert mode data at destination kept identical to source data. Update mode uses iceberg equality delete feature and creates delete files using key of the debezium event. With update mode to avoid duplicate data deduplication is done on each batch. + +Note: For the tables without record key(PK) operation mode falls back to append even configuration is set to upsert mode + +==== Keeping Deleted Records + +For some use cases it's useful to keep deleted records as soft deletes, this is possible by setting `debezium.sink.iceberg.upsert-keep-deletes` to true +this setting will keep the latest version of deleted records (`__op=d`) in the iceberg table. Setting it to false will remove deleted records from the destination table. + +==== Append mode + +this is most straightforward operation mode, setting `debezium.sink.iceberg.upsert` to `false` sets the operation mode to append, +with append mode data deduplication is not done and all received records are appended to destination table + +=== Optimizing batch size (commit interval) + +Debezium extracts/consumes database events in real time and this could cause too frequent commits( generate too many small files) to iceberg table, +which is not optimal for batch processing especially when near realtime data feed is sufficient. +To avoid this problem it's possible to use following configuration and increase batch size per commit + +**MaxBatchSizeWait**: uses debezium metrics to optimize batch size, it periodically reads streaming queue current size and waits until it reaches to `max.batch.size` +maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties +during the wait debezium events are collected in memory (in debezium streaming queue) and this way each commit receives more and consistent batch size +Note: this setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties + +Note: It's also possible to do data compaction using iceberg, compacting data and metadata files to get best performance. + +example setting:: + +[source,properties] +---- +debezium.sink.batch.batch-size-wait=MaxBatchSizeWait +debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc +debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.max.batch.size=50000 +debezium.source.max.queue.size=400000 +debezium.sink.batch.batch-size-wait.max-wait-ms=60000 +debezium.sink.batch.batch-size-wait.wait-interval-ms=10000 +---- + +=== Destination, iceberg catalog + +Iceberg consumer uses iceberg catalog to read and commit data to destination table, destination could be any cloud storage and any catalog supported by iceberg. + +== Next Datawarehouse, Curated layer + +Now we got perfect raw layer with near realtime data feed which we can build Curated Layer,Analytic Layer or Datawarehouse on top of. + +for example i could easily use Spark SQL(or Prestodb, Trino, Flink) and process this data to next layer:) + +[source,sql] +---- +MERGE INTO dwh.consumers t + USING ( + -- new data goes to insert + SELECT customer_id, name, effective_date, to_date('9999-12-31', 'yyyy-MM-dd') as end_date FROM debezium.consumers + UNION ALL + -- update exiting records and close them + SELECT t.customer_id, t.name, t.effective_date, s.effective_date as end_date FROM debezium.consumers s + INNER JOIN dwh.consumers t on s.customer_id = t.customer_id AND t.current = true + + ) s + ON s.customer_id = t.customer_id AND s.effective_date = t.effective_date + -- close last record. + WHEN MATCHED + THEN UPDATE SET t.current = false, t.end_date = s.end_date + WHEN NOT MATCHED THEN + INSERT(customer_id, name, current, effective_date, end_date) + VALUES(s.customer_id, s.name, true, s.effective_date, s.end_date); +---- + +its also possible to use delete insert +[source,sql] +---- + +---- + +in https://github.com/ismailsimsek/iceberg-examples[iceberg examples] project you could see more examples and experiment with iceberg spark + +=== Contribution + +This project is new and there are many things to improve, please feel free to test it, give feedback, open feature request or send pull request. + +- https://github.com/memiiso/debezium-server-iceberg[For more details please see the project] +- https://github.com/memiiso/debezium-server-iceberg/releases[Releases] \ No newline at end of file diff --git a/BLOGPOST.md b/BLOGPOST.md deleted file mode 100644 index b286c5ea..00000000 --- a/BLOGPOST.md +++ /dev/null @@ -1,103 +0,0 @@ -# Using Debezium to Create ACID Data Lake - -Do you need to build Data Lake with near realtime data pipeline, do you want it to support ACID transactions and updates on data lake? -It is possible with Debezium Server Iceberg( build using "Debezium" and "Apache Iceberg" projects). and it has no dependency to kafka or spark applications - -#### Debezium -[Debezium](https://debezium.io/) is an open source distributed platform for change data capture. -Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms -(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of [supported sinks](https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration)), -it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink) - -#### Apache Iceberg -[Apache Iceberg](https://iceberg.apache.org/) is an open table format for huge analytic datasets. -Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. -It supports Insert, Row level Deletes/Updates. It has great and flexible foundation, its has [many other features](https://iceberg.apache.org) - -## Debezium Server Iceberg - -**Debezium Server Iceberg** project ads Iceberg consumer to debezium server. -Iceberg consumer converts received events to iceberg format and commits them to destination iceberg table. With the consumer it's possible to configure any supported iceberg destination/catalog. -If destination table not found in the destination catalog consumer will automatically try to create it using event schema and key schema(record key) - -on high level iceberg consumer -groups batch of events to event destination, -for each destination, events are converted to iceberg records. at this step event schema used to do type mapping to iceberg record that's why `debezium.format.value.schemas.enable` should be enabled(true). -After debezium events converted to iceberg records, they are saved to iceberg parquet files(iceberg data and delete files(for upsert)), -as last step these files are committed to destination table as data and delete file using iceberg java API. - -Currently, Iceberg Consumer works only with json events. With json events it requires event schema to do data type conversion. -Currently, nested data types are not supported, so it requires event flattening. - -example configuration -```properties -debezium.sink.type=iceberg -# run with append mode -debezium.sink.iceberg.upsert=false -debezium.sink.iceberg.upsert-keep-deletes=true -# iceberg -debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.table-namespace=debeziumevents -debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET); -debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.type=hadoop -debezium.sink.iceberg.catalog-name=mycatalog -debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog -# enable event schemas -debezium.format.value.schemas.enable=true -debezium.format.value=json -# unwrap message -debezium.transforms=unwrap -debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db -debezium.transforms.unwrap.delete.handling.mode=rewrite -debezium.transforms.unwrap.drop.tombstones=true -``` - -### update, append -By default, Iceberg sink is running with upsert mode `debezium.sink.iceberg.upsert=true`. When a row updated on source table destination row replaced with the new updated version. -With upsert mode data at destination kept identical to source data. Update mode uses iceberg equality delete feature and creates delete files using record key of target table -With update mode to avoid duplicate data deduplication is done on each batch - -Note: For the tables without record key operation mode falls back to append even configuration is set to upsert mode - -#### Keeping Deleted Records - -For some use cases it's useful to keep deleted records as soft deletes, this is possible by setting `debezium.sink.iceberg.upsert-keep-deletes` to true -this setting will keep the latest version of deleted records (`__op=d`) in the iceberg table. Setting it to false will remove deleted records from the destination table. - -### Append -this is most straightforward operation mode, setting `debezium.sink.iceberg.upsert` to false sets the operation mode to append, -with append mode data deduplication is not done and all received records are appended to destination table - -### Optimizing batch size (commit interval) - -Debezium extracts/consumes database events in real time and this could cause too frequent commits( and too many small files) to iceberg table, -which is not optimal for batch processing especially when near realtime data feed is sufficient. -To avoid this problem it's possible to use following configuration and increase batch size per commit - -**MaxBatchSizeWait**: This setting adds delay based on debezium metrics, -it periodically monitors streaming queue size, and it starts processing events when it reaches `debezium.source.max.batch.size` value -during the wait debezium events are collected in memory (in debezium streaming queue) and this way each commit receives more and consistent batch size -this setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties - -example setting: -```properties -debezium.sink.batch.batch-size-wait=MaxBatchSizeWait -debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc -debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc -debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -debezium.source.max.batch.size=50000 -debezium.source.max.queue.size=400000 -debezium.sink.batch.batch-size-wait.max-wait-ms=60000 -debezium.sink.batch.batch-size-wait.wait-interval-ms=10000 -``` - -### destination, iceberg catalog -The consumer uses iceberg catalog to read and commit data to destination table, all the catalog types and storage types used by Iceberg are supported. - -### Contribution -This project is very new and there are many things to improve, please feel free to test it, give feedback, open feature request or send pull request. - -- [for more details please see the project](https://github.com/memiiso/debezium-server-iceberg) -- [Releases](https://github.com/memiiso/debezium-server-iceberg/releases) \ No newline at end of file From e41b53f79a17f07eb0ef29e0a35ca356032d72ff Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 19 Sep 2021 10:31:09 +0200 Subject: [PATCH 10/14] Write Blog Post --- 2021-10-01-debezium-server-iceberg.adoc | 44 ++++++++++++++----------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/2021-10-01-debezium-server-iceberg.adoc b/2021-10-01-debezium-server-iceberg.adoc index 3cfaddbd..d238ef19 100644 --- a/2021-10-01-debezium-server-iceberg.adoc +++ b/2021-10-01-debezium-server-iceberg.adoc @@ -117,36 +117,40 @@ Iceberg consumer uses iceberg catalog to read and commit data to destination tab Now we got perfect raw layer with near realtime data feed which we can build Curated Layer,Analytic Layer or Datawarehouse on top of. -for example i could easily use Spark SQL(or Prestodb, Trino, Flink) and process this data to next layer:) +for example i could easily use https://iceberg.apache.org/spark-writes/[Spark SQL](or Prestodb, Trino, Flink) and process this data to next layer:) [source,sql] ---- MERGE INTO dwh.consumers t - USING ( - -- new data goes to insert - SELECT customer_id, name, effective_date, to_date('9999-12-31', 'yyyy-MM-dd') as end_date FROM debezium.consumers - UNION ALL - -- update exiting records and close them - SELECT t.customer_id, t.name, t.effective_date, s.effective_date as end_date FROM debezium.consumers s - INNER JOIN dwh.consumers t on s.customer_id = t.customer_id AND t.current = true - - ) s - ON s.customer_id = t.customer_id AND s.effective_date = t.effective_date - -- close last record. - WHEN MATCHED - THEN UPDATE SET t.current = false, t.end_date = s.end_date - WHEN NOT MATCHED THEN - INSERT(customer_id, name, current, effective_date, end_date) - VALUES(s.customer_id, s.name, true, s.effective_date, s.end_date); + USING ( + -- new data goes to insert + SELECT customer_id, name, effective_date, to_date('9999-12-31', 'yyyy-MM-dd') as end_date FROM debezium.consumers + UNION ALL + -- update exiting records and close them + SELECT t.customer_id, t.name, t.effective_date, s.effective_date as end_date FROM debezium.consumers s + INNER JOIN dwh.consumers t on s.customer_id = t.customer_id AND t.current = true + + ) s + ON s.customer_id = t.customer_id AND s.effective_date = t.effective_date + -- close last record. + WHEN MATCHED + THEN UPDATE SET t.current = false, t.end_date = s.end_date + -- deleting deleted record is also possible! + -- WHEN MATCHED and s.__op = 'd' + -- THEN DELETE + WHEN NOT MATCHED THEN + INSERT(customer_id, name, current, effective_date, end_date) + VALUES(s.customer_id, s.name, true, s.effective_date, s.end_date); ---- -its also possible to use delete insert +its also possible to use https://iceberg.apache.org/spark-writes/[delete, insert statements] [source,sql] ---- - +INSERT INTO prod.db.table SELECT ...; +DELETE FROM prod.db.table WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'; ---- -in https://github.com/ismailsimsek/iceberg-examples[iceberg examples] project you could see more examples and experiment with iceberg spark +in https://github.com/ismailsimsek/iceberg-examples[iceberg examples] project you could see more examples and experiment with iceberg and spark === Contribution From 4bd41c7e117fae102a02a8462520f679bcb89654 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 19 Sep 2021 14:33:43 +0200 Subject: [PATCH 11/14] Write Blog Post --- 2021-10-01-debezium-server-iceberg.adoc | 8 +++++--- README.md | 2 ++ docs/images/Debezium-Iceberg.png | Bin 0 -> 46159 bytes 3 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 docs/images/Debezium-Iceberg.png diff --git a/2021-10-01-debezium-server-iceberg.adoc b/2021-10-01-debezium-server-iceberg.adoc index d238ef19..55bd2578 100644 --- a/2021-10-01-debezium-server-iceberg.adoc +++ b/2021-10-01-debezium-server-iceberg.adoc @@ -2,8 +2,8 @@ layout: post title: Using Debezium to Create ACID Data Lake date: 2021-10-01 -tags: [ debezium, iceberg, datalake ] -author: isimsek +tags: [ debezium, iceberg, datalake, lakehouse ] +author: Ismail Simsek --- Do you need to build Lakehouse with near realtime data pipeline, do you want it to support ACID transactions and updates on data lake? @@ -37,6 +37,8 @@ on high level data processing works like below :: If destination table not found in the destination catalog consumer will automatically try to create it using event schema and key schema(record key) +image::docs/images/Debezium-Iceberg.png[Debezium-Iceberg] + Currently, Iceberg Consumer supports only json events. With json events event schema must be enabled to do correct type conversion to iceberg record. and complex nested data types are not supported, so event flattening must be enabled. @@ -135,7 +137,7 @@ MERGE INTO dwh.consumers t -- close last record. WHEN MATCHED THEN UPDATE SET t.current = false, t.end_date = s.end_date - -- deleting deleted record is also possible! + -- also possible to delete deleted records! -- WHEN MATCHED and s.__op = 'd' -- THEN DELETE WHEN NOT MATCHED THEN diff --git a/README.md b/README.md index 37b4a38c..e70aac6a 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ This project adds iceberg consumer to [debezium server application](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to replicate database changes to iceberg table(Cloud storage, hdfs) without requiring Spark, Kafka or Streaming platform. +![Debezium Iceberg](docs/images/Debezium-Iceberg.png) + ## `iceberg` Consumer Iceberg consumer appends or upserts debezium events to destination iceberg tables. When event and key schemas diff --git a/docs/images/Debezium-Iceberg.png b/docs/images/Debezium-Iceberg.png new file mode 100644 index 0000000000000000000000000000000000000000..8fadb4870a6ef54ab72d4046fe1758591e2d75b0 GIT binary patch literal 46159 zcmZ^~by!sE_dYyy4JpFVEg&^0ogyeHDV^hh(jnan1EQoTpri)y|`HI+$;>4_l_2&t-yk}d>-1BF1aYzgtf zk>jucX7G#1RmIpH0wG6SzOX*Y=h#6YtPoWth5J6K8)?2i)cQ45@=3;pbZ_XMcP)uO zu71PwGHy6`ZOd_R4c-C&;aW_0i=u$!DIKmG7SU~byw_ZIx0H0KuToP}QsXOeF^N4b zLKWdZ<%~n=V}B!zTRu5^S)gXu9B-JC>CC$A7=N1L70{B4* zaD_@+bgj61OthOl1#u;cR3P0g3QfSBNdxdUz zR+U=2K6A+9pjKoT!as=lV_(P)ojnJUeHN5fgUOkX$>Z6`w8*vOL8U)7=0nQonCVW@ zS9rDgmbJXHhrj$Wmh$5iXLrOeCF|(36s}MgA(!X;ugMzf<&4Dt`+6jjdwudV+~hmb z8s0MP`P4FQCx`YJyz~FvnuyR9l&Ue~=w7sDG>Exy6l zH?RA%-}qJ>b2C&*W+C?Ej}r;e%I_$h1%`NtD#}TsZ9+M2%2eNdP85_?)FC-(dHDD1 zqsV!frsf%<>7wX0<$W=FGZxC3z4n$f1b^Ku_kE#hQo1G{H54j6oX+$(r})EIW|Ycb zCxxfdd7LobF@_Cil0lqz+eB4p{(K!Y>GNZ1jnNNs$cy9w+d5I5jF1Z5HP^15%fll% zUgg2U`biV&GRlS9149J`2jDO-Mdif>1cGnA>-kgWWK4*x`b_Zf_+tH-mIqn(u?SN3^3jMmJZ z`^xdwi4_9s&CR9T#%ev)e+Ryo!_(8%J?Cc;RVh#z%&E)qAPtrKFj(vqwLp(VD`gCCJ$8hdYArp=p=0SfB5B~h+H}1M> zs8Ba3Y5z3%y^KSaD zGO|BuL`WC^>J@UjGsHYs|I@-l!*$}xAA6w-EyAUys%yJ3LMl-{-YP=IuH~68$L$kWcCgT{`85@le@zg4*eG^iPXud?a_m zVgWYfHQAPxX{qbUgOkHP=nKXL9#Y#7Q*2j~KdU;=fZObQT{oW1v*z^8Yh}id1i!8O zLbN%#KJ;yy3w)wcy^&4Ly{4y~Gjt~YWY8A9>G=qM_8fKl;6r%R4CSAQErWy#Rx7AK zN&TcHnPRO{PxIp-_ho$iu7B`wimi(A_SPO)r-H;KwTv3-TfQw~(w#W`He22N$Ajdd2Zy3OWqL`wp`S&srV}c?m;3!&DR7tcWUuXYv6|;(MoQ`% zDU;PeEmWqpwsop_2G$=la2d-XQ{6RkqcyfBg<7L!S&!ed+2oH(ksiNp5nNxOs{NGa zAVTk=eT45qz=BX+xpMK2m1pvGfxZ~$;Fs$pTK)9+f0pE{cHo0T^-=06 zKe}-w2;jwq^JB~Fo??EME^A6!lb`%j1$|i{Q`xSX#fg0%H#+6z3~os5>R#4=shxwD zCcSL`=5$0HyRdLuHnRq|VU3I-SCKd@o}eRU=)uQD%^y4U^p+rJybXO3@i$1M7G@a$AwDU6khZ?j~R=2`QI=M?_o z2G)nUhGEaN&O5#62O?_Q47!!ouc!?--H-U!%*w<5)Hf4>=f_L#g4oM0sj=R1c!6DP z-eYk3=t`;dq{|h_*9;Z>9bQr45ARXQD6uGxQCgHQ(=`8^>|&~*P-qGmiM`br6l}Xp zjT!tQ;`hato3i9s>$$nP=H;Vl+3NK_ zT+f|N(@OA$RaJS1%0_`7J~}vkesq|+Q3o+q(HNW%{FB$VRlL7hP+##mg=lxxP{lhwN&GCbkG%qi& zF58pQELY>&JI*NL5oa#?5nDDbW6=71wcEGp z8E5oN%f3kMA`FuThqFzo*ZwZpjuP*Wd&Av&A17kmEjAofeq%$LtW>76477B0yHZRF z9r`~QI5=8HFS-Gn8)=pKUia#K76Q9~Qgz^#GCS23y>#7|fAS&3mWh-5T4OfNMlIh1 zrkV6Yu6|81&frB=rp|BzIeq(ACZ@0UJ7p6JK9>=M4!U)1st7oKYb5^TBNT-N{-JpJ zQh-)^7x(oq?%G%J&v1hEF3&PI8?uWI;m*h?e)V&V`HbUlw10X(CLi#OyE!nRuDJf} zx4`Q#69FfYAMs3M%sh#W?PvRqpm?+FKZSS1-24%*U&=}c8PjEjy`$-# zB|6VqU*Cj4qKYkjWbNOnSI^}+y>J@*VD^eBOMMyy@NYq0VVjrShxh(i%NA-;7_0VL zz*RF}bKrwgtwfuRSmu!r(!XAAxlPHu%doR%IOz(NuJqCSwCB&C2UDO3&NEmEy|Hbs z{avhnPBymk(9xWOKar_WCQP+YerhTJ5_16Q=N*qGWc%MCvYU_q>LoE0)qf}hktGW;3;)}&R*=;7-Qg7*ApszqpqL;6)j4U1y>~7;!>BU*;3ajhVeM=h zBFkz_E;rsjrWn{T(zvEa#-xR*l|COeb;&pMGtZy$ThHIm z(Mne*;Jc0}H%W{6 zu(*sl{4vn7x<{IGaI$cOXbJZnCU{25K%={vv)HnJbSIJ~a0ye02-zVSq-9m0n8)h~ zeWTs^FLCPRuWNCxuR1v)2iq?5Dh8 zJ%xaUM)9(eTi?dIiEAbDK>)w(9EHjx!t^~oW|>$18g$Bhjj-`Dz!1b8IB&I^K(HVP zh!n4w+@-Vic)CORurcV$6%-3Af;9q;x74h`V}(x@UKEPTr1Q>#I{hwX3wZIZaJCh9 zo&aPDy9Smo{0i}#rzo}!R)lT)2oFxQAVouyefj%N>k{k_23%+;6@Cbz-Yw>S6!yb@ ziXHFm8S^fh8(tembthZ3s<043qENm#ql>fUU@a{P|Dvuif2yJo#LB&JAHGg_yyAZw+b#q2WK4J?+1O zuRZyR5aPNqUMvA_*mh`8Xt7zw^2Ee$9V@?(4dEItcLi+;5_g{XAFn~?xuCwjzIjyr z4T1jr*RP}N>SVx8^sbV;o9kK8cHM|N`}ZcO|J?*mh41G5@WWy20}=Y-z>|Z)2*ty# zX;m0DgedgJ!piLof_ZfnScL0k-lM4ruOMv43YbjzX?hrtSP;2`ab(`mYepQ2&m-3b z-hW*1bvuo^Q@!`dxW>cI_1t#Kb)l6gH~Pvoji$?`MiE^JId@UwI%=~oKhBQ*R>HXq zu~B9J<6>4QD$~5y%eLGEILJN`uei)*mN|*vc%h6{3dG`pq_wpzE#>u09l!mxtEuwz z;*L{Q!ap)d6Z-jMW~BvMp}3R}Rf0i}fTCr+P$0pJI-GLXq8u-G7%xIF+uVB0zd2c{ z{CQ+;_n9em_$?W_oWYVC)*TeMgn@`3Q`PSM@_gXhtkt>jG?W%a;y6SKNyk>r_gk;1FrbbXJr0;YU-Zy1=>Q*KoU165Dj22&G2$}v`)HH@>4YI`AY`OL4`Vdqq99HYGsta{8 zgRL>pyG`x3+>q=ysW|uG9q&2hjMc~Eez@_yR8gUxHdp~Y=Re9)H&+^TIMPts+>jP`nM}$3*Pswk zr3Q4dBhF@Bv>Wei*nUP^y(#W_q>vJ5TvE?zd$>fd0t>)Ihu>}dRsH6dBY(Xa!^R{D z*q?5{N_l`p26w_voE#lDoQjK!gXer_cnHxv3KTO|QTg9}gFdqoO2d#VFyxQ-jB~z0 ze0_WJgds)R6ew{IDRY+el`AzKtGl&TRaF;jm#s3^T%y&iN8HOGlsVM~YZP&UD=${- z`L_6{S$3~7QN+=26PzIbJmT7Woj*-?Xflc==MI;JfORbmk>y95xz`)_FD+!;4YDY5 zxyi|jAf%M?T>IsqKye9NKwXv-6Fy25Hki9hXre3lQ>HORxFTxh_kc_v_s#aL=;-J( z9LN;)MjvOBiac77$cy4`m|E3q$b*YRhG7><>4|vwl6hVDY4CJ*qC@t<+i%bzXNB7}CGig!F`_rx zufveabjV3Z_F|N91!~2Y1mi|$e@7-pxFTM-;<<1|lCal{#j15;Y@P1C0A{2k9dhHt z(a}-5n6E9#Ki28qQlda$5rwwCzY{6f)zNV)m1LP*6eB1H{9F@urt zN|88tSyZ%FpdbaBHL(J^*+Rhy+|Q156vnuL-KdxcUU89c9mTXLvE>_F#-VhX zD2cbxtPjFjHAsoG;bsj6g|Mn=c`kzO6zjIezKQ3ty)$8#uK}Si@8^23sPMPP%}lbc zLk}D66OPTc#VZhB`54XyR~9CI{NkhvhfrR*a;16iRHA+SYOUQ}Lpmfk9kOsL_~OhP zw>J6!-p*_t73iEo?AbDEVI6gKM01mJ9#u6grb?I1ipvikf{@O>@o~r(P!jAO zQfbeHz3#rok6C0U_Nom~QXy0{H)ooua%&Otyn>o#Lr@4S@?FLMkW**4PCrc?%J0Ka z%E*%1U|#NXrOrvlquJ$fJHQfOFgAyVD~tz#;9J@I9VkS*~Pb6PzK$TO!AEq_&5S=R81 z(q&QCYt&nJ&#EwMwbsd5m`07sjlH?e z1fkfY<%Tm*Hod6BGQZp{g-1<1)+vSi3~d*Th~l1b5EF7jgHVfAs!_#$?XX=Lml&`s z*c44Jk-(6LH`*&EKAM=5c=-pQw8F3`-I_!22q6+q6C$hSk+eb!9WObArKIQq3V*TK z9^JCL`*<}_2bL&c8qSD8C@J9rCz9;#?b&o;80?5{u5(b0L6ni}Q){cH2_@y9eR z2YdS>!$OPk(nnf3C9dQXJ2_O}7Z(w+DzvEI0rgH% zsdZ|~Y~x!gxARmL<;8wsFyj7w>Vx$$^2;+!D;%5CW&IxjM;&ZV(nh{`Arhc3v1c<+ zlH6V4zFnQa9f>BYdcSkk%TdUzkMp|k}MtVn+mX=1F4Yvj61NX;yZcNVN z_fpI1e3zeEBs*tFlAmYD?YZ~JNY{blLsy;V2LY8Oo| zx??DsnF^nX6X8l%$93;~)N?NJZRE#7V&U%z+a}9H7F767ryk4US}3-k(c7X#Ey%rt zIh@3N*sBIr_l%UY&FAle?mi7cHOIO#CLto-M4OJezko$v}@C$cL^t zsOzt*&z`V<+zzC({3eYBUXK1`C2$IW8xPjow*Cm^(qYVGCVrW&F)Syc`!EEVA&2mu&A^0MQ{!z^5_%@*o z&vhOoj@Xy$4CTJpq3=J{Pz__`1#S&>U6HWz)ESCstx8Bfo;q`~<`9>+QSHL&isz)o zpKO1$;*-uo3wA%sjS62JjGH;AAqr+m0BM@Q=ouI&A#g!KGD=ErjW3^nxWOHADQIA(3l zK9|e|mHDVIU6+RYXVCHCwon?kT1&aZm|1WPu@47yk4!dddF;j2b%(^TAmWl8T+K;& z>YPoM@WSm2`2eSjn-Zei#v&^p<`dSt6oQUtlHA^F&{(tqA6fx?bnf{xN6h%m)4uKW z=Qv|^LMIBwwh5I3=0#b*W{z=aP3~jrSHB&(C%EGmjGXj##`wFnA$l$zYR17;toW{k zPp=dnYrxW{Jq(0>)+>2ffM;*Thhcq=-1#Y_r81=Y9#h?y!y*_5w^hvV^W}g}U;EPD z*gL*9ynP(}xHWfH7Vd58-pFu~VKB69g1y-?r_6|&Pd>WvO}9))O=XK!w>r{sn>^Yl+5vv(D`P$ zs?9WBUDsnUj!^AuPmh#;b(bc7rLY-*@u;+29Jw;vtL@3bP2j%)o}PQQ5wND5hT?k$ z8?&+C;}*u!#fGDrOCQBPoSGKpYI|vHd^V``l#y6h4sacf^_k87croKuXB-h8T*ZPM zEPZ)zI_i}ni{j$fkFOJdJ{0oqOkDZe3e;H(D);e<}?e7LOtlzJZna(F()2Ae$jC=}CuoEDf-`{?WA zsJEZwx<+UK4B;Tti?^_fVk#vmbATbYi9&W<0|Ekav`uIG^Gz#f-5F`QLDSPdE?Dbj z=+0P~E_}d3g+GSYHZTZ>JSl8AzAh#rB7rYjvfHmn&ISsLlCrY&NShnuU1jCP7^eFs z@jw{xuNFYsZ1iT}vBzp^ax(pr1Q`~jv~=0hb_T0kztwQx+tTxDolCg@8^OJ$!)y!jVg#u?w`!)DcV2QYA zX1S#w>h$ske9t=jag`Iw{%~i}1Y7i!7n-7$7kC}2Rlw(*98|XPsgyrJpNC}X{4jD+ zvpHn~-_<6`AuwqB{0T~dp@9Z}E1~@8jZLLqhWqPbkR)(luNe16e+%DaUDGF^#E+AJ z1}rr$uF0M8kOujsRqT!`HG#wJ+h}OaVz?_K?ISQqBf0JFLWMi837R@IjG+Z~GJZ{B zflyTx$)B=V6-*N1n$uEWjvpWhbzkYd18FL3us6|Wprm}7Chgr0AdBQxx6C35uFfD- zWyGTAYS^lVRn4#+gX=HDe!A%W+v!5TZYYA`R!pr;H_*AIRMcgQ3%$p#jd4Fy<RpAsvSfJ; zv2$hq8uUSiskzZ}hsZqFwEQmu@fnpnwqajp^N(_KX9cAFjEiS`z?r3cf~tYe;M|Q; zTX~{T^3S9ThaNxPQQ!AoAKg48A|fJ(x6go9XEOTMuyA{?aQlHPKBnR~2 z0Jxpyvm|@FKX|Z^Ara#rAuRSn_&^RUps-ieVwJsgW_1_r1T+f>p1wWdf-QlsKXc)Z zlNF?(K*4X^pc1mEi-?E-Gd4nq&v}5mwjs!~fq?-D_wO1?j%2%sn+`#1wcgtnygsx! z+@6~g`d;<>>tjd{k1~;Xt70RHcXm2uR0rrCfFD>PbB{6tuhlnNcgDxj;42%y0b8}^ zezob`7m*UyuG_~mv}tR4rgMz1(xZcYgYjZIds=5QL}d3J6T`e+hkL!X(d@sJYTvu zB8Msl2yuu)x81JZzu6wdKub&8ws-o95wkpw?S6tcSk8zse;Ack{=AK>ecV5)*bubK zCWT`Rj$B>(46Tc}3O~C4>DFC9myCyTdnOsfXA@;^9i=}}s9Kaz%804&b~9T8u<}G1 zQgvs&K8)7_#k^gjLYhnJT=`O~aW+&MA=KGV{LWZ_hP)3AF0e&a}Oho^45JleY2nmFDWyAuQ8S zEfG3E5`DvyM}iErsAr=M4R8PGUO3_}ynE}C5-_+1HSM!CZJEo3^Ghr+)=4s-WIXVl z6FVBpLdYkjf*O{Go-1-oNt8QW{i&8qE^wwgvogv{*rHpzxWZJ>+pOEA4@~O}oyh+9gjlI|ZpGyvvrIGAvNfeUPou%7!YEs5mBo-Do6qJUWY zD*>p5g9TAnQ!5L4q~ar8!unA0vC=yBBQ9efn{C{jkCax>%yO=)U@#-iY~TK9EVN4c8FmYKNI$PEo{*0Hs#wsZUab{l>1fjA=f91 zlVC`$sABeZBfyq%W86N&kfokd(>DPn@U*IuqMZ-16aL}_F(7%lzkmO}F!I?@KUUg% zi}Cx)in^T!lcfIvzcZ*Y)`Ni(}AJ&E0_X!#-ZRhUyDYw_j#3?o^SzoO=K9 z_Y8}ROJeoaYMYW=^@WR!pBalYXB_eH2-v-AdYS;f+}9g>rwWskUj;$8?TS|*F+g3F z&+8!=37#YF&CsNN@C z{2C|c6cf zLH*b@*HagzR$<~o^Sp+q^#anu2P2Fc$;G5vW}gPmW50v>4{k+CAdI4X^H<%RN?3Q3 zQ>RwKi>E2Dy1*>BsP*OUbDX!%T?ZKWH!{dt%dJU39@;K;?fczB!?%mpgTyUA^z~Hk zOLHMQqEfq*GaoqO`J+JH7p^ug8O~k6e&j|!7?H-ZP%15le&iJpl0myow1k1l zsK7Xjt`fJj4|S1bFon&pvoD8r+h>s+*rc@##~mJ~4pEo2fepUqbZ^Ur9q^F-F+!+j z{S5z=Pp4D+Z3%Cx%kR54I`9E5?WF;K=5wGMi^p1XE~^H=aYjfQ%1fy8M+=6EHFo5!enS^kjLy1MF>hkTGX>EGMTSXyaUyo5|m?O9@VXrk28NW zZ$uXmd%OfZ>bjr*TONun8i~(NT?fuj_a4|NdDJ!;JLy&YR+e|mTQ@oLtmfa)r8hn@ ziVw>PGJDz`SvMs~N4ZW<|8ub1k3YYl<%(ee()b=zqJOFKZB|x9+$~xTAX53&1lW+Q zZ;l7K61cZvLE4=mOFYm$rjSZfLF~%$PDugci3fwF#m&!7$nmkM@cXpa+(!rPxe-QQ zW^zoZ<=&*c6KYbapZ*5 zRdK;NX=MUCdgCIkG!s3;T%*~x33rLbrqA?uX_j*J3P&srzqs`EVSZLm9h%4R%j_=_ zpEj~IHZ9-z{lfE8@#2(z(w-Yrcc%K7%z3~WXx9H>(nm@-jq?q1>l>(_-6F(!|9Hnh z*PQzbJ|?H0Bu0Y3>E@o`Z%*5x`i7A|^sS=P>t-Do<4IaP|EZHsy4LZu@}#U53v=)P ziMc(s%ii6_9~QwSD5yE%`F2N^GQ^y&o~INO%f`VGalOTb zbo=-TCp1Dmksq)$1?II2088ww7dBQ{c7)>L;WeDLuv1GI``!}vwD&YmFx+S25sKR^ z1t_7xWIH40Xg_*FjA^$F7w?kpuKJBAXR(2>JO@@{>B#!j2CI>g8TlCvttJ%BPq^tQ^rF8`^ zUv94re^YMQMMP3o<~ph^eSZ8=swrjXB|PGdLJRrlMIM|oZANTfXmoT}^W>Rc@)!ry zT;~Z3vx8p=-`DZ)Lr*SE4TGn?;Jl5VW0D`czTIs#D|GGq>v{1z`VyWP-%4o_f4zp8 zV8L+~MeTojTC{c_%rpl~axSNCl|2kFWTPkyS`QFm`RqNZ@y$UM8IiMyN<-gSK14)R zR4bsbzvRAj@zOUoetzv0(X8(Z(`tE5jZ~_b6FqUr6ToFkIF7?*T;KVw%-D1%A};w! zK;+ooczX%`B&?bsfc-8Jr37=f%VSF&F9#hg1AgKiY<#=I!NF1NGTTt~u!nxN{`559 zA3AfUj_#5j0{RAvGyP{c-|K>T1zjy%Jp?UWv@Ig%h;xQ>EAV)`L`UivK}Vr(r<8M6`AJj<2JhS*7OSspVd4wdhI(??eA zV#U{{WgPznOzcuofXc7!g_S;LJj6wQF)y0TxtcZL^BG8`kWoFwEXIU45fDfR){|~X z99HkbTmt+TU>C6<1Z0d6la($Q2Jzw0j)uS!wRMKOvCPrHc0$OTl*zLEQxK*x5Z3zh zg-D@=PdZr?5ZBp;KwZG-Zh%geiKN~Nb_^`0K4_VU(iW6A5%Jw?(x} zSQA+^4gF907U&?hcummBShaV=9ifiygs&MPP0X@>RZb_<-m{wq^tZAka7|zoX|!4R zYA`etRe0ij6*pmL$VFx#>@=bt4xm@FV22>$a;V$TIAD9~Sv0vCk^-iPB1p&UNJm;Q zf5|_ir91*zv(;q>YVDL`0S6y1`pUR`!oMIUJC=pNp8EAR55O;IN&E@{f`S<>lo( zgxGhLl&s9n(>S4*9D2sKaYg#A!BnYz;jrP{P-h&c8&Xt#F)-icQz z*+0b~989%U(pQG)kXOZ?+RjN7qzWfu+@|(Gf6P|i@N}{2e)@{MbyON6%6wU(V8BZV zLx&8uiK?dyM?0`28}Z$(9%yLav&z$Yn-?WA-84StQAn}dDfAO5x9QLK9{ za6XLoU#2x4UI)4ax!)Xzv?d3}KIZXZ-|>1A#0w-wMC~%xnNpbm)|^X6FeRC(bqb&aq=Y|2Cp|};&U0~EwO}mXQ)Dc5e-8e-qRBy`SXtP{U2#E>F|FDaEyK6zVf38uhhMqedEHIb;s{M^uYWy7l$73T9a>fD9lB=?`7`O%CRekD|;=k_aYEp2@V2 zQz=kb46g47tE;G}Ecu=XDNsmZ+S=~4VK9}kv9T_*xq945ddaPe3kwVEApBw?BO@ts z*=D@XGQq%2GRCwUbXm{#X~tm492nA|eVhce^e;`e1)v>RwDKK-D@2n-EdWVTLUuMQ zXT7ApG&C@SzMcXCIA=-%C=9U2lNC-afOhNbns0eVS?r6N@4EW?${vSu44_^>Ub`){ zJ|~12Sy;qGL~PgXkzqrC3M$8soJk4=IxygS%f=b*;H7i)NC#RZ>?Pibql$x8p+}xp zcSi_7G5%xNW+p#mlugz}vOm5IduR+R+w`9bnwMGo=%UZRef54`q9`E6aUc5BgNFb0jCf z@bujMl>mYiMo0G?)H;y)o@@7UA!hY{Ov$C>&?k;yDIG|fzt_H90+@6+lfOpJ%uflwH9aW@f zJ@fhik?E3cr=sOJ`6RT$sa0VzsOjmX7-GYEc{nJLW|?r8efOP_ zT_8s&8AK*OzTO>}lX~(-3j)SNArTL~H%H^SR3mvJQ1S5by{VjMJLjDfAFs;-T7T+z zi0d#RToNdnKIB!9T^w#sCv(lSxGX=g$8-?2?CT-F88fwI)qbRVo&K*C%L}W(XsR*7u z`LUXlk=XFdFyADsy1KgR@v@p-Fn(lPNJIqKYY6*aciMs(Zk6-26mLts#O--C*o@!0 zK<>SRr@?`NvH;aUo?Oc8HZ~j?!M=Y^) z4tCb>!`b2?UXM(;(`<_H@i-71aX0~JlW3R~T5y2aQKDA5tq*%G;s2YS#=^rwXxIRV zT*$&`9VR>)9fGm|KaCvew}jGY$6#R9enfPgOs zpY_XoK*u~25uzeNq-gpK=qbg4HpWN{3)0khg9Rm`E*e!974qmIra9tHBVp@uOTa{o z`b083;*J38bs-@P7=%w1A#ebtvpDj(E5*jVF^CBa+}$uSOq#Wdm8kG=e3_tA3FqmW z5ngw-s=3XMkT<~0e3DcqWtvt;l^9#|IY%K9oC;@ytKsCacC>iKbrl$Q#8LMzffr+1 z$;CZ~7ZI1!57)hxHHn&jDjfxs|HA`@#N z(jed(Cb5T6##;t7fJ}KoXZBCm*OK#pZJ`CWh;(BhkezM3fs!%ho^31+e(5t$W2*4D zHp285mm=TUsu}Hd1De-hp0eSjmgZM%AlLT!^!REINosdY9EAWv+sxXmF{)v+cd)$rB$iRI&K3xS8#H5aavs!_~qShOL9`uin|U4kv^hI zXZFdVW;{urNyf(8>YFP0P<~K$9vRNt1ZZ!bCm-s5 zVzx*(&B)>1hx?&rU(U{}9N!R(2oml2cVp3(%dhqX6`(Q{z49v1AvRG^4ZH}Ete~^ zV?aY7abkzuBZnH6Q`mNi_Li16qC)K}Q9tXKpU6~IVUtA`n}fKeD2wKo*e>1idb)R? zUFnyPwSq=k&0LQ8i~VW_;~$MSvSd`a*QQVEdzP>l`pA6L33R(!h%FneF zu2=y@p2LKx)*UaFJlnX%P@eh-is(VAx%2E^rnT76_J z)Pputoq-BJa@|guqeL!DheY~hHy*)wzm`j5yN9s%egDjXMF(LE7#yW}#W+V&8>VTS z&rNZX=uzwCjEZ017IH0F`?7|;^3d35>$Q}AG{@B>eVh5<%B2U5J^rE>64569gZcac z9$&B@Im&;2Bb5N8%2>i4D1`aBZ%)v`SLNR&$cIDS?tTcJOer zlA!N1vwKaQ-SI%5{@Bk3)n5Pa6ptsu;Za7vOktAZJQBO+?F^y3Bd#F|5 z(^8%YA1>&^LkBSWI_PAfV87J-#4t(@KH4w8OJ7VmD}SDAzjoi{jeBcevQ#-GLw(-R ztETA${s7++DljX2zM7&|G+X~BtdF-jfB)td>W}F^C?iGqtc4Q7UW!5?&LL&h8M(~jgSya* zlGc;8k^_Zu&78$ReV3wv_5DTwepnu&DagCtvo0jS>&}nYFI(mHS{4yNwTxQ^DS9x8Ufls?iV`% z`~yr_oc69scuVPCJg}BL4!5e05q&7c#rfH#)+{O_;+6u%B`XDhxL|$o&xJdTl8-roJOK$h zNH@@AmK*|x=4WnBI=b@}HxdXJg~1&584yg#U+~X9vbwxZp{4vK?p#{3PQCevY;;VV zyt(0Yw_Cz(LCLJvOMo7Vf2r%Iq!gByXE{0Cjs+9jqm9zOh0%Odw~jcrDx1i|X3tMM zW?M3Cc)c#n$N1|E3^D~;0-kH1fB!Zbn`>wYu1i=i7Wq13852p8`mcLW!vVo8+BMf% zE;u%|^BrmM><^Z35>_89Vd7#Ya8^U_G*=AeOIyRI_yr-ra}y zp3Q%^Y*A6aA#*dfS|$kfyD~1Z96$Xn!^$QX)Bj5brB&|Z)BitY5G%r!COnjYoE>6y znQU}7@%8ldC?Q2a0~Qf++0+2QP6!##(P9Z$%gv#Hz@?;2;==)vmMZ{gkd{lG8N>=S z^_;kXJ4%Y7lM9fz6srL`2?}Es9kCSw0v}v#2;e&A0B-+qa0k2rl$y=h@ey)mIvVFW zH%TgZdR3+ei*kBvZlkK{_qR05`3jOXyI)f_id3``7f5$zxe72lc9KL%t`xGeFFfBH z0oV)dm(CLUVGJ)myDGYI6#$WB`JIF#PXt$tk-IKc_<47#GTm|S#Oi=&0AMpB=t3x} zo`t}pE_OeCl&fgW#@^GkY(C?<>a%dWeCQ-&C9!<1DH_#W=`q3s_01_%Iot9}+fxK0 z7|)3qxWtXYL880-$6*Wp9CuH#(|a|ZP`*W4WiKoH*gsTu5kv?<88J>pBqi!FAMF6CW}V;ub)X56;6$+nZj< z>ILkT&-=aKat{(MJ>E)*5HxbUci)zqkvMI0^bf3K{zo+_` zfGAYT$q)v)K|(@O{O%p(Up*y|6@xUl@ifXez=d3LzUk85n#Y-x>pVbnc9qYd zqg9bQJTLEh?ROl&2pAf8J3Bi^5QYM1`C@W7_kbid)__`iMD+0rd)Ujc55&28hLQrV z1^RLLV;m$W)Yh4hi^RPv2Nmf%k>J^ipM5d&S*6~Rq!3_Uf9_TV$Zk{SWC1OVX`s1eeTB3NZ1`s>eC{47=(vOYgjwH7^-fLhrJPs{ow$qp z>^hlQhg&9nU6ok>l`3z{)SWBo?S%)CaCxr5Up`fCaUO&jX-ll##)0tk)XCO(XkzN} zepgBotaGf$W7#Ek_xqn0%Gf2zs^uyJ|N6u%Har5tiBZvrEHY71*(_Z2GT|AsA0eeqa<8?PudgIFB!*d*-g{$wz9bz8_c(x_ z%$1RQ#BFtu3n)^>om0PTfxa(}0^d+tg71T7x#ir(5%lKDsZQ)&ns5nzfT_=3amxY; zg-vE%<(E^)Z3#_L)``1%g*8!dd}0b*2vS`>pbw)mj)N>H~^sp#@3g! zsy9UY#+FD;%CrnHdwgUknIGmOGr$-MtBEi!afBB5oP$4!t!EIT_4OicgHKeTR65(3 zVvzqs+(QLO0{1!b>xW*BJUtsec_MoSE9&2;L`NH_d)8bZt)|AXBwBm5#($0J*6gcx z$Tm{DvF+Emx?h_v&=a*`5&w-c`2S<=z2m9;|Nn7iXB}B5dlp$o$sXYlDnhcyNo9wu zG7HBlltdXBDIp_!%OOH$A)91m?>)YcQ?J+i`}usopWp5G$M4te=9Y5Kab3^rc|FE` zJg)ZLiDY`wn=QLkPrkMUSWk_Nw}}R4rx+uG*zvpE#ZOb8hrj*7xsvG*eqFbCE;RpLuApD{`Q#hIfRMTbEC06L+)hO*DsEoNnx>DVSBzY* ztd&a>$Jw~mQ5_D4V18BkNpK;*^HQ&UEy(*`^nru3EdbTMN(2`TU$?&#^&@}7c*uU` z^n=q}g*$hU#rfx-ymXZ%xTx33!I#%Togg&!T6L^QE0wp9C)Q9#M6lQ`)Kh?xo9E6! zRHX$epEyyEqzh{wcA|{y$;bXDAD0}IWMqhzB1$r^;O$RHtI>0fv8cw_WjUvDDG`;N z&?d3Q8|NA&Ks0u)QL8_lh)q+tW9`9{kGG$Ei~|_MyRotHu&&jcLel3PpN46tL#^Ig zXQ~S1+^Q;5V z9E)0Z0L8>)D}O(3*FiSm6YQ4)fd~#o1>T^#=eFvbc!`lm!!UKb-JkE!wvP&-LDpep z|Hx?dP9@S=__LpAaB0-_v02~}DyD8d=l`Mkwc%>&`HF|?xXvfwcbY8vETq95>AP5? z(Qjxp=S8}w@7+OCQcFS$2>AIU(&?@t(vhLX&8jm@)6z-SFH0^E2vdt5=b_}8O1~IC z$(n9-SKL22-sx^Mw%npQ2op5<>U8tVQM1hr*_-VP5<7+j1lOYbKcED982C^bjuC7v42FcZJ$}8n{u`fl8$b zupC;99F_GWr^|4%ee{#u!Z7(1<6Y|~!#$uJZV%vGXCN9^ zRYC|o`RE?NIS&>|Nvrt)7BMLYS&0yU^NE!5A&hkd1qHD~qSx6P^*>@;Sy@?doZ~z0 z;*=V0fJ6O3=!UAjCAqnKdwMWlUKQ63V9^y0*1z4~oHxom;ZpSJ@FnQTHJ2`_sr|Hy zk`D$cY;0s$WMtz+Q-Wvihuqv)-GTv*LXb@UehInei|sKFa7y7Xv~ESu`}h1Bc5t14 zZ*bIzE_?ii_%i+oLu9(0*4U%oaY>&bOq*sYm(~odM%~xz1D(hEk|H-=iQrBjiJ^Nm64I$F@)uwuv6s;6ylRMyzN2pV76@)U2z z366T3cKAL&zWn{kpI#I}yF$*yLzSyJk{NC(Ih+SATwIxewE2dta?+<0_3s}@oO-#( zkZyEsK(5Ukd3a$pXHK>^`)ya-L?{)b1*;^JPpG`_E_JdXvKh(7XDH2D zyV^x=Bw(6DLKIgs^}ab|>anfdy@h!kL)HBEgpSd!e4R9o@^3h4eaO!geC>dX7bG$# zNx~|E)xOu(=<6viocA=yPhdg7ozhYBjp6<%uj6Qx)BbeX-ibk}KR7*snl%*A>V>8S z#D|I+dHobB-^2tRU%j}N8fuU1%)I6PfxmUW3_#Hi?#;*DUn4KUp-afHNlxdsM1L4P zI7W~nK9}pMIpwxrF5bE$C0;MudQSHAi(4bbUlTZ<{3C_3r^8v+suUfFbg)RuWSpcD zcVac1Spm|=ml2p;nR9-lrSX8KwkRJ7w<}#f$_Wt{cJ?>}v_FHvoE-HwhglUz^5H&# zn)k;}X-n{KaFPdd?`a#10sa3IJcYQLHP@9DY^BTU?C}p}%tNG%kAj1^@;?mwvxoRT zJ+o1X;rKO9YNAbAX@tT0Z(&dgPQl}s{iq(U5*KM(H=kd6d8JgGZ5m&bU`%-BX$Vqm zG%_p(<0jUWT&Vq?o$~V?U!X2EyI)!FXcKG#=XY4hTWDp~oETYn%VP0PS-)Md$pQrd z!SSJ>(`BlecRh=6DaL6GBPIz55_AKY0fvff>Dm*e>1y{^FU$GLh}Vcai4QH?;nA{L z_5L2sS3;Cse=aiHPUdP>%NONmO@PbfHD*}c)QHbq5Xdo%9(7LUYRGu{M{M}KSlC*F z+f)U2F^DC#^xGQ$We)Yi>u2eM#9Q6(PvsnKHupUsu)+S>SEUj>l&Mdwc_=R50&sKr z&aG4d_gVJOcudh5zrywOy1P4Rh7^9A&9z~d;PYQ{kv)qMBDG* z*VXR){$#T`o!g|4@sC~#Oyoqu1t~>=g(o9I*_)O1X1~jCN3}i3T5v7>)UO9pRtFo0 zhVPOkduV!di@B=brhq*m#9(g8ILNZNhsfkHHmW~JA*p$QbCIU}h1e!I{KrMVzvGH< zei?i#8IZZ%Ml~pX8@^}SndUext1|+aDt}&*bxV`pBT;B;=KLlF9)bl(i~L2g`y;au zI!-vMgs%~2oN+BYb$8X&A!>pAln!YniM0B%RwXSV(QB(6>Kr8IA!_B2eHD2&V3?gl z35U2jU*$$w80i!7Dx~k`)VRCU^s-l??|s5{g?-_6nSD0+B5ao{A|I8z^;wd7vMk4G zZKqIb<05-J&3w}n!X2r}PfCf={>Y4d|IHCOx$wEKg9;w!Od6SGSE}N_qL&(yL)0nH z73|9CM}AJ3(XD(Rc?H2V6sXi!&BHaDgD#(@cj{#rFxAXbUT zEFZ$TM69W+K$%&{l(&3I_Eo#DnPO zD`fEyJ#YRV5vuutZO%SCjgp%?<3&b9gPykh28verOqwxwKvq_AaOG8{0PG`-F$G1S zT7@NMS>W6a*SxOZWrCiwxVYch5tbM-s;g~n$E+h+ihjKuRuH1x4Dw(jnW{vuGt%m7 z3M7Qhgs1Tb;wdrQaQ?-ox*odpw;wZJc&2+-?A4W{lZzh2>r2jED3UZpDhK9mnZ|ML?1~NKOE%$fmNXP9EEmzqngVl0Vn^X=ALc=g z?E{lwOy31WplY7S#H{CMW#u8d9JBewH3YWQ1?X_YWt=$NCsYm=t3=A?!$#siH7fV2 zm~`j1nGu5BNqdp&u3cOAPJs>F^$2NzDwJrI*LF+>iKaeIY?Y9Vw2 z^;xsRY$~#{wE0e-Ryb+9#!}(2uSWLwbRj;^n_~pt`Y36>dKw?Udl5O{nVglS zcwK+P8ZG`bbjp~RwD-L-1MQ@Sz)^bEEKPl9&9(afoW3EFm5JBw^QX^m*<~Fyk{4|z zZj{3E*EMY(hH=cwoSnSNsOHzKL+qK4DiVWE>Y%3i)~+Zwvntx!u{&1+(Z#?m!}-=! z@GUFX+x>{FtT1qI?wFIs-}5iMJM72%)+ajD{7l*@bj&66ne-Ta31vMMw8g6{?~aA1 zHDFWNo*kzktS@3@Bw=%d{Zu@$l>5zNez4N<$H4L@9Ky$8DOFllv~0-w($3Y=Ei)@U zSC-NrhCv`reKaPd@tP}biI11Jfj@jzQMcu|eUs^)MD43kySFi=p)&>gLYs&#H}2hw zIx%t2pFh|8z}NWgLD9i={e?IDEyv zN)|<-plvOd13Mh?zXzC);uI045xt`L>zkj8?SuP%yLA!{x9?*xWC|r)-KC{=W{VSc z0c-EN?Hz}7t#ETFG#Pq5FDr}sy1x4x_j@kIF{`g1NBVSz#}qWjue|R)OJ%LE-&Rb0 z`JCHsAA#jRo8y_CM8cY^T`ccl`;WP`%RU233!mRQ{s5y4EJqbpAmPJG_f4Ghq?Q>= zN#~Dz6q*xCQJVKzyW+IIoNp8FL|kO$Oq)QvscAlV5qg&vw*B?{(E*jczD_5*h`D7{ z&>>+qc`^6+Xm9iJ@f|!0tq9($sHgnr93$LlgCo6pPt>z0oTyLX8)5oxr=5=?;$RH! zggcFL@jTq5X<2q@Y5bY!TQ~GKR8@7%xzjw~#?;Hj80hQ!>ge|{rN>m$t7=?$bwx2U zD(Xn7O!O@7Kj$W5h%`jX;V&%Q$Q1BP4gc$p_L-v_a0sJ(6e<$Fo)8~lA5k!gEM8{0 zxpzo4m!VGaTNw_f5FZ5swDXsw&42g5Mt~KTDUmNs4Cxh8&ojOlpS0_nfr-t#r(~;p>umKGAaWro{`@*3 zFd{LM@YMaeH{%K=?d9%TW0pDo`v+gaYztEaUWO&0xr9`{916pbDB7bNkd*K;G5!GT zv5U7(5MDx>;q4d$o{Vb&CqwJ!oQPC zg76pE-Nt%ACbCZB-mbYcU7(wp?apjIe-P|GBdfTym>lc>J@oW{>psFFXr< z{V}}G0DXN*mCRW0?%D8yy$e_X~F0o1f?XVQ{xJbS5vcBNNs zj@6}hH7Gg=wUMTE9CJ$+PDz?)WQ6@O6L=p!}s zk&W=^nj5=gEm-IFFTtBX!idE%^m;PbDzg9N2yR+O?zB1T)oVO&_ti9ah-s_gT|aKr zCZOv6b10CwEUO@)BB{(nQX}9}pwZc3Gv{b8{BY~cW>6KlMnAa#gj_g8LoQl*n#cd$sYY%Zs7ye%zwGZ zQzP(DD3nhK+ToH@cdo0TGeTzqo7lbQlX1gpCZRJK=%Vj3^jRYV<_MivaOq%2P z_3FXhd-v`I=(84y=f+(<{Y_R(OpI1o&JRJ0>+I}&xVyXi^m`kMD+DSsA<8GD5D?!$ zi~*(J$+R#VF4`WG+9V52v>8?@WUT?TlaRY!h0T~;WloRL4z4s);^xH6j8s=zSrY@J ziAreW=jUJEqY?+XyWH-~n@3e!``mnd#KV=IG&>JoQj<^wQX3-O?&|k}BFnwZ$h~9I zV=X37>y%yWV(s_F90w@!lsk%Bytg+!*s_)`$T(&@Iox}7>URrU-xfbzfsh7qKp`*r zvzIToUFTSd)deUj-RCxe+T5V}&=u^5c56FJ z>-)A55G`e1GZ!{l*^OjXs6}lJ$Ltqk9DIL4r4fYS-u=XEb`h3Xvuuz~D0D_iRp*Wo zHuc=l0{O{f6e`!R$9%gBxVSQKOF;-w;DvWAC6=5d2_`51TWU+!La7~Jw)3P(%LuxQ z-Kz>?7nlwOT`%K&1-Kqzxx}{%UWbU$JQC%R@#>RcvHYhjL}~8LLE#cYB#p zjo-cUlq?HHa8XN(ItYOjyBTyi0Md<>0sA~I2?<7EW9m|r0o+fY*~mM8xLX1D*t9@vtyJMdk%)%LqR0E!9TV z1(FgH0N{!L@3i;t)$?jIGs)qTaNq_P~b^4a@l1|ZfzI*R^3}VziufceQ+!`QL{c;k3;_=b>6>P0L)4z zm!aFNm5`h?C51M1Q^51&n}^M`FAZKla*Q*0kmEEZDgDxgo9VpPmAWFjZ>WU^w=04P zVVSqji@kNAzSpX7>2AO26_JFHh`18cby<`gEImFwZ>gUoF)wO64++6~G}FXQIQ|7aC zE(igepWq=_EE^gcVBcB4en28xy?!zX5gHNE^A_{z;W01BB%i?tk$dE`I3!IFOAQd? zW6_#W{7aR|0`kDT!pwErfBUHt& zKBT@w-IoH%WKDbcgDDNl##DI?ZPhqxqmd`3<(0j2z47>ZW)P7bLMoEz{1|zuN zvfe2~4PKa&q4Z~1^e7I_c#d_jm^$)kAG*Fp@-Q)HkT3 zum84>HWal!->=v4Gu=!ZDE0LTaohLV*=zgWZc3auRO-6G_3!2EP6ON&zGh!=OQ~iJQRyV7CpU>J221ncHWW8}UX1_b+PF-IkT2&u3h#! zmgBIBHw7tsu^qYm;ph1AWZ#e@1(Cq*GpafzHf!gg{)ZA0t*ZK)qYQxf(C~0&VDPk( z6(T_V`sTxjuOcVJAZ)`6Qv0U{OJMR08P_t{rGK`(5ZqN!%btK6+_2zQSkir)L`8pg zycin`8Uj zkL3O$i4P^Wt~MlZPI7WpfBHRm)FkiLV&ONaA;wOstHygJM=37_p>9pTjSqd*8eXFOpIn4|d&uzVpA10o%j$(V^`Wr;Ug(MJs(2&YPPv}% zVldZ(a3MPU7R5`Sv|#SefOF>IrwF z`&JerHxh>(4Gc_;*(>}^ABtYi9bgz-fTi7?a}pL%h5xRRxz~0#IoYpd(4H-Fx<^-E zWXP4)#dg!G$%G2-?SBi&Dsq*ruSxj!SgDBvbKlz~v(FbWL1>k^N^qP(|8F2uy!ZbB zGF^K7^g6B1>zGyTtgOS%@qqzCB?dgng-`KNl$H4npTp!`ieiVAn(c|z8U`{9!NT&m z>&+!Xo8P}UWWKNFEJ0S^Xs?uz%e)*X;xRo4DstGI8|um)9uh(MSHeB8IVgwz0%Ewi zjf>Pr6|CD6L-F%MkgmU&v^E|jjLn}dNNHAv+=1VAb<&VS6%#%Z5mGdzZB>1S4O93G zD{$X;>AH|&WmB$*|1<~wwx8=f{{4dZj#-F+FUu8)q&!ziJBaLY9HoP`Df92l@#kqDJRN>)~j$m4EsK%#8B zd8lD+!Ho2HXlx5~A?p;%A?1F0|8a6k#-t!@|Q0 z64XDmVLH2G?QYixQ|&5%N5f0=MovRxR3 zb#--G8~v>~^-5_9*sfQS3oT!r$!Ex0D*l4Y_O?zdbU&V<#tD;X;){lxlBNHocF&L( zu8@QfiVX^*+fMJ;DXRh-aVdn1B<#_@jsb!Uy@D|m>7j~AcHFyw%#mhgRdmUeml;E2 zt$Inp=CjO$@{xF$eG=N9l@)K#7W+%=K#)RZj4cfWG?q&v)u1OsU%eVJcmQK$0)GCy zDIbx^6DMlX2*{|8uI^3AX|8Ko+1Wb4*}|V2eGFueU{`$4jFh^>XuhfO_ggi_5s`gL zCx%ERxbCmxKbmp`uMZ78*d@$pZ8F(7s*JZ7>v){6vQ#7hoOJEB7lbp{()im{z1l0ROzw7u(yW*z|(b+>bBVSk~JLTmeE36 zr^gtC%`ie|tcg7%P-YL#Mp`q@wDV}Um!X@Q0tP2_Zs_WEbA-OPH!(FK`gRBRsq9Wy zv%&FjAsZO6@#?`%n~Tqgg;}C`N(z=^^#Lr2toNV%P?r}g+<8W-w#M_U`G^8t*sFxj zoi{#iY&EC8Ma~yy&vj5-He-yCWMcp5eehhgrmj5Uf?v=0_IUzIx0QjmEU)$Uk;mIy z#M@C{lcXNx3~VXJ^ZYq8GCNx*z+#duM_qw1ff&QZ8=5q`2a%xmeOCN?e$BJAukhe1 z;Mk!vW&)Eann~D^769-Q3)~h})zO81Itm5>`y;se7zF?lCK$Pe`M_d?ud_E~_Au9q zuU=5lc0TOURMu-&qJwtMGDC)>Y@;V6N%{Pl>~_Y1e&Bhn$eScZ#ro z#5kT#>T!^mc8%p_Bmw;Y9~>B=Jb=kC&R{cVp->?jv$$2~M?5|MCQqt|N&Qxx1m3_h zh-N%AwMmxgc_Fm#(jf<*c^p17Ul<5jXz+Q=f>I~iah9xjs0?^1e8ySG*UR}vofm=S zQeL)3MqABZH`qQyv6-o7HqL6~EKK2_--2uPX2dkVs!!d2p$Y^F^KV>R;KuTJJyOB} z30@+G^-+~{vigbolJOPH;L~N7M`m5-p2fp12@{)m`fJ(6b4hB%qUP?NP0xz6g)SNo zFZ`Di8FvgQtW@oP7b~rOgJjJ8&p%GHCr@8DFq@F~Hu^Drwg6=ihmG?Pt^0ZYSZ*R^ zN^0gQ$vI}1k6FjgIE~i7oh7+WRG_A8@(GvnOhiu3F!}yZa=GrsyOGh)ze_E5bL znX5GSCVs`I;u9+LBSS04zG}LlhETF)Wg2&iorgkn!93t-xSH~D2Yuy-dN%Le?_=4n zp1vY)CFk}J4}yspNg~|F$Lr#glk-X?U;g~`XXTM^Xw!r=b^yjGZq+K*lC3Iv7?fIG z-hY>hFo&0ycP3lpd|gInIqhKH2j>Kto}=CP{Hc8dk4!z%{&X!zigtqw+kE7?Eq&8V zX#$A_E%&WgShvzQP8_Z4+yMcw_-K8dD-=ZSpiRai0b) z9*8{#D3L4xATATZ!*|!FJ0Lm2m=Nj~md7iR*$(Q@BmEhjeVxkHlj7YS^bsbM^ZO+- zyag#QoU#Y*(<(r=+RmpAw2G-KA>xKT*a+yH81pl3N`!JsNl9-e4!V^%j(52?zHGP; zZ9hClSmb3FxNPeDI0uF3dJR_p{p4$}^CHR=`S{*S(wd3AMDdVc{6L3LrCS zZ{FybIoAHrzkQqU>z(uOuDnXC?FDpVP)x%X9n$Nqxbcq9ZqxeHo)Nc3u>H$Y>FpGFI& zV5GPW0)FJ{m5hA0UhhOYFL4mGlLtetBIdZ@4FiJEZ;<9@~fBH4Tv_&n{4%m>F#3Q z(@N09mfq_PFi)~HAkiKjTSfP`mdY*+u2rAf-3)KB%UCe(DmyydQ`vp!f(TaQHcpO- zp%(-W?tS|y&CnTdz#yuDs&h1WPVHH_pK{TV`$m0Gp%hr3I^@j-1h7tlh~v;`B9t#`{@!7(56a8byF^d*=~05oaPNf0?Df_} zY?8sS=!Yp7{@@s}_PEj@Wo-6TRNuf2*gC?~*~bqGw?hzLn2y(fpm|Yp7FY8;iJknj zg9Oud_a?{Dc9Vi?qc3fUIxWedR-aRbbe>YTylaNRDPW>R)u4CSo1DLDjmaM#5w^P{O_3<})p9^<5Zsyo7y<|8n<6gpFP zE%7{k`nz|2lG4)Bk3t)Ns4-IVQ)c4)-Y4YXS8#H2(u>qW5D%Zh`8%&oT|LOBVv&Gb zb!)_1JBkrM&sUiY_8EcjA7se1!i}6D->QjPR+`{pQIR+q*EO(Nz=%@lmEfvyND_xnTCjd!BkZCEjgb3i@P$JV4VaGZ zFLug$EJh_&&#oPoG#%^*8nKt=85f`B?Y0!8)NP57NSdcW7a8ls_<{4tpJjfHnBe_#SCIWqRNmx7(zlo zKu8L3=y52^|G*URDFIN0!%|+m)>)vgqo+rQM?BwNW+I3kSI0w@5TXbyCtF@X_bo3! zoIBz<@_QbhtpD{ znVx=~+c4(n1(wMpr)kN<@l%_(u!^u`IW4vzT;_V8WNF1vbzeG@!SvbM;R?)ka}36& zaCL&?2nik4$Gwe0d{q5gMB<96>%pUZZ2Ju5VM{<>)A3#8j}*Xnz}egd3>9q`TlPhe zRS1U8sFMiiQyVIUXPJlNj$@NykRx?BP=2>$R2JRGlXLX)8r>hG{(&@Z)HxFcV ze}4lMEg=7Z#|<6v;+YbVxF19yepc2wJSW_0f=Tu6SFU^=rR@rYaq~x+{F)N9NwK;T zMxWmqX&Qk>f;@t z?C9dq6SKZtZUL*1)^pV#x*d&Nr$kE3-*>kLMGt02!L)0o6*(EHA8C_h+yf9lshp^`NVSvf z^!1ak(cdnKDR>!rcc3NEJ+~gi9see4)&d^g=70Cxs|tbe)t6aWQKv!u!3)}(7{GEm zd7|Wgx~M-dv4Hkhob~`l)+#50Af7+H3g}u#~^!zhJ_s?pyW_sRVMpjUTr*@foB@L zw;DzLcPKVCKbl-O4bR4CssyV0_Ty^MGs9bFbO%mzwo=IGjXQT(UKtZ&K&;9(}Xuf79 zolaCZ$~g$w8+zv;t|D|5YMo7{Z`zlTAqu~J_Lu7s%5qF!&jiyzw~-!`lpZ6B&|#$Y zbzDfVPhA^vU~6$p4D~qKWq=kB37z5(j~0H*I+bg)N85Fedq5LS8a9)tuQ_(jjzLxW z3YsExMij_X`9FQgSaCVGe$Y{ZVooB1f=~7|FEuq=TAYQS(NU)b;N&O3(gfIruWvBS zbRZCZJ_qvuXV0Gp0$DV7Dz=CWdj|7T+%|gT^kAgFl@-r*eJrdfDUBM;x5KAqvaWNl z2KaGfODNu}cqk&0=%i8pRghqEg0Wh*^rxh(X{)lS@$2XO4pMW*^v4;3tqV8HtDEeAwwXPlCC7jL+*lAeKz#|cbEh;qJ zUgr8eXC!|5( z!$h)hv_U0IBW1yd`CM``L(K~joYEbRk;O+Y#f#`8b@b5?Ontwk4=Spx*~+Y}J9J%M z`ubb263L;t4`>75^!idj^naUWSCW-g3#vYCXeS!v;AvV%_?z^pdky-1p0FAE;&knW z&t@U-Au<752SgH2_~XZBB0^*Y@N63u7Z)*@NH$R<<7N+NHJ%~x0IyYy^k;rNb^=;s zJa*FbB_I|w)D~x?23v5hvL)pV!To1^n22+H?tSvdY#TLt#7!SXUFoCwdZX^228Hbn zMGh@-1mIfcA$1o3EXJ!#_)OpC|;(=1=2qqp;x^%Ud}eSJm)AobfW&{Pm`BA9;7TC!D`6Y$QlO4 zQ)NFEc2v5_tpK#XkkCfFv$G>IXwPCs7R+yhPJ_Gi?@?g6ijc}k2mf~C0;;yQWew({ z1p?=?{j^rAm_8u^dSZ$|3%3|)N-Y{!#^9%7IrA!tJQV&PFDK1@dEYH?3~)JsUaBgb zF8I}YzwLNNycWrp;?P(B2!cna`wt1WgK!6Ooc@C-coWQhqi)Wz1Or9kt1Zt+5o*9&5U zXV^MYVVIuSH$bzJRpvQ3P2Qdhq3+jUyk_!gDf@!uIkw8q2cgrX(XEsk)YIX6Cb}8af-U#5I{L`l z7vW$UU|J76(9vm+J&z_m{i~VCTMVvC_N6P%h!y+<#g2WJ%oXig^gb`Jcr;3C&e{3s zAwV>~2aZOj(LT$Rl1q0{1&B4F26#RK8O8TFgj$gi&I=U%8Q#$OcsW*78X1&D3wzGiW zw*G}U=Dy?L1zx{(>~ntnC2FXtt(zvj*?C5x5H<4<`v_Y{h~KZUQ~t;wmm$LMK43P+ zI7<>WZytduMKK6*HTAwFvP^G*fe;oYf5~)+k^1>Zc^t0u^fsfBv$-|q-mA51+ys7? z@Eb;Ybl%k{x%7*2?ZRHC=bm3OyXKJs@8}!UkJu-Kl9s#66z@55oBZFx0GwpTl&vpj z3OSXXr`g2*YBkIB1`+GsBY3r3UCp%JB9?Qb=n;9J3}tV}w`GbzvT?%3Pf2!p$aPqf zLVmeh!;E4$=cN((j2CZYxviI8yUEt|b5n3sUP&7GF~6&F_hK zOvjoSZTk;Wba@UcCCe`4<;hO!Gb$*$kiM!Efj70_;O_h*yBfZ&ae@Il&?laTAR3|7 z!q^ysk9KyJMfogcF7G4rq=zP+_SbZE0jK9AsV+G_E_8w!#&wna7&tuG3*219TX5|2 zsu_!R2YA|S8lS9U@9Jfc;9gqoz~2&Yq*G)LewTSiV;M3?lCGBgs|7gw@#l5E?h^Bh zqnV$DD77*oq%i7Cn3=La)gk~AL3+rbWv-Sn+saZa$F02`(!zzvW6CUM2wZ-WT{3E{ znD0biS_IVAn6#_i8!>x>*^7ZXp;t$O3tLjN*BMefg;0&t%-$+Mt zA=SS~2L|ITOo^9w;Gu}Lw3Mu~__>UrQt*Wb@cf?wk@=EKFjM<&oH-6Wg)4y1bIZsu z;~#%?cPQV&I_3R)GMHVjdgTgbh38VFm{nuS$B(!7551SFL+~ma7#63Zsd?hG_G?N4 zV4v0SK6Rj(;GvXJKR#NK;?*_|qwM#mWpuY)$BE+T5-+{*d(=2}x^~VrPR3o`MT|MV zs&6Y>1M7EL<%SONomlBsT)5tg``&L9C7ZH$Y z?zSL(=^ey7l>YV)oLpQ=OaUP5{=HYV{qyntsaMuejlhFfGbCa5o4!d50ty$ys{0>D zYM^TcplNi`)OA`W@9t3S_W8P<*2lzm=T=Gr3MYuB6&KC0Nqz0*IUO(~9&nJn61GOs zQieB6o4WL-#d1Dc<~ge-NXSX~D??@ua^6fV9XD>)_iKKT)e9@m5L{`A*Q-FS!<_%y zGi&`L%WW2a*>MnWJ~WtqvF+2*zbNU_`l`g%@+E{Fwzjs5Kl2@Tt7A%93Q<2sJgsvL zN_7MyE=l!aItj*%kg-fLWpYMKb%kJ3!5lf)72IwuI`U_QdN&YtQS(7m=syCrzz4b4 zqCq%63{(;kH?tLsktKej1hnqTWRHOQI6&mo`_J_XQ?ko}?CFD9<&DuA7-0sjRD!+H z8T4EOhx9%4sC8$G5_Ds2@fQdP^z{Kd0?miu1~s@X@vXbzh$!F)a4|r82nM-gvpw*S zpCEXjnOV2ip~yjtd+M>ww&%_`l&)2rH0F~z6|X0M$tigYlZ5>&az?68T*f*>#?WOL zfN>L}`yM9byYw+XnTVT~uhfx$zi@$=mvZrjm!Hdlv|Q=#6A`M~7U}d>-1AqJ`!bjM zQWxI}T7oJ@x9yIr(xr~*UwP>=i-Yp61}w!zpzI~1^0Z@4T;%Z&;sUXi|0mkZo0!+TTlDzuA z0h3b{frThCE}$QWxu!dkn|3$mbYPGz5SMtMsH%!JLp!+<*frplNVtdq0ws8iTA%Cd z$c=moHDcAB}VGz7l8H}zg4w9bNHA}? z%+!8cyo6n`ODGX0->>BO>>Qn$$2wNUdT}jplQc%4Ipn>{wT}$JfC<^C>U4+BMCx^<&XCvN7?I3O7N^6*flXzhMM zt4czAd}5kO$t2)FLIv-b!3916Kr6mK$89* zUnr(S5hU7zQk9xXc940+e%sddO!%oXnmfM)80Cpx#y)Ug1)Z`-;$B8gcSB$tclnIe zPlchAPh*0BLBP8+zQ}siPs*3pY==)zL{=)~LS2FIsVLj|`Kvp}II5|-cHc0he90Kk z!XU=ETVS3pVMf`jFY|NGyqD0eDyL^uf=u7CgE1gH=H7W)edFm1Haa18pb7%>g66$@ ztJ>f!=YWUCej6p@s$rS;=y&!9t+7Q@L&H8Btvo!UZ69>Gx9GZT1#UmrcZh!*M2AwW`>qk^ZRzWg=(L~ad! z>(4c~;@M=SLNOo3Q7VZRrsBZMRaR;S@688q1%42(l=9nZK9Set&daR|)csH_vyuiJ8yyTVqGrnj6L%pH+gS^gSY zT8HJ$w`Sk*w^)JFCI0DC#B*rAuEDcjRHEcoLz}p+4*22m4=|{}UoKf$r;R%lABL)F z2=r+P* zp87^|Zo|V18A556!Y&+lBNAth1F=PDhy8@DE6ytJc10y>@jA$Oj@%lr9ck%_EawT@p6*p3 z^Z6X2wc)GVqNUry6w3cNw5G(Pd%W|3QwjBLpqgdFW(2w8{~vVh?PvL%`HRETSWl-B zqu1jAx_b@sO+~WhfhTI$f-vwf7r1xvl@5$i$3we6RM`OKq~Q`;V~rKHh%9%VP6H(j zyap;E0^`&2ifdob{{Ryzqp0~4ypp4_ahA_sD@3A*V931l$Di5P{tS> zMt25606_Afmm%ZuS$^a#{dfPcLH89XYf@5t5u8RH*)ooCAxgn^Gyezru}v$VkV3=#~{W<9*}7j1sydJ4@zK z=;{Yo5?0wwC4m9f6RJ9~z~YCe?I(t&Ux;_M$j@!#1ag8$?a4#LaTr!bN+2#S&M7Q> zHBS&EMGy|piCENpFi2IUaM*gK=wDMK4|QVg{-h|rW}MxF#Yuy46r&kn!q#rKtL&%? z6s99e_F2`)ApZe#HMc=jxbBt>HB!ZQkj=>DmwG=X6E8@)4J$K>QI77W)&m zg&}f?E)zE*Q z0`!cGY3~|+C4vhI&Ot-)ro7+9I1h_|Ks_EY77%c$j1q?Dx8p#E1&US}9oT&iWt(}U zUNz;f^q~g8CK!&u^9T)|`_!7QBk^pr;9qmN<+)SdLZDi?Q#-e_MGz9a4d8@Y1(`~d z{K=s41BND83FA4`le}uc2uxYe$dQ6_%3CHLRV45a$uImKhseYaY*#M&(Br(dXeNJ< zwNUlgd2Ehon$ajfUCXxzGWWAj+!TWwAU6VV&zYWtV0-gM&S=uX-8Y_t6er%oV%U^QaXzS;=23keCQ8r1SOjh^T>piwEcVT6Z^-Lg#ZyQ6Q`meYF z4uFsVqM;SEC`cG&d*g%QBRmCNsX_2?_ucM8ZVKbs>z71&t`9WhC=^-)_&`cAdcC(S zOek^11S!rct2)x+|72vm?{N51$%Avbc;EDU&gP|WA6dNOp&@S>lY>%EI37* zkE0vb9X*<&3=I|!QmD~*7uI8mcneA}=&%;m+s~nJh~{l3qnB1NB$ffLvQVZTP1%Qz z(JujT)(=NJXn?QA6(YAH z6xT}pcKh$LNB+X98;5Wq?2dKdt{ItS2zyt44%ka-&6hSWeuC> zoN3343d3=)yV}zp11?r@2%{q&zBC69bTThv9PRElh6j#T$W3{Eqp6TmX=_!j6K*dp56peh1%WbwVN!|G*5A6gOd8Z?JJ`c` zvI)Y55`d3sg0tDU@GgmH>?3kMZ>QIv%B<|gHfLm>$XBpiMc%#G+ZFLeW|u)hcK3pU zYz6U~%cVE3+bXEVdT~rtEzI7g6qSCm9?eZDDA3r%cWK#WMaRd~2TbCvsPy-g91Jfe z-lRQO7nGSSu4#+QULF~5Re0gfBuKc+cD?7q&7r;wQ-w~(h034-ucOA`5LSFO=L}ca zT_l+4qYyYS$=IqJ7svNRh>A!t(5+OhG_rW{fZ&(sC-{^C!`{CRP2tz{!q;LUh4OU) zp)zz`)Y+&Za!>C7umk>e@8VtO;HTLe#^@;yALEF9Atnxn%i9~Nt$w4POG4yJ#^`5S z-n1lKLHta@G*1Q{eeW5(yz=fY`ufcULkm5;jp;zYY_NwJ^!Axh#YofkoTR<3e;_{O zcmfQ4l;4cOumc&s>Dl8KT1Gakt!v+?Q?o?!-C6L8n!)^{Z%7T)!zLff$b^8V1&#{Ick?SZOI3a$p~xsILQzE#JPJX&Q|gtCvl z-jZg*+IKQ81x97L>}s_U{K z*SnZikhm$^+07CvDU^y1Wq^HMU;p3~EfsHC`Hi|q7qYIYq95qCC}w8u)}x9Mf8#gqQ(%t^V=iI{qFtB^ZY#d!z1VHv-i8#de?i--fQiz zGtkMi_zChmqL-SD{h((`+bcm0Yn6C`K!W*##QGB|k|>spGC@mC+W)nISE1(rv4L0O zWAwwnlJ`KZ1UN{0MnW+w4;^VbTYlF?dsQzE)8WRWnGW^ALS z!WuPfHSLCt! zaMVZ_YFOoi7q`gfeGZim*R0z!wexNzgA~$H5zelU%Ldrs>*Qa>B7W`P1cb@;KtGiE zJu$AQ=}@5URlF}@Am#O*`fzsm5;pt%E$csKA(-?~ez06C<%|fk)5&UiNj_Dv=Jhg+ z`^o?{i^wA>udFzZ@EaWL?90SbLjqHy^k#o|hQB8dV-{8vpiDtG%1)0IZYW-PJ7v!I z=2TjJSPJZ$4DZa(T|q=-)JsXN%dv-g+yL47hZ(uKJP+Bu*dt=C_}=)^f&Ha{4hCvL zzwS+eQ*28v0G#_izi&QBD&SePD~~(hfw(cR zzd4^ecF!o&k*FzhzHgx*?1#K zz=edZPy&!z{yR}TlWPP#(vHt``-RROL|A$Z*G%iiMQ} zZ*Y+lf5Jo1<^%Ejz;#9F>!R^zk@?B+TfGhdl1QB23FHuhz?h|_7MS&#ib?dFRdR*> z%{raHFMKmLCiv zkeBS8@nm0vkg_k$8{=!NpaXN)dcnC4)%#qfPMk-kQE4G3c!&#HAxyo;-j~3L9*pc; zCcL;aX%Euvc4)3Cp3qT05r(GR#gW&@$A`+?JDucW6y?6O&Ajp$EbUxFH0@H8zo2n8{*KThP=v!ydH&Hx<3oRWoiR7}5Ludc^ovJTvHfFkS?Z@{!=d zOxDM@Y`UhyEz8bP9ko}34^tUnu5qF&XiLWF27{lgf;~zVBhD<#h94FBXqA|;@|8Yy z*qxI0PN~8MqpEf}B7(p{{W>r_l(CmtuZ~qr*rmZ2Rkm&%zJtw}@&ubW|52X6+1ap@ zM*Q;2?EbCl=eG^DDi7s-CUbcjTY4jOK#S2|o)qO4me9tQuiqm9&->Ba#SeJ~wj-FB z%zs7j+%`V`Hr|=SYfxmr`X{w7LF-6>o{)w{bYi0Rs<>J}WImk@*I00^*WL>-c`me7 zpm=n2FRI1VTC`GzECej4uQ&Aw2K+~L1Waj%$U6YyM~R2R;0_nhhW$=QPTfk}XJxQR zI%^(6R9451EkS(2Kqk=yj7`Q+>u7$|l`Oxa)M8v4bPbM05f zFb>^%=kc{2aOV#1QD*!6it-=mNjDwC8^YSS5YOx02Z@*8=N2}<%}|MNNxyu#)_s}p z=DNn)O*@KRN{03<`y#mWE_shmy_|hV8@tmnX$UZL;0ck*%TA6v6PFtr5@7%-Q|irf z2OKe4@nLH(z>wFTVoDzbVro|W@ZH5R8HlgKwP`s*K*=K50PIQ422)9fLTxu<4E zU<#^zGbN@MDw=1s9^$<_uO>={zrVyg??|K+~;JAOi?MaB&@+p9Jn^ zy@-hSy8vOB#8jqa(Ezpcf>U{@#mXn3jc|UHfv`qT}}d zZEj7C!kiozn#ym{s^)z&ee~T{1M0&904=O%^OHVYk|gkOJ34N$h`dp|F?s`o>1V7m zN@M8nJb-dXCnu+-Y({r;$|#MFfzLl)n;^6NUD779850$LPthgHkm%MOS_;n(YPKmj ztRU0-HwUt81~LEIo8L~)Nk|`>L{SCX+PZo;MVN`HZA1yCyFERulPtu!8QjGHS_MM+ zXTg=fH@_?I<0C1>RZYo>x#6$UBtPSaR%asYGVN`uG1D8LN@;rcy?IyGK5uDv6gJ=$ z`)C?IoLt7R#~7E;sQ!5@`)7p^a4wdIya5k0u&P{Y@OXy4BHP802&12s{aN8E`}MtBQi~afde9+v>hEW#85T_=5@*!Ntc8G ztQd30SPYz*SYkIvdG{u-dGph%Bio5u`X^-XEp8Yx>VQ@p?lydXMk3nO5LALx9pu9)UE4fXTSZJ|81#|Fk#U+~*<10Pa1pvnLz zDnTCWdg}W50Q2)SF=_rZoFHJIz2)(%SZ{?TOh|6(UC=(CNN=ElJBA-V*Tpymibso1hxknSe8H1L$h*W6~4RDn-6=kPCL`e&ef8M45s6}_7L#`6v)N_i0w z8-M7*feH&!_4f9j_xaJ6%Fp{&-GUf;JB=M4OKu_qG0+$gK1cvhFIcK~7^eaMmk#gH z2TqTn2@{2c5Om$H;bF2g=21GEEq323sLP=z*`~2ah$ZGnG1-8TAA$ zABtfb#n30t{AShqrSWORKg0!CTA7OhiU5G+X-*B6i6PrWen6D0j_HK~PKyKD0@LWi z;%lN>^b0nnBa9L7G)_fk<&78*33Vrjd+qFC>{MQ}5D$`nILk4WUmdJdwGKxmMqyY= z-6e{faKdDPmn?hIZc)*-yCPorX5Nij(}GsI+^_SsLKSqXK=ALdGpq(|hH2z^ z%L`#MzH8t5+5yrIa|>LnU)-)JC@5gY{h@+OvKu)p3xSZKWToz_rL`-iMgcX1vx~O@ z>R|k>#15t!4WRtZN_6|uD*Iu7VbILxD{D*}19Q_~hu%0r#XTxNw^?Yme6?%>Eik*a zdjmM}z*sQy$aPH^etT6GanXXT$TpbSYm>8XcOq4i1FU4cF3>JoKN{b+)GBrs0EUf- zF?K>1I-UoG@=vnRr{+DQMV9i-iVde2_~sU{lJEKr%pX)dl)dpEzB(`vjCpd8Ub)KI z_$!!2${s-eTpP42+&{K zgR%>N28GnrRy3gHh7zyb@*aCs;hXpABgMYA*hIff6)^l5^3ZaNIx#5;qrd3u_H=@p z%>wqu!uc_7W@Tkna`yNVa9D@prHjAhRkiC0F2)%Q!O@u^fJeV-J@Lpp69ko-{;ev} zKuj>mVW9>F_x+{Y1qAF-Qo}`==v3s}SrtyLQW_8%w90$>L3nG4|f z5Ur4Pi|Kr?iS@%M{F9`8pd>i;D)iIh2qATQCuiu-I;yIgjJQ8e^!6d!ja;p!1r1UB zA`LP6W3<-x^~J{7)OYT~nAxgdK14DxeJBaQdw+wppPV4$8J(0G)BAhs7bWIVqU$KL zqH9--u>4o0Q>S|pbDU%5_zKO<3(i&oP`R_Hu|L2eg3906$<&iI@Nn(Ng}+VcLg^Tq zrM^<&N657qYw^I2k2dB$dd#DtW25wER% zA;=w9^kfL!fx*L-7=KvhQGY!u^UDR$ppbzZ@A7)Q9U_r-r3(hTA}ePIhaizm zFP(`9Ki~D4&26;X5hnC-watlEEK2y3heD-czF15B_~Zw#w)V-yACo!^WsKM z(OiJ23q+Cht4`nHNAU3-Q%7C~y{z%MBkQW@8e`iMqNIA=FB1w4yG2MS6{`|S1?|D_H%x~@p$`SYUubY~S!CG> z2_aCZNel8`sUe){({DV}ZCP$5fg=X89>)n~;)TS-kbwLNH8mSOJ-=82Fq{y7|A#90 z*x0h|OaA^A6rvg{!!@@P#zD*<@_UY%&%?Jve)N^_~4$ zP$qPErv$ro1M4;oPZbt~!CAb0%$;d%(yi8qkT3}eATWY6#l{)cBa3YpVUc*(1~$Gj z(AstlKREY_j4~R*``*}ozGhSU9%Db&Pt2S{g@jRhArSSdz zt~XndXEHw6on>AQ>qXoU z^R?Mm);ENnu$&$*ez545OYgijHFO3<4R_uVzV=0fU`A$V1J)&u@H27Hu??1ry?jU9kjK$CmY{ zG8{_$C<8#}dwf9Uy)j(X+1V3e@y1}S!*|EY6~1 zW2MA*VWM8Aq(%BlhBl(ly18tD|LRsdQ$W6MYA`i@4AgNulkhP{xCX3sUmChCh>)!7 z8`TR!vUaf^XdgJS(l ze?;*|=XshHnk2)LT#F_bMtS)&GR-Ra?mXVD{<_Nzf*W$`a9GR!fUs_3*d>Rz-*1U# znQyH0l7qW#PwODh;12qaR(0&BLY(IRSU4ai_#S=XYZEe^abb7ugcFipUQcrn%&@&n z%sjdvVd){D-;gTd`<4>AD9(vAQlHt5n^{O98jGk`4!oa_d|~bMc4>&(*;ye!El#`-f$6@mH?;yh4+29g>7B1kN7d$ zHCsF)djD3mr1fM$^)-??f7z+^)evBQq0hBn9L#f@n3yqJk1`0|HBCq%Xg|@)KpO&t zfN>#O%O;N9HcWo713uqKKPpLpMm0hMQMpFmMx;dX#2&7ccUVPRDH^WifCaLB*{H&@=Mzc;cP;!^nok>La_FyfoZ> z=yq{TJfGnt1vFB~7ZPS_T$?D)jVQ@@PUSP6!~n$&c_8J7P)AJ@Gj`Bn&v-!JA$d4D3?HJXHD%EZt3y0o5@klPJ zgYH?^saL+2i#0HO#THA5iBEC#bWkEXk$@f^w;5k^H$FS=+zlj~YtrrmzqQQ(s@`X`nD=Rh+HK8&8nW1!MT-Pl2rHbMGgArGBE&}Ymo^!&>yvnaLqO{~_r(`I*rN`>)4 zB)n|{+hY1KF3tLiUmuSsPIfjS@HLCAX^QSJ?yB-|By8JRZF>}3GN1Pf0~DAm@Lllr zv*S_+yW^b!tiZ+~g=t9uEY_=GDqsmaSb+?3z{4v{Rqy(+z?Sx$-sLh-=HZ{Ai6Hk< zKL@I2=e1RPpBmBmoqR4^@c5Io-|$x?h#SD=oM3~+kmqJ4q8rErc)&<7Od~4?K!I_{9)`M9S&9P~1j`jg^>?CrJ0~je zsNXyRgzsC~!~osqcqanp4fI;#?G^7=6-Fv1E|je+oRspM+r6oWBAbc0T#xHh6Qe!- z23}if(W7Q>mN%|S?z~2Xv!$!glmNPJ@Kv4Ca8oj9+k77I^6LKHoD2zcTXt1r^4vl) zoA7E&%UqP>A;U#Ba*VXZQkMzAsCYY&D^5_|FZh7^^&8%higWC$hda>v5(S?2#-VaS z@N3TeQXxN_Y-cc3Qd3_~m^8?7nm=8{pu3ahICI1T^j1V~@1ak0WaA%Kraed}{ejeJ zMc!j$H^svXX%NOk;SNUja31e(@c{oHSF4B{uVK=*pK4#f9?>SH)b!lM^R57wU`2=o z^Tu>n`9DRT<8bACqPUS|_}bhjQieTZqivMdCq||EN)A}3sP8O=HDS;lms)(H(FuEe zeI*4MG$Uc(b!YBYtNzp`ay;rjKP}{Tj}GLDdb&mv9Itbd8O@n-)S zEnkwx$(Mn^e*y5b5svQR-5@5E+g(hBeOR9)>K5 zs_c7TO#&p=++15{c4q)tzE5H-F!Oz2MhQ5?*uJkhOeHZdK62h;J^4Jp-YS9z zC3^n;b25_<1J8FfDI7^*s>>hJZa*G$kWIeC{PLfw67!ftvrq0LA@=QdgQvFRbn=O4 zd_p?$I8M>fUsHr~_h+6o;X8P#e3d7_3KlkkC|S~1UxC)pZQ{Q}b!>B1SjY`@ZzWsBI;eIvspz?|Zz%tu zyRXj-&ZGhq&YxMUKae1r7FHL1S<#H!8FZsf)Sf*}(b9Y?FDIwb)Mq1WF-Zay@*RsE zheAR9(I!GEHma+ejJG83k%A~Tb4$Nt6Koy%NKz8BgQ3|cp-qh?2W3}z>uPI%Oxib< zT&+>p(71MsQZQgHl0hdRKS&M~-ouHg`BFi0f_YbKv~g!Wzulvl{{Mm z>o?(~6E}IhK<=kOb~O_7S|UgDN66=QCql0c;>srnm&wHRslSX1`F2`pIDfseyaL^>nC%jib;}#6d|Bdf{9*tGdy)TB z9^Z!p=lc?CbxTM6ZfIoiTxcY4C=(QBgjX+PVW%tf0sob)YXFWcZ}g4?fDrrYrmSzU zdhA2KbErPC!Bz#)9d2GvV^vz7x^jns`7dQNqnQD-#`j-ljW3Vw!bE9oQ4=pSD?hxF(7_p#sVo=Us;Kz!F}u zrL|ruY%HM$3iJL0GYzRpz$eN{Vl{(FjmobyLk7t@h^fPbQE9_cmy9t67i;Fvtb2xp zX;0cU&+LMKGsw5B8AJ0knUot_hwWyC0@(sWzBf=k?5FwHCoC1w;8hLnf99cHlW0 z=g6J6hI#8ff8_(eNU>4i{%+^I+5CSv&G{9qPPrHpJfEOnR8vtVM@pjnXhF?Si|ne^ z&Yyu%Q3`!#f0#)tgH+|qW#gDy7CSAHUS4%eWM9V8PR7$t2Gg5em1O literal 0 HcmV?d00001 From c28820e339d375095fa612815a8363860128a811 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 19 Sep 2021 16:18:05 +0200 Subject: [PATCH 12/14] Write Blog Post --- 2021-10-01-debezium-server-iceberg.adoc | 162 ------------------------ README.md | 10 +- 2 files changed, 5 insertions(+), 167 deletions(-) delete mode 100644 2021-10-01-debezium-server-iceberg.adoc diff --git a/2021-10-01-debezium-server-iceberg.adoc b/2021-10-01-debezium-server-iceberg.adoc deleted file mode 100644 index 55bd2578..00000000 --- a/2021-10-01-debezium-server-iceberg.adoc +++ /dev/null @@ -1,162 +0,0 @@ ---- -layout: post -title: Using Debezium to Create ACID Data Lake -date: 2021-10-01 -tags: [ debezium, iceberg, datalake, lakehouse ] -author: Ismail Simsek ---- - -Do you need to build Lakehouse with near realtime data pipeline, do you want it to support ACID transactions and updates on data lake? -It is possible with "Debezium Server Iceberg" without any dependency to kafka or spark applications - -++++++ - -==== Debezium - -https://debezium.io/[Debezium] is an open source distributed platform for change data capture. -Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms -(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration[supported sinks], -it provides simple interface to https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink[implement new sink] - -==== Apache Iceberg - -https://iceberg.apache.org/[Apache Iceberg] is an open table format for huge analytic datasets. -Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. -It supports ACID Inserts, Row level Deletes/Updates. It has flexible foundation and provides https://iceberg.apache.org[many features] - -== Debezium Server Iceberg - -**Debezium Server Iceberg** project implements new consumer which uses Iceberg Java API to consume events. -Iceberg consumer converts received events to iceberg format and commits them to destination iceberg table. With the consumer it's possible to use cloud storage or catalog supported by iceberg. - -on high level data processing works like below :: -* it groups set of events per event destination, -* for each destination, events are converted to iceberg records. at this step event schema used to map data to iceberg record (`debezium.format.value.schemas.enable` should be enabled(true)). -* After events converted to iceberg records, they are saved to iceberg parquet files(iceberg data and delete files(for upsert)). -* as last step these files are committed to destination table(uploaded to destination storage)) as data and delete file using iceberg java API. - -If destination table not found in the destination catalog consumer will automatically try to create it using event schema and key schema(record key) - -image::docs/images/Debezium-Iceberg.png[Debezium-Iceberg] - -Currently, Iceberg Consumer supports only json events. With json events event schema must be enabled to do correct type conversion to iceberg record. -and complex nested data types are not supported, so event flattening must be enabled. - -example configuration:: -[source,properties] ----- -debezium.sink.type=iceberg -# run with append mode -debezium.sink.iceberg.upsert=false -debezium.sink.iceberg.upsert-keep-deletes=true -# iceberg -debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.table-namespace=debeziumevents -debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET); -debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.type=hadoop -debezium.sink.iceberg.catalog-name=mycatalog -debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog -# enable event schemas -debezium.format.value.schemas.enable=true -debezium.format.value=json -# unwrap message -debezium.transforms=unwrap -debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db -debezium.transforms.unwrap.delete.handling.mode=rewrite -debezium.transforms.unwrap.drop.tombstones=true ----- - -=== Update, Append modes - -By default, Iceberg sink is running with upsert mode `debezium.sink.iceberg.upsert=true`. When a row updated on source table destination row replaced with the new updated version, and deleted records are deleted from destination. With upsert mode data at destination kept identical to source data. Update mode uses iceberg equality delete feature and creates delete files using key of the debezium event. With update mode to avoid duplicate data deduplication is done on each batch. - -Note: For the tables without record key(PK) operation mode falls back to append even configuration is set to upsert mode - -==== Keeping Deleted Records - -For some use cases it's useful to keep deleted records as soft deletes, this is possible by setting `debezium.sink.iceberg.upsert-keep-deletes` to true -this setting will keep the latest version of deleted records (`__op=d`) in the iceberg table. Setting it to false will remove deleted records from the destination table. - -==== Append mode - -this is most straightforward operation mode, setting `debezium.sink.iceberg.upsert` to `false` sets the operation mode to append, -with append mode data deduplication is not done and all received records are appended to destination table - -=== Optimizing batch size (commit interval) - -Debezium extracts/consumes database events in real time and this could cause too frequent commits( generate too many small files) to iceberg table, -which is not optimal for batch processing especially when near realtime data feed is sufficient. -To avoid this problem it's possible to use following configuration and increase batch size per commit - -**MaxBatchSizeWait**: uses debezium metrics to optimize batch size, it periodically reads streaming queue current size and waits until it reaches to `max.batch.size` -maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties -during the wait debezium events are collected in memory (in debezium streaming queue) and this way each commit receives more and consistent batch size -Note: this setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties - -Note: It's also possible to do data compaction using iceberg, compacting data and metadata files to get best performance. - -example setting:: - -[source,properties] ----- -debezium.sink.batch.batch-size-wait=MaxBatchSizeWait -debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc -debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc -debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -debezium.source.max.batch.size=50000 -debezium.source.max.queue.size=400000 -debezium.sink.batch.batch-size-wait.max-wait-ms=60000 -debezium.sink.batch.batch-size-wait.wait-interval-ms=10000 ----- - -=== Destination, iceberg catalog - -Iceberg consumer uses iceberg catalog to read and commit data to destination table, destination could be any cloud storage and any catalog supported by iceberg. - -== Next Datawarehouse, Curated layer - -Now we got perfect raw layer with near realtime data feed which we can build Curated Layer,Analytic Layer or Datawarehouse on top of. - -for example i could easily use https://iceberg.apache.org/spark-writes/[Spark SQL](or Prestodb, Trino, Flink) and process this data to next layer:) - -[source,sql] ----- -MERGE INTO dwh.consumers t - USING ( - -- new data goes to insert - SELECT customer_id, name, effective_date, to_date('9999-12-31', 'yyyy-MM-dd') as end_date FROM debezium.consumers - UNION ALL - -- update exiting records and close them - SELECT t.customer_id, t.name, t.effective_date, s.effective_date as end_date FROM debezium.consumers s - INNER JOIN dwh.consumers t on s.customer_id = t.customer_id AND t.current = true - - ) s - ON s.customer_id = t.customer_id AND s.effective_date = t.effective_date - -- close last record. - WHEN MATCHED - THEN UPDATE SET t.current = false, t.end_date = s.end_date - -- also possible to delete deleted records! - -- WHEN MATCHED and s.__op = 'd' - -- THEN DELETE - WHEN NOT MATCHED THEN - INSERT(customer_id, name, current, effective_date, end_date) - VALUES(s.customer_id, s.name, true, s.effective_date, s.end_date); ----- - -its also possible to use https://iceberg.apache.org/spark-writes/[delete, insert statements] -[source,sql] ----- -INSERT INTO prod.db.table SELECT ...; -DELETE FROM prod.db.table WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'; ----- - -in https://github.com/ismailsimsek/iceberg-examples[iceberg examples] project you could see more examples and experiment with iceberg and spark - -=== Contribution - -This project is new and there are many things to improve, please feel free to test it, give feedback, open feature request or send pull request. - -- https://github.com/memiiso/debezium-server-iceberg[For more details please see the project] -- https://github.com/memiiso/debezium-server-iceberg/releases[Releases] \ No newline at end of file diff --git a/README.md b/README.md index e70aac6a..1d287335 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ this should be configured with `debezium.source.max.queue.size` and `debezium.so This is default configuration by default consumer will not use any batch size wait #### DynamicBatchSizeWait - +**Deprecated** This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes are bigger than 90% of `max.batch.size` Wait duration will decrease @@ -68,9 +68,9 @@ debezium.sink.batch.batch-size-wait.max-wait-ms=5000 ``` #### MaxBatchSizeWait -MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait -DynamicBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size` -maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties +MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait. +MaxBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`. +Maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties. example setup to receive ~2048 events per commit. maximum wait is set to 30 seconds, streaming queue current size checked every 5 seconds ```properties @@ -100,7 +100,7 @@ database table = `inventory.customers` will be replicated to `default.testc_cdc_ ## Debezium Event Flattening -Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported +Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported. ```properties debezium.transforms=unwrap From 94758df67e5f83d96eae37c715a5be2920f3b38b Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 19 Sep 2021 16:22:15 +0200 Subject: [PATCH 13/14] Write Blog Post --- pom.xml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5aab73ea..cb26509d 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,8 @@ 2.16.88 1.11.1 - 1.7.0.Beta1 + 1.7.0.CR1 + 8.0.26 2.0.3.Final @@ -70,6 +71,12 @@ import + + + mysql + mysql-connector-java + ${version.mysql.driver} + io.debezium From 621ed2ddbb0aa995d7ed1f2087c110006f6cfe55 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 19 Sep 2021 16:34:29 +0200 Subject: [PATCH 14/14] Write Blog Post --- debezium-server-iceberg-sink/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/debezium-server-iceberg-sink/pom.xml b/debezium-server-iceberg-sink/pom.xml index d84881c8..c5d595df 100644 --- a/debezium-server-iceberg-sink/pom.xml +++ b/debezium-server-iceberg-sink/pom.xml @@ -161,7 +161,6 @@ mysql mysql-connector-java - 8.0.22 test