Skip to content

Commit

Permalink
Fix reading config values in debezium storage classes (#327)
Browse files Browse the repository at this point in the history
* Fix reading config values in debezium storage classes

* Update IcebergOffsetBackingStore.java

* Update IcebergOffsetBackingStoreTest.java

(cherry picked from commit c442297)
  • Loading branch information
ismailsimsek committed May 23, 2024
1 parent 14b2158 commit 849073a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,17 @@

package io.debezium.server.iceberg.history;

import com.google.common.collect.Maps;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.*;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Strings;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -47,8 +31,24 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.*;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

Expand Down Expand Up @@ -87,8 +87,8 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
this.historyConfig = new IcebergSchemaHistoryConfig(config);
icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(),
this.historyConfig.icebergProperties(), this.historyConfig.hadoopConfig());
tableFullName = String.format("%s.%s", this.historyConfig.catalogName(), this.historyConfig.tableName());
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName());
tableFullName = String.format("%s.%s", this.historyConfig.tableNamespace(), this.historyConfig.tableName());
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.tableNamespace()), this.historyConfig.tableName());

if (running.get()) {
throw new SchemaHistoryException("Iceberg database history process already initialized table: " + tableFullName);
Expand Down Expand Up @@ -269,45 +269,43 @@ private void loadFileSchemaHistory(File file) {
}

public static class IcebergSchemaHistoryConfig {

final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
private final Configuration config;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
private static final String PROP_SINK_PREFIX = "debezium.sink.";
Properties configCombined = new Properties();

public IcebergSchemaHistoryConfig(Configuration config) {
this.config = config;
Configuration confIcebergSubset = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.", true);
confIcebergSubset.forEach(configCombined::put);

final Map<String, String> conf = new HashMap<>();
this.config.forEach((propName, value) -> {
if (propName.startsWith(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.")) {
final String newPropName = propName.substring((CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.").length());
conf.put(newPropName, value);
}
});

conf.forEach(hadoopConfig::set);
icebergProperties.putAll(conf);
// debezium is doing config filtering before passing it down to this class!
// so we are taking additional config using ConfigProvider with this we take full iceberg config
Map<String, String> icebergConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_SINK_PREFIX + "iceberg.");
icebergConf.forEach(configCombined::putIfAbsent);
}

public String catalogName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default"));
return this.configCombined.getProperty( "catalog-name", "default");
}

public String tableNamespace() {
return this.configCombined.getProperty("table-namespace", "default");
}

public String tableName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault(
"debezium_database_history_storage"));
return this.configCombined.getProperty("table-name", "debezium_database_history_storage");
}

public org.apache.hadoop.conf.Configuration hadoopConfig() {
final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
configCombined.forEach((key, value) -> hadoopConfig.set((String)key, (String)value));
return hadoopConfig;
}

public Map<String, String> icebergProperties() {
return icebergProperties;
return Maps.fromProperties(configCombined);
}

public String getMigrateHistoryFile() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-history-file").withDefault(""));
return configCombined.getProperty("migrate-history-file","");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.Strings;
import jakarta.enterprise.context.Dependent;
Expand Down Expand Up @@ -47,7 +47,6 @@
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -286,40 +285,45 @@ public Set<Map<String, Object>> connectorPartitions(String connectorName) {
}

public static class IcebergOffsetBackingStoreConfig extends WorkerConfig {
final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
private final Configuration config;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
private static final String PROP_SINK_PREFIX = "debezium.sink.";
Properties configCombined = new Properties();

public IcebergOffsetBackingStoreConfig(Configuration config) {
super(new ConfigDef(), config.asMap());
this.config = config;
Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(hadoopConfig::set);
icebergProperties.putAll(conf);
Configuration confIcebergSubset = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.", true);
confIcebergSubset.forEach(configCombined::put);

// debezium is doing config filtering before passing it down to this class!
// so we are taking additional config using ConfigProvider with this we take full iceberg config
Map<String, String> icebergConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_SINK_PREFIX + "iceberg.");
icebergConf.forEach(configCombined::putIfAbsent);
}

public String catalogName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default"));
return this.configCombined.getProperty("catalog-name", "default");
}

public String tableNamespace() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-namespace").withDefault("default"));
return this.configCombined.getProperty("table-namespace", "default");
}

public String tableName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault("debezium_offset_storage"));
return this.configCombined.getProperty("table-name", "debezium_offset_storage");
}

public String getMigrateOffsetFile() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-offset-file").withDefault(""));
return this.configCombined.getProperty("migrate-offset-file","");
}

public org.apache.hadoop.conf.Configuration hadoopConfig() {
final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
configCombined.forEach((key, value) -> hadoopConfig.set((String)key, (String)value));
return hadoopConfig;
}

public Map<String, String> icebergProperties() {
return icebergProperties;
return Maps.fromProperties(configCombined);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE;
import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.fromByteBuffer;
import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.toByteBuffer;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -90,7 +92,7 @@ public void testGetSet() throws Exception {
assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key"))));
Assertions.assertNull(values.get(toByteBuffer("bad")));

CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of("default", "debezium_offset_storage"));
CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of(CATALOG_TABLE_NAMESPACE, "debezium_offset_storage"));
Assertions.assertEquals(1, Lists.newArrayList(d).size());
}

Expand Down Expand Up @@ -122,4 +124,4 @@ public TestWorkerConfig(Map<String, String> props) {
super(new ConfigDef(), props);
}
}
}
}

0 comments on commit 849073a

Please sign in to comment.