diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index ac1778f7..1fea1c0b 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -12,6 +12,7 @@ import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; +import io.debezium.serde.DebeziumSerdes; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator; @@ -19,36 +20,38 @@ import io.debezium.util.Strings; import io.debezium.util.Threads; +import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; import javax.enterprise.context.Dependent; import javax.enterprise.inject.Any; import javax.enterprise.inject.Instance; -import javax.enterprise.inject.literal.NamedLiteral; import javax.inject.Inject; import javax.inject.Named; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.iceberg.TableProperties.*; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; /** * Implementation of the consumer that delivers the messages to iceberg tables. @@ -60,8 +63,13 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15); + protected static final ObjectMapper mapper = new ObjectMapper(); + protected static final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); + protected static final Serde keySerde = DebeziumSerdes.payloadJson(JsonNode.class); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class); private static final String PROP_PREFIX = "debezium.sink.iceberg."; + static Deserializer valDeserializer; + static Deserializer keyDeserializer; protected final Clock clock = Clock.system(); final Configuration hadoopConf = new Configuration(); final Map icebergProperties = new ConcurrentHashMap<>(); @@ -116,44 +124,53 @@ void connect() { icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf); - Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); - if (instance.isAmbiguous()) { - throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found"); - } else if (instance.isUnsatisfied()) { - throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available"); - } - batchSizeWait = instance.get(); + batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName); batchSizeWait.initizalize(); - LOGGER.info("Using {}", batchSizeWait.getClass().getName()); - String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend"; - Instance toInstance = icebergTableOperatorInstances.select(NamedLiteral.of(icebergTableOperatorName)); - if (instance.isAmbiguous()) { - throw new DebeziumException("Multiple class named `" + icebergTableOperatorName + "` were found"); - } - if (instance.isUnsatisfied()) { - throw new DebeziumException("No class named `" + icebergTableOperatorName + "` found"); - } - icebergTableOperator = toInstance.get(); + final String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend"; + icebergTableOperator = IcebergUtil.selectInstance(icebergTableOperatorInstances, icebergTableOperatorName); icebergTableOperator.initialize(); - LOGGER.info("Using {}", icebergTableOperator.getClass().getName()); - + // configure and set + valSerde.configure(Collections.emptyMap(), false); + valDeserializer = valSerde.deserializer(); + // configure and set + keySerde.configure(Collections.emptyMap(), true); + keyDeserializer = keySerde.deserializer(); } - + @Override public void handleBatch(List> records, DebeziumEngine.RecordCommitter> committer) throws InterruptedException { Instant start = Instant.now(); - Map>> result = + Map> result = records.stream() - .map(IcebergChangeEvent::new) + .map((ChangeEvent e) + -> { + try { + return new IcebergChangeEvent(e.destination(), + valDeserializer.deserialize(e.destination(), getBytes(e.value())), + e.key() == null ? null : keyDeserializer.deserialize(e.destination(), getBytes(e.key())), + mapper.readTree(getBytes(e.value())).get("schema"), + e.key() == null ? null : mapper.readTree(getBytes(e.key())).get("schema") + ); + } catch (IOException ex) { + throw new DebeziumException(ex); + } + }) .collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable)); - for (Map.Entry>> event : result.entrySet()) { + for (Map.Entry> event : result.entrySet()) { final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); - Table icebergTable = loadIcebergTable(tableIdentifier) - .orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0))); + Table icebergTable = IcebergUtil.loadIcebergTable(icebergCatalog, tableIdentifier) + .orElseGet(() -> { + if (!eventSchemaEnabled) { + throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); + } + return IcebergUtil.createIcebergTable(icebergCatalog, tableIdentifier, + event.getValue().get(0).getSchema(), writeFormat); + }); //addToTable(icebergTable, event.getValue()); icebergTableOperator.addToTable(icebergTable, event.getValue()); } @@ -180,36 +197,4 @@ protected void logConsumerProgress(long numUploadedEvents) { } } - - private Table createIcebergTable(TableIdentifier tableIdentifier, - IcebergChangeEvent event) { - - if (!eventSchemaEnabled) { - throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + - "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); - } - - Schema schema = event.getSchema(); - - LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, - schema.identifierFieldNames()); - - return icebergCatalog.buildTable(tableIdentifier, schema) - .withProperty(FORMAT_VERSION, "2") - .withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH)) - .withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema)) - .create(); - } - - - private Optional loadIcebergTable(TableIdentifier tableId) { - try { - Table table = icebergCatalog.loadTable(tableId); - return Optional.of(table); - } catch (NoSuchTableException e) { - LOGGER.warn("Table not found: {}", tableId.toString()); - return Optional.empty(); - } - } - } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 29aff7b5..27cfb0e6 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -8,16 +8,15 @@ package io.debezium.server.iceberg; -import io.debezium.DebeziumException; -import io.debezium.engine.ChangeEvent; - +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,59 +24,56 @@ /** * @author Ismail Simsek */ -public class IcebergChangeEvent implements ChangeEvent { +public class IcebergChangeEvent { - private final ChangeEvent event; protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class); - protected static final ObjectMapper objectMapper = new ObjectMapper(); + protected final String destination; + protected final JsonNode value; + protected final JsonNode key; + protected final JsonNode valueSchema; + protected final JsonNode keySchema; - public IcebergChangeEvent(ChangeEvent e) { - event = e; + public IcebergChangeEvent(String destination, + JsonNode value, + JsonNode key, + JsonNode valueSchema, + JsonNode keySchema) { + this.destination = destination; + this.value = value; + this.key = key; + this.valueSchema = valueSchema; + this.keySchema = keySchema; } - @Override - public K key() { - return event.key(); + public JsonNode key() { + return key; } - @Override - public V value() { - return event.value(); + public String destinationTable() { + return destination.replace(".", "_"); } - @Override - public String destination() { - return event.destination(); - } - - public String destinationTable() { - return event.destination().replace(".","_"); + public GenericRecord getIcebergRecord(Schema schema) { + return getIcebergRecord(schema.asStruct(), value); } - public GenericRecord getIcebergRecord(Schema schema, JsonNode data) { - return IcebergUtil.getIcebergRecord(schema.asStruct(), data); + public String schemaHashCode() { + return valueSchema.hashCode() + "-" + keySchema.hashCode(); } - + public Schema getSchema() { - - if (this.value() == null) { - throw new RuntimeException("Failed to get event schema event value is null, destination:" + this.destination()); + + if (this.valueSchema == null) { + throw new RuntimeException("Failed to get event schema, event value is null, destination:" + this.destination); } - List tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(this.value())); - List keyColumns = - IcebergUtil.getIcebergFieldsFromEventSchema(this.key() == null ? null : getBytes(this.key())); + final List tableColumns = getValueFields(); if (tableColumns.isEmpty()) { - throw new RuntimeException("Failed to get schema destination:" + this.destination()); + throw new RuntimeException("Failed to get schema destination:" + this.destination); } - return getSchema(tableColumns, keyColumns); - } - - private Schema getSchema(List tableColumns, - List keyColumns) { - + final List keyColumns = getKeyFields(); Set identifierFieldIds = new HashSet<>(); for (Types.NestedField ic : keyColumns) { @@ -104,19 +100,163 @@ private Schema getSchema(List tableColumns, return new Schema(tableColumns, identifierFieldIds); } + private GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) { + Map mappedResult = new HashMap<>(); + LOGGER.debug("Processing nested field:{}", tableFields); + + for (Types.NestedField field : tableFields.fields()) { + // Set value to null if json event don't have the field + if (data == null || !data.has(field.name()) || data.get(field.name()) == null) { + mappedResult.put(field.name(), null); + continue; + } + // get the value of the field from json event, map it to iceberg value + mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name()))); + } + + return GenericRecord.create(tableFields).copy(mappedResult); + } + + //getIcebergFieldsFromEventSchema + private List getKeyFields() { + if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) { + LOGGER.debug(keySchema.toString()); + return getIcebergSchema(keySchema, "", 0); + } + LOGGER.trace("Key schema not found!"); + return new ArrayList<>(); + } - protected byte[] getBytes(Object object) { - if (object instanceof byte[]) { - return (byte[]) object; + private List getValueFields() { + if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) { + LOGGER.debug(valueSchema.toString()); + return getIcebergSchema(valueSchema, "", 0); } - else if (object instanceof String) { - return ((String) object).getBytes(); + LOGGER.trace("Event schema not found!"); + return new ArrayList<>(); + } + + private Type.PrimitiveType getIcebergFieldType(String fieldType) { + switch (fieldType) { + case "int8": + case "int16": + case "int32": // int 4 bytes + return Types.IntegerType.get(); + case "int64": // long 8 bytes + return Types.LongType.get(); + case "float8": + case "float16": + case "float32": // float is represented in 32 bits, + return Types.FloatType.get(); + case "float64": // double is represented in 64 bits + return Types.DoubleType.get(); + case "boolean": + return Types.BooleanType.get(); + case "string": + return Types.StringType.get(); + case "bytes": + return Types.BinaryType.get(); + default: + // default to String type + return Types.StringType.get(); + //throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!"); } - throw new DebeziumException(unsupportedTypeMessage(object)); } - protected String unsupportedTypeMessage(Object object) { - final String type = (object == null) ? "null" : object.getClass().getName(); - return "Unexpected data type '" + type + "'"; + private List getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) { + List schemaColumns = new ArrayList<>(); + String schemaType = eventSchema.get("type").textValue(); + LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); + for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) { + columnId++; + String fieldName = jsonSchemaFieldNode.get("field").textValue(); + String fieldType = jsonSchemaFieldNode.get("type").textValue(); + LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType); + switch (fieldType) { + case "array": + JsonNode items = jsonSchemaFieldNode.get("items"); + if (items != null && items.has("type")) { + String listItemType = items.get("type").textValue(); + if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { + throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName); + } + Type.PrimitiveType item = getIcebergFieldType(listItemType); + schemaColumns.add(Types.NestedField.optional( + columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); + //throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!"); + } else { + throw new RuntimeException("Unexpected Array type for field " + fieldName); + } + break; + case "map": + throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!"); + //schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); + //break; + case "struct": + // create it as struct, nested type + List subSchema = getIcebergSchema(jsonSchemaFieldNode, fieldName, columnId); + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); + columnId += subSchema.size(); + break; + default: //primitive types + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, getIcebergFieldType(fieldType))); + break; + } + } + return schemaColumns; } + + private Object jsonToGenericRecordVal(Types.NestedField field, + JsonNode node) { + LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); + final Object val; + switch (field.type().typeId()) { + case INTEGER: // int 4 bytes + val = node.isNull() ? null : node.asInt(); + break; + case LONG: // long 8 bytes + val = node.isNull() ? null : node.asLong(); + break; + case FLOAT: // float is represented in 32 bits, + val = node.isNull() ? null : node.floatValue(); + break; + case DOUBLE: // double is represented in 64 bits + val = node.isNull() ? null : node.asDouble(); + break; + case BOOLEAN: + val = node.isNull() ? null : node.asBoolean(); + break; + case STRING: + // if the node is not a value node (method isValueNode returns false), convert it to string. + val = node.isValueNode() ? node.asText(null) : node.toString(); + break; + case BINARY: + try { + val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); + } catch (IOException e) { + LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e); + throw new RuntimeException("Failed Processing Event!", e); + } + break; + case LIST: + val = IcebergChangeConsumer.mapper.convertValue(node, ArrayList.class); + break; + case MAP: + val = IcebergChangeConsumer.mapper.convertValue(node, Map.class); + break; + case STRUCT: + // create it as struct, nested type + // recursive call to get nested data/record + val = getIcebergRecord(field.type().asStructType(), node); + break; + default: + // default to String type + // if the node is not a value node (method isValueNode returns false), convert it to string. + val = node.isValueNode() ? node.asText(null) : node.toString(); + break; + } + + return val; + } + } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index f56d16ee..34109b49 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -143,12 +143,7 @@ void connect() { eventTable = icebergCatalog.loadTable(tableIdentifier); Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); - if (instance.isAmbiguous()) { - throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found"); - } else if (instance.isUnsatisfied()) { - throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available"); - } - batchSizeWait = instance.get(); + batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName); batchSizeWait.initizalize(); LOGGER.info("Using {}", batchSizeWait.getClass().getName()); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 98b52990..0177679f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -8,23 +8,27 @@ package io.debezium.server.iceberg; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; +import io.debezium.DebeziumException; + import java.util.HashMap; -import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Optional; +import javax.enterprise.inject.Instance; +import javax.enterprise.inject.literal.NamedLiteral; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.types.Type.PrimitiveType; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.eclipse.microprofile.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; /** * @author Ismail Simsek @@ -33,158 +37,6 @@ public class IcebergUtil { protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class); protected static final ObjectMapper jsonObjectMapper = new ObjectMapper(); - private static List getIcebergSchema(JsonNode eventSchema) { - LOGGER.debug(eventSchema.toString()); - return getIcebergSchema(eventSchema, "", 0); - } - - private static PrimitiveType getIcebergFieldType(String fieldType) { - switch (fieldType) { - case "int8": - case "int16": - case "int32": // int 4 bytes - return Types.IntegerType.get(); - case "int64": // long 8 bytes - return Types.LongType.get(); - case "float8": - case "float16": - case "float32": // float is represented in 32 bits, - return Types.FloatType.get(); - case "float64": // double is represented in 64 bits - return Types.DoubleType.get(); - case "boolean": - return Types.BooleanType.get(); - case "string": - return Types.StringType.get(); - case "bytes": - return Types.BinaryType.get(); - default: - // default to String type - return Types.StringType.get(); - //throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!"); - } - } - - private static List getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) { - List schemaColumns = new ArrayList<>(); - String schemaType = eventSchema.get("type").textValue(); - LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); - for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) { - columnId++; - String fieldName = jsonSchemaFieldNode.get("field").textValue(); - String fieldType = jsonSchemaFieldNode.get("type").textValue(); - LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType); - switch (fieldType) { - case "array": - JsonNode items = jsonSchemaFieldNode.get("items"); - if (items != null && items.has("type")) { - String listItemType = items.get("type").textValue(); - if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { - throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName); - } - PrimitiveType item = IcebergUtil.getIcebergFieldType(listItemType); - schemaColumns.add(Types.NestedField.optional( - columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); - //throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!"); - } else { - throw new RuntimeException("Unexpected Array type for field " + fieldName); - } - break; - case "map": - throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!"); - //schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); - //break; - case "struct": - // create it as struct, nested type - List subSchema = IcebergUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, columnId); - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); - columnId += subSchema.size(); - break; - default: //primitive types - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, IcebergUtil.getIcebergFieldType(fieldType))); - break; - } - } - return schemaColumns; - } - - private static boolean hasSchema(JsonNode jsonNode) { - return jsonNode != null - && jsonNode.has("schema") - && jsonNode.get("schema").has("fields") - && jsonNode.get("schema").get("fields").isArray(); - } - - public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) { - Map mappedResult = new HashMap<>(); - LOGGER.debug("Processing nested field:{}", tableFields); - - for (Types.NestedField field : tableFields.fields()) { - // Set value to null if json event don't have the field - if (data == null || !data.has(field.name()) || data.get(field.name()) == null) { - mappedResult.put(field.name(), null); - continue; - } - // get the value of the field from json event, map it to iceberg value - mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name()))); - } - - return GenericRecord.create(tableFields).copy(mappedResult); - } - - static Object jsonToGenericRecordVal(Types.NestedField field, - JsonNode node) { - LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); - final Object val; - switch (field.type().typeId()) { - case INTEGER: // int 4 bytes - val = node.isNull() ? null : node.asInt(); - break; - case LONG: // long 8 bytes - val = node.isNull() ? null : node.asLong(); - break; - case FLOAT: // float is represented in 32 bits, - val = node.isNull() ? null : node.floatValue(); - break; - case DOUBLE: // double is represented in 64 bits - val = node.isNull() ? null : node.asDouble(); - break; - case BOOLEAN: - val = node.isNull() ? null : node.asBoolean(); - break; - case STRING: - // if the node is not a value node (method isValueNode returns false), convert it to string. - val = node.isValueNode() ? node.asText(null) : node.toString(); - break; - case BINARY: - try { - val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); - } catch (IOException e) { - LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e); - throw new RuntimeException("Failed Processing Event!", e); - } - break; - case LIST: - val = jsonObjectMapper.convertValue(node, ArrayList.class); - break; - case MAP: - val = jsonObjectMapper.convertValue(node, Map.class); - break; - case STRUCT: - // create it as struct, nested type - // recursive call to get nested data/record - val = getIcebergRecord(field.type().asStructType(), node); - break; - default: - // default to String type - // if the node is not a value node (method isValueNode returns false), convert it to string. - val = node.isValueNode() ? node.asText(null) : node.toString(); - break; - } - - return val; - } - public static Map getConfigSubset(Config config, String prefix) { final Map ret = new HashMap<>(); @@ -198,27 +50,33 @@ public static Map getConfigSubset(Config config, String prefix) return ret; } - public static List getIcebergFieldsFromEventSchema(byte[] eventVal) { + public static T selectInstance(Instance instances, String name) { - if (eventVal == null) { - return new ArrayList<>(); + Instance instance = instances.select(NamedLiteral.of(name)); + if (instance.isAmbiguous()) { + throw new DebeziumException("Multiple batch size wait class named '" + name + "' were found"); + } else if (instance.isUnsatisfied()) { + throw new DebeziumException("No batch size wait class named '" + name + "' is available"); } - try { - JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal); - if (IcebergUtil.hasSchema(jsonEvent)) { - return IcebergUtil.getIcebergSchema(jsonEvent.get("schema")); - } - - LOGGER.trace("Event schema not found in the given data:!"); - return new ArrayList<>(); - } catch (IOException e) { - throw new RuntimeException(e); - } + LOGGER.info("Using {}", instance.getClass().getName()); + return instance.get(); } + public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, + Schema schema, String writeFormat) { + + LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, + schema.identifierFieldNames()); + + return icebergCatalog.buildTable(tableIdentifier, schema) + .withProperty(FORMAT_VERSION, "2") + .withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH)) + .withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema)) + .create(); + } - public static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) { + private static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) { SortOrder.Builder sob = SortOrder.builderFor(schema); for (String fieldName : schema.identifierFieldNames()) { sob = sob.asc(fieldName); @@ -227,4 +85,14 @@ public static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) { return sob.build(); } + public static Optional
loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId) { + try { + Table table = icebergCatalog.loadTable(tableId); + return Optional.of(table); + } catch (NoSuchTableException e) { + LOGGER.warn("Table not found: {}", tableId.toString()); + return Optional.empty(); + } + } + } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java index 9bec1c1a..69f25d57 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java @@ -8,15 +8,15 @@ package io.debezium.server.iceberg.tableoperator; -import io.debezium.DebeziumException; -import io.debezium.serde.DebeziumSerdes; import io.debezium.server.iceberg.IcebergChangeEvent; import java.io.IOException; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.UUID; -import com.fasterxml.jackson.databind.JsonNode; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; @@ -27,8 +27,6 @@ import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.primitives.Ints; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; @@ -42,49 +40,22 @@ abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); - final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); - Deserializer valDeserializer; - @Override public void initialize() { - valSerde.configure(Collections.emptyMap(), false); - valDeserializer = valSerde.deserializer(); } - protected byte[] getBytes(Object object) { - if (object instanceof byte[]) { - return (byte[]) object; - } else if (object instanceof String) { - return ((String) object).getBytes(); - } - throw new DebeziumException(unsupportedTypeMessage(object)); - } - - protected String getString(Object object) { - if (object instanceof String) { - return (String) object; - } - throw new DebeziumException(unsupportedTypeMessage(object)); - } - - protected String unsupportedTypeMessage(Object object) { - final String type = (object == null) ? "null" : object.getClass().getName(); - return "Unexpected data type '" + type + "'"; - } - - protected ArrayList toIcebergRecords(Schema schema, List> events) { + protected ArrayList toIcebergRecords(Schema schema, List events) { ArrayList icebergRecords = new ArrayList<>(); - for (IcebergChangeEvent e : events) { - GenericRecord icebergRecord = e.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), - getBytes(e.value()))); + for (IcebergChangeEvent e : events) { + GenericRecord icebergRecord = e.getIcebergRecord(schema); icebergRecords.add(icebergRecord); } return icebergRecords; } - FileFormat getFileFormat(Table icebergTable){ + FileFormat getFileFormat(Table icebergTable) { String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); return FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT)); } @@ -97,16 +68,16 @@ GenericAppenderFactory getAppender(Table icebergTable) { icebergTable.schema(), null); } - + protected DataFile getDataFile(Table icebergTable, ArrayList icebergRecords) { - + FileFormat fileFormat = getFileFormat(icebergTable); GenericAppenderFactory appender = getAppender(icebergTable); final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name(); OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); - + DataWriter dw = appender.newDataWriter(icebergTable.encryption().encrypt(out), fileFormat, null); - + icebergRecords.stream().filter(this.filterEvents()).forEach(dw::add); try { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java index 19f921cc..ccdb57a8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java @@ -29,7 +29,7 @@ public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); @Override - public void addToTable(Table icebergTable, List> events) { + public void addToTable(Table icebergTable, List events) { ArrayList icebergRecords = toIcebergRecords(icebergTable.schema(), events); DataFile dataFile = getDataFile(icebergTable, icebergRecords); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java index c0c1780d..4d1ffdb4 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java @@ -95,12 +95,11 @@ private Optional getDeleteFile(Table icebergTable, ArrayList return Optional.of(edw.toDeleteFile()); } - private ArrayList toDeduppedIcebergRecords(Schema schema, List> events) { + private ArrayList toDeduppedIcebergRecords(Schema schema, List events) { ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); - for (IcebergChangeEvent e : events) { - GenericRecord icebergRecord = e.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), - getBytes(e.value()))); + for (IcebergChangeEvent e : events) { + GenericRecord icebergRecord = e.getIcebergRecord(schema); // only replace it if its newer if (icebergRecordsmap.containsKey(e.key())) { @@ -132,7 +131,7 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { } @Override - public void addToTable(Table icebergTable, List> events) { + public void addToTable(Table icebergTable, List events) { if (icebergTable.sortOrder().isUnsorted()) { LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java index e1c9adf3..68d51eb6 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java @@ -20,7 +20,7 @@ public interface InterfaceIcebergTableOperator { void initialize(); - void addToTable(Table icebergTable, List> events); + void addToTable(Table icebergTable, List events); Predicate filterEvents(); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index f003ffa3..abb28d9c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -9,7 +9,6 @@ package io.debezium.server.iceberg; import io.debezium.serde.DebeziumSerdes; -import io.debezium.server.iceberg.testresources.TestChangeEvent; import io.debezium.util.Testing; import java.io.IOException; @@ -17,12 +16,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.types.Types; import org.apache.kafka.common.serialization.Serde; import org.junit.jupiter.api.Test; +import static io.debezium.server.iceberg.IcebergChangeConsumer.mapper; import static org.junit.jupiter.api.Assertions.*; class TestIcebergUtil { @@ -34,45 +33,49 @@ class TestIcebergUtil { @Test public void testNestedJsonRecord() throws JsonProcessingException { - Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(serdeWithSchema)).getSchema(); + IcebergChangeEvent e = new IcebergChangeEvent("test", + mapper.readTree(serdeWithSchema).get("payload"),null, + mapper.readTree(serdeWithSchema).get("schema"),null); + Schema schema = e.getSchema(); assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " + "4:")); } @Test public void testUnwrapJsonRecord() throws IOException { - JsonNode event = new ObjectMapper().readTree(unwrapWithSchema).get("payload"); - Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(unwrapWithSchema)).getSchema(); - GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), event); + IcebergChangeEvent e = new IcebergChangeEvent("test", + mapper.readTree(unwrapWithSchema).get("payload"),null, + mapper.readTree(unwrapWithSchema).get("schema"),null); + Schema schema = e.getSchema(); + GenericRecord record = e.getIcebergRecord(schema); assertEquals("orders", record.getField("__table").toString()); assertEquals(16850, record.getField("order_date")); } @Test public void testNestedArrayJsonRecord() throws JsonProcessingException { - JsonNode jsonData = new ObjectMapper().readTree(unwrapWithArraySchema); - JsonNode jsonPayload = jsonData.get("payload"); - JsonNode jsonSchema = jsonData.get("schema"); - Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(unwrapWithArraySchema)).getSchema(); + IcebergChangeEvent e = new IcebergChangeEvent("test", + mapper.readTree(unwrapWithArraySchema).get("payload"),null, + mapper.readTree(unwrapWithArraySchema).get("schema"),null); + Schema schema = e.getSchema(); assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list, 4: schedule: optional list, 6:")); System.out.println(schema.asStruct()); System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType()); System.out.println(schema.findField("schedule").type().asListType().elementType()); assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(),"int"); assertEquals(schema.findField("schedule").type().asListType().elementType().toString(),"string"); - GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); + GenericRecord record = e.getIcebergRecord(schema); //System.out.println(record); assertTrue( record.toString().contains("[10000, 10001, 10002, 10003]")); } @Test public void testNestedArray2JsonRecord() throws JsonProcessingException { - JsonNode jsonData = new ObjectMapper().readTree(unwrapWithArraySchema2); - JsonNode jsonPayload = jsonData.get("payload"); - JsonNode jsonSchema = jsonData.get("schema"); - assertThrows(RuntimeException.class, () -> { - Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(unwrapWithArraySchema2)).getSchema(); + IcebergChangeEvent e = new IcebergChangeEvent("test", + mapper.readTree(unwrapWithArraySchema2).get("payload"),null, + mapper.readTree(unwrapWithArraySchema2).get("schema"),null); + Schema schema = e.getSchema(); System.out.println(schema.asStruct()); System.out.println(schema); System.out.println(schema.findField("tableChanges")); @@ -84,11 +87,11 @@ public void testNestedArray2JsonRecord() throws JsonProcessingException { @Test public void testNestedGeomJsonRecord() throws JsonProcessingException { - JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema); - JsonNode jsonPayload = jsonData.get("payload"); - JsonNode jsonSchema = jsonData.get("schema"); - Schema schema = new IcebergChangeEvent<>(new TestChangeEvent<>(unwrapWithGeomSchema)).getSchema(); - GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); + IcebergChangeEvent e = new IcebergChangeEvent("test", + mapper.readTree(unwrapWithGeomSchema).get("payload"),null, + mapper.readTree(unwrapWithGeomSchema).get("schema"),null); + Schema schema = e.getSchema(); + GenericRecord record = e.getIcebergRecord(schema); //System.out.println(schema); //System.out.println(record); assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>")); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestIcebergChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestIcebergChangeEvent.java deleted file mode 100644 index c11f19c8..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestIcebergChangeEvent.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg.testresources; - -import io.debezium.server.iceberg.IcebergChangeEvent; - -public class TestIcebergChangeEvent extends IcebergChangeEvent { - - private final K key; - private final V value; - private final String destination; - - public TestIcebergChangeEvent(K key, V value, String destination) { - super(null); - this.key = key; - this.value = value; - this.destination = destination; - } - - public TestIcebergChangeEvent(K key) { - this(key,null,null); - } - - @Override - public K key() { - return key; - } - - @Override - public V value() { - return value; - } - - @Override - public String destination() { - return destination; - } - - @Override - public String toString() { - return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value + ", sourceRecord=" + destination + "]"; - } -}