From fcc6e4935d738c35edf1b6b07edf4c0bec6e465a Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 11 Jun 2024 21:56:53 +0200 Subject: [PATCH] Improve IcebergChangeEvent class, Optimize event and schema deserialization (#343) * Improve IcebergChangeEvent class, Optimize event and schema deserialization * Improve IcebergChangeEvent class, Optimize event and schema deserialization * Improve access modifiers * Check for null key values * fix test * fix test --- .../server/iceberg/IcebergChangeConsumer.java | 38 +++++-------- .../server/iceberg/IcebergChangeEvent.java | 53 ++++++++++++------- .../tableoperator/IcebergTableOperator.java | 23 ++++---- ...ChangeConsumerUpsertDeleteDeletesTest.java | 12 ++--- .../iceberg/IcebergChangeEventTest.java | 39 ++++++++------ .../{testresources => }/TestChangeEvent.java | 6 ++- .../IcebergChangeEventBuilder.java | 30 +++++------ 7 files changed, 104 insertions(+), 97 deletions(-) rename debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/{testresources => }/TestChangeEvent.java (96%) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 57554642..140416c8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -8,6 +8,7 @@ package io.debezium.server.iceberg; +import com.fasterxml.jackson.databind.JsonNode; import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; @@ -19,19 +20,6 @@ import io.debezium.util.Clock; import io.debezium.util.Strings; import io.debezium.util.Threads; - -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.Dependent; import jakarta.enterprise.inject.Any; @@ -51,6 +39,16 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; @@ -64,7 +62,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15); - protected static final ObjectMapper mapper = new ObjectMapper(); protected static final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); protected static final Serde keySerde = DebeziumSerdes.payloadJson(JsonNode.class); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class); @@ -145,18 +142,7 @@ public void handleBatch(List> records, DebeziumEngin Map> result = records.stream() .map((ChangeEvent e) - -> { - try { - return new IcebergChangeEvent(e.destination(), - valDeserializer.deserialize(e.destination(), getBytes(e.value())), - e.key() == null ? null : keyDeserializer.deserialize(e.destination(), getBytes(e.key())), - mapper.readTree(getBytes(e.value())).get("schema"), - e.key() == null ? null : mapper.readTree(getBytes(e.key())).get("schema") - ); - } catch (IOException ex) { - throw new DebeziumException(ex); - } - }) + -> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key()))) .collect(Collectors.groupingBy(IcebergChangeEvent::destination)); // consume list of events for each destination table diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 8ea3af9b..c9187e96 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -9,6 +9,8 @@ package io.debezium.server.iceberg; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.debezium.DebeziumException; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.exceptions.ValidationException; @@ -25,42 +27,57 @@ import java.util.*; import java.util.concurrent.atomic.AtomicReference; +import static io.debezium.server.iceberg.IcebergChangeConsumer.keyDeserializer; +import static io.debezium.server.iceberg.IcebergChangeConsumer.valDeserializer; + /** - * * Converts iceberg json event to Iceberg GenericRecord. Extracts event schema and key fields. Converts event schema to Iceberg Schema. * * @author Ismail Simsek */ public class IcebergChangeEvent { + protected static final ObjectMapper mapper = new ObjectMapper(); protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class); public static final List TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms"); protected final String destination; - protected final JsonNode value; - protected final JsonNode key; - final JsonSchema jsonSchema; + protected final byte[] valueData; + protected final byte[] keyData; + private JsonNode value; + private JsonNode key; - public IcebergChangeEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) { + public IcebergChangeEvent(String destination, byte[] valueData, byte[] keyData) { this.destination = destination; - this.value = value; - this.key = key; - this.jsonSchema = new JsonSchema(valueSchema, keySchema); + this.valueData = valueData; + this.keyData = keyData; } public JsonNode key() { + if (key == null) { + key = keyDeserializer.deserialize(destination, keyData); + } + return key; } public JsonNode value() { + if (value == null) { + value = valDeserializer.deserialize(destination, valueData); + } + return value; } public JsonSchema jsonSchema() { - return jsonSchema; + try { + return new JsonSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema")); + } catch (IOException e) { + throw new DebeziumException("Failed to get event schema", e); + } } public Schema icebergSchema() { - return jsonSchema.icebergSchema(); + return jsonSchema().icebergSchema(); } public String destination() { @@ -68,7 +85,7 @@ public String destination() { } public GenericRecord asIcebergRecord(Schema schema) { - return asIcebergRecord(schema.asStruct(), value); + return asIcebergRecord(schema.asStruct(), value()); } private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) { @@ -176,13 +193,13 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node break; } - val = IcebergChangeConsumer.mapper.convertValue(node, ArrayList.class); + val = mapper.convertValue(node, ArrayList.class); break; case MAP: Type keyType = field.type().asMapType().keyType(); Type valType = field.type().asMapType().valueType(); if (keyType.isPrimitiveType() && valType.isPrimitiveType()) { - val = IcebergChangeConsumer.mapper.convertValue(node, Map.class); + val = mapper.convertValue(node, Map.class); break; } // convert complex/nested map value with recursion @@ -268,7 +285,7 @@ private static Map.Entry debeziumFieldToIcebergField private static List icebergSchemaFields(JsonNode schemaNode) { List schemaColumns = new ArrayList<>(); AtomicReference fieldId = new AtomicReference<>(1); - if (schemaNode != null && schemaNode.has("fields") && schemaNode.get("fields").isArray()) { + if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) { LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); schemaNode.get("fields").forEach(field -> { Map.Entry df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get()); @@ -289,11 +306,11 @@ public static class JsonSchema { this.keySchema = keySchema; } - public JsonNode valueSchema() { + protected JsonNode valueSchema() { return valueSchema; } - public JsonNode keySchema() { + protected JsonNode keySchema() { return keySchema; } @@ -310,9 +327,9 @@ public int hashCode() { return Objects.hash(valueSchema, keySchema); } - public Schema icebergSchema() { + private Schema icebergSchema() { - if (this.valueSchema == null) { + if (this.valueSchema.isNull()) { throw new RuntimeException("Failed to get schema from debezium event, event schema is null"); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index b9f07a3a..295e486f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -8,19 +8,10 @@ package io.debezium.server.iceberg.tableoperator; -import io.debezium.DebeziumException; -import io.debezium.server.iceberg.IcebergChangeEvent; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import io.debezium.DebeziumException; +import io.debezium.server.iceberg.IcebergChangeEvent; import jakarta.enterprise.context.Dependent; import jakarta.inject.Inject; import org.apache.iceberg.*; @@ -31,6 +22,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + /** * Wrapper to perform operations on iceberg tables * @@ -152,7 +151,7 @@ public void addToTable(Table icebergTable, List events) { for (Map.Entry> schemaEvents : eventsGroupedBySchema.entrySet()) { // extend table schema if new fields found - applyFieldAddition(icebergTable, schemaEvents.getKey().icebergSchema()); + applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema()); // add set of events to table addToTablePerSchema(icebergTable, schemaEvents.getValue()); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java index d72754f9..ea9db649 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java @@ -10,24 +10,22 @@ import io.debezium.server.iceberg.testresources.BaseSparkTest; import io.debezium.server.iceberg.testresources.S3Minio; -import io.debezium.server.iceberg.testresources.TestChangeEvent; import io.debezium.server.iceberg.testresources.TestUtil; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import jakarta.inject.Inject; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * * @author Ismail Simsek diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java index 10fccf0d..df91802e 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java @@ -8,9 +8,9 @@ package io.debezium.server.iceberg; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import io.debezium.serde.DebeziumSerdes; +import jakarta.inject.Inject; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.types.Types; @@ -18,11 +18,11 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; -import static io.debezium.server.iceberg.IcebergChangeConsumer.mapper; import static org.junit.jupiter.api.Assertions.*; class IcebergChangeEventTest { @@ -32,14 +32,22 @@ class IcebergChangeEventTest { final String unwrapWithArraySchema = Files.readString(Path.of("src/test/resources/json/serde-with-array.json")); final String unwrapWithArraySchema2 = Files.readString(Path.of("src/test/resources/json/serde-with-array2.json")); + @Inject + IcebergChangeConsumer consumer; + IcebergChangeEventTest() throws IOException { + // configure and set + IcebergChangeConsumer.valSerde.configure(Collections.emptyMap(), false); + IcebergChangeConsumer.valDeserializer = IcebergChangeConsumer.valSerde.deserializer(); + // configure and set + IcebergChangeConsumer.keySerde.configure(Collections.emptyMap(), true); + IcebergChangeConsumer.keyDeserializer = IcebergChangeConsumer.keySerde.deserializer(); } @Test - public void testNestedJsonRecord() throws JsonProcessingException { + public void testNestedJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(serdeWithSchema).get("payload"), null, - mapper.readTree(serdeWithSchema).get("schema"), null); + serdeWithSchema.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(); System.out.println(schema.toString()); assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " + @@ -47,10 +55,9 @@ public void testNestedJsonRecord() throws JsonProcessingException { } @Test - public void testUnwrapJsonRecord() throws IOException { + public void testUnwrapJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(unwrapWithSchema).get("payload"), null, - mapper.readTree(unwrapWithSchema).get("schema"), null); + unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(); GenericRecord record = e.asIcebergRecord(schema); assertEquals("orders", record.getField("__table").toString()); @@ -60,10 +67,10 @@ public void testUnwrapJsonRecord() throws IOException { } @Test - public void testNestedArrayJsonRecord() throws JsonProcessingException { + public void testNestedArrayJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(unwrapWithArraySchema).get("payload"), null, - mapper.readTree(unwrapWithArraySchema).get("schema"), null); + unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null); + Schema schema = e.icebergSchema(); System.out.println(schema); System.out.println(schema.asStruct()); @@ -79,10 +86,9 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException { } @Test - public void testNestedArray2JsonRecord() throws JsonProcessingException { + public void testNestedArray2JsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(unwrapWithArraySchema2).get("payload"), null, - mapper.readTree(unwrapWithArraySchema2).get("schema"), null); + unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(); System.out.println(schema.asStruct()); System.out.println(schema); @@ -94,10 +100,9 @@ public void testNestedArray2JsonRecord() throws JsonProcessingException { } @Test - public void testNestedGeomJsonRecord() throws JsonProcessingException { + public void testNestedGeomJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(unwrapWithGeomSchema).get("payload"), null, - mapper.readTree(unwrapWithGeomSchema).get("schema"), null); + unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(); GenericRecord record = e.asIcebergRecord(schema); //System.out.println(schema); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java similarity index 96% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java index b1f25633..6924e6d9 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java @@ -6,11 +6,13 @@ * */ -package io.debezium.server.iceberg.testresources; +package io.debezium.server.iceberg; + import io.debezium.engine.ChangeEvent; import io.debezium.engine.RecordChangeEvent; -import io.debezium.server.iceberg.IcebergChangeEvent; +import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder; +import io.debezium.server.iceberg.testresources.TestUtil; import java.time.Instant; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/IcebergChangeEventBuilder.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/IcebergChangeEventBuilder.java index 41d74719..36b2766e 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/IcebergChangeEventBuilder.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/IcebergChangeEventBuilder.java @@ -8,19 +8,19 @@ package io.debezium.server.iceberg.testresources; -import io.debezium.server.iceberg.IcebergChangeConsumerTest; -import io.debezium.server.iceberg.IcebergChangeEvent; - -import java.util.Iterator; -import java.util.Map; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.debezium.server.iceberg.IcebergChangeConsumerTest; +import io.debezium.server.iceberg.IcebergChangeEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Map; + /** * helper class used to generate test change events * @@ -106,10 +106,14 @@ public IcebergChangeEventBuilder addKeyField(String name, String val) { public IcebergChangeEvent build() { return new IcebergChangeEvent( this.destination, - payload, - keyPayload, - this.valueSchema(), - this.keySchema() + ("{" + + "\"schema\":" + this.valueSchema() + "," + + "\"payload\":" + payload.toString() + + "} ").getBytes(StandardCharsets.UTF_8), + ("{" + + "\"schema\":" + this.keySchema() + "," + + "\"payload\":" + keyPayload.toString() + + "} ").getBytes(StandardCharsets.UTF_8) ); } @@ -155,11 +159,7 @@ private ArrayNode getSchemaFields(ObjectNode node) { } else if (field.getValue().isFloat()) { schemaField.put("type", "float64"); } - if (keyPayload.has(field.getKey())) { - schemaField.put("optional", false); - } else { - schemaField.put("optional", true); - } + schemaField.put("optional", !keyPayload.has(field.getKey())); schemaField.put("field", field.getKey()); fields.add(schemaField); }