Skip to content

Commit

Permalink
Improve IcebergChangeEvent class, Optimize event and schema deseriali…
Browse files Browse the repository at this point in the history
…zation (#343)

* Improve IcebergChangeEvent class, Optimize event and schema deserialization

* Improve IcebergChangeEvent class, Optimize event and schema deserialization

* Improve access modifiers

* Check for null key values

* fix test

* fix test
  • Loading branch information
ismailsimsek committed Jun 11, 2024
1 parent 706b83f commit fcc6e49
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package io.debezium.server.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
Expand All @@ -19,19 +20,6 @@
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Any;
Expand All @@ -51,6 +39,16 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

Expand All @@ -64,7 +62,6 @@
public class IcebergChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {

protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15);
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
protected static final Serde<JsonNode> keySerde = DebeziumSerdes.payloadJson(JsonNode.class);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class);
Expand Down Expand Up @@ -145,18 +142,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
Map<String, List<IcebergChangeEvent>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
-> {
try {
return new IcebergChangeEvent(e.destination(),
valDeserializer.deserialize(e.destination(), getBytes(e.value())),
e.key() == null ? null : keyDeserializer.deserialize(e.destination(), getBytes(e.key())),
mapper.readTree(getBytes(e.value())).get("schema"),
e.key() == null ? null : mapper.readTree(getBytes(e.key())).get("schema")
);
} catch (IOException ex) {
throw new DebeziumException(ex);
}
})
-> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key())))
.collect(Collectors.groupingBy(IcebergChangeEvent::destination));

// consume list of events for each destination table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package io.debezium.server.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
Expand All @@ -25,50 +27,65 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

import static io.debezium.server.iceberg.IcebergChangeConsumer.keyDeserializer;
import static io.debezium.server.iceberg.IcebergChangeConsumer.valDeserializer;

/**
*
* Converts iceberg json event to Iceberg GenericRecord. Extracts event schema and key fields. Converts event schema to Iceberg Schema.
*
* @author Ismail Simsek
*/
public class IcebergChangeEvent {

protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class);
public static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
final JsonSchema jsonSchema;
protected final byte[] valueData;
protected final byte[] keyData;
private JsonNode value;
private JsonNode key;

public IcebergChangeEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) {
public IcebergChangeEvent(String destination, byte[] valueData, byte[] keyData) {
this.destination = destination;
this.value = value;
this.key = key;
this.jsonSchema = new JsonSchema(valueSchema, keySchema);
this.valueData = valueData;
this.keyData = keyData;
}

public JsonNode key() {
if (key == null) {
key = keyDeserializer.deserialize(destination, keyData);
}

return key;
}

public JsonNode value() {
if (value == null) {
value = valDeserializer.deserialize(destination, valueData);
}

return value;
}

public JsonSchema jsonSchema() {
return jsonSchema;
try {
return new JsonSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
} catch (IOException e) {
throw new DebeziumException("Failed to get event schema", e);
}
}

public Schema icebergSchema() {
return jsonSchema.icebergSchema();
return jsonSchema().icebergSchema();
}

public String destination() {
return destination;
}

public GenericRecord asIcebergRecord(Schema schema) {
return asIcebergRecord(schema.asStruct(), value);
return asIcebergRecord(schema.asStruct(), value());
}

private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
Expand Down Expand Up @@ -176,13 +193,13 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
break;
}

val = IcebergChangeConsumer.mapper.convertValue(node, ArrayList.class);
val = mapper.convertValue(node, ArrayList.class);
break;
case MAP:
Type keyType = field.type().asMapType().keyType();
Type valType = field.type().asMapType().valueType();
if (keyType.isPrimitiveType() && valType.isPrimitiveType()) {
val = IcebergChangeConsumer.mapper.convertValue(node, Map.class);
val = mapper.convertValue(node, Map.class);
break;
}
// convert complex/nested map value with recursion
Expand Down Expand Up @@ -268,7 +285,7 @@ private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
Expand All @@ -289,11 +306,11 @@ public static class JsonSchema {
this.keySchema = keySchema;
}

public JsonNode valueSchema() {
protected JsonNode valueSchema() {
return valueSchema;
}

public JsonNode keySchema() {
protected JsonNode keySchema() {
return keySchema;
}

Expand All @@ -310,9 +327,9 @@ public int hashCode() {
return Objects.hash(valueSchema, keySchema);
}

public Schema icebergSchema() {
private Schema icebergSchema() {

if (this.valueSchema == null) {
if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,10 @@

package io.debezium.server.iceberg.tableoperator;

import io.debezium.DebeziumException;
import io.debezium.server.iceberg.IcebergChangeEvent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.debezium.DebeziumException;
import io.debezium.server.iceberg.IcebergChangeEvent;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.iceberg.*;
Expand All @@ -31,6 +22,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Wrapper to perform operations on iceberg tables
*
Expand Down Expand Up @@ -152,7 +151,7 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

for (Map.Entry<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getKey().icebergSchema());
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema());
// add set of events to table
addToTablePerSchema(icebergTable, schemaEvents.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,22 @@

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.TestChangeEvent;
import io.debezium.server.iceberg.testresources.TestUtil;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import jakarta.inject.Inject;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
*
* @author Ismail Simsek
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@

package io.debezium.server.iceberg;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.serde.DebeziumSerdes;
import jakarta.inject.Inject;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Types;
import org.apache.kafka.common.serialization.Serde;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;

import static io.debezium.server.iceberg.IcebergChangeConsumer.mapper;
import static org.junit.jupiter.api.Assertions.*;

class IcebergChangeEventTest {
Expand All @@ -32,25 +32,32 @@ class IcebergChangeEventTest {
final String unwrapWithArraySchema = Files.readString(Path.of("src/test/resources/json/serde-with-array.json"));
final String unwrapWithArraySchema2 = Files.readString(Path.of("src/test/resources/json/serde-with-array2.json"));

@Inject
IcebergChangeConsumer consumer;

IcebergChangeEventTest() throws IOException {
// configure and set
IcebergChangeConsumer.valSerde.configure(Collections.emptyMap(), false);
IcebergChangeConsumer.valDeserializer = IcebergChangeConsumer.valSerde.deserializer();
// configure and set
IcebergChangeConsumer.keySerde.configure(Collections.emptyMap(), true);
IcebergChangeConsumer.keyDeserializer = IcebergChangeConsumer.keySerde.deserializer();
}

@Test
public void testNestedJsonRecord() throws JsonProcessingException {
public void testNestedJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(serdeWithSchema).get("payload"), null,
mapper.readTree(serdeWithSchema).get("schema"), null);
serdeWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
System.out.println(schema.toString());
assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " +
"4:"));
}

@Test
public void testUnwrapJsonRecord() throws IOException {
public void testUnwrapJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithSchema).get("payload"), null,
mapper.readTree(unwrapWithSchema).get("schema"), null);
unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
assertEquals("orders", record.getField("__table").toString());
Expand All @@ -60,10 +67,10 @@ public void testUnwrapJsonRecord() throws IOException {
}

@Test
public void testNestedArrayJsonRecord() throws JsonProcessingException {
public void testNestedArrayJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithArraySchema).get("payload"), null,
mapper.readTree(unwrapWithArraySchema).get("schema"), null);
unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null);

Schema schema = e.icebergSchema();
System.out.println(schema);
System.out.println(schema.asStruct());
Expand All @@ -79,10 +86,9 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException {
}

@Test
public void testNestedArray2JsonRecord() throws JsonProcessingException {
public void testNestedArray2JsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithArraySchema2).get("payload"), null,
mapper.readTree(unwrapWithArraySchema2).get("schema"), null);
unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
System.out.println(schema.asStruct());
System.out.println(schema);
Expand All @@ -94,10 +100,9 @@ public void testNestedArray2JsonRecord() throws JsonProcessingException {
}

@Test
public void testNestedGeomJsonRecord() throws JsonProcessingException {
public void testNestedGeomJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithGeomSchema).get("payload"), null,
mapper.readTree(unwrapWithGeomSchema).get("schema"), null);
unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
//System.out.println(schema);
Expand Down
Loading

0 comments on commit fcc6e49

Please sign in to comment.