Skip to content

Commit

Permalink
Test consumer without event flattening
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 16, 2024
1 parent 3b1e479 commit a087ca8
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public Map<String, String> getConfigOverrides() {
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
config.put("debezium.transforms", ",");
config.put("debezium.sink.iceberg.upsert", "false");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "true");
config.put("debezium.sink.iceberg.create-identifier-fields", "false");
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public Map<String, String> getConfigOverrides() {
config.put("debezium.sink.iceberg.destination-regexp", "\\d");
config.put("debezium.source.hstore.handling.mode", "map");
config.put("debezium.transforms", ",");
config.put("debezium.sink.iceberg.create-identifier-fields", "false");
return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class IcebergChangeEventTest {
public void testNestedJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
serdeWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
Schema schema = e.icebergSchema(true);
System.out.println(schema.toString());
assertEquals(schema.toString(), ("""
table {
Expand All @@ -69,7 +69,7 @@ public void testNestedJsonRecord() {
public void testUnwrapJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
Schema schema = e.icebergSchema(true);
GenericRecord record = e.asIcebergRecord(schema);
assertEquals("orders", record.getField("__table").toString());
assertEquals(16850, record.getField("order_date"));
Expand All @@ -95,7 +95,7 @@ public void testNestedArrayJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null);

Schema schema = e.icebergSchema();
Schema schema = e.icebergSchema(true);
assertEquals(schema.toString(), """
table {
1: name: optional string
Expand All @@ -119,7 +119,7 @@ public void testNestedArrayJsonRecord() {
public void testNestedArray2JsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
Schema schema = e.icebergSchema(true);
System.out.println(schema);
assertEquals(schema.toString(), """
table {
Expand All @@ -136,7 +136,7 @@ public void testNestedArray2JsonRecord() {
public void testNestedGeomJsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
Schema schema = e.icebergSchema(true);
GenericRecord record = e.asIcebergRecord(schema);
assertEquals(schema.toString(), """
table {
Expand Down Expand Up @@ -180,7 +180,7 @@ public void valuePayloadWithSchemaAsJsonNode() {
@Test
public void testIcebergChangeEventSchemaWithKey() {
TestChangeEvent<Object, Object> debeziumEvent = TestChangeEvent.ofCompositeKey("destination", 1, "u", "user1", 2L);
Schema schema = debeziumEvent.toIcebergChangeEvent().icebergSchema();
Schema schema = debeziumEvent.toIcebergChangeEvent().icebergSchema(true);
assertEquals(schema.toString(), """
table {
1: id: required int (id)
Expand Down Expand Up @@ -212,7 +212,7 @@ public void testIcebergChangeEventSchemaWithKey() {

// test when PK is not first two columns!
TestChangeEvent<String, String> debeziumEvent2 = new TestChangeEvent<>(key, val, "test");
Schema schema2 = debeziumEvent2.toIcebergChangeEvent().icebergSchema();
Schema schema2 = debeziumEvent2.toIcebergChangeEvent().icebergSchema(true);
assertEquals(schema2.toString(), """
table {
1: first_column: optional string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,25 @@ public void testIcebergChangeEventSchemaWithNestedKey() throws IOException {
String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json"));
String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json"));
TestChangeEvent<String, String> dbzEvent = new TestChangeEvent<>(key, val, "test");
Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema();

Exception exception = assertThrows(RuntimeException.class, () -> {
dbzEvent.toIcebergChangeEvent().icebergSchema(true);
});
assertTrue(exception.getMessage().contains("Identifier fields are not supported for untested events"));

Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema(false);
assertEquals("""
table {
1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int>
7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int>
7: after: optional struct<8: order_number: optional int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int>
13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string>
31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long>
35: op: optional string
36: ts_ms: optional long
37: ts_us: optional long
38: ts_ns: optional long
}""", schema.toString());
assertEquals(Set.of(8), schema.identifierFieldIds());
assertEquals(Set.of(), schema.identifierFieldIds());
}

@Test
Expand All @@ -71,7 +77,13 @@ public void testIcebergChangeEventSchemaWithDelete() throws IOException {
String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-delete-val-withschema.json"));
TestChangeEvent<String, String> dbzEvent = new TestChangeEvent<>(key, val, "test");
IcebergChangeEvent ie = dbzEvent.toIcebergChangeEvent();
System.out.println(ie.asIcebergRecord(ie.icebergSchema()));

Exception exception = assertThrows(RuntimeException.class, () -> {
ie.icebergSchema(true);
});
assertTrue(exception.getMessage().contains("Identifier fields are not supported for untested events"));
// print converted event value!
System.out.println(ie.asIcebergRecord(ie.icebergSchema(false)));
}

public static class TestProfile implements QuarkusTestProfile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testIcebergChangeEventBuilder() {
.addField("preferences", "feature1", true)
.addField("preferences", "feature2", true)
.build();
Assertions.assertTrue(schema1.sameSchema(t.icebergSchema()));
Assertions.assertTrue(schema1.sameSchema(t.icebergSchema(true)));

Schema schema2 = new Schema(
optional(1, "id", Types.IntegerType.get()),
Expand All @@ -68,7 +68,7 @@ public void testIcebergChangeEventBuilder() {
.addField("preferences", "feature1", true)
.addField("preferences", "feature2", true)
.build();
Assertions.assertTrue(schema2.sameSchema(t.icebergSchema()));
Assertions.assertTrue(schema2.sameSchema(t.icebergSchema(true)));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class IcebergTableOperatorTest extends BaseSparkTest {
public Table createTable(IcebergChangeEvent sampleEvent) {
HadoopCatalog icebergCatalog = getIcebergCatalog();
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(true), writeFormat);
}

@Test
Expand Down

0 comments on commit a087ca8

Please sign in to comment.