From afbace4e17531b17a2ccd5bef35f1d020982c6fc Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Wed, 5 Jan 2022 21:11:31 +0100 Subject: [PATCH] [Feature] Enable field addition (#74) * enable field addition --- .../tableoperator/IcebergTableOperator.java | 75 ++++++++---- .../iceberg/IcebergChangeConsumerTest.java | 25 ---- .../IcebergTableOperatorTest.java | 114 ++++++++++++++++++ .../iceberg/testresources/BaseSparkTest.java | 4 +- 4 files changed, 171 insertions(+), 47 deletions(-) create mode 100644 debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 82c369e4..8bfbf1e5 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -15,16 +15,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.enterprise.context.Dependent; import javax.inject.Inject; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.WriteResult; @@ -46,66 +48,98 @@ public class IcebergTableOperator { String sourceTsMsColumn; @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") String opColumn; + @ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true") + boolean allowFieldAddition; @Inject IcebergTableWriterFactory writerFactory; @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsert; - private ArrayList deduplicatedBatchRecords(Schema schema, List events) { - ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); + private List deduplicateBatch(List events) { + + ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); for (IcebergChangeEvent e : events) { - GenericRecord icebergRecord = e.asIcebergRecord(schema); - // deduplicate over key(PK) + // deduplicate using key(PK) @TODO improve using map.merge if (icebergRecordsmap.containsKey(e.key())) { // replace it if it's new - if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) { - icebergRecordsmap.put(e.key(), icebergRecord); + if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()).value(), e.value()) <= 0) { + icebergRecordsmap.put(e.key(), e); } } else { - icebergRecordsmap.put(e.key(), icebergRecord); + icebergRecordsmap.put(e.key(), e); } } return new ArrayList<>(icebergRecordsmap.values()); } - private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { - int result = Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn)); + private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) { + + int result = Long.compare(lhs.get(sourceTsMsColumn).asLong(0), rhs.get(sourceTsMsColumn).asLong(0)); if (result == 0) { // return (x < y) ? -1 : ((x == y) ? 0 : 1); - result = cdcOperations.getOrDefault(lhs.getField(opColumn), -1) + result = cdcOperations.getOrDefault(lhs.get(opColumn).asText("c"), -1) .compareTo( - cdcOperations.getOrDefault(rhs.getField(opColumn), -1) + cdcOperations.getOrDefault(rhs.get(opColumn).asText("c"), -1) ); } return result; } + private void applyFieldAddition(Table icebergTable, Schema newSchema) { + + UpdateSchema us = icebergTable.updateSchema(). + unionByNameWith(newSchema). + setIdentifierFields(newSchema.identifierFieldNames()); + Schema newSchemaCombined = us.apply(); + + // @NOTE avoid committing when there is no schema change. commit creates new commit even when there is no change! + if (!icebergTable.schema().sameSchema(newSchemaCombined)) { + LOGGER.warn("Extending schema of {}", icebergTable.name()); + us.commit(); + } + } + public void addToTable(Table icebergTable, List events) { - final List batchEvents; - // when its operation mode is not upsert deduplicate the events to avoid inserting duplicate row + // when operation mode is not upsert deduplicate the events to avoid inserting duplicate row if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) { - batchEvents = deduplicatedBatchRecords(icebergTable.schema(), events); + events = deduplicateBatch(events); + } + + if (!allowFieldAddition) { + // if field additions not enabled add set of events to table + addToTablePerSchema(icebergTable, events); } else { - batchEvents = events.stream(). - map(e -> e.asIcebergRecord(icebergTable.schema())). - collect(Collectors.toList()); + Map> eventsGroupedBySchema = + events.stream() + .collect(Collectors.groupingBy(IcebergChangeEvent::jsonSchema)); + LOGGER.debug("Batch got {} records with {} different schema!!", events.size(), eventsGroupedBySchema.keySet().size()); + + for (Map.Entry> schemaEvents : eventsGroupedBySchema.entrySet()) { + // extend table schema if new fields found + applyFieldAddition(icebergTable, schemaEvents.getKey().icebergSchema()); + // add set of events to table + addToTablePerSchema(icebergTable, schemaEvents.getValue()); + } } + } + + private void addToTablePerSchema(Table icebergTable, List events) { // Initialize a task writer to write both INSERT and equality DELETE. BaseTaskWriter writer = writerFactory.create(icebergTable); try { - for (Record icebergRecord : batchEvents) { - writer.write(icebergRecord); + for (IcebergChangeEvent e : events) { + writer.write(e.asIcebergRecord(icebergTable.schema())); } writer.close(); @@ -121,5 +155,4 @@ public void addToTable(Table icebergTable, List events) { LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); } - } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index a7fce283..ae4b2d4a 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -17,8 +17,6 @@ import java.time.Duration; -import org.apache.iceberg.Table; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.awaitility.Awaitility; @@ -165,16 +163,6 @@ public void testSchemaChanges() throws Exception { // added columns are not recognized by iceberg getTableData("testc.inventory.customers").show(); - - // TEST add new columns to iceberg table then check if its data populated! - Table table = getTable("testc.inventory.customers"); - // NOTE column list below are in reverse order!! testing the behaviour! - table.updateSchema() - // NOTE test_date_column is Long type because debezium serializes date type as number - .addColumn("test_date_column", Types.LongType.get()) - .addColumn("test_boolean_column", Types.BooleanType.get()) - .addColumn("test_varchar_column", Types.StringType.get()) - .commit(); // insert row after defining new column in target iceberg table SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " + "(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');"); @@ -197,20 +185,7 @@ public void testSchemaChanges() throws Exception { }); getTableData("testc.inventory.customers").show(); - // CASE 1:(Adding new column to source) (A column missing in iceberg table) - // data of the new column is ignored till same column defined in iceberg table - // for example: if a column not found in iceberg table its data is dropped ignored and not copied to target! - // once iceberg table adds same column then data for this column recognized and populated - - // CASE 2:(Removing column from source) (An extra column in iceberg table) - // these columns are populated with null value - - // CASE 3:(Renaming column from source) - // this is CASE 2 + CASE 1 : old column will be populated with null values and new column will not be recognized - // and populated till it's added to iceberg table - S3Minio.listFiles(); - } @Test diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java new file mode 100644 index 00000000..a1c42af4 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -0,0 +1,114 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperator; + +import io.debezium.server.iceberg.IcebergChangeEvent; +import io.debezium.server.iceberg.IcebergUtil; +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +import java.util.ArrayList; +import java.util.List; +import javax.inject.Inject; + +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + + +/** + * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. + * + * @author Ismail Simsek + */ +@QuarkusTest +@QuarkusTestResource(S3Minio.class) +class IcebergTableOperatorTest extends BaseSparkTest { + + static String testTable = "inventory.test_table_operator"; + @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") + String tablePrefix; + @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") + String namespace; + @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") + boolean upsert; + @ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT) + String writeFormat; + @Inject + IcebergTableOperator icebergTableOperator; + IcebergChangeEventBuilder eventBuilder = new IcebergChangeEventBuilder().destination(testTable); + + public Table createTable(IcebergChangeEvent sampleEvent) { + HadoopCatalog icebergCatalog = getIcebergCatalog(); + final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destinationTable()); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert); + } + + @Test + public void testIcebergTableOperator() { + // setup + List events = new ArrayList<>(); + Table icebergTable = this.createTable( + new IcebergChangeEventBuilder() + .destination(testTable) + .addKeyField("id", 1) + .addField("data", "record1") + .addField("preferences", "feature1", true) + .build() + ); + + events.add(new IcebergChangeEventBuilder() + .destination(testTable) + .addKeyField("id", 1) + .addField("data", "record1") + .build() + ); + events.add(new IcebergChangeEventBuilder() + .destination(testTable) + .addKeyField("id", 2) + .addField("data", "record2") + .build() + ); + events.add(new IcebergChangeEventBuilder() + .destination(testTable) + .addKeyField("id", 3) + .addField("user_name", "Alice") + .addField("data", "record3_adding_field") + .build() + ); + icebergTableOperator.addToTable(icebergTable, events); + + getTableData(testTable).show(false); + Assertions.assertEquals(3, getTableData(testTable).count()); + events.clear(); + events.add(new IcebergChangeEventBuilder() + .destination(testTable) + .addKeyField("id", 3) + .addField("user_name", "Alice-Updated") + .addField("data", "record3_updated") + .addField("preferences", "feature2", "feature2Val2") + .addField("__op", "u") + .build() + ); + icebergTableOperator.addToTable(icebergTable, events); + getTableData(testTable).show(false); + Assertions.assertEquals(4, getTableData(testTable).count()); + Assertions.assertEquals(1, getTableData(testTable).where("user_name == 'Alice-Updated'").count()); + Assertions.assertEquals(1, getTableData(testTable).where("preferences.feature2 == 'feature2Val2'").count()); + } +} \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index e485e41d..68c837ac 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -168,7 +168,9 @@ protected HadoopCatalog getIcebergCatalog() { } public Dataset getTableData(String table) { - return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM debeziumevents.debeziumcdc_" + table.replace(".", "_")); + table = "debeziumevents.debeziumcdc_" + table.replace(".", "_"); + //System.out.println("--loading-->" + table); + return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table); } }