From 624ad0ed837ba565188ae5115c8859c3f2859dda Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 23 May 2024 17:35:54 +0200 Subject: [PATCH] Minor improvements to icebergevents consumer, IcebergEventsChangeConsumer (#328) --- .../iceberg/IcebergEventsChangeConsumer.java | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index a1e1dc34..777e6b40 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -8,6 +8,8 @@ package io.debezium.server.iceberg; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; @@ -16,19 +18,6 @@ import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter; - -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.Dependent; import jakarta.enterprise.inject.Any; @@ -53,6 +42,16 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.stream.Collectors; + import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -69,6 +68,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D protected static final ObjectMapper mapper = new ObjectMapper(); protected static final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); protected static final Serde keySerde = DebeziumSerdes.payloadJson(JsonNode.class); + static final String TABLE_NAME = "debezium_events"; static final Schema TABLE_SCHEMA = new Schema( required(1, "event_destination", Types.StringType.get()), optional(2, "event_key_schema", Types.StringType.get()), @@ -92,13 +92,12 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D static Deserializer valDeserializer; static Deserializer keyDeserializer; final Configuration hadoopConf = new Configuration(); - final Map icebergProperties = new ConcurrentHashMap<>(); - @ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION) - String warehouseLocation; @ConfigProperty(name = "debezium.format.value", defaultValue = "json") String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") String keyFormat; + @ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION) + String warehouseLocation; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") String namespace; @ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default") @@ -123,13 +122,11 @@ void connect() { "Supported (debezium.format.key=*) formats are {json,}!"); } - TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events"); - - Map conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); - conf.forEach(this.hadoopConf::set); - this.icebergProperties.putAll(conf); + Map icebergProperties = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); + icebergProperties.forEach(this.hadoopConf::set); icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), TABLE_NAME); // create table if not exists if (!icebergCatalog.tableExists(tableIdentifier)) { @@ -179,10 +176,6 @@ public GenericRecord getIcebergRecord(ChangeEvent record, Offset } } - public String map(String destination) { - return destination.replace(".", "_"); - } - @Override public void handleBatch(List> records, DebeziumEngine.RecordCommitter> committer) throws InterruptedException {