Skip to content

Commit

Permalink
Minor improvements to icebergevents consumer, IcebergEventsChangeCons…
Browse files Browse the repository at this point in the history
…umer (#328)

(cherry picked from commit 624ad0e)
  • Loading branch information
ismailsimsek committed May 23, 2024
1 parent 849073a commit e77f04d
Showing 1 changed file with 18 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package io.debezium.server.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
Expand All @@ -16,19 +18,6 @@
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Any;
Expand All @@ -53,6 +42,16 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

Expand All @@ -69,6 +68,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
protected static final Serde<JsonNode> keySerde = DebeziumSerdes.payloadJson(JsonNode.class);
static final String TABLE_NAME = "debezium_events";
static final Schema TABLE_SCHEMA = new Schema(
required(1, "event_destination", Types.StringType.get()),
optional(2, "event_key_schema", Types.StringType.get()),
Expand All @@ -92,13 +92,12 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
static Deserializer<JsonNode> valDeserializer;
static Deserializer<JsonNode> keyDeserializer;
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
@ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
@ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
Expand All @@ -123,13 +122,11 @@ void connect() {
"Supported (debezium.format.key=*) formats are {json,}!");
}

TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");

Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
this.icebergProperties.putAll(conf);
Map<String, String> icebergProperties = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
icebergProperties.forEach(this.hadoopConf::set);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), TABLE_NAME);

// create table if not exists
if (!icebergCatalog.tableExists(tableIdentifier)) {
Expand Down Expand Up @@ -179,10 +176,6 @@ public GenericRecord getIcebergRecord(ChangeEvent<Object, Object> record, Offset
}
}

public String map(String destination) {
return destination.replace(".", "_");
}

@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
Expand Down

0 comments on commit e77f04d

Please sign in to comment.