From 849073ac62cb217a54bdd69cf7c31a9ce0323fb3 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 23 May 2024 17:30:50 +0200 Subject: [PATCH 1/3] Fix reading config values in debezium storage classes (#327) * Fix reading config values in debezium storage classes * Update IcebergOffsetBackingStore.java * Update IcebergOffsetBackingStoreTest.java (cherry picked from commit c4422976997e8a7e77a381117983969dc4fc4372) --- .../iceberg/history/IcebergSchemaHistory.java | 76 +++++++++---------- .../offset/IcebergOffsetBackingStore.java | 30 ++++---- .../offset/IcebergOffsetBackingStoreTest.java | 6 +- 3 files changed, 58 insertions(+), 54 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java index a4c4573f..97e9630b 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -8,33 +8,17 @@ package io.debezium.server.iceberg.history; +import com.google.common.collect.Maps; import io.debezium.DebeziumException; import io.debezium.annotation.ThreadSafe; import io.debezium.common.annotation.Incubating; import io.debezium.config.Configuration; -import io.debezium.config.Field; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; import io.debezium.relational.history.*; import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.FunctionalReadWriteLock; import io.debezium.util.Strings; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -47,8 +31,24 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.*; import org.apache.iceberg.types.Types; +import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -87,8 +87,8 @@ public void configure(Configuration config, HistoryRecordComparator comparator, this.historyConfig = new IcebergSchemaHistoryConfig(config); icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(), this.historyConfig.icebergProperties(), this.historyConfig.hadoopConfig()); - tableFullName = String.format("%s.%s", this.historyConfig.catalogName(), this.historyConfig.tableName()); - tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName()); + tableFullName = String.format("%s.%s", this.historyConfig.tableNamespace(), this.historyConfig.tableName()); + tableId = TableIdentifier.of(Namespace.of(this.historyConfig.tableNamespace()), this.historyConfig.tableName()); if (running.get()) { throw new SchemaHistoryException("Iceberg database history process already initialized table: " + tableFullName); @@ -269,45 +269,43 @@ private void loadFileSchemaHistory(File file) { } public static class IcebergSchemaHistoryConfig { - - final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); - private final Configuration config; - Map icebergProperties = new ConcurrentHashMap<>(); + private static final String PROP_SINK_PREFIX = "debezium.sink."; + Properties configCombined = new Properties(); public IcebergSchemaHistoryConfig(Configuration config) { - this.config = config; + Configuration confIcebergSubset = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.", true); + confIcebergSubset.forEach(configCombined::put); - final Map conf = new HashMap<>(); - this.config.forEach((propName, value) -> { - if (propName.startsWith(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.")) { - final String newPropName = propName.substring((CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.").length()); - conf.put(newPropName, value); - } - }); - - conf.forEach(hadoopConfig::set); - icebergProperties.putAll(conf); + // debezium is doing config filtering before passing it down to this class! + // so we are taking additional config using ConfigProvider with this we take full iceberg config + Map icebergConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_SINK_PREFIX + "iceberg."); + icebergConf.forEach(configCombined::putIfAbsent); } public String catalogName() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default")); + return this.configCombined.getProperty( "catalog-name", "default"); + } + + public String tableNamespace() { + return this.configCombined.getProperty("table-namespace", "default"); } public String tableName() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault( - "debezium_database_history_storage")); + return this.configCombined.getProperty("table-name", "debezium_database_history_storage"); } public org.apache.hadoop.conf.Configuration hadoopConfig() { + final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); + configCombined.forEach((key, value) -> hadoopConfig.set((String)key, (String)value)); return hadoopConfig; } public Map icebergProperties() { - return icebergProperties; + return Maps.fromProperties(configCombined); } public String getMigrateHistoryFile() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-history-file").withDefault("")); + return configCombined.getProperty("migrate-history-file",""); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index b3855179..2c05e3f2 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -11,9 +11,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; import io.debezium.DebeziumException; import io.debezium.config.Configuration; -import io.debezium.config.Field; import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.Strings; import jakarta.enterprise.context.Dependent; @@ -47,7 +47,6 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -286,40 +285,45 @@ public Set> connectorPartitions(String connectorName) { } public static class IcebergOffsetBackingStoreConfig extends WorkerConfig { - final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); - private final Configuration config; - Map icebergProperties = new ConcurrentHashMap<>(); + private static final String PROP_SINK_PREFIX = "debezium.sink."; + Properties configCombined = new Properties(); public IcebergOffsetBackingStoreConfig(Configuration config) { super(new ConfigDef(), config.asMap()); - this.config = config; Map conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); - conf.forEach(hadoopConfig::set); - icebergProperties.putAll(conf); + Configuration confIcebergSubset = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.", true); + confIcebergSubset.forEach(configCombined::put); + + // debezium is doing config filtering before passing it down to this class! + // so we are taking additional config using ConfigProvider with this we take full iceberg config + Map icebergConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_SINK_PREFIX + "iceberg."); + icebergConf.forEach(configCombined::putIfAbsent); } public String catalogName() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default")); + return this.configCombined.getProperty("catalog-name", "default"); } public String tableNamespace() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-namespace").withDefault("default")); + return this.configCombined.getProperty("table-namespace", "default"); } public String tableName() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault("debezium_offset_storage")); + return this.configCombined.getProperty("table-name", "debezium_offset_storage"); } public String getMigrateOffsetFile() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-offset-file").withDefault("")); + return this.configCombined.getProperty("migrate-offset-file",""); } public org.apache.hadoop.conf.Configuration hadoopConfig() { + final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); + configCombined.forEach((key, value) -> hadoopConfig.set((String)key, (String)value)); return hadoopConfig; } public Map icebergProperties() { - return icebergProperties; + return Maps.fromProperties(configCombined); } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java index d6724d07..fb9fbce6 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java @@ -30,6 +30,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; + +import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE; import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.fromByteBuffer; import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.toByteBuffer; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -90,7 +92,7 @@ public void testGetSet() throws Exception { assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key")))); Assertions.assertNull(values.get(toByteBuffer("bad"))); - CloseableIterable d = getTableDataV2(TableIdentifier.of("default", "debezium_offset_storage")); + CloseableIterable d = getTableDataV2(TableIdentifier.of(CATALOG_TABLE_NAMESPACE, "debezium_offset_storage")); Assertions.assertEquals(1, Lists.newArrayList(d).size()); } @@ -122,4 +124,4 @@ public TestWorkerConfig(Map props) { super(new ConfigDef(), props); } } -} \ No newline at end of file +} From e77f04d1fdb310141635395f75423fce6b9af2ce Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 23 May 2024 17:35:54 +0200 Subject: [PATCH 2/3] Minor improvements to icebergevents consumer, IcebergEventsChangeConsumer (#328) (cherry picked from commit 624ad0ed837ba565188ae5115c8859c3f2859dda) --- .../iceberg/IcebergEventsChangeConsumer.java | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index a1e1dc34..777e6b40 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -8,6 +8,8 @@ package io.debezium.server.iceberg; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; @@ -16,19 +18,6 @@ import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter; - -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.Dependent; import jakarta.enterprise.inject.Any; @@ -53,6 +42,16 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.stream.Collectors; + import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -69,6 +68,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D protected static final ObjectMapper mapper = new ObjectMapper(); protected static final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); protected static final Serde keySerde = DebeziumSerdes.payloadJson(JsonNode.class); + static final String TABLE_NAME = "debezium_events"; static final Schema TABLE_SCHEMA = new Schema( required(1, "event_destination", Types.StringType.get()), optional(2, "event_key_schema", Types.StringType.get()), @@ -92,13 +92,12 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D static Deserializer valDeserializer; static Deserializer keyDeserializer; final Configuration hadoopConf = new Configuration(); - final Map icebergProperties = new ConcurrentHashMap<>(); - @ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION) - String warehouseLocation; @ConfigProperty(name = "debezium.format.value", defaultValue = "json") String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") String keyFormat; + @ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION) + String warehouseLocation; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") String namespace; @ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default") @@ -123,13 +122,11 @@ void connect() { "Supported (debezium.format.key=*) formats are {json,}!"); } - TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events"); - - Map conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); - conf.forEach(this.hadoopConf::set); - this.icebergProperties.putAll(conf); + Map icebergProperties = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); + icebergProperties.forEach(this.hadoopConf::set); icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), TABLE_NAME); // create table if not exists if (!icebergCatalog.tableExists(tableIdentifier)) { @@ -179,10 +176,6 @@ public GenericRecord getIcebergRecord(ChangeEvent record, Offset } } - public String map(String destination) { - return destination.replace(".", "_"); - } - @Override public void handleBatch(List> records, DebeziumEngine.RecordCommitter> committer) throws InterruptedException { From adeeaf86431294332b2845dd24c70aa3c73d7b0b Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Fri, 17 May 2024 18:37:24 +0200 Subject: [PATCH 3/3] Simplify example config, removed hadoop catalog example (#321) (cherry picked from commit 1ef7ee9cf946aa93b1032b36becd853a0a5e14d7) --- .../conf/application.properties.example | 79 ++++++++----------- 1 file changed, 32 insertions(+), 47 deletions(-) diff --git a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example index d9367c2d..a4b1eb38 100644 --- a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example +++ b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example @@ -6,21 +6,7 @@ debezium.sink.iceberg.table-prefix=debeziumcdc_ debezium.sink.iceberg.upsert=true debezium.sink.iceberg.upsert-keep-deletes=true debezium.sink.iceberg.write.format.default=parquet -debezium.sink.iceberg.catalog-name=mycatalog -# Hadoop catalog, you can use other catalog supported by iceberg as well - -# S3 config with hadoop and hadoop catalog -debezium.sink.iceberg.type=hadoop -debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse -debezium.sink.iceberg.table-namespace=debeziumevents -debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket -debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true -debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true -debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain -debezium.sink.iceberg.fs.s3a.access.key=AWS_ACCESS_KEY -debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY -debezium.sink.iceberg.fs.s3a.path.style.access=true -debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem +debezium.sink.iceberg.catalog-name=iceberg # S3 config using JdbcCatalog catalog And S3FileIO debezium.sink.iceberg.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog @@ -28,36 +14,34 @@ debezium.sink.iceberg.uri=jdbc_db_url debezium.sink.iceberg.jdbc.user=my_user debezium.sink.iceberg.jdbc.password=my_password debezium.sink.iceberg.table-namespace=debeziumdata -debezium.sink.iceberg.catalog-name=iceberg -debezium.sink.iceberg.warehouse=s3a://my_bucket/iceberg_warehouse +debezium.sink.iceberg.warehouse=s3://my_bucket/iceberg_warehouse +# S3FileIO +debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO +debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY +debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY +debezium.sink.iceberg.s3.path-style-access=true # Config with hive meatastore catalogs # debezium.sink.iceberg.type=hive # debezium.sink.iceberg.uri=thrift://xx.xxx.xx.xxx:9083 # debezium.sink.iceberg.clients=5 -# debezium.sink.iceberg.warehouse=s3a://datalake/datawarehouse +# debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse # debezium.sink.iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO +# debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY +# debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY # debezium.sink.iceberg.engine.hive.enabled=true # debezium.sink.iceberg.iceberg.engine.hive.enabled=true # debezium.sink.hive.metastore.sasl.enabled=false # debezium.sink.iceberg.hive.metastore.sasl.enabled=false -# Use S3FileIO -debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO -debezium.sink.iceberg.s3.endpoint=http://localhost:9000 -debezium.sink.iceberg.s3.path-style-access=true -debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY -debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY - -# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO ### using mino as S3 -debezium.sink.iceberg.s3.endpoint=http://localhost:9000; -debezium.sink.iceberg.s3.path-style-access=true -debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY -debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY -debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO -debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse -debezium.sink.iceberg.catalog-impl=org.apache.iceberg.inmemory.InMemoryCatalog +# debezium.sink.iceberg.s3.endpoint=http://localhost:9000; +## S3FileIO +# debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO +# debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY +# debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY +# debezium.sink.iceberg.s3.path-style-access=true +# debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse # enable event schemas - mandatory debezium.format.value.schemas.enable=true @@ -65,13 +49,13 @@ debezium.format.key.schemas.enable=true debezium.format.value=json debezium.format.key=json -# saving debezium state data to destination, iceberg tables +# saving debezium state data to destination iceberg tables # see https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore -debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table +debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_table # see https://debezium.io/documentation/reference/stable/development/engine.html#database-history-properties debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory -debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test +debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_table # postgres source debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector @@ -87,17 +71,17 @@ debezium.source.schema.include.list=inventory debezium.source.topic.prefix=dbz_ # sql server source -#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector -#debezium.source.offset.flush.interval.ms=0 -#debezium.source.database.hostname=localhost -#debezium.source.database.port=5432 -#debezium.source.database.user=debezium -#debezium.source.database.password=debezium -#debezium.source.database.dbname=debezium -#debezium.source.database.server.name=tutorial -#debezium.source.schema.include.list=inventory +# debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector +# debezium.source.offset.flush.interval.ms=0 +# debezium.source.database.hostname=localhost +# debezium.source.database.port=5432 +# debezium.source.database.user=debezium +# debezium.source.database.password=debezium +# debezium.source.database.dbname=debezium +# debezium.source.database.server.name=tutorial +# debezium.source.schema.include.list=inventory # mandate for sql server source, avoid error when snapshot and schema change -#debezium.source.include.schema.changes=false +# debezium.source.include.schema.changes=false # do event flattening. unwrap message! debezium.transforms=unwrap @@ -109,8 +93,9 @@ debezium.transforms.unwrap.drop.tombstones=true # ############ SET LOG LEVELS ############ quarkus.log.level=INFO quarkus.log.console.json=false +# set log level for libs # hadoop, parquet quarkus.log.category."org.apache.hadoop".level=WARN quarkus.log.category."org.apache.parquet".level=WARN # Ignore messages below warning level from Jetty, because it's a bit verbose -quarkus.log.category."org.eclipse.jetty".level=WARN +quarkus.log.category."org.eclipse.jetty".level=WARN \ No newline at end of file