Skip to content

Commit

Permalink
[Feature] Enable field addition (#74)
Browse files Browse the repository at this point in the history
* enable field addition
  • Loading branch information
ismailsimsek committed Jan 5, 2022
1 parent b97595d commit afbace4
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.WriteResult;
Expand All @@ -46,66 +48,98 @@ public class IcebergTableOperator {
String sourceTsMsColumn;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op")
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@Inject
IcebergTableWriterFactory writerFactory;

@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;

private ArrayList<Record> deduplicatedBatchRecords(Schema schema, List<IcebergChangeEvent> events) {
ConcurrentHashMap<Object, GenericRecord> icebergRecordsmap = new ConcurrentHashMap<>();
private List<IcebergChangeEvent> deduplicateBatch(List<IcebergChangeEvent> events) {

ConcurrentHashMap<JsonNode, IcebergChangeEvent> icebergRecordsmap = new ConcurrentHashMap<>();

for (IcebergChangeEvent e : events) {
GenericRecord icebergRecord = e.asIcebergRecord(schema);

// deduplicate over key(PK)
// deduplicate using key(PK) @TODO improve using map.merge
if (icebergRecordsmap.containsKey(e.key())) {

// replace it if it's new
if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) {
icebergRecordsmap.put(e.key(), icebergRecord);
if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()).value(), e.value()) <= 0) {
icebergRecordsmap.put(e.key(), e);
}

} else {
icebergRecordsmap.put(e.key(), icebergRecord);
icebergRecordsmap.put(e.key(), e);
}

}
return new ArrayList<>(icebergRecordsmap.values());
}

private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {

int result = Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn));
private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {

int result = Long.compare(lhs.get(sourceTsMsColumn).asLong(0), rhs.get(sourceTsMsColumn).asLong(0));

if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = cdcOperations.getOrDefault(lhs.getField(opColumn), -1)
result = cdcOperations.getOrDefault(lhs.get(opColumn).asText("c"), -1)
.compareTo(
cdcOperations.getOrDefault(rhs.getField(opColumn), -1)
cdcOperations.getOrDefault(rhs.get(opColumn).asText("c"), -1)
);
}

return result;
}

private void applyFieldAddition(Table icebergTable, Schema newSchema) {

UpdateSchema us = icebergTable.updateSchema().
unionByNameWith(newSchema).
setIdentifierFields(newSchema.identifierFieldNames());
Schema newSchemaCombined = us.apply();

// @NOTE avoid committing when there is no schema change. commit creates new commit even when there is no change!
if (!icebergTable.schema().sameSchema(newSchemaCombined)) {
LOGGER.warn("Extending schema of {}", icebergTable.name());
us.commit();
}
}

public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

final List<Record> batchEvents;
// when its operation mode is not upsert deduplicate the events to avoid inserting duplicate row
// when operation mode is not upsert deduplicate the events to avoid inserting duplicate row
if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) {
batchEvents = deduplicatedBatchRecords(icebergTable.schema(), events);
events = deduplicateBatch(events);
}

if (!allowFieldAddition) {
// if field additions not enabled add set of events to table
addToTablePerSchema(icebergTable, events);
} else {
batchEvents = events.stream().
map(e -> e.asIcebergRecord(icebergTable.schema())).
collect(Collectors.toList());
Map<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> eventsGroupedBySchema =
events.stream()
.collect(Collectors.groupingBy(IcebergChangeEvent::jsonSchema));
LOGGER.debug("Batch got {} records with {} different schema!!", events.size(), eventsGroupedBySchema.keySet().size());

for (Map.Entry<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getKey().icebergSchema());
// add set of events to table
addToTablePerSchema(icebergTable, schemaEvents.getValue());
}
}

}

private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
try {
for (Record icebergRecord : batchEvents) {
writer.write(icebergRecord);
for (IcebergChangeEvent e : events) {
writer.write(e.asIcebergRecord(icebergTable.schema()));
}

writer.close();
Expand All @@ -121,5 +155,4 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.time.Duration;

import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -165,16 +163,6 @@ public void testSchemaChanges() throws Exception {

// added columns are not recognized by iceberg
getTableData("testc.inventory.customers").show();

// TEST add new columns to iceberg table then check if its data populated!
Table table = getTable("testc.inventory.customers");
// NOTE column list below are in reverse order!! testing the behaviour!
table.updateSchema()
// NOTE test_date_column is Long type because debezium serializes date type as number
.addColumn("test_date_column", Types.LongType.get())
.addColumn("test_boolean_column", Types.BooleanType.get())
.addColumn("test_varchar_column", Types.StringType.get())
.commit();
// insert row after defining new column in target iceberg table
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');");
Expand All @@ -197,20 +185,7 @@ public void testSchemaChanges() throws Exception {
});
getTableData("testc.inventory.customers").show();

// CASE 1:(Adding new column to source) (A column missing in iceberg table)
// data of the new column is ignored till same column defined in iceberg table
// for example: if a column not found in iceberg table its data is dropped ignored and not copied to target!
// once iceberg table adds same column then data for this column recognized and populated

// CASE 2:(Removing column from source) (An extra column in iceberg table)
// these columns are populated with null value

// CASE 3:(Renaming column from source)
// this is CASE 2 + CASE 1 : old column will be populated with null values and new column will not be recognized
// and populated till it's added to iceberg table

S3Minio.listFiles();

}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg.tableoperator;

import io.debezium.server.iceberg.IcebergChangeEvent;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;

import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;

import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;


/**
* Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination.
*
* @author Ismail Simsek
*/
@QuarkusTest
@QuarkusTestResource(S3Minio.class)
class IcebergTableOperatorTest extends BaseSparkTest {

static String testTable = "inventory.test_table_operator";
@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat;
@Inject
IcebergTableOperator icebergTableOperator;
IcebergChangeEventBuilder eventBuilder = new IcebergChangeEventBuilder().destination(testTable);

public Table createTable(IcebergChangeEvent sampleEvent) {
HadoopCatalog icebergCatalog = getIcebergCatalog();
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destinationTable());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
}

@Test
public void testIcebergTableOperator() {
// setup
List<IcebergChangeEvent> events = new ArrayList<>();
Table icebergTable = this.createTable(
new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 1)
.addField("data", "record1")
.addField("preferences", "feature1", true)
.build()
);

events.add(new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 1)
.addField("data", "record1")
.build()
);
events.add(new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 2)
.addField("data", "record2")
.build()
);
events.add(new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 3)
.addField("user_name", "Alice")
.addField("data", "record3_adding_field")
.build()
);
icebergTableOperator.addToTable(icebergTable, events);

getTableData(testTable).show(false);
Assertions.assertEquals(3, getTableData(testTable).count());
events.clear();
events.add(new IcebergChangeEventBuilder()
.destination(testTable)
.addKeyField("id", 3)
.addField("user_name", "Alice-Updated")
.addField("data", "record3_updated")
.addField("preferences", "feature2", "feature2Val2")
.addField("__op", "u")
.build()
);
icebergTableOperator.addToTable(icebergTable, events);
getTableData(testTable).show(false);
Assertions.assertEquals(4, getTableData(testTable).count());
Assertions.assertEquals(1, getTableData(testTable).where("user_name == 'Alice-Updated'").count());
Assertions.assertEquals(1, getTableData(testTable).where("preferences.feature2 == 'feature2Val2'").count());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ protected HadoopCatalog getIcebergCatalog() {
}

public Dataset<Row> getTableData(String table) {
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM debeziumevents.debeziumcdc_" + table.replace(".", "_"));
table = "debeziumevents.debeziumcdc_" + table.replace(".", "_");
//System.out.println("--loading-->" + table);
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table);
}

}

0 comments on commit afbace4

Please sign in to comment.