Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor improvements to icebergevents consumer, IcebergEventsChangeCons… #328

Merged
merged 1 commit into from
May 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package io.debezium.server.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
Expand All @@ -16,19 +18,6 @@
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Any;
Expand All @@ -53,6 +42,16 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

Expand All @@ -69,6 +68,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
protected static final Serde<JsonNode> keySerde = DebeziumSerdes.payloadJson(JsonNode.class);
static final String TABLE_NAME = "debezium_events";
static final Schema TABLE_SCHEMA = new Schema(
required(1, "event_destination", Types.StringType.get()),
optional(2, "event_key_schema", Types.StringType.get()),
Expand All @@ -92,13 +92,12 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
static Deserializer<JsonNode> valDeserializer;
static Deserializer<JsonNode> keyDeserializer;
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
@ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
@ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
Expand All @@ -123,13 +122,11 @@ void connect() {
"Supported (debezium.format.key=*) formats are {json,}!");
}

TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");

Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
this.icebergProperties.putAll(conf);
Map<String, String> icebergProperties = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
icebergProperties.forEach(this.hadoopConf::set);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), TABLE_NAME);

// create table if not exists
if (!icebergCatalog.tableExists(tableIdentifier)) {
Expand Down Expand Up @@ -179,10 +176,6 @@ public GenericRecord getIcebergRecord(ChangeEvent<Object, Object> record, Offset
}
}

public String map(String destination) {
return destination.replace(".", "_");
}

@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
Expand Down
Loading