Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading config values in debezium storage classes #327

Merged
merged 3 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,17 @@

package io.debezium.server.iceberg.history;

import com.google.common.collect.Maps;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.*;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Strings;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -47,8 +31,24 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.*;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

Expand Down Expand Up @@ -87,8 +87,8 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
this.historyConfig = new IcebergSchemaHistoryConfig(config);
icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(),
this.historyConfig.icebergProperties(), this.historyConfig.hadoopConfig());
tableFullName = String.format("%s.%s", this.historyConfig.catalogName(), this.historyConfig.tableName());
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName());
tableFullName = String.format("%s.%s", this.historyConfig.tableNamespace(), this.historyConfig.tableName());
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.tableNamespace()), this.historyConfig.tableName());

if (running.get()) {
throw new SchemaHistoryException("Iceberg database history process already initialized table: " + tableFullName);
Expand Down Expand Up @@ -269,45 +269,43 @@ private void loadFileSchemaHistory(File file) {
}

public static class IcebergSchemaHistoryConfig {

final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
private final Configuration config;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
private static final String PROP_SINK_PREFIX = "debezium.sink.";
Properties configCombined = new Properties();

public IcebergSchemaHistoryConfig(Configuration config) {
this.config = config;
Configuration confIcebergSubset = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.", true);
confIcebergSubset.forEach(configCombined::put);

final Map<String, String> conf = new HashMap<>();
this.config.forEach((propName, value) -> {
if (propName.startsWith(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.")) {
final String newPropName = propName.substring((CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.").length());
conf.put(newPropName, value);
}
});

conf.forEach(hadoopConfig::set);
icebergProperties.putAll(conf);
// debezium is doing config filtering before passing it down to this class!
// so we are taking additional config using ConfigProvider with this we take full iceberg config
Map<String, String> icebergConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_SINK_PREFIX + "iceberg.");
icebergConf.forEach(configCombined::putIfAbsent);
}

public String catalogName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default"));
return this.configCombined.getProperty( "catalog-name", "default");
}

public String tableNamespace() {
return this.configCombined.getProperty("table-namespace", "default");
}

public String tableName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault(
"debezium_database_history_storage"));
return this.configCombined.getProperty("table-name", "debezium_database_history_storage");
}

public org.apache.hadoop.conf.Configuration hadoopConfig() {
final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
configCombined.forEach((key, value) -> hadoopConfig.set((String)key, (String)value));
return hadoopConfig;
}

public Map<String, String> icebergProperties() {
return icebergProperties;
return Maps.fromProperties(configCombined);
}

public String getMigrateHistoryFile() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-history-file").withDefault(""));
return configCombined.getProperty("migrate-history-file","");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.Strings;
import jakarta.enterprise.context.Dependent;
Expand Down Expand Up @@ -47,7 +47,6 @@
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -286,40 +285,45 @@ public Set<Map<String, Object>> connectorPartitions(String connectorName) {
}

public static class IcebergOffsetBackingStoreConfig extends WorkerConfig {
final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
private final Configuration config;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
private static final String PROP_SINK_PREFIX = "debezium.sink.";
Properties configCombined = new Properties();

public IcebergOffsetBackingStoreConfig(Configuration config) {
super(new ConfigDef(), config.asMap());
this.config = config;
Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(hadoopConfig::set);
icebergProperties.putAll(conf);
Configuration confIcebergSubset = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.", true);
confIcebergSubset.forEach(configCombined::put);

// debezium is doing config filtering before passing it down to this class!
// so we are taking additional config using ConfigProvider with this we take full iceberg config
Map<String, String> icebergConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_SINK_PREFIX + "iceberg.");
icebergConf.forEach(configCombined::putIfAbsent);
}

public String catalogName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default"));
return this.configCombined.getProperty("catalog-name", "default");
}

public String tableNamespace() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-namespace").withDefault("default"));
return this.configCombined.getProperty("table-namespace", "default");
}

public String tableName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault("debezium_offset_storage"));
return this.configCombined.getProperty("table-name", "debezium_offset_storage");
}

public String getMigrateOffsetFile() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-offset-file").withDefault(""));
return this.configCombined.getProperty("migrate-offset-file","");
}

public org.apache.hadoop.conf.Configuration hadoopConfig() {
final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
configCombined.forEach((key, value) -> hadoopConfig.set((String)key, (String)value));
return hadoopConfig;
}

public Map<String, String> icebergProperties() {
return icebergProperties;
return Maps.fromProperties(configCombined);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE;
import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.fromByteBuffer;
import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.toByteBuffer;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -90,7 +92,7 @@ public void testGetSet() throws Exception {
assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key"))));
Assertions.assertNull(values.get(toByteBuffer("bad")));

CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of("default", "debezium_offset_storage"));
CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of(CATALOG_TABLE_NAMESPACE, "debezium_offset_storage"));
Assertions.assertEquals(1, Lists.newArrayList(d).size());
}

Expand Down Expand Up @@ -122,4 +124,4 @@ public TestWorkerConfig(Map<String, String> props) {
super(new ConfigDef(), props);
}
}
}
}
Loading