Skip to content

Commit

Permalink
Check if configuration has isUnwrapped (#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 15, 2024
1 parent a299f51 commit 1eff67a
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class IcebergChangeEvent {
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class);
public static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
static final boolean eventsAreUnwrapped = IcebergUtil.configIncludesUnwrapSmt();
protected final String destination;
protected final byte[] valueData;
protected final byte[] keyData;
Expand Down Expand Up @@ -76,23 +77,13 @@ public ChangeEventSchema changeEventSchema() {
}

public Schema icebergSchema() {
return changeEventSchema().icebergSchema(this.isUnwrapped());
return changeEventSchema().icebergSchema();
}

public String destination() {
return destination;
}

public boolean isUnwrapped() {
return !(
this.value().has("after") &&
this.value().has("source") &&
this.value().has("before") &&
this.value().get("after").isObject() &&
this.value().get("source").isObject()
);
}

public GenericRecord asIcebergRecord(Schema schema) {
return asIcebergRecord(schema.asStruct(), value());
}
Expand Down Expand Up @@ -321,14 +312,14 @@ private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaN
return schemaData;
}

private Schema icebergSchema(boolean isUnwrapped) {
private Schema icebergSchema() {

if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
if (!isUnwrapped && keySchema != null) {
if (!eventsAreUnwrapped && keySchema != null) {
// NOTE: events re not unwrapped, align schema with event schema, so then we can scan event and key schemas synchronously
ObjectNode nestedKeySchema = mapper.createObjectNode();
nestedKeySchema.put("type", "struct");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.iceberg.io.OutputFileFactory;
import com.google.common.primitives.Ints;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.ConfigValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.*;
Expand All @@ -56,6 +58,33 @@ public static Map<String, String> getConfigSubset(Config config, String prefix)
return ret;
}


public static boolean configIncludesUnwrapSmt() {
return configIncludesUnwrapSmt(ConfigProvider.getConfig());
}

//@TestingOnly
static boolean configIncludesUnwrapSmt(Config config) {
// first lets find the config value for debezium statements
ConfigValue stms = config.getConfigValue("debezium.transforms");
if (stms == null || stms.getValue() == null || stms.getValue().isEmpty() || stms.getValue().isBlank()){
return false;
}

String[] stmsList = stms.getValue().split(",");
final String regexVal = "^io\\.debezium\\..*transforms\\.ExtractNew.*State$";
// we have debezium statements configured! let's check if we have event flattening config is set.
for (String stmName : stmsList) {
ConfigValue stmVal = config.getConfigValue("debezium.transforms."+stmName+".type");
if (stmVal != null && stmVal.getValue() != null && !stmVal.getValue().isEmpty() && !stmVal.getValue().isBlank() && stmVal.getValue().matches(regexVal)){
return true;
}
}

return false;
}


public static <T> T selectInstance(Instance<T> instances, String name) {

Instance<T> instance = instances.select(NamedLiteral.of(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -27,6 +28,7 @@

import static org.junit.jupiter.api.Assertions.*;

@QuarkusTest
class IcebergChangeEventTest {
final String serdeWithSchema = Files.readString(Path.of("src/test/resources/json/serde-with-schema.json"));
final String unwrapWithSchema = Files.readString(Path.of("src/test/resources/json/unwrap-with-schema.json"));
Expand Down Expand Up @@ -223,25 +225,4 @@ public void testIcebergChangeEventSchemaWithKey() {
assertEquals(schema2.identifierFieldIds(), Set.of(2, 3));
}


@Test
public void testIcebergChangeEventSchemaWithNestedKey() throws IOException {
String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json"));
String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json"));
TestChangeEvent<String, String> dbzEvent = new TestChangeEvent<>(key, val, "test");
Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema();
assertEquals("""
table {
1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int>
7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int>
13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string>
31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long>
35: op: optional string
36: ts_ms: optional long
37: ts_us: optional long
38: ts_ns: optional long
}""", schema.toString());
assertEquals(Set.of(8), schema.identifierFieldIds());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Types;
import org.apache.kafka.common.serialization.Serde;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.*;

@QuarkusTest
@TestProfile(IcebergChangeEventTestUnwrapped.TestProfile.class)
class IcebergChangeEventTestUnwrapped {

@Test
public void testIcebergChangeEventSchemaWithNestedKey() throws IOException {

assertFalse(IcebergUtil.configIncludesUnwrapSmt());

String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json"));
String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json"));
TestChangeEvent<String, String> dbzEvent = new TestChangeEvent<>(key, val, "test");
Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema();
assertEquals("""
table {
1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int>
7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int>
13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string>
31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long>
35: op: optional string
36: ts_ms: optional long
37: ts_us: optional long
38: ts_ns: optional long
}""", schema.toString());
assertEquals(Set.of(8), schema.identifierFieldIds());
}
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("debezium.transforms", ",");
config.put("debezium.transforms.unwrap.type", "null");

return config;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.debezium.server.iceberg;

import io.smallrye.config.SmallRyeConfig;
import io.smallrye.config.SmallRyeConfigBuilder;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

public class IcebergUtilTest {


@Test
void configIncludesUnwrapSmt() {
// mongodb transforms
String mongoConfVal = "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState";
SmallRyeConfig mongodbConf = new SmallRyeConfigBuilder().
withProfile("mongodb").
withDefaultValue("debezium.transforms", "unwrap,someothersmt").
withDefaultValue("%mongodb.debezium.transforms.unwrap.type", mongoConfVal).
build();
assertEquals(mongodbConf.getConfigValue("debezium.transforms.unwrap.type").getValue(), mongoConfVal);
assertTrue(IcebergUtil.configIncludesUnwrapSmt(mongodbConf));

String defaultConfVal = "io.debezium.transforms.ExtractNewRecordState";
SmallRyeConfig defaultConf = new SmallRyeConfigBuilder().
withProfile("mysql").
withDefaultValue("%mysql.debezium.transforms", "unwrapdata,someothersmt").
withDefaultValue("%mysql.debezium.transforms.unwrapdata.type", defaultConfVal).
build();
assertEquals(defaultConf.getConfigValue("debezium.transforms.unwrapdata.type").getValue(), defaultConfVal);
assertTrue(IcebergUtil.configIncludesUnwrapSmt(defaultConf));


SmallRyeConfig testConf = new SmallRyeConfigBuilder().
withProfile("testing").
withDefaultValue("%testing.test.property", "test-Value").
build();
assertEquals(testConf.getConfigValue("test.property").getValue(), "test-Value");
assertFalse(IcebergUtil.configIncludesUnwrapSmt(testConf));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public TestConfigSource() {
config.put("debezium.sink.iceberg.upsert-keep-deletes", "true");
config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");

// iceberg config
config.put("debezium.sink.iceberg.warehouse", S3_BUCKET);

// ==== configure batch behaviour/size ====
// Positive integer value that specifies the maximum size of each batch of events that should be processed during
// each iteration of this connector. Defaults to 2048.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.List;
import java.util.Set;

import io.quarkus.test.junit.QuarkusTest;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Assertions;
Expand All @@ -25,6 +26,7 @@
/**
* @author Ismail Simsek
*/
@QuarkusTest
class IcebergChangeEventBuilderTest {

@Test
Expand Down

0 comments on commit 1eff67a

Please sign in to comment.