diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 6d7c2a4e..ac1778f7 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -21,7 +21,10 @@ import java.time.Duration; import java.time.Instant; -import java.util.*; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -41,7 +44,6 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.types.Types; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -137,23 +139,18 @@ void connect() { LOGGER.info("Using {}", icebergTableOperator.getClass().getName()); } - - public String map(String destination) { - return destination.replace(".", "_"); - } - + @Override public void handleBatch(List> records, DebeziumEngine.RecordCommitter> committer) throws InterruptedException { Instant start = Instant.now(); - Map>> result = records.stream() - .collect(Collectors.groupingBy( - objectObjectChangeEvent -> map(objectObjectChangeEvent.destination()), - Collectors.mapping(p -> p, - Collectors.toCollection(ArrayList::new)))); + Map>> result = + records.stream() + .map(IcebergChangeEvent::new) + .collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable)); - for (Map.Entry>> event : result.entrySet()) { + for (Map.Entry>> event : result.entrySet()) { final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); Table icebergTable = loadIcebergTable(tableIdentifier) .orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0))); @@ -185,26 +182,14 @@ protected void logConsumerProgress(long numUploadedEvents) { private Table createIcebergTable(TableIdentifier tableIdentifier, - ChangeEvent event) { + IcebergChangeEvent event) { if (!eventSchemaEnabled) { throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + - "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); - } - - if (event.value() == null) { - throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); - } - - List tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(event.value())); - List keyColumns = - IcebergUtil.getIcebergFieldsFromEventSchema(event.key() == null ? null : getBytes(event.key())); - - if (tableColumns.isEmpty()) { - throw new RuntimeException("Failed to create table " + tableIdentifier); + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); } - Schema schema = IcebergUtil.getSchema(tableColumns, keyColumns); + Schema schema = event.getSchema(); LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, schema.identifierFieldNames()); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java new file mode 100644 index 00000000..29aff7b5 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -0,0 +1,122 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg; + +import io.debezium.DebeziumException; +import io.debezium.engine.ChangeEvent; + +import java.util.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Ismail Simsek + */ +public class IcebergChangeEvent implements ChangeEvent { + + private final ChangeEvent event; + protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class); + protected static final ObjectMapper objectMapper = new ObjectMapper(); + + public IcebergChangeEvent(ChangeEvent e) { + event = e; + } + + @Override + public K key() { + return event.key(); + } + + @Override + public V value() { + return event.value(); + } + + @Override + public String destination() { + return event.destination(); + } + + public String destinationTable() { + return event.destination().replace(".","_"); + } + + public GenericRecord getIcebergRecord(Schema schema, JsonNode data) { + return IcebergUtil.getIcebergRecord(schema.asStruct(), data); + } + + public Schema getSchema() { + + if (this.value() == null) { + throw new RuntimeException("Failed to get event schema event value is null, destination:" + this.destination()); + } + + List tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(this.value())); + List keyColumns = + IcebergUtil.getIcebergFieldsFromEventSchema(this.key() == null ? null : getBytes(this.key())); + + if (tableColumns.isEmpty()) { + throw new RuntimeException("Failed to get schema destination:" + this.destination()); + } + + return getSchema(tableColumns, keyColumns); + } + + private Schema getSchema(List tableColumns, + List keyColumns) { + + Set identifierFieldIds = new HashSet<>(); + + for (Types.NestedField ic : keyColumns) { + boolean found = false; + + ListIterator colsIterator = tableColumns.listIterator(); + while (colsIterator.hasNext()) { + Types.NestedField tc = colsIterator.next(); + if (Objects.equals(tc.name(), ic.name())) { + identifierFieldIds.add(tc.fieldId()); + // set column as required its part of identifier filed + colsIterator.set(tc.asRequired()); + found = true; + break; + } + } + + if (!found) { + throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns"); + } + + } + + return new Schema(tableColumns, identifierFieldIds); + } + + + protected byte[] getBytes(Object object) { + if (object instanceof byte[]) { + return (byte[]) object; + } + else if (object instanceof String) { + return ((String) object).getBytes(); + } + throw new DebeziumException(unsupportedTypeMessage(object)); + } + + protected String unsupportedTypeMessage(Object object) { + final String type = (object == null) ? "null" : object.getClass().getName(); + return "Unexpected data type '" + type + "'"; + } +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 12c73309..98b52990 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -10,14 +10,16 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types; import org.eclipse.microprofile.config.Config; @@ -31,12 +33,12 @@ public class IcebergUtil { protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class); protected static final ObjectMapper jsonObjectMapper = new ObjectMapper(); - public static List getIcebergSchema(JsonNode eventSchema) { + private static List getIcebergSchema(JsonNode eventSchema) { LOGGER.debug(eventSchema.toString()); return getIcebergSchema(eventSchema, "", 0); } - public static PrimitiveType getIcebergFieldType(String fieldType) { + private static PrimitiveType getIcebergFieldType(String fieldType) { switch (fieldType) { case "int8": case "int16": @@ -63,7 +65,7 @@ public static PrimitiveType getIcebergFieldType(String fieldType) { } } - public static List getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) { + private static List getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) { List schemaColumns = new ArrayList<>(); String schemaType = eventSchema.get("type").textValue(); LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); @@ -106,17 +108,13 @@ public static List getIcebergSchema(JsonNode eventSchema, Str return schemaColumns; } - public static boolean hasSchema(JsonNode jsonNode) { + private static boolean hasSchema(JsonNode jsonNode) { return jsonNode != null && jsonNode.has("schema") && jsonNode.get("schema").has("fields") && jsonNode.get("schema").get("fields").isArray(); } - public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) { - return IcebergUtil.getIcebergRecord(schema.asStruct(), data); - } - public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) { Map mappedResult = new HashMap<>(); LOGGER.debug("Processing nested field:{}", tableFields); @@ -219,34 +217,6 @@ public static List getIcebergFieldsFromEventSchema(byte[] eve } } - public static Schema getSchema(List tableColumns, - List keyColumns) { - - Set identifierFieldIds = new HashSet<>(); - - for (Types.NestedField ic : keyColumns) { - boolean found = false; - - ListIterator colsIterator = tableColumns.listIterator(); - while (colsIterator.hasNext()) { - Types.NestedField tc = colsIterator.next(); - if (Objects.equals(tc.name(), ic.name())) { - identifierFieldIds.add(tc.fieldId()); - // set column as required its part of identifier filed - colsIterator.set(tc.asRequired()); - found = true; - break; - } - } - - if (!found) { - throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns"); - } - - } - - return new Schema(tableColumns, identifierFieldIds); - } public static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) { SortOrder.Builder sob = SortOrder.builderFor(schema); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java index b37579e2..9bec1c1a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java @@ -9,16 +9,12 @@ package io.debezium.server.iceberg.tableoperator; import io.debezium.DebeziumException; -import io.debezium.engine.ChangeEvent; import io.debezium.serde.DebeziumSerdes; -import io.debezium.server.iceberg.IcebergUtil; +import io.debezium.server.iceberg.IcebergChangeEvent; import java.io.IOException; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Locale; -import java.util.UUID; +import java.util.*; import com.fasterxml.jackson.databind.JsonNode; import org.apache.iceberg.DataFile; @@ -76,11 +72,11 @@ protected String unsupportedTypeMessage(Object object) { return "Unexpected data type '" + type + "'"; } - protected ArrayList toIcebergRecords(Schema schema, ArrayList> events) { + protected ArrayList toIcebergRecords(Schema schema, List> events) { ArrayList icebergRecords = new ArrayList<>(); - for (ChangeEvent e : events) { - GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), + for (IcebergChangeEvent e : events) { + GenericRecord icebergRecord = e.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), getBytes(e.value()))); icebergRecords.add(icebergRecord); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java index 2fc3bc51..19f921cc 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java @@ -8,9 +8,10 @@ package io.debezium.server.iceberg.tableoperator; -import io.debezium.engine.ChangeEvent; +import io.debezium.server.iceberg.IcebergChangeEvent; import java.util.ArrayList; +import java.util.List; import java.util.function.Predicate; import javax.enterprise.context.Dependent; import javax.inject.Named; @@ -28,7 +29,7 @@ public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); @Override - public void addToTable(Table icebergTable, ArrayList> events) { + public void addToTable(Table icebergTable, List> events) { ArrayList icebergRecords = toIcebergRecords(icebergTable.schema(), events); DataFile dataFile = getDataFile(icebergTable, icebergRecords); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java index 3513cd6f..c0c1780d 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java @@ -8,8 +8,7 @@ package io.debezium.server.iceberg.tableoperator; -import io.debezium.engine.ChangeEvent; -import io.debezium.server.iceberg.IcebergUtil; +import io.debezium.server.iceberg.IcebergChangeEvent; import java.io.IOException; import java.time.Instant; @@ -96,11 +95,11 @@ private Optional getDeleteFile(Table icebergTable, ArrayList return Optional.of(edw.toDeleteFile()); } - private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) { + private ArrayList toDeduppedIcebergRecords(Schema schema, List> events) { ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); - for (ChangeEvent e : events) { - GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), + for (IcebergChangeEvent e : events) { + GenericRecord icebergRecord = e.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), getBytes(e.value()))); // only replace it if its newer @@ -133,7 +132,7 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { } @Override - public void addToTable(Table icebergTable, ArrayList> events) { + public void addToTable(Table icebergTable, List> events) { if (icebergTable.sortOrder().isUnsorted()) { LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java index ff02f9a2..e1c9adf3 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java @@ -8,9 +8,9 @@ package io.debezium.server.iceberg.tableoperator; -import io.debezium.engine.ChangeEvent; +import io.debezium.server.iceberg.IcebergChangeEvent; -import java.util.ArrayList; +import java.util.List; import java.util.function.Predicate; import org.apache.iceberg.Table; @@ -20,7 +20,7 @@ public interface InterfaceIcebergTableOperator { void initialize(); - void addToTable(Table icebergTable, ArrayList> events); + void addToTable(Table icebergTable, List> events); Predicate filterEvents(); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index 2a7f0594..f003ffa3 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -9,11 +9,11 @@ package io.debezium.server.iceberg; import io.debezium.serde.DebeziumSerdes; +import io.debezium.server.iceberg.testresources.TestChangeEvent; import io.debezium.util.Testing; import java.io.IOException; import java.util.Collections; -import java.util.List; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -34,16 +34,15 @@ class TestIcebergUtil { @Test public void testNestedJsonRecord() throws JsonProcessingException { - List d = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema")); - assertTrue(d.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, 4:")); + Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(serdeWithSchema)).getSchema(); + assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " + + "4:")); } @Test public void testUnwrapJsonRecord() throws IOException { JsonNode event = new ObjectMapper().readTree(unwrapWithSchema).get("payload"); - List fileds = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(unwrapWithSchema) - .get("schema")); - Schema schema = new Schema(fileds); + Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(unwrapWithSchema)).getSchema(); GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), event); assertEquals("orders", record.getField("__table").toString()); assertEquals(16850, record.getField("order_date")); @@ -54,8 +53,7 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException { JsonNode jsonData = new ObjectMapper().readTree(unwrapWithArraySchema); JsonNode jsonPayload = jsonData.get("payload"); JsonNode jsonSchema = jsonData.get("schema"); - List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); - Schema schema = new Schema(schemaFields); + Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(unwrapWithArraySchema)).getSchema(); assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list, 4: schedule: optional list, 6:")); System.out.println(schema.asStruct()); System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType()); @@ -74,8 +72,7 @@ public void testNestedArray2JsonRecord() throws JsonProcessingException { JsonNode jsonSchema = jsonData.get("schema"); assertThrows(RuntimeException.class, () -> { - List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); - Schema schema = new Schema(schemaFields); + Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(unwrapWithArraySchema2)).getSchema(); System.out.println(schema.asStruct()); System.out.println(schema); System.out.println(schema.findField("tableChanges")); @@ -90,8 +87,7 @@ public void testNestedGeomJsonRecord() throws JsonProcessingException { JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema); JsonNode jsonPayload = jsonData.get("payload"); JsonNode jsonSchema = jsonData.get("schema"); - List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); - Schema schema = new Schema(schemaFields); + Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(unwrapWithGeomSchema)).getSchema(); GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); //System.out.println(schema); //System.out.println(record); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java index d1ff77e3..1120d549 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java @@ -22,6 +22,10 @@ public TestChangeEvent(K key, V value, String destination) { this.value = value; this.destination = destination; } + + public TestChangeEvent(V value) { + this(null,value,null); + } @Override public K key() { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestIcebergChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestIcebergChangeEvent.java new file mode 100644 index 00000000..c11f19c8 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestIcebergChangeEvent.java @@ -0,0 +1,49 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.testresources; + +import io.debezium.server.iceberg.IcebergChangeEvent; + +public class TestIcebergChangeEvent extends IcebergChangeEvent { + + private final K key; + private final V value; + private final String destination; + + public TestIcebergChangeEvent(K key, V value, String destination) { + super(null); + this.key = key; + this.value = value; + this.destination = destination; + } + + public TestIcebergChangeEvent(K key) { + this(key,null,null); + } + + @Override + public K key() { + return key; + } + + @Override + public V value() { + return value; + } + + @Override + public String destination() { + return destination; + } + + @Override + public String toString() { + return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value + ", sourceRecord=" + destination + "]"; + } +}