Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for map type part2 #233

Merged
merged 1 commit into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@

package io.debezium.server.iceberg;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -24,6 +17,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

/**
* @author Ismail Simsek
*/
Expand Down Expand Up @@ -84,7 +85,7 @@ private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonN
return record;
}

private static Type.PrimitiveType icebergFieldType(String fieldName, String fieldType) {
private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
Expand Down Expand Up @@ -161,10 +162,36 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
}
break;
case LIST:
Types.NestedField listItemsType = field.type().asListType().fields().get(0);
// recursive value mapping when list elements are nested type
if (listItemsType.type().isNestedType()) {
ArrayList<Object> listVal = new ArrayList<>();
node.elements().forEachRemaining(element -> {
listVal.add(jsonValToIcebergVal(field.type().asListType().fields().get(0), element));
});
val = listVal;
break;
}

val = IcebergChangeConsumer.mapper.convertValue(node, ArrayList.class);
break;
case MAP:
val = IcebergChangeConsumer.mapper.convertValue(node, Map.class);
Type keyType = field.type().asMapType().keyType();
Type valType = field.type().asMapType().valueType();
if (keyType.isPrimitiveType() && valType.isPrimitiveType()) {
val = IcebergChangeConsumer.mapper.convertValue(node, Map.class);
break;
}
// convert complex/nested map value with recursion
HashMap<Object, Object> mapVal = new HashMap<>();
node.fields().forEachRemaining(f -> {
if (valType.isStructType()) {
mapVal.put(f.getKey(), asIcebergRecord(valType.asStructType(), f.getValue()));
} else {
mapVal.put(f.getKey(), f.getValue());
}
});
val = mapVal;
break;
case STRUCT:
// create it as struct, nested type
Expand All @@ -181,7 +208,76 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
return val;
}

public class JsonSchema {
/***
* converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types.
*
* @param fieldSchema JsonNode representation of debezium field schema.
* @param fieldName name of the debezium field
* @param fieldId id sequence to assign iceberg field, after the conversion.
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) {
String fieldType = fieldSchema.get("type").textValue();
switch (fieldType) {
case "struct":
// struct type
int rootStructId = fieldId;
List<Types.NestedField> subFields = new ArrayList<>();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
fieldId += 1;
String subFieldName = subFieldSchema.get("field").textValue();
Map.Entry<Integer, Types.NestedField> subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId);
subFields.add(subField.getValue());
fieldId = subField.getKey();
}
// create it as struct, nested type
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields)));
case "map":
int rootMapId = fieldId;
int keyFieldId = fieldId + 1;
int valFieldId = fieldId + 2;
fieldId = fieldId + 3;
Map.Entry<Integer, Types.NestedField> keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId);
fieldId = keyField.getKey() + 1;
Map.Entry<Integer, Types.NestedField> valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId);
fieldId = valField.getKey();
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField));

case "array":
int rootArrayId = fieldId;
fieldId += 1;
Map.Entry<Integer, Types.NestedField> listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId);
fieldId = listItemsField.getKey() + 1;
Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField));
default:
// its primitive field
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType)));
}
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
});
}
return schemaColumns;
}


public static class JsonSchema {
private final JsonNode valueSchema;
private final JsonNode keySchema;

Expand Down Expand Up @@ -211,38 +307,19 @@ public int hashCode() {
return Objects.hash(valueSchema, keySchema);
}

//getIcebergFieldsFromEventSchema
private List<Types.NestedField> KeySchemaFields() {
if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) {
LOGGER.debug(keySchema.toString());
return icebergSchema(keySchema, "", 0);
}
LOGGER.trace("Key schema not found!");
return new ArrayList<>();
}

private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return icebergSchema(valueSchema, "", 0);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
}

public Schema icebergSchema() {

if (this.valueSchema == null) {
throw new RuntimeException("Failed to get event schema, event schema is null");
}

final List<Types.NestedField> tableColumns = valueSchemaFields();
final List<Types.NestedField> tableColumns = icebergSchemaFields(valueSchema);

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get event schema, event schema has no fields!");
}

final List<Types.NestedField> keyColumns = KeySchemaFields();
final List<Types.NestedField> keyColumns = icebergSchemaFields(keySchema);
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
Expand All @@ -268,53 +345,6 @@ public Schema icebergSchema() {

return new Schema(tableColumns, identifierFieldIds);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) {
columnId++;
String fieldName = jsonSchemaFieldNode.get("field").textValue();
String fieldType = jsonSchemaFieldNode.get("type").textValue();
LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType);
switch (fieldType) {
case "array":
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
String listItemType = items.get("type").textValue();

if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex nested array types are not supported," + " array[" + listItemType + "], field " + fieldName);
}

Type.PrimitiveType item = icebergFieldType(fieldName, listItemType);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
break;
case "map":
String keyFieldType = jsonSchemaFieldNode.get("keys").get("type").textValue();
String varFieldlType = jsonSchemaFieldNode.get("keys").get("type").textValue();
Types.MapType mapField = Types.MapType.ofOptional(columnId, ++columnId, icebergFieldType(fieldName+".keys", keyFieldType), icebergFieldType(fieldName+".values", varFieldlType));
schemaColumns.add(Types.NestedField.optional(++columnId,fieldName, mapField));
break;
case "struct":
// create it as struct, nested type
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldName, fieldType)));
break;
}
}

return schemaColumns;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@

package io.debezium.server.iceberg;

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Lists;
import jakarta.inject.Inject;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -34,6 +29,11 @@
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand All @@ -59,7 +59,6 @@ public class IcebergChangeConsumerTest extends BaseSparkTest {
@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
SourcePostgresqlDB.runSQL("CREATE EXTENSION hstore;");
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.data_types;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
Expand Down Expand Up @@ -102,7 +101,7 @@ public void testConsumingVariousDataTypes() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.data_types");
df.show(true);
df.show(false);
return df.where("c_text is null AND c_varchar is null AND c_int is null " +
"AND c_date is null AND c_timestamp is null AND c_timestamptz is null " +
"AND c_float is null AND c_decimal is null AND c_numeric is null AND c_interval is null " +
Expand All @@ -114,7 +113,7 @@ public void testConsumingVariousDataTypes() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.data_types");
df.show(true);
df.show(false);
return df.count() == 2;
} catch (Exception e) {
return false;
Expand All @@ -128,19 +127,23 @@ public void testConsumingArrayDataType() throws Exception {
" CREATE TABLE IF NOT EXISTS inventory.array_data (\n" +
" name text,\n" +
" pay_by_quarter integer[],\n" +
" c_array_of_map hstore[],\n" +
" schedule text[][]\n" +
" );\n" +
" INSERT INTO inventory.array_data\n" +
" VALUES " +
"('Carol2',\n" +
" ARRAY[20000, 25000, 25000, 25000],\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n" +
"('Bill',\n" +
" '{10000, 10000, 10000, 10000}',\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" +
" ('Carol1',\n" +
" '{20000, 25000, 25000, 25000}',\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
" '{20000, 25000, 25000, 25000}',\n" +
" ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
";";
SourcePostgresqlDB.runSQL(sql);

Expand Down
Loading