Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if configuration contains event flattening #352

Merged
merged 1 commit into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class IcebergChangeEvent {
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class);
public static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
static final boolean eventsAreUnwrapped = IcebergUtil.configIncludesUnwrapSmt();
protected final String destination;
protected final byte[] valueData;
protected final byte[] keyData;
Expand Down Expand Up @@ -76,23 +77,13 @@ public ChangeEventSchema changeEventSchema() {
}

public Schema icebergSchema() {
return changeEventSchema().icebergSchema(this.isUnwrapped());
return changeEventSchema().icebergSchema();
}

public String destination() {
return destination;
}

public boolean isUnwrapped() {
return !(
this.value().has("after") &&
this.value().has("source") &&
this.value().has("before") &&
this.value().get("after").isObject() &&
this.value().get("source").isObject()
);
}

public GenericRecord asIcebergRecord(Schema schema) {
return asIcebergRecord(schema.asStruct(), value());
}
Expand Down Expand Up @@ -321,14 +312,14 @@ private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaN
return schemaData;
}

private Schema icebergSchema(boolean isUnwrapped) {
private Schema icebergSchema() {

if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
if (!isUnwrapped && keySchema != null) {
if (!eventsAreUnwrapped && keySchema != null) {
// NOTE: events re not unwrapped, align schema with event schema, so then we can scan event and key schemas synchronously
ObjectNode nestedKeySchema = mapper.createObjectNode();
nestedKeySchema.put("type", "struct");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.iceberg.io.OutputFileFactory;
import com.google.common.primitives.Ints;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.ConfigValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.*;
Expand All @@ -56,6 +58,33 @@ public static Map<String, String> getConfigSubset(Config config, String prefix)
return ret;
}


public static boolean configIncludesUnwrapSmt() {
return configIncludesUnwrapSmt(ConfigProvider.getConfig());
}

//@TestingOnly
static boolean configIncludesUnwrapSmt(Config config) {
// first lets find the config value for debezium statements
ConfigValue stms = config.getConfigValue("debezium.transforms");
if (stms == null || stms.getValue() == null || stms.getValue().isEmpty() || stms.getValue().isBlank()){
return false;
}

String[] stmsList = stms.getValue().split(",");
final String regexVal = "^io\\.debezium\\..*transforms\\.ExtractNew.*State$";
// we have debezium statements configured! let's check if we have event flattening config is set.
for (String stmName : stmsList) {
ConfigValue stmVal = config.getConfigValue("debezium.transforms."+stmName+".type");
if (stmVal != null && stmVal.getValue() != null && !stmVal.getValue().isEmpty() && !stmVal.getValue().isBlank() && stmVal.getValue().matches(regexVal)){
return true;
}
}

return false;
}


public static <T> T selectInstance(Instance<T> instances, String name) {

Instance<T> instance = instances.select(NamedLiteral.of(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -27,6 +28,7 @@

import static org.junit.jupiter.api.Assertions.*;

@QuarkusTest
class IcebergChangeEventTest {
final String serdeWithSchema = Files.readString(Path.of("src/test/resources/json/serde-with-schema.json"));
final String unwrapWithSchema = Files.readString(Path.of("src/test/resources/json/unwrap-with-schema.json"));
Expand Down Expand Up @@ -223,25 +225,4 @@ public void testIcebergChangeEventSchemaWithKey() {
assertEquals(schema2.identifierFieldIds(), Set.of(2, 3));
}


@Test
public void testIcebergChangeEventSchemaWithNestedKey() throws IOException {
String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json"));
String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json"));
TestChangeEvent<String, String> dbzEvent = new TestChangeEvent<>(key, val, "test");
Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema();
assertEquals("""
table {
1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int>
7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int>
13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string>
31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long>
35: op: optional string
36: ts_ms: optional long
37: ts_us: optional long
38: ts_ns: optional long
}""", schema.toString());
assertEquals(Set.of(8), schema.identifierFieldIds());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Types;
import org.apache.kafka.common.serialization.Serde;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.*;

@QuarkusTest
@TestProfile(IcebergChangeEventTestUnwrapped.TestProfile.class)
class IcebergChangeEventTestUnwrapped {

@Test
public void testIcebergChangeEventSchemaWithNestedKey() throws IOException {

assertFalse(IcebergUtil.configIncludesUnwrapSmt());

String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json"));
String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json"));
TestChangeEvent<String, String> dbzEvent = new TestChangeEvent<>(key, val, "test");
Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema();
assertEquals("""
table {
1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int>
7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int>
13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string>
31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long>
35: op: optional string
36: ts_ms: optional long
37: ts_us: optional long
38: ts_ns: optional long
}""", schema.toString());
assertEquals(Set.of(8), schema.identifierFieldIds());
}
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("debezium.transforms", ",");
config.put("debezium.transforms.unwrap.type", "null");

return config;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.debezium.server.iceberg;

import io.smallrye.config.SmallRyeConfig;
import io.smallrye.config.SmallRyeConfigBuilder;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

public class IcebergUtilTest {


@Test
void configIncludesUnwrapSmt() {
// mongodb transforms
String mongoConfVal = "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState";
SmallRyeConfig mongodbConf = new SmallRyeConfigBuilder().
withProfile("mongodb").
withDefaultValue("debezium.transforms", "unwrap,someothersmt").
withDefaultValue("%mongodb.debezium.transforms.unwrap.type", mongoConfVal).
build();
assertEquals(mongodbConf.getConfigValue("debezium.transforms.unwrap.type").getValue(), mongoConfVal);
assertTrue(IcebergUtil.configIncludesUnwrapSmt(mongodbConf));

String defaultConfVal = "io.debezium.transforms.ExtractNewRecordState";
SmallRyeConfig defaultConf = new SmallRyeConfigBuilder().
withProfile("mysql").
withDefaultValue("%mysql.debezium.transforms", "unwrapdata,someothersmt").
withDefaultValue("%mysql.debezium.transforms.unwrapdata.type", defaultConfVal).
build();
assertEquals(defaultConf.getConfigValue("debezium.transforms.unwrapdata.type").getValue(), defaultConfVal);
assertTrue(IcebergUtil.configIncludesUnwrapSmt(defaultConf));


SmallRyeConfig testConf = new SmallRyeConfigBuilder().
withProfile("testing").
withDefaultValue("%testing.test.property", "test-Value").
build();
assertEquals(testConf.getConfigValue("test.property").getValue(), "test-Value");
assertFalse(IcebergUtil.configIncludesUnwrapSmt(testConf));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public TestConfigSource() {
config.put("debezium.sink.iceberg.upsert-keep-deletes", "true");
config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");

// iceberg config
config.put("debezium.sink.iceberg.warehouse", S3_BUCKET);

// ==== configure batch behaviour/size ====
// Positive integer value that specifies the maximum size of each batch of events that should be processed during
// each iteration of this connector. Defaults to 2048.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.List;
import java.util.Set;

import io.quarkus.test.junit.QuarkusTest;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Assertions;
Expand All @@ -25,6 +26,7 @@
/**
* @author Ismail Simsek
*/
@QuarkusTest
class IcebergChangeEventBuilderTest {

@Test
Expand Down