Skip to content

Commit

Permalink
Add more tests on schema conversion (#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 15, 2024
1 parent f5121dd commit 0997cae
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import jakarta.inject.Inject;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -22,6 +23,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -58,6 +60,7 @@ public void testNestedJsonRecord() {
26: op: optional string
27: ts_ms: optional long
}"""));
assertEquals(schema.identifierFieldIds(), Set.of());
}

@Test
Expand All @@ -81,6 +84,8 @@ public void testUnwrapJsonRecord() {
9: __source_ts_ms: optional timestamptz
10: __deleted: optional string
}""");

assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema);
System.out.println(record);
}
Expand All @@ -102,6 +107,7 @@ public void testNestedArrayJsonRecord() {
11: __db: optional string
12: __deleted: optional string
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema);
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
Expand All @@ -128,6 +134,7 @@ public void testNestedArray2JsonRecord() {
19: ddl: optional string
20: tableChanges: optional list<struct<22: type: optional string, 23: id: optional string, 24: table: optional struct<25: defaultCharsetName: optional string, 26: primaryKeyColumnNames: optional list<string>, 29: columns: optional list<struct<31: name: optional string, 32: jdbcType: optional int, 33: nativeType: optional int, 34: typeName: optional string, 35: typeExpression: optional string, 36: charsetName: optional string, 37: length: optional int, 38: scale: optional int, 39: position: optional int, 40: optional: optional boolean, 41: autoIncremented: optional boolean, 42: generated: optional boolean>>>>>
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema.findField("tableChanges"));
System.out.println(schema.findField("tableChanges").type().asListType().elementType());
//GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
Expand All @@ -153,6 +160,7 @@ public void testNestedGeomJsonRecord() {
11: __db: optional string
12: __deleted: optional string
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
GenericRecord g = (GenericRecord) record.getField("g");
GenericRecord h = (GenericRecord) record.getField("h");
assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass()));
Expand Down Expand Up @@ -180,6 +188,55 @@ public void valuePayloadWithSchemaAsJsonNode() {
JsonNode deserializedSchema = valueSerde.deserializer().deserialize("xx", serdeWithSchema.getBytes());
System.out.println(deserializedSchema);
assertFalse(deserializedSchema.has("schema"));

}

@Test
public void testIcebergChangeEventSchemaWithKey() {
TestChangeEvent<Object, Object> debeziumEvent = TestChangeEvent.ofCompositeKey("destination", 1, "u", "user1", 2L);
Schema schema = debeziumEvent.toIcebergChangeEvent().icebergSchema();
assertEquals(schema.toString(), """
table {
1: id: required int (id)
2: first_name: required string (id)
3: __op: optional string
4: __source_ts_ms: optional timestamptz
5: __deleted: optional boolean
}""");
assertEquals(schema.identifierFieldIds(), Set.of(1, 2));


final IcebergChangeEvent t = new IcebergChangeEventBuilder()
.destination("test")
.addField("first_column", "dummy-value")
.addKeyField("id", 1)
.addKeyField("first_name", "Marx")
.addField("__op", "c")
.addField("__source_ts_ms", 0L)
.addField("__deleted", false)
.build();
final String key = "{" +
"\"schema\":" + t.changeEventSchema().keySchema() + "," +
"\"payload\":" + t.key() +
"} ";
final String val = "{" +
"\"schema\":" + t.changeEventSchema().valueSchema() + "," +
"\"payload\":" + t.value() +
"} ";

// test when PK is not first two columns!
TestChangeEvent<String, String> debeziumEvent2 = new TestChangeEvent<>(key, val, "test");
Schema schema2 = debeziumEvent2.toIcebergChangeEvent().icebergSchema();
assertEquals(schema2.toString(), """
table {
1: first_column: optional string
2: id: required int (id)
3: first_name: required string (id)
4: __op: optional string
5: __source_ts_ms: optional timestamptz
6: __deleted: optional boolean
}""");
assertEquals(schema2.identifierFieldIds(), Set.of(2, 3));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.debezium.server.iceberg.testresources.TestUtil;

import java.nio.charset.StandardCharsets;
import java.time.Instant;

/**
* helper class used to generate test customer change events
* helper class used to generate test debezium change events
*
* @author Ismail Simsek
*/
Expand All @@ -37,6 +38,18 @@ public TestChangeEvent(V value) {
this(null, value, null);
}

public byte[] getKeyBytes() {
return this.key.toString().getBytes(StandardCharsets.UTF_8);
}

public byte[] getValueBytes() {
return this.value.toString().getBytes(StandardCharsets.UTF_8);
}

public IcebergChangeEvent toIcebergChangeEvent() {
return new IcebergChangeEvent(this.destination(), this.getValueBytes(), this.getKeyBytes());
}

public static TestChangeEvent<Object, Object> of(String destination, Integer id, String operation, String name,
Long epoch) {
final IcebergChangeEvent t = new IcebergChangeEventBuilder()
Expand Down

0 comments on commit 0997cae

Please sign in to comment.