Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more tests on schema conversion #348

Merged
merged 1 commit into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import jakarta.inject.Inject;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -22,6 +23,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -58,6 +60,7 @@ public void testNestedJsonRecord() {
26: op: optional string
27: ts_ms: optional long
}"""));
assertEquals(schema.identifierFieldIds(), Set.of());
}

@Test
Expand All @@ -81,6 +84,8 @@ public void testUnwrapJsonRecord() {
9: __source_ts_ms: optional timestamptz
10: __deleted: optional string
}""");

assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema);
System.out.println(record);
}
Expand All @@ -102,6 +107,7 @@ public void testNestedArrayJsonRecord() {
11: __db: optional string
12: __deleted: optional string
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema);
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
Expand All @@ -128,6 +134,7 @@ public void testNestedArray2JsonRecord() {
19: ddl: optional string
20: tableChanges: optional list<struct<22: type: optional string, 23: id: optional string, 24: table: optional struct<25: defaultCharsetName: optional string, 26: primaryKeyColumnNames: optional list<string>, 29: columns: optional list<struct<31: name: optional string, 32: jdbcType: optional int, 33: nativeType: optional int, 34: typeName: optional string, 35: typeExpression: optional string, 36: charsetName: optional string, 37: length: optional int, 38: scale: optional int, 39: position: optional int, 40: optional: optional boolean, 41: autoIncremented: optional boolean, 42: generated: optional boolean>>>>>
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema.findField("tableChanges"));
System.out.println(schema.findField("tableChanges").type().asListType().elementType());
//GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
Expand All @@ -153,6 +160,7 @@ public void testNestedGeomJsonRecord() {
11: __db: optional string
12: __deleted: optional string
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
GenericRecord g = (GenericRecord) record.getField("g");
GenericRecord h = (GenericRecord) record.getField("h");
assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass()));
Expand Down Expand Up @@ -180,6 +188,55 @@ public void valuePayloadWithSchemaAsJsonNode() {
JsonNode deserializedSchema = valueSerde.deserializer().deserialize("xx", serdeWithSchema.getBytes());
System.out.println(deserializedSchema);
assertFalse(deserializedSchema.has("schema"));

}

@Test
public void testIcebergChangeEventSchemaWithKey() {
TestChangeEvent<Object, Object> debeziumEvent = TestChangeEvent.ofCompositeKey("destination", 1, "u", "user1", 2L);
Schema schema = debeziumEvent.toIcebergChangeEvent().icebergSchema();
assertEquals(schema.toString(), """
table {
1: id: required int (id)
2: first_name: required string (id)
3: __op: optional string
4: __source_ts_ms: optional timestamptz
5: __deleted: optional boolean
}""");
assertEquals(schema.identifierFieldIds(), Set.of(1, 2));


final IcebergChangeEvent t = new IcebergChangeEventBuilder()
.destination("test")
.addField("first_column", "dummy-value")
.addKeyField("id", 1)
.addKeyField("first_name", "Marx")
.addField("__op", "c")
.addField("__source_ts_ms", 0L)
.addField("__deleted", false)
.build();
final String key = "{" +
"\"schema\":" + t.changeEventSchema().keySchema() + "," +
"\"payload\":" + t.key() +
"} ";
final String val = "{" +
"\"schema\":" + t.changeEventSchema().valueSchema() + "," +
"\"payload\":" + t.value() +
"} ";

// test when PK is not first two columns!
TestChangeEvent<String, String> debeziumEvent2 = new TestChangeEvent<>(key, val, "test");
Schema schema2 = debeziumEvent2.toIcebergChangeEvent().icebergSchema();
assertEquals(schema2.toString(), """
table {
1: first_column: optional string
2: id: required int (id)
3: first_name: required string (id)
4: __op: optional string
5: __source_ts_ms: optional timestamptz
6: __deleted: optional boolean
}""");
assertEquals(schema2.identifierFieldIds(), Set.of(2, 3));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.debezium.server.iceberg.testresources.TestUtil;

import java.nio.charset.StandardCharsets;
import java.time.Instant;

/**
* helper class used to generate test customer change events
* helper class used to generate test debezium change events
*
* @author Ismail Simsek
*/
Expand All @@ -37,6 +38,18 @@ public TestChangeEvent(V value) {
this(null, value, null);
}

public byte[] getKeyBytes() {
return this.key.toString().getBytes(StandardCharsets.UTF_8);
}

public byte[] getValueBytes() {
return this.value.toString().getBytes(StandardCharsets.UTF_8);
}

public IcebergChangeEvent toIcebergChangeEvent() {
return new IcebergChangeEvent(this.destination(), this.getValueBytes(), this.getKeyBytes());
}

public static TestChangeEvent<Object, Object> of(String destination, Integer id, String operation, String name,
Long epoch) {
final IcebergChangeEvent t = new IcebergChangeEventBuilder()
Expand Down