Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test consumer without event flattening, Add flag to control identifier-field creation #356

Merged
merged 1 commit into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
boolean createIdentifierFields;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;
@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
Expand Down Expand Up @@ -175,7 +177,7 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
try {
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(createIdentifierFields), writeFormat);
} catch (Exception e){
throw new DebeziumException("Failed to create table from debezium event schema:"+tableId+" Error:" + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public ChangeEventSchema changeEventSchema() {
}
}

public Schema icebergSchema() {
return changeEventSchema().icebergSchema();
public Schema icebergSchema(boolean createIdentifierFields) {
return changeEventSchema().icebergSchema(createIdentifierFields);
}

public String destination() {
Expand Down Expand Up @@ -312,19 +312,30 @@ private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaN
return schemaData;
}

private Schema icebergSchema() {
private Schema icebergSchema(boolean createIdentifierFields) {

if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
if (!eventsAreUnwrapped && keySchema != null) {
// NOTE: events re not unwrapped, align schema with event schema, so then we can scan event and key schemas synchronously
if (!createIdentifierFields) {
LOGGER.warn("Creating identifier fields is disabled, creating table without identifier field!");
icebergSchemaFields(valueSchema, null, schemaData);
} else if (!eventsAreUnwrapped && keySchema != null) {
ObjectNode nestedKeySchema = mapper.createObjectNode();
nestedKeySchema.put("type", "struct");
nestedKeySchema.putArray("fields").add(((ObjectNode) keySchema).put("field", "after"));
icebergSchemaFields(valueSchema, nestedKeySchema, schemaData);

if (!schemaData.identifierFieldIds().isEmpty()) {
// While Iceberg supports nested key fields, they cannot be set with nested events(unwrapped events, Without event flattening)
// due to inconsistency in the after and before fields.
// For insert events, only the `before` field is NULL, while for delete events after field is NULL.
// This inconsistency prevents using either field as a reliable key.
throw new DebeziumException("Events are unnested, Identifier fields are not supported for unnested events! " +
"Pleas make sure you are using event flattening SMT! or disable identifier field creation!");
}
} else {
icebergSchemaFields(valueSchema, keySchema, schemaData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.iceberg.*;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.WriteResult;
Expand Down Expand Up @@ -46,6 +47,8 @@ public class IcebergTableOperator {
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
boolean createIdentifierFields;
@Inject
IcebergTableWriterFactory writerFactory;

Expand Down Expand Up @@ -156,7 +159,7 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

for (Map.Entry<IcebergChangeEvent.ChangeEventSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema());
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema(createIdentifierFields));
// add set of events to table
addToTablePerSchema(icebergTable, schemaEvents.getValue());
}
Expand All @@ -172,9 +175,11 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {
*/
private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
final Schema tableSchema = icebergTable.schema();
try (BaseTaskWriter<Record> writer = writerFactory.create(icebergTable)) {
for (IcebergChangeEvent e : events) {
writer.write(e.asIcebergRecord(icebergTable.schema()));
final GenericRecord record = e.asIcebergRecord(tableSchema);
writer.write(record);
}

writer.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* @author Ismail Simsek
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMysqlTestUnwrapped.TestProfile.class)
public class IcebergChangeConsumerMysqlTestUnwrapped extends BaseTest {

@Test
public void testSimpleUpload() throws Exception {

// make sure its not unwrapped
assertEquals(IcebergUtil.configIncludesUnwrapSmt(), false);
assertEquals(IcebergChangeEvent.eventsAreUnwrapped, false);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
CloseableIterable<Record> result = getTableDataV2("testc.inventory.customers");
printTableData(result);
return Lists.newArrayList(result).size() >= 3;
} catch (Exception e) {
return false;
}
});

// test nested data(struct) consumed
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
CloseableIterable<Record> result = getTableDataV2("testc.inventory.geom");
return Lists.newArrayList(result).size() >= 3;
} catch (Exception e) {
return false;
}
});

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_table"));
System.out.println(Lists.newArrayList(d));
return Lists.newArrayList(d).size() == 1;
} catch (Exception e) {
return false;
}
});

}

@Test
public void testDeleteEvents() throws Exception {

// make sure its not unwrapped
assertEquals(IcebergUtil.configIncludesUnwrapSmt(), false);
assertEquals(IcebergChangeEvent.eventsAreUnwrapped, false);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
CloseableIterable<Record> result = getTableDataV2("testc.inventory.customers");
printTableData(result);
return Lists.newArrayList(result).size() >= 4;
} catch (Exception e) {
return false;
}
});

SourceMysqlDB.runSQL("ALTER TABLE inventory.addresses DROP FOREIGN KEY addresses_ibfk_1;");
SourceMysqlDB.runSQL("DELETE FROM inventory.customers where id = 1004 ;");

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
CloseableIterable<Record> result = getTableDataV2("testc.inventory.customers");
printTableData(result);
return Lists.newArrayList(result).size() >= 5;
} catch (Exception e) {
return false;
}
});
}


public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mysql");
config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
config.put("debezium.transforms", ",");
config.put("debezium.sink.iceberg.upsert", "false");
config.put("debezium.sink.iceberg.create-identifier-fields", "false");
return config;
}

@Override
public String getConfigProfile() {
return "mysql";
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Integration test that verifies basic reading from PostgreSQL database and writing to iceberg destination.
*
* @author Ismail Simsek
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerTestUnwraapped.TestProfile.class)
public class IcebergChangeConsumerTestUnwraapped extends BaseSparkTest {

@Test
public void testSimpleUpload() {

// make sure its not unwrapped
assertEquals(IcebergUtil.configIncludesUnwrapSmt(), false);
assertEquals(IcebergChangeEvent.eventsAreUnwrapped, false);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
ds.show(false);
return ds.count() >= 3;
} catch (Exception e) {
return false;
}
});

// test nested data(struct) consumed
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.geom");
ds.show(false);
return ds.count() >= 3;
} catch (Exception e) {
return false;
}
});

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_table"));
System.out.println(Lists.newArrayList(d));
return Lists.newArrayList(d).size() == 1;
} catch (Exception e) {
return false;
}
});
}

public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("debezium.sink.iceberg.write.format.default", "orc");
config.put("debezium.sink.iceberg.destination-regexp", "\\d");
config.put("debezium.source.hstore.handling.mode", "map");
config.put("debezium.transforms", ",");
config.put("debezium.sink.iceberg.create-identifier-fields", "false");
return config;
}
}

}
Loading