Skip to content

Commit

Permalink
improve testing helper classes (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jan 5, 2022
1 parent cc3d3d6 commit b97595d
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 211 deletions.
9 changes: 0 additions & 9 deletions .run/IcebergChangeConsumerTest.run.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
<!--
~ /*
~ * Copyright memiiso Authors.
~ *
~ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
~ */
-->

<component name="ProjectRunConfigurationManager">
<configuration default="false" name="IcebergChangeConsumerTest" type="JUnit" factoryName="JUnit">
<module name="debezium-server-iceberg-sink"/>
Expand All @@ -19,7 +11,6 @@
<option name="MAIN_CLASS_NAME" value="io.debezium.server.iceberg.IcebergChangeConsumerTest"/>
<option name="METHOD_NAME" value=""/>
<option name="TEST_OBJECT" value="class"/>
<option name="VM_PARAMETERS" value="-ea -Ddebezium.sink.type=iceberg"/>
<option name="WORKING_DIRECTORY" value="$MODULE_DIR$"/>
<method v="2">
<option name="Make" enabled="true"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

package io.debezium.server.iceberg;

import io.debezium.engine.ChangeEvent;
import io.debezium.server.iceberg.testresources.*;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;
Expand All @@ -41,10 +39,11 @@ public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest
@Test
public void testSimpleUpsert() throws Exception {

List<ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(getCustomerRecord(1, "c"));
records.add(getCustomerRecord(2, "c"));
records.add(getCustomerRecord(3, "c"));
String dest = "inventory.customers_upsert";
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.of(dest, 1, "c"));
records.add(TestChangeEvent.of(dest, 2, "c"));
records.add(TestChangeEvent.of(dest, 3, "c"));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert");
Expand All @@ -53,10 +52,10 @@ public void testSimpleUpsert() throws Exception {

// 3 records should be updated 4th one should be inserted
records.clear();
records.add(getCustomerRecord(1, "r"));
records.add(getCustomerRecord(2, "d"));
records.add(getCustomerRecord(3, "u", "UpdatednameV1"));
records.add(getCustomerRecord(4, "c"));
records.add(TestChangeEvent.of(dest, 1, "r"));
records.add(TestChangeEvent.of(dest, 2, "d"));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV1"));
records.add(TestChangeEvent.of(dest, 4, "c"));
consumer.handleBatch(records, TestUtil.getCommitter());

ds = getTableData("testc.inventory.customers_upsert");
Expand All @@ -70,17 +69,17 @@ public void testSimpleUpsert() throws Exception {

records.clear();
// incase of duplicate records it should only keep the latest by epoch ts
records.add(getCustomerRecord(3, "r", "UpdatednameV2", 1L));
records.add(getCustomerRecord(3, "u", "UpdatednameV3", 2L));
records.add(getCustomerRecord(3, "u", "UpdatednameV4", 3L));
records.add(getCustomerRecord(4, "u", "Updatedname-4-V1", 4L));
records.add(getCustomerRecord(4, "u", "Updatedname-4-V2", 5L));
records.add(getCustomerRecord(4, "d", "Updatedname-4-V3", 6L));
records.add(getCustomerRecord(5, "d", 7L));
records.add(getCustomerRecord(6, "r", 8L));
records.add(getCustomerRecord(6, "r", 9L));
records.add(getCustomerRecord(6, "u", 10L));
records.add(getCustomerRecord(6, "u", "Updatedname-6-V1", 11L));
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", 5L));
records.add(TestChangeEvent.of(dest, 4, "d", "Updatedname-4-V3", 6L));
records.add(TestChangeEvent.of(dest, 5, "d", 7L));
records.add(TestChangeEvent.of(dest, 6, "r", 8L));
records.add(TestChangeEvent.of(dest, 6, "r", 9L));
records.add(TestChangeEvent.of(dest, 6, "u", 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", 11L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -93,10 +92,10 @@ public void testSimpleUpsert() throws Exception {
// in case of duplicate records including epoch ts, its should keep latest one based on operation priority
// ("c", 1, "r", 2, "u", 3, "d", 4);
records.clear();
records.add(getCustomerRecord(3, "d", "UpdatednameV5", 1L));
records.add(getCustomerRecord(3, "u", "UpdatednameV6", 1L));
records.add(getCustomerRecord(6, "c", "Updatedname-6-V2", 1L));
records.add(getCustomerRecord(6, "r", "Updatedname-6-V3", 1L));
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -105,10 +104,10 @@ public void testSimpleUpsert() throws Exception {

// if its not standard insert followed by update! should keep latest one
records.clear();
records.add(getCustomerRecord(7, "u", 1L));
records.add(getCustomerRecord(7, "d", 2L));
records.add(getCustomerRecord(7, "r", 3L));
records.add(getCustomerRecord(7, "u", "Updatedname-7-V1", 4L));
records.add(TestChangeEvent.of(dest, 7, "u", 1L));
records.add(TestChangeEvent.of(dest, 7, "d", 2L));
records.add(TestChangeEvent.of(dest, 7, "r", 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", 4L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -118,12 +117,13 @@ public void testSimpleUpsert() throws Exception {

@Test
public void testSimpleUpsertCompositeKey() throws Exception {
String dest = "inventory.customers_upsert_compositekey";
// test simple inserts
List<ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(getCustomerRecordCompositeKey(1, "c", "user1", 1L));
records.add(getCustomerRecordCompositeKey(1, "c", "user2", 1L));
records.add(getCustomerRecordCompositeKey(1, "u", "user1", 2L));
records.add(getCustomerRecordCompositeKey(1, "r", "user1", 3L));
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_compositekey");
Expand All @@ -132,67 +132,15 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1").count(), 2);

records.clear();
records.add(getCustomerRecordCompositeKey(1, "u", "user1", 2L));
records.add(getCustomerRecordCompositeKey(1, "r", "user1", 3L));
records.add(getCustomerRecordCompositeKey(1, "d", "user1", 3L));
records.add(getCustomerRecordCompositeKey(1, "d", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user2", 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_compositekey");
ds.show();
Assertions.assertEquals(ds.count(), 0);
Assertions.assertEquals(ds.where("first_name= 'user2'").count(), 0);
}

private TestChangeEvent<Object, Object> getCustomerRecord(Integer id, String operation, String name, Long epoch) {
String key = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false," + "\"field\":\"id\"}]," +
"\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," +
"\"payload\":{\"id\":" + id + "}}";
String val = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}," +
"{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," +
"{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," +
"{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," +
"{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," +
"\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," +
"\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" +
".com\"," +
"\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," +
"\"__deleted\":\"" + operation.equals("d") + "\"}} ";
return new TestChangeEvent<>(key, val, "testc.inventory.customers_upsert");
}

private TestChangeEvent<Object, Object> getCustomerRecordCompositeKey(Integer id, String operation, String name,
Long epoch) {
String key = "{\"schema\":{\"type\":\"struct\",\"fields\":[" +
"{\"type\":\"int32\",\"optional\":false," + "\"field\":\"id\"}," +
"{\"type\":\"string\",\"optional\":false," + "\"field\":\"first_name\"}" +
"]," +
"\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," +
"\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\"}}";

String val = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}," +
"{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," +
"{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," +
"{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," +
"{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," +
"\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," +
"\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" +
".com\"," +
"\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," +
"\"__deleted\":\"" + operation.equals("d") + "\"}} ";
return new TestChangeEvent<>(key, val, "testc.inventory.customers_upsert_compositekey");
}

private TestChangeEvent<Object, Object> getCustomerRecord(Integer id, String operation) {
return this.getCustomerRecord(id, operation, TestUtil.randomString(12), Instant.now().toEpochMilli());
}


private TestChangeEvent<Object, Object> getCustomerRecord(Integer id, String operation, String name) {
return this.getCustomerRecord(id, operation, name, Instant.now().toEpochMilli());
}

private TestChangeEvent<Object, Object> getCustomerRecord(Integer id, String operation, Long epoch) {
return this.getCustomerRecord(id, operation, TestUtil.randomString(12), epoch);
}


}
Loading

0 comments on commit b97595d

Please sign in to comment.