Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Enable field addition #74

Merged
merged 10 commits into from
Jan 5, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.WriteResult;
Expand All @@ -46,66 +48,98 @@ public class IcebergTableOperator {
String sourceTsMsColumn;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op")
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@Inject
IcebergTableWriterFactory writerFactory;

@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;

private ArrayList<Record> deduplicatedBatchRecords(Schema schema, List<IcebergChangeEvent> events) {
ConcurrentHashMap<Object, GenericRecord> icebergRecordsmap = new ConcurrentHashMap<>();
private List<IcebergChangeEvent> deduplicateBatch(List<IcebergChangeEvent> events) {

ConcurrentHashMap<JsonNode, IcebergChangeEvent> icebergRecordsmap = new ConcurrentHashMap<>();

for (IcebergChangeEvent e : events) {
GenericRecord icebergRecord = e.asIcebergRecord(schema);

// deduplicate over key(PK)
// deduplicate using key(PK) @TODO improve using map.merge
if (icebergRecordsmap.containsKey(e.key())) {

// replace it if it's new
if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) {
icebergRecordsmap.put(e.key(), icebergRecord);
if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()).value(), e.value()) <= 0) {
icebergRecordsmap.put(e.key(), e);
}

} else {
icebergRecordsmap.put(e.key(), icebergRecord);
icebergRecordsmap.put(e.key(), e);
}

}
return new ArrayList<>(icebergRecordsmap.values());
}

private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {

int result = Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn));
private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {

int result = Long.compare(lhs.get(sourceTsMsColumn).asLong(0), rhs.get(sourceTsMsColumn).asLong(0));

if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = cdcOperations.getOrDefault(lhs.getField(opColumn), -1)
result = cdcOperations.getOrDefault(lhs.get(opColumn).asText("c"), -1)
.compareTo(
cdcOperations.getOrDefault(rhs.getField(opColumn), -1)
cdcOperations.getOrDefault(rhs.get(opColumn).asText("c"), -1)
);
}

return result;
}

private void applyFieldAddition(Table icebergTable, Schema newSchema) {

UpdateSchema us = icebergTable.updateSchema().
unionByNameWith(newSchema).
setIdentifierFields(newSchema.identifierFieldNames());
Schema newSchemaCombined = us.apply();

// @NOTE avoid committing when there is no schema change. commit creates new commit even when there is no change!
if (!icebergTable.schema().sameSchema(newSchemaCombined)) {
LOGGER.warn("Extending schema of {}", icebergTable.name());
us.commit();
}
}

public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

final List<Record> batchEvents;
// when its operation mode is not upsert deduplicate the events to avoid inserting duplicate row
// when operation mode is not upsert deduplicate the events to avoid inserting duplicate row
if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) {
batchEvents = deduplicatedBatchRecords(icebergTable.schema(), events);
events = deduplicateBatch(events);
}

if (!allowFieldAddition) {
// if field additions not enabled add set of events to table
addToTablePerSchema(icebergTable, events);
} else {
batchEvents = events.stream().
map(e -> e.asIcebergRecord(icebergTable.schema())).
collect(Collectors.toList());
Map<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> eventsGroupedBySchema =
events.stream()
.collect(Collectors.groupingBy(IcebergChangeEvent::jsonSchema));
LOGGER.debug("Batch got {} records with {} different schema!!", events.size(), eventsGroupedBySchema.keySet().size());

for (Map.Entry<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getKey().icebergSchema());
// add set of events to table
addToTablePerSchema(icebergTable, schemaEvents.getValue());
}
}

}

private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
try {
for (Record icebergRecord : batchEvents) {
writer.write(icebergRecord);
for (IcebergChangeEvent e : events) {
writer.write(e.asIcebergRecord(icebergTable.schema()));
}

writer.close();
Expand All @@ -121,5 +155,4 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.time.Duration;

import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -165,16 +163,6 @@ public void testSchemaChanges() throws Exception {

// added columns are not recognized by iceberg
getTableData("testc.inventory.customers").show();

// TEST add new columns to iceberg table then check if its data populated!
Table table = getTable("testc.inventory.customers");
// NOTE column list below are in reverse order!! testing the behaviour!
table.updateSchema()
// NOTE test_date_column is Long type because debezium serializes date type as number
.addColumn("test_date_column", Types.LongType.get())
.addColumn("test_boolean_column", Types.BooleanType.get())
.addColumn("test_varchar_column", Types.StringType.get())
.commit();
// insert row after defining new column in target iceberg table
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');");
Expand All @@ -197,20 +185,7 @@ public void testSchemaChanges() throws Exception {
});
getTableData("testc.inventory.customers").show();

// CASE 1:(Adding new column to source) (A column missing in iceberg table)
// data of the new column is ignored till same column defined in iceberg table
// for example: if a column not found in iceberg table its data is dropped ignored and not copied to target!
// once iceberg table adds same column then data for this column recognized and populated

// CASE 2:(Removing column from source) (An extra column in iceberg table)
// these columns are populated with null value

// CASE 3:(Renaming column from source)
// this is CASE 2 + CASE 1 : old column will be populated with null values and new column will not be recognized
// and populated till it's added to iceberg table

S3Minio.listFiles();

}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg.tableoperator;

import io.debezium.server.iceberg.IcebergChangeEvent;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;

import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;

import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;


/**
* Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination.
*
* @author Ismail Simsek
*/
@QuarkusTest
@QuarkusTestResource(S3Minio.class)
class IcebergTableOperatorTest extends BaseSparkTest {

static String testTable = "inventory.test_table_operator";
@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat;
@Inject
IcebergTableOperator icebergTableOperator;
IcebergChangeEventBuilder eventBuilder = new IcebergChangeEventBuilder().destination(testTable);

public Table createTable(IcebergChangeEvent sampleEvent) {
HadoopCatalog icebergCatalog = getIcebergCatalog();
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destinationTable());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
}

@Test
public void testIcebergTableOperator() {
// setup
List<IcebergChangeEvent> events = new ArrayList<>();
Table icebergTable = this.createTable(
new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 1)
.addField("data", "record1")
.addField("preferences", "feature1", true)
.build()
);

events.add(new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 1)
.addField("data", "record1")
.build()
);
events.add(new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 2)
.addField("data", "record2")
.build()
);
events.add(new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 3)
.addField("user_name", "Alice")
.addField("data", "record3_adding_field")
.build()
);
icebergTableOperator.addToTable(icebergTable, events);

getTableData(testTable).show(false);
Assertions.assertEquals(3, getTableData(testTable).count());
events.clear();
events.add(new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 3)
.addField("user_name", "Alice-Updated")
.addField("data", "record3_updated")
.addField("preferences", "feature2", "feature2Val2")
.addField("__op", "u")
.build()
);
icebergTableOperator.addToTable(icebergTable, events);
getTableData(testTable).show(false);
Assertions.assertEquals(4, getTableData(testTable).count());
Assertions.assertEquals(1, getTableData(testTable).where("user_name == 'Alice-Updated'").count());
Assertions.assertEquals(1, getTableData(testTable).where("preferences.feature2 == 'feature2Val2'").count());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ protected HadoopCatalog getIcebergCatalog() {
}

public Dataset<Row> getTableData(String table) {
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM debeziumevents.debeziumcdc_" + table.replace(".", "_"));
table = "debeziumevents.debeziumcdc_" + table.replace(".", "_");
//System.out.println("--loading-->" + table);
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table);
}

}