Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve IcebergChangeEvent class, Optimize event and schema deserialization #343

Merged
merged 6 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package io.debezium.server.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
Expand All @@ -19,19 +20,6 @@
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Any;
Expand All @@ -51,6 +39,16 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

Expand All @@ -64,7 +62,6 @@
public class IcebergChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {

protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15);
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
protected static final Serde<JsonNode> keySerde = DebeziumSerdes.payloadJson(JsonNode.class);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class);
Expand Down Expand Up @@ -145,18 +142,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
Map<String, List<IcebergChangeEvent>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
-> {
try {
return new IcebergChangeEvent(e.destination(),
valDeserializer.deserialize(e.destination(), getBytes(e.value())),
e.key() == null ? null : keyDeserializer.deserialize(e.destination(), getBytes(e.key())),
mapper.readTree(getBytes(e.value())).get("schema"),
e.key() == null ? null : mapper.readTree(getBytes(e.key())).get("schema")
);
} catch (IOException ex) {
throw new DebeziumException(ex);
}
})
-> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key())))
.collect(Collectors.groupingBy(IcebergChangeEvent::destination));

// consume list of events for each destination table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package io.debezium.server.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
Expand All @@ -25,50 +27,65 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

import static io.debezium.server.iceberg.IcebergChangeConsumer.keyDeserializer;
import static io.debezium.server.iceberg.IcebergChangeConsumer.valDeserializer;

/**
*
* Converts iceberg json event to Iceberg GenericRecord. Extracts event schema and key fields. Converts event schema to Iceberg Schema.
*
* @author Ismail Simsek
*/
public class IcebergChangeEvent {

protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class);
public static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
final JsonSchema jsonSchema;
protected final byte[] valueData;
protected final byte[] keyData;
private JsonNode value;
private JsonNode key;

public IcebergChangeEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) {
public IcebergChangeEvent(String destination, byte[] valueData, byte[] keyData) {
this.destination = destination;
this.value = value;
this.key = key;
this.jsonSchema = new JsonSchema(valueSchema, keySchema);
this.valueData = valueData;
this.keyData = keyData;
}

public JsonNode key() {
if (key == null) {
key = keyDeserializer.deserialize(destination, keyData);
}

return key;
}

public JsonNode value() {
if (value == null) {
value = valDeserializer.deserialize(destination, valueData);
}

return value;
}

public JsonSchema jsonSchema() {
return jsonSchema;
try {
return new JsonSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
} catch (IOException e) {
throw new DebeziumException("Failed to get event schema", e);
}
}

public Schema icebergSchema() {
return jsonSchema.icebergSchema();
return jsonSchema().icebergSchema();
}

public String destination() {
return destination;
}

public GenericRecord asIcebergRecord(Schema schema) {
return asIcebergRecord(schema.asStruct(), value);
return asIcebergRecord(schema.asStruct(), value());
}

private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
Expand Down Expand Up @@ -176,13 +193,13 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
break;
}

val = IcebergChangeConsumer.mapper.convertValue(node, ArrayList.class);
val = mapper.convertValue(node, ArrayList.class);
break;
case MAP:
Type keyType = field.type().asMapType().keyType();
Type valType = field.type().asMapType().valueType();
if (keyType.isPrimitiveType() && valType.isPrimitiveType()) {
val = IcebergChangeConsumer.mapper.convertValue(node, Map.class);
val = mapper.convertValue(node, Map.class);
break;
}
// convert complex/nested map value with recursion
Expand Down Expand Up @@ -268,7 +285,7 @@ private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
Expand All @@ -289,11 +306,11 @@ public static class JsonSchema {
this.keySchema = keySchema;
}

public JsonNode valueSchema() {
protected JsonNode valueSchema() {
return valueSchema;
}

public JsonNode keySchema() {
protected JsonNode keySchema() {
return keySchema;
}

Expand All @@ -310,9 +327,9 @@ public int hashCode() {
return Objects.hash(valueSchema, keySchema);
}

public Schema icebergSchema() {
private Schema icebergSchema() {

if (this.valueSchema == null) {
if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,10 @@

package io.debezium.server.iceberg.tableoperator;

import io.debezium.DebeziumException;
import io.debezium.server.iceberg.IcebergChangeEvent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.debezium.DebeziumException;
import io.debezium.server.iceberg.IcebergChangeEvent;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.iceberg.*;
Expand All @@ -31,6 +22,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Wrapper to perform operations on iceberg tables
*
Expand Down Expand Up @@ -152,7 +151,7 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

for (Map.Entry<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getKey().icebergSchema());
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema());
// add set of events to table
addToTablePerSchema(icebergTable, schemaEvents.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,22 @@

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.TestChangeEvent;
import io.debezium.server.iceberg.testresources.TestUtil;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import jakarta.inject.Inject;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
*
* @author Ismail Simsek
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@

package io.debezium.server.iceberg;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.serde.DebeziumSerdes;
import jakarta.inject.Inject;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Types;
import org.apache.kafka.common.serialization.Serde;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;

import static io.debezium.server.iceberg.IcebergChangeConsumer.mapper;
import static org.junit.jupiter.api.Assertions.*;

class IcebergChangeEventTest {
Expand All @@ -32,25 +32,32 @@ class IcebergChangeEventTest {
final String unwrapWithArraySchema = Files.readString(Path.of("src/test/resources/json/serde-with-array.json"));
final String unwrapWithArraySchema2 = Files.readString(Path.of("src/test/resources/json/serde-with-array2.json"));

@Inject
IcebergChangeConsumer consumer;

IcebergChangeEventTest() throws IOException {
// configure and set
IcebergChangeConsumer.valSerde.configure(Collections.emptyMap(), false);
IcebergChangeConsumer.valDeserializer = IcebergChangeConsumer.valSerde.deserializer();
// configure and set
IcebergChangeConsumer.keySerde.configure(Collections.emptyMap(), true);
IcebergChangeConsumer.keyDeserializer = IcebergChangeConsumer.keySerde.deserializer();
}

@Test
public void testNestedJsonRecord() throws JsonProcessingException {
public void testNestedJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(serdeWithSchema).get("payload"), null,
mapper.readTree(serdeWithSchema).get("schema"), null);
serdeWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
System.out.println(schema.toString());
assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " +
"4:"));
}

@Test
public void testUnwrapJsonRecord() throws IOException {
public void testUnwrapJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithSchema).get("payload"), null,
mapper.readTree(unwrapWithSchema).get("schema"), null);
unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
assertEquals("orders", record.getField("__table").toString());
Expand All @@ -60,10 +67,10 @@ public void testUnwrapJsonRecord() throws IOException {
}

@Test
public void testNestedArrayJsonRecord() throws JsonProcessingException {
public void testNestedArrayJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithArraySchema).get("payload"), null,
mapper.readTree(unwrapWithArraySchema).get("schema"), null);
unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null);

Schema schema = e.icebergSchema();
System.out.println(schema);
System.out.println(schema.asStruct());
Expand All @@ -79,10 +86,9 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException {
}

@Test
public void testNestedArray2JsonRecord() throws JsonProcessingException {
public void testNestedArray2JsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithArraySchema2).get("payload"), null,
mapper.readTree(unwrapWithArraySchema2).get("schema"), null);
unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
System.out.println(schema.asStruct());
System.out.println(schema);
Expand All @@ -94,10 +100,9 @@ public void testNestedArray2JsonRecord() throws JsonProcessingException {
}

@Test
public void testNestedGeomJsonRecord() throws JsonProcessingException {
public void testNestedGeomJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithGeomSchema).get("payload"), null,
mapper.readTree(unwrapWithGeomSchema).get("schema"), null);
unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
//System.out.println(schema);
Expand Down
Loading