Skip to content

Commit

Permalink
Add support to map type part2 (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Sep 9, 2023
1 parent 0841617 commit d64860d
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@

package io.debezium.server.iceberg;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -24,6 +17,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

/**
* @author Ismail Simsek
*/
Expand Down Expand Up @@ -84,7 +85,7 @@ private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonN
return record;
}

private static Type.PrimitiveType icebergFieldType(String fieldName, String fieldType) {
private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
Expand Down Expand Up @@ -161,10 +162,36 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
}
break;
case LIST:
Types.NestedField listItemsType = field.type().asListType().fields().get(0);
// recursive value mapping when list elements are nested type
if (listItemsType.type().isNestedType()) {
ArrayList<Object> listVal = new ArrayList<>();
node.elements().forEachRemaining(element -> {
listVal.add(jsonValToIcebergVal(field.type().asListType().fields().get(0), element));
});
val = listVal;
break;
}

val = IcebergChangeConsumer.mapper.convertValue(node, ArrayList.class);
break;
case MAP:
val = IcebergChangeConsumer.mapper.convertValue(node, Map.class);
Type keyType = field.type().asMapType().keyType();
Type valType = field.type().asMapType().valueType();
if (keyType.isPrimitiveType() && valType.isPrimitiveType()) {
val = IcebergChangeConsumer.mapper.convertValue(node, Map.class);
break;
}
// convert complex/nested map value with recursion
HashMap<Object, Object> mapVal = new HashMap<>();
node.fields().forEachRemaining(f -> {
if (valType.isStructType()) {
mapVal.put(f.getKey(), asIcebergRecord(valType.asStructType(), f.getValue()));
} else {
mapVal.put(f.getKey(), f.getValue());
}
});
val = mapVal;
break;
case STRUCT:
// create it as struct, nested type
Expand All @@ -181,7 +208,76 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
return val;
}

public class JsonSchema {
/***
* converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types.
*
* @param fieldSchema JsonNode representation of debezium field schema.
* @param fieldName name of the debezium field
* @param fieldId id sequence to assign iceberg field, after the conversion.
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) {
String fieldType = fieldSchema.get("type").textValue();
switch (fieldType) {
case "struct":
// struct type
int rootStructId = fieldId;
List<Types.NestedField> subFields = new ArrayList<>();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
fieldId += 1;
String subFieldName = subFieldSchema.get("field").textValue();
Map.Entry<Integer, Types.NestedField> subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId);
subFields.add(subField.getValue());
fieldId = subField.getKey();
}
// create it as struct, nested type
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields)));
case "map":
int rootMapId = fieldId;
int keyFieldId = fieldId + 1;
int valFieldId = fieldId + 2;
fieldId = fieldId + 3;
Map.Entry<Integer, Types.NestedField> keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId);
fieldId = keyField.getKey() + 1;
Map.Entry<Integer, Types.NestedField> valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId);
fieldId = valField.getKey();
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField));

case "array":
int rootArrayId = fieldId;
fieldId += 1;
Map.Entry<Integer, Types.NestedField> listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId);
fieldId = listItemsField.getKey() + 1;
Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField));
default:
// its primitive field
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType)));
}
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
});
}
return schemaColumns;
}


public static class JsonSchema {
private final JsonNode valueSchema;
private final JsonNode keySchema;

Expand Down Expand Up @@ -211,38 +307,19 @@ public int hashCode() {
return Objects.hash(valueSchema, keySchema);
}

//getIcebergFieldsFromEventSchema
private List<Types.NestedField> KeySchemaFields() {
if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) {
LOGGER.debug(keySchema.toString());
return icebergSchema(keySchema, "", 0);
}
LOGGER.trace("Key schema not found!");
return new ArrayList<>();
}

private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return icebergSchema(valueSchema, "", 0);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
}

public Schema icebergSchema() {

if (this.valueSchema == null) {
throw new RuntimeException("Failed to get event schema, event schema is null");
}

final List<Types.NestedField> tableColumns = valueSchemaFields();
final List<Types.NestedField> tableColumns = icebergSchemaFields(valueSchema);

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get event schema, event schema has no fields!");
}

final List<Types.NestedField> keyColumns = KeySchemaFields();
final List<Types.NestedField> keyColumns = icebergSchemaFields(keySchema);
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
Expand All @@ -268,53 +345,6 @@ public Schema icebergSchema() {

return new Schema(tableColumns, identifierFieldIds);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) {
columnId++;
String fieldName = jsonSchemaFieldNode.get("field").textValue();
String fieldType = jsonSchemaFieldNode.get("type").textValue();
LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType);
switch (fieldType) {
case "array":
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
String listItemType = items.get("type").textValue();

if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex nested array types are not supported," + " array[" + listItemType + "], field " + fieldName);
}

Type.PrimitiveType item = icebergFieldType(fieldName, listItemType);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
break;
case "map":
String keyFieldType = jsonSchemaFieldNode.get("keys").get("type").textValue();
String varFieldlType = jsonSchemaFieldNode.get("keys").get("type").textValue();
Types.MapType mapField = Types.MapType.ofOptional(columnId, ++columnId, icebergFieldType(fieldName+".keys", keyFieldType), icebergFieldType(fieldName+".values", varFieldlType));
schemaColumns.add(Types.NestedField.optional(++columnId,fieldName, mapField));
break;
case "struct":
// create it as struct, nested type
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldName, fieldType)));
break;
}
}

return schemaColumns;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@

package io.debezium.server.iceberg;

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Lists;
import jakarta.inject.Inject;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -34,6 +29,11 @@
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand All @@ -59,7 +59,6 @@ public class IcebergChangeConsumerTest extends BaseSparkTest {
@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
SourcePostgresqlDB.runSQL("CREATE EXTENSION hstore;");
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.data_types;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
Expand Down Expand Up @@ -102,7 +101,7 @@ public void testConsumingVariousDataTypes() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.data_types");
df.show(true);
df.show(false);
return df.where("c_text is null AND c_varchar is null AND c_int is null " +
"AND c_date is null AND c_timestamp is null AND c_timestamptz is null " +
"AND c_float is null AND c_decimal is null AND c_numeric is null AND c_interval is null " +
Expand All @@ -114,7 +113,7 @@ public void testConsumingVariousDataTypes() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.data_types");
df.show(true);
df.show(false);
return df.count() == 2;
} catch (Exception e) {
return false;
Expand All @@ -128,19 +127,23 @@ public void testConsumingArrayDataType() throws Exception {
" CREATE TABLE IF NOT EXISTS inventory.array_data (\n" +
" name text,\n" +
" pay_by_quarter integer[],\n" +
" c_array_of_map hstore[],\n" +
" schedule text[][]\n" +
" );\n" +
" INSERT INTO inventory.array_data\n" +
" VALUES " +
"('Carol2',\n" +
" ARRAY[20000, 25000, 25000, 25000],\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n" +
"('Bill',\n" +
" '{10000, 10000, 10000, 10000}',\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" +
" ('Carol1',\n" +
" '{20000, 25000, 25000, 25000}',\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
" '{20000, 25000, 25000, 25000}',\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
";";
SourcePostgresqlDB.runSQL(sql);

Expand Down
Loading

0 comments on commit d64860d

Please sign in to comment.