Skip to content

Commit

Permalink
Merge pull request #330 from memiiso/0.4-backport
Browse files Browse the repository at this point in the history
0.4 backport PRs (#327) (#328) (#321)
  • Loading branch information
ismailsimsek committed May 23, 2024
2 parents 14b2158 + adeeaf8 commit c083c63
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,72 +6,56 @@ debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=mycatalog
# Hadoop catalog, you can use other catalog supported by iceberg as well

# S3 config with hadoop and hadoop catalog
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
debezium.sink.iceberg.table-namespace=debeziumevents
debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
debezium.sink.iceberg.fs.s3a.access.key=AWS_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
debezium.sink.iceberg.catalog-name=iceberg

# S3 config using JdbcCatalog catalog And S3FileIO
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog
debezium.sink.iceberg.uri=jdbc_db_url
debezium.sink.iceberg.jdbc.user=my_user
debezium.sink.iceberg.jdbc.password=my_password
debezium.sink.iceberg.table-namespace=debeziumdata
debezium.sink.iceberg.catalog-name=iceberg
debezium.sink.iceberg.warehouse=s3a://my_bucket/iceberg_warehouse
debezium.sink.iceberg.warehouse=s3://my_bucket/iceberg_warehouse
# S3FileIO
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY
debezium.sink.iceberg.s3.path-style-access=true

# Config with hive meatastore catalogs
# debezium.sink.iceberg.type=hive
# debezium.sink.iceberg.uri=thrift://xx.xxx.xx.xxx:9083
# debezium.sink.iceberg.clients=5
# debezium.sink.iceberg.warehouse=s3a://datalake/datawarehouse
# debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
# debezium.sink.iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
# debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
# debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY
# debezium.sink.iceberg.engine.hive.enabled=true
# debezium.sink.iceberg.iceberg.engine.hive.enabled=true
# debezium.sink.hive.metastore.sasl.enabled=false
# debezium.sink.iceberg.hive.metastore.sasl.enabled=false

# Use S3FileIO
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.endpoint=http://localhost:9000
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY

# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO
### using mino as S3
debezium.sink.iceberg.s3.endpoint=http://localhost:9000;
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.inmemory.InMemoryCatalog
# debezium.sink.iceberg.s3.endpoint=http://localhost:9000;
## S3FileIO
# debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
# debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
# debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY
# debezium.sink.iceberg.s3.path-style-access=true
# debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse

# enable event schemas - mandatory
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# saving debezium state data to destination, iceberg tables
# saving debezium state data to destination iceberg tables
# see https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_table
# see https://debezium.io/documentation/reference/stable/development/engine.html#database-history-properties
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test
debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_table

# postgres source
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
Expand All @@ -87,17 +71,17 @@ debezium.source.schema.include.list=inventory
debezium.source.topic.prefix=dbz_

# sql server source
#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
#debezium.source.offset.flush.interval.ms=0
#debezium.source.database.hostname=localhost
#debezium.source.database.port=5432
#debezium.source.database.user=debezium
#debezium.source.database.password=debezium
#debezium.source.database.dbname=debezium
#debezium.source.database.server.name=tutorial
#debezium.source.schema.include.list=inventory
# debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
# debezium.source.offset.flush.interval.ms=0
# debezium.source.database.hostname=localhost
# debezium.source.database.port=5432
# debezium.source.database.user=debezium
# debezium.source.database.password=debezium
# debezium.source.database.dbname=debezium
# debezium.source.database.server.name=tutorial
# debezium.source.schema.include.list=inventory
# mandate for sql server source, avoid error when snapshot and schema change
#debezium.source.include.schema.changes=false
# debezium.source.include.schema.changes=false

# do event flattening. unwrap message!
debezium.transforms=unwrap
Expand All @@ -109,8 +93,9 @@ debezium.transforms.unwrap.drop.tombstones=true
# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# set log level for libs
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.category."org.eclipse.jetty".level=WARN
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package io.debezium.server.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
Expand All @@ -16,19 +18,6 @@
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Any;
Expand All @@ -53,6 +42,16 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

Expand All @@ -69,6 +68,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
protected static final Serde<JsonNode> keySerde = DebeziumSerdes.payloadJson(JsonNode.class);
static final String TABLE_NAME = "debezium_events";
static final Schema TABLE_SCHEMA = new Schema(
required(1, "event_destination", Types.StringType.get()),
optional(2, "event_key_schema", Types.StringType.get()),
Expand All @@ -92,13 +92,12 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
static Deserializer<JsonNode> valDeserializer;
static Deserializer<JsonNode> keyDeserializer;
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
@ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
@ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
Expand All @@ -123,13 +122,11 @@ void connect() {
"Supported (debezium.format.key=*) formats are {json,}!");
}

TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");

Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
this.icebergProperties.putAll(conf);
Map<String, String> icebergProperties = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
icebergProperties.forEach(this.hadoopConf::set);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), TABLE_NAME);

// create table if not exists
if (!icebergCatalog.tableExists(tableIdentifier)) {
Expand Down Expand Up @@ -179,10 +176,6 @@ public GenericRecord getIcebergRecord(ChangeEvent<Object, Object> record, Offset
}
}

public String map(String destination) {
return destination.replace(".", "_");
}

@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,17 @@

package io.debezium.server.iceberg.history;

import com.google.common.collect.Maps;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.*;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Strings;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -47,8 +31,24 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.*;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

Expand Down Expand Up @@ -87,8 +87,8 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
this.historyConfig = new IcebergSchemaHistoryConfig(config);
icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(),
this.historyConfig.icebergProperties(), this.historyConfig.hadoopConfig());
tableFullName = String.format("%s.%s", this.historyConfig.catalogName(), this.historyConfig.tableName());
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName());
tableFullName = String.format("%s.%s", this.historyConfig.tableNamespace(), this.historyConfig.tableName());
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.tableNamespace()), this.historyConfig.tableName());

if (running.get()) {
throw new SchemaHistoryException("Iceberg database history process already initialized table: " + tableFullName);
Expand Down Expand Up @@ -269,45 +269,43 @@ private void loadFileSchemaHistory(File file) {
}

public static class IcebergSchemaHistoryConfig {

final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
private final Configuration config;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
private static final String PROP_SINK_PREFIX = "debezium.sink.";
Properties configCombined = new Properties();

public IcebergSchemaHistoryConfig(Configuration config) {
this.config = config;
Configuration confIcebergSubset = config.subset(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.", true);
confIcebergSubset.forEach(configCombined::put);

final Map<String, String> conf = new HashMap<>();
this.config.forEach((propName, value) -> {
if (propName.startsWith(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.")) {
final String newPropName = propName.substring((CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.").length());
conf.put(newPropName, value);
}
});

conf.forEach(hadoopConfig::set);
icebergProperties.putAll(conf);
// debezium is doing config filtering before passing it down to this class!
// so we are taking additional config using ConfigProvider with this we take full iceberg config
Map<String, String> icebergConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_SINK_PREFIX + "iceberg.");
icebergConf.forEach(configCombined::putIfAbsent);
}

public String catalogName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default"));
return this.configCombined.getProperty( "catalog-name", "default");
}

public String tableNamespace() {
return this.configCombined.getProperty("table-namespace", "default");
}

public String tableName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault(
"debezium_database_history_storage"));
return this.configCombined.getProperty("table-name", "debezium_database_history_storage");
}

public org.apache.hadoop.conf.Configuration hadoopConfig() {
final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
configCombined.forEach((key, value) -> hadoopConfig.set((String)key, (String)value));
return hadoopConfig;
}

public Map<String, String> icebergProperties() {
return icebergProperties;
return Maps.fromProperties(configCombined);
}

public String getMigrateHistoryFile() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-history-file").withDefault(""));
return configCombined.getProperty("migrate-history-file","");
}
}

Expand Down
Loading

0 comments on commit c083c63

Please sign in to comment.