From 1eff67a5b230f4a1bdfd19acec1b6842b1bb6159 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 15 Jun 2024 21:32:32 +0200 Subject: [PATCH] Check if configuration has isUnwrapped (#352) --- .../server/iceberg/IcebergChangeEvent.java | 17 +---- .../debezium/server/iceberg/IcebergUtil.java | 29 +++++++ .../iceberg/IcebergChangeEventTest.java | 23 +----- .../IcebergChangeEventTestUnwrapped.java | 75 +++++++++++++++++++ .../server/iceberg/IcebergUtilTest.java | 42 +++++++++++ .../server/iceberg/TestConfigSource.java | 3 + .../IcebergChangeEventBuilderTest.java | 2 + 7 files changed, 157 insertions(+), 34 deletions(-) create mode 100644 debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java create mode 100644 debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergUtilTest.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index e1016de9..38533234 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -39,6 +39,7 @@ public class IcebergChangeEvent { protected static final ObjectMapper mapper = new ObjectMapper(); protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class); public static final List TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms"); + static final boolean eventsAreUnwrapped = IcebergUtil.configIncludesUnwrapSmt(); protected final String destination; protected final byte[] valueData; protected final byte[] keyData; @@ -76,23 +77,13 @@ public ChangeEventSchema changeEventSchema() { } public Schema icebergSchema() { - return changeEventSchema().icebergSchema(this.isUnwrapped()); + return changeEventSchema().icebergSchema(); } public String destination() { return destination; } - public boolean isUnwrapped() { - return !( - this.value().has("after") && - this.value().has("source") && - this.value().has("before") && - this.value().get("after").isObject() && - this.value().get("source").isObject() - ); - } - public GenericRecord asIcebergRecord(Schema schema) { return asIcebergRecord(schema.asStruct(), value()); } @@ -321,14 +312,14 @@ private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaN return schemaData; } - private Schema icebergSchema(boolean isUnwrapped) { + private Schema icebergSchema() { if (this.valueSchema.isNull()) { throw new RuntimeException("Failed to get schema from debezium event, event schema is null"); } IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData(); - if (!isUnwrapped && keySchema != null) { + if (!eventsAreUnwrapped && keySchema != null) { // NOTE: events re not unwrapped, align schema with event schema, so then we can scan event and key schemas synchronously ObjectNode nestedKeySchema = mapper.createObjectNode(); nestedKeySchema.put("type", "struct"); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index bd07cbef..b3a2e44b 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -30,6 +30,8 @@ import org.apache.iceberg.io.OutputFileFactory; import com.google.common.primitives.Ints; import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.ConfigValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.iceberg.TableProperties.*; @@ -56,6 +58,33 @@ public static Map getConfigSubset(Config config, String prefix) return ret; } + + public static boolean configIncludesUnwrapSmt() { + return configIncludesUnwrapSmt(ConfigProvider.getConfig()); + } + + //@TestingOnly + static boolean configIncludesUnwrapSmt(Config config) { + // first lets find the config value for debezium statements + ConfigValue stms = config.getConfigValue("debezium.transforms"); + if (stms == null || stms.getValue() == null || stms.getValue().isEmpty() || stms.getValue().isBlank()){ + return false; + } + + String[] stmsList = stms.getValue().split(","); + final String regexVal = "^io\\.debezium\\..*transforms\\.ExtractNew.*State$"; + // we have debezium statements configured! let's check if we have event flattening config is set. + for (String stmName : stmsList) { + ConfigValue stmVal = config.getConfigValue("debezium.transforms."+stmName+".type"); + if (stmVal != null && stmVal.getValue() != null && !stmVal.getValue().isEmpty() && !stmVal.getValue().isBlank() && stmVal.getValue().matches(regexVal)){ + return true; + } + } + + return false; + } + + public static T selectInstance(Instance instances, String name) { Instance instance = instances.select(NamedLiteral.of(name)); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java index 4512b272..d8a05fda 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.debezium.serde.DebeziumSerdes; import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder; +import io.quarkus.test.junit.QuarkusTest; import jakarta.inject.Inject; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; @@ -27,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.*; +@QuarkusTest class IcebergChangeEventTest { final String serdeWithSchema = Files.readString(Path.of("src/test/resources/json/serde-with-schema.json")); final String unwrapWithSchema = Files.readString(Path.of("src/test/resources/json/unwrap-with-schema.json")); @@ -223,25 +225,4 @@ public void testIcebergChangeEventSchemaWithKey() { assertEquals(schema2.identifierFieldIds(), Set.of(2, 3)); } - - @Test - public void testIcebergChangeEventSchemaWithNestedKey() throws IOException { - String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json")); - String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json")); - TestChangeEvent dbzEvent = new TestChangeEvent<>(key, val, "test"); - Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema(); - assertEquals(""" - table { - 1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int> - 7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int> - 13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string> - 31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long> - 35: op: optional string - 36: ts_ms: optional long - 37: ts_us: optional long - 38: ts_ns: optional long - }""", schema.toString()); - assertEquals(Set.of(8), schema.identifierFieldIds()); - } - } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java new file mode 100644 index 00000000..23494a32 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java @@ -0,0 +1,75 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg; + +import com.fasterxml.jackson.databind.JsonNode; +import io.debezium.serde.DebeziumSerdes; +import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.SourceMysqlDB; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import jakarta.inject.Inject; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.types.Types; +import org.apache.kafka.common.serialization.Serde; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.*; + +@QuarkusTest +@TestProfile(IcebergChangeEventTestUnwrapped.TestProfile.class) +class IcebergChangeEventTestUnwrapped { + + @Test + public void testIcebergChangeEventSchemaWithNestedKey() throws IOException { + + assertFalse(IcebergUtil.configIncludesUnwrapSmt()); + + String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json")); + String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json")); + TestChangeEvent dbzEvent = new TestChangeEvent<>(key, val, "test"); + Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema(); + assertEquals(""" + table { + 1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int> + 7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int> + 13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string> + 31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long> + 35: op: optional string + 36: ts_ms: optional long + 37: ts_us: optional long + 38: ts_ns: optional long + }""", schema.toString()); + assertEquals(Set.of(8), schema.identifierFieldIds()); + } + public static class TestProfile implements QuarkusTestProfile { + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("debezium.transforms", ","); + config.put("debezium.transforms.unwrap.type", "null"); + + return config; + } + + } +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergUtilTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergUtilTest.java new file mode 100644 index 00000000..23c73796 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergUtilTest.java @@ -0,0 +1,42 @@ +package io.debezium.server.iceberg; + +import io.smallrye.config.SmallRyeConfig; +import io.smallrye.config.SmallRyeConfigBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +public class IcebergUtilTest { + + + @Test + void configIncludesUnwrapSmt() { + // mongodb transforms + String mongoConfVal = "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"; + SmallRyeConfig mongodbConf = new SmallRyeConfigBuilder(). + withProfile("mongodb"). + withDefaultValue("debezium.transforms", "unwrap,someothersmt"). + withDefaultValue("%mongodb.debezium.transforms.unwrap.type", mongoConfVal). + build(); + assertEquals(mongodbConf.getConfigValue("debezium.transforms.unwrap.type").getValue(), mongoConfVal); + assertTrue(IcebergUtil.configIncludesUnwrapSmt(mongodbConf)); + + String defaultConfVal = "io.debezium.transforms.ExtractNewRecordState"; + SmallRyeConfig defaultConf = new SmallRyeConfigBuilder(). + withProfile("mysql"). + withDefaultValue("%mysql.debezium.transforms", "unwrapdata,someothersmt"). + withDefaultValue("%mysql.debezium.transforms.unwrapdata.type", defaultConfVal). + build(); + assertEquals(defaultConf.getConfigValue("debezium.transforms.unwrapdata.type").getValue(), defaultConfVal); + assertTrue(IcebergUtil.configIncludesUnwrapSmt(defaultConf)); + + + SmallRyeConfig testConf = new SmallRyeConfigBuilder(). + withProfile("testing"). + withDefaultValue("%testing.test.property", "test-Value"). + build(); + assertEquals(testConf.getConfigValue("test.property").getValue(), "test-Value"); + assertFalse(IcebergUtil.configIncludesUnwrapSmt(testConf)); + } + +} \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 03da9774..92eb991b 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -32,6 +32,9 @@ public TestConfigSource() { config.put("debezium.sink.iceberg.upsert-keep-deletes", "true"); config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + // iceberg config + config.put("debezium.sink.iceberg.warehouse", S3_BUCKET); + // ==== configure batch behaviour/size ==== // Positive integer value that specifies the maximum size of each batch of events that should be processed during // each iteration of this connector. Defaults to 2048. diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java index 63d326f4..09f1b86a 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.Set; +import io.quarkus.test.junit.QuarkusTest; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Assertions; @@ -25,6 +26,7 @@ /** * @author Ismail Simsek */ +@QuarkusTest class IcebergChangeEventBuilderTest { @Test