From c4422976997e8a7e77a381117983969dc4fc4372 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 23 May 2024 17:30:50 +0200 Subject: [PATCH] Fix reading config values in debezium storage classes (#327) * Fix reading config values in debezium storage classes * Update IcebergOffsetBackingStore.java * Update IcebergOffsetBackingStoreTest.java --- .../iceberg/history/IcebergSchemaHistory.java | 76 +++++++++---------- .../offset/IcebergOffsetBackingStore.java | 30 ++++---- .../offset/IcebergOffsetBackingStoreTest.java | 6 +- 3 files changed, 58 insertions(+), 54 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java index a4c4573f..97e9630b 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -8,33 +8,17 @@ package io.debezium.server.iceberg.history; +import com.google.common.collect.Maps; import io.debezium.DebeziumException; import io.debezium.annotation.ThreadSafe; import io.debezium.common.annotation.Incubating; import io.debezium.config.Configuration; -import io.debezium.config.Field; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; import io.debezium.relational.history.*; import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.FunctionalReadWriteLock; import io.debezium.util.Strings; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -47,8 +31,24 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.*; import org.apache.iceberg.types.Types; +import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -87,8 +87,8 @@ public void configure(Configuration config, HistoryRecordComparator comparator, this.historyConfig = new IcebergSchemaHistoryConfig(config); icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(), this.historyConfig.icebergProperties(), this.historyConfig.hadoopConfig()); - tableFullName = String.format("%s.%s", this.historyConfig.catalogName(), this.historyConfig.tableName()); - tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName()); + tableFullName = String.format("%s.%s", this.historyConfig.tableNamespace(), this.historyConfig.tableName()); + tableId = TableIdentifier.of(Namespace.of(this.historyConfig.tableNamespace()), this.historyConfig.tableName()); if (running.get()) { throw new SchemaHistoryException("Iceberg database history process already initialized table: " + tableFullName); @@ -269,45 +269,43 @@ private void loadFileSchemaHistory(File file) { } public static class IcebergSchemaHistoryConfig { - - final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); - private final Configuration config; - Map icebergProperties = new ConcurrentHashMap<>(); + private static final String PROP_SINK_PREFIX = "debezium.sink."; + Properties configCombined = new Properties(); public IcebergSchemaHistoryConfig(Configuration config) { - this.config = config; + Configuration confIcebergSubset = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.", true); + confIcebergSubset.forEach(configCombined::put); - final Map conf = new HashMap<>(); - this.config.forEach((propName, value) -> { - if (propName.startsWith(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.")) { - final String newPropName = propName.substring((CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.").length()); - conf.put(newPropName, value); - } - }); - - conf.forEach(hadoopConfig::set); - icebergProperties.putAll(conf); + // debezium is doing config filtering before passing it down to this class! + // so we are taking additional config using ConfigProvider with this we take full iceberg config + Map icebergConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_SINK_PREFIX + "iceberg."); + icebergConf.forEach(configCombined::putIfAbsent); } public String catalogName() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default")); + return this.configCombined.getProperty( "catalog-name", "default"); + } + + public String tableNamespace() { + return this.configCombined.getProperty("table-namespace", "default"); } public String tableName() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault( - "debezium_database_history_storage")); + return this.configCombined.getProperty("table-name", "debezium_database_history_storage"); } public org.apache.hadoop.conf.Configuration hadoopConfig() { + final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); + configCombined.forEach((key, value) -> hadoopConfig.set((String)key, (String)value)); return hadoopConfig; } public Map icebergProperties() { - return icebergProperties; + return Maps.fromProperties(configCombined); } public String getMigrateHistoryFile() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-history-file").withDefault("")); + return configCombined.getProperty("migrate-history-file",""); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index b3855179..2c05e3f2 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -11,9 +11,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; import io.debezium.DebeziumException; import io.debezium.config.Configuration; -import io.debezium.config.Field; import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.Strings; import jakarta.enterprise.context.Dependent; @@ -47,7 +47,6 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -286,40 +285,45 @@ public Set> connectorPartitions(String connectorName) { } public static class IcebergOffsetBackingStoreConfig extends WorkerConfig { - final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); - private final Configuration config; - Map icebergProperties = new ConcurrentHashMap<>(); + private static final String PROP_SINK_PREFIX = "debezium.sink."; + Properties configCombined = new Properties(); public IcebergOffsetBackingStoreConfig(Configuration config) { super(new ConfigDef(), config.asMap()); - this.config = config; Map conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); - conf.forEach(hadoopConfig::set); - icebergProperties.putAll(conf); + Configuration confIcebergSubset = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.", true); + confIcebergSubset.forEach(configCombined::put); + + // debezium is doing config filtering before passing it down to this class! + // so we are taking additional config using ConfigProvider with this we take full iceberg config + Map icebergConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_SINK_PREFIX + "iceberg."); + icebergConf.forEach(configCombined::putIfAbsent); } public String catalogName() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default")); + return this.configCombined.getProperty("catalog-name", "default"); } public String tableNamespace() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-namespace").withDefault("default")); + return this.configCombined.getProperty("table-namespace", "default"); } public String tableName() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault("debezium_offset_storage")); + return this.configCombined.getProperty("table-name", "debezium_offset_storage"); } public String getMigrateOffsetFile() { - return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-offset-file").withDefault("")); + return this.configCombined.getProperty("migrate-offset-file",""); } public org.apache.hadoop.conf.Configuration hadoopConfig() { + final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); + configCombined.forEach((key, value) -> hadoopConfig.set((String)key, (String)value)); return hadoopConfig; } public Map icebergProperties() { - return icebergProperties; + return Maps.fromProperties(configCombined); } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java index d6724d07..fb9fbce6 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java @@ -30,6 +30,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; + +import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE; import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.fromByteBuffer; import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.toByteBuffer; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -90,7 +92,7 @@ public void testGetSet() throws Exception { assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key")))); Assertions.assertNull(values.get(toByteBuffer("bad"))); - CloseableIterable d = getTableDataV2(TableIdentifier.of("default", "debezium_offset_storage")); + CloseableIterable d = getTableDataV2(TableIdentifier.of(CATALOG_TABLE_NAMESPACE, "debezium_offset_storage")); Assertions.assertEquals(1, Lists.newArrayList(d).size()); } @@ -122,4 +124,4 @@ public TestWorkerConfig(Map props) { super(new ConfigDef(), props); } } -} \ No newline at end of file +}