From f1d88020d98cf514d1515e146cf81e51c2e9e187 Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Sat, 16 Oct 2021 18:20:18 +0200 Subject: [PATCH] split table operations append and upsert to separate classes (#33) * WIP table operators * WIP table operators * WIP table operators --- .../server/iceberg/IcebergChangeConsumer.java | 276 ++---------------- .../iceberg/IcebergTableOperations.java | 33 --- .../AbstractIcebergTableOperator.java | 154 ++++++++++ .../IcebergTableOperatorAppend.java | 47 +++ .../IcebergTableOperatorUpsert.java | 198 +++++++++++++ .../InterfaceIcebergTableOperator.java | 35 +++ 6 files changed, 466 insertions(+), 277 deletions(-) delete mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 939c594f..ffae1728 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -12,15 +12,15 @@ import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; -import io.debezium.serde.DebeziumSerdes; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; +import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator; -import java.io.Closeable; -import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -31,24 +31,13 @@ import javax.inject.Inject; import javax.inject.Named; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.primitives.Ints; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.*; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.deletes.EqualityDeleteWriter; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -65,13 +54,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class); private static final String PROP_PREFIX = "debezium.sink.iceberg."; - static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); @ConfigProperty(name = "debezium.format.value", defaultValue = "json") String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") String keyFormat; - @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") - boolean eventSchemaEnabled; @ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION) String warehouseLocation; @ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS") @@ -84,24 +70,22 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String catalogName; @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsert; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") - boolean upsertKeepDeletes; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") - String opColumn; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") - String sourceTsMsColumn; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; + @Inject @Any Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; + Configuration hadoopConf = new Configuration(); Catalog icebergCatalog; Map icebergProperties = new ConcurrentHashMap<>(); - Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); - Deserializer valDeserializer; - private IcebergTableOperations icebergTableOperations; + + @Inject + @Any + Instance icebergTableOperatorInstances; + InterfaceIcebergTableOperator icebergTableOperator; @PostConstruct void connect() throws InterruptedException { @@ -118,10 +102,6 @@ void connect() throws InterruptedException { conf.forEach(this.icebergProperties::put); icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf); - icebergTableOperations = new IcebergTableOperations(icebergCatalog); - - valSerde.configure(Collections.emptyMap(), false); - valDeserializer = valSerde.deserializer(); Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); if (instance.isAmbiguous()) { @@ -132,28 +112,23 @@ void connect() throws InterruptedException { batchSizeWait = instance.get(); batchSizeWait.initizalize(); LOGGER.info("Using {}", batchSizeWait.getClass().getName()); - } - - public String map(String destination) { - return destination.replace(".", "_"); - } - - private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent event) { - if (!eventSchemaEnabled) { - throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + - "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); + String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend"; + Instance toInstance = icebergTableOperatorInstances.select(NamedLiteral.of(icebergTableOperatorName)); + if (instance.isAmbiguous()) { + throw new DebeziumException("Multiple class named `" + icebergTableOperatorName + "` were found"); } - - if (event.value() == null) { - throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); + if (instance.isUnsatisfied()) { + throw new DebeziumException("No class named `" + icebergTableOperatorName + "` found"); } + icebergTableOperator = toInstance.get(); + icebergTableOperator.initialize(); + LOGGER.info("Using {}", icebergTableOperator.getClass().getName()); - DebeziumToIcebergTable eventSchema = event.key() == null - ? new DebeziumToIcebergTable(getBytes(event.value())) - : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); + } - return eventSchema.create(icebergCatalog, tableIdentifier); + public String map(String destination) { + return destination.replace(".", "_"); } @Override @@ -169,9 +144,12 @@ public void handleBatch(List> records, DebeziumEngin for (Map.Entry>> event : result.entrySet()) { final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); - Table icebergTable = icebergTableOperations.loadTable(tableIdentifier) - .orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0))); - addToTable(icebergTable, event.getValue()); + Table icebergTable = icebergTableOperator + .loadIcebergTable(icebergCatalog, tableIdentifier) + .orElseGet(() -> + icebergTableOperator.createIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0))); + //addToTable(icebergTable, event.getValue()); + icebergTableOperator.addToTable(icebergTable, event.getValue()); } // workaround! somehow offset is not saved to file unless we call committer.markProcessed // even its should be saved to file periodically @@ -185,194 +163,4 @@ public void handleBatch(List> records, DebeziumEngin } - public int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { - if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) { - // return (x < y) ? -1 : ((x == y) ? 0 : 1); - return - cdcOperations.getOrDefault(lhs.getField(opColumn), -1) - .compareTo( - cdcOperations.getOrDefault(rhs.getField(opColumn), -1) - ) - ; - } else { - return Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn)); - } - } - - - private ArrayList toIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { - - ArrayList icebergRecords = new ArrayList<>(); - for (ChangeEvent e : events) { - GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), - getBytes(e.value()))); - icebergRecords.add(icebergRecord); - } - - return icebergRecords; - } - - private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { - ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); - - for (ChangeEvent e : events) { - GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), - getBytes(e.value()))); - - // only replace it if its newer - if (icebergRecordsmap.containsKey(e.key())) { - - if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) { - icebergRecordsmap.put(e.key(), icebergRecord); - } - - } else { - icebergRecordsmap.put(e.key(), icebergRecord); - } - - } - return new ArrayList<>(icebergRecordsmap.values()); - } - - private void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException { - - if (!upsert || icebergTable.sortOrder().isUnsorted()) { - - if (upsert && icebergTable.sortOrder().isUnsorted()) { - LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); - } - - ArrayList icebergRecords = toIcebergRecords(icebergTable.schema(), events); - DataFile dataFile = getDataFile(icebergTable, icebergRecords); - LOGGER.debug("Committing new file as Append '{}' !", dataFile.path()); - icebergTable.newAppend() - .appendFile(dataFile) - .commit(); - - } else { - // DO UPSERT >>> DELETE + INSERT - ArrayList icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events); - DataFile dataFile = getDataFile(icebergTable, icebergRecords); - DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords); - LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path()); - RowDelta c = icebergTable - .newRowDelta() - .addRows(dataFile); - - if (deleteDataFile != null) { - c.addDeletes(deleteDataFile) - .validateDeletedFiles(); - } - - c.commit(); - } - LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); - - } - - private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { - - final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; - OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); - Set equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds(); - - EqualityDeleteWriter deleteWriter; - - // anything is not an insert. - // upsertKeepDeletes = false, which means delete deletes - List deleteRows = icebergRecords.stream() - .filter(r -> - // anything is not an insert. - !r.getField(opColumn).equals("c") - // upsertKeepDeletes = false and its deleted record, which means delete deletes - // || !(upsertKeepDeletes && r.getField(opColumn).equals("d")) - ).collect(Collectors.toList()); - - if (deleteRows.size() == 0) { - return null; - } - - try { - LOGGER.debug("Writing data to equality delete file: {}!", out); - - deleteWriter = Parquet.writeDeletes(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .rowSchema(icebergTable.sortOrder().schema()) - .withSpec(icebergTable.spec()) - .equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds())) - .metricsConfig(MetricsConfig.fromProperties(icebergTable.properties())) - .withSortOrder(icebergTable.sortOrder()) - .setAll(icebergTable.properties()) - .buildEqualityWriter() - ; - - try (Closeable toClose = deleteWriter) { - deleteWriter.deleteAll(deleteRows); - } - - } catch (IOException e) { - throw new InterruptedException(e.getMessage()); - } - - LOGGER.debug("Creating iceberg equality delete file!"); - // Equality delete files identify deleted rows in a collection of data files by one or more column values, - // and may optionally contain additional columns of the deleted row. - return FileMetadata.deleteFileBuilder(icebergTable.spec()) - .ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds())) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(deleteWriter.length()) - .withFileSizeInBytes(deleteWriter.length()) - .withRecordCount(deleteRows.size()) - .withSortOrder(icebergTable.sortOrder()) - .build(); - } - - private DataFile getDataFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { - final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; - OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); - - FileAppender writer; - // if its append OR upsert with keep deletes then add all the records to data file - // if table has no PK - which means fall back to append - // if its upsert and upsertKeepDeletes = true - // if nothing above then exclude deletes - List newRecords = icebergRecords.stream() - .filter(r -> - // if its append OR upsert with keep deletes then add all the records to data file - !upsert - // if table has no PK - which means fall back to append - || icebergTable.sortOrder().isUnsorted() - // if its upsert and upsertKeepDeletes = true - || upsertKeepDeletes - // if nothing above then exclude deletes - || !(r.getField(opColumn).equals("d"))) - .collect(Collectors.toList()); - try { - LOGGER.debug("Writing data to file: {}!", out); - writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .forTable(icebergTable) - .overwrite() - .build(); - - try (Closeable toClose = writer) { - writer.addAll(newRecords); - } - - } catch (IOException e) { - throw new InterruptedException(e.getMessage()); - } - - LOGGER.debug("Creating iceberg DataFile!"); - return DataFiles.builder(icebergTable.spec()) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(writer.length()) - .withSplitOffsets(writer.splitOffsets()) - .withMetrics(writer.metrics()) - .build(); - } - } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java deleted file mode 100644 index fde020e6..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.debezium.server.iceberg; - -import java.util.Optional; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Wrapper to perform operations in iceberg tables - * @author Rafael Acevedo - */ -public class IcebergTableOperations { - private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperations.class); - - private final Catalog catalog; - - public IcebergTableOperations(Catalog catalog) { - this.catalog = catalog; - } - - public Optional loadTable(TableIdentifier tableId) { - try { - Table table = catalog.loadTable(tableId); - return Optional.of(table); - } catch (NoSuchTableException e) { - LOGGER.warn("table not found: {}", tableId.toString()); - return Optional.empty(); - } - } -} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java new file mode 100644 index 00000000..fed67fb4 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java @@ -0,0 +1,154 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperator; + +import io.debezium.DebeziumException; +import io.debezium.engine.ChangeEvent; +import io.debezium.serde.DebeziumSerdes; +import io.debezium.server.iceberg.DebeziumToIcebergTable; +import io.debezium.server.iceberg.IcebergUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.iceberg.*; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper to perform operations in iceberg tables + * + * @author Rafael Acevedo + */ +abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); + + @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") + boolean eventSchemaEnabled; + Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); + Deserializer valDeserializer; + + @Override + public void initialize() { + valSerde.configure(Collections.emptyMap(), false); + valDeserializer = valSerde.deserializer(); + } + + protected byte[] getBytes(Object object) { + if (object instanceof byte[]) { + return (byte[]) object; + } else if (object instanceof String) { + return ((String) object).getBytes(); + } + throw new DebeziumException(unsupportedTypeMessage(object)); + } + + protected String getString(Object object) { + if (object instanceof String) { + return (String) object; + } + throw new DebeziumException(unsupportedTypeMessage(object)); + } + + protected String unsupportedTypeMessage(Object object) { + final String type = (object == null) ? "null" : object.getClass().getName(); + return "Unexpected data type '" + type + "'"; + } + + protected ArrayList toIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { + + ArrayList icebergRecords = new ArrayList<>(); + for (ChangeEvent e : events) { + GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), + getBytes(e.value()))); + icebergRecords.add(icebergRecord); + } + + return icebergRecords; + } + + protected DataFile getDataFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { + final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; + OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); + + FileAppender writer; + List newRecords = icebergRecords.stream() + .filter(this.filterEvents()).collect(Collectors.toList()); + try { + LOGGER.debug("Writing data to file: {}!", out); + writer = Parquet.write(out) + .createWriterFunc(GenericParquetWriter::buildWriter) + .forTable(icebergTable) + .overwrite() + .build(); + + try (Closeable toClose = writer) { + writer.addAll(newRecords); + } + + } catch (IOException e) { + throw new InterruptedException(e.getMessage()); + } + + LOGGER.debug("Creating iceberg DataFile!"); + return DataFiles.builder(icebergTable.spec()) + .withFormat(FileFormat.PARQUET) + .withPath(out.location()) + .withFileSizeInBytes(writer.length()) + .withSplitOffsets(writer.splitOffsets()) + .withMetrics(writer.metrics()) + .build(); + } + + public Table createIcebergTable(Catalog catalog, + TableIdentifier tableIdentifier, + ChangeEvent event) { + + if (!eventSchemaEnabled) { + throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); + } + + if (event.value() == null) { + throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); + } + + DebeziumToIcebergTable eventSchema = event.key() == null + ? new DebeziumToIcebergTable(getBytes(event.value())) + : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); + + return eventSchema.create(catalog, tableIdentifier); + } + + public Optional
loadIcebergTable(Catalog catalog, TableIdentifier tableId) { + try { + Table table = catalog.loadTable(tableId); + return Optional.of(table); + } catch (NoSuchTableException e) { + LOGGER.warn("table not found: {}", tableId.toString()); + return Optional.empty(); + } + } +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java new file mode 100644 index 00000000..851dc284 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java @@ -0,0 +1,47 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperator; + +import io.debezium.engine.ChangeEvent; + +import java.util.ArrayList; +import java.util.function.Predicate; +import javax.enterprise.context.Dependent; +import javax.inject.Named; + +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Dependent +@Named("IcebergTableOperatorAppend") +public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); + + @Override + public void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException { + + ArrayList icebergRecords = toIcebergRecords(icebergTable.schema(), events); + DataFile dataFile = getDataFile(icebergTable, icebergRecords); + LOGGER.debug("Committing new file as Append '{}' !", dataFile.path()); + AppendFiles c = icebergTable.newAppend() + .appendFile(dataFile); + + c.commit(); + LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); + } + + @Override + public Predicate filterEvents() { + return p -> true; + } +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java new file mode 100644 index 00000000..f19ac465 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java @@ -0,0 +1,198 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperator; + +import io.debezium.engine.ChangeEvent; +import io.debezium.server.iceberg.IcebergUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import javax.enterprise.context.Dependent; +import javax.inject.Inject; +import javax.inject.Named; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import org.apache.iceberg.*; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Dependent +@Named("IcebergTableOperatorUpsert") +public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class); + static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); + @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") + String sourceTsMsColumn; + + @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") + boolean upsertKeepDeletes; + @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") + String opColumn; + + @Inject + IcebergTableOperatorAppend icebergTableAppend; + + + @Override + public void initialize() { + super.initialize(); + icebergTableAppend.initialize(); + } + + private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { + + final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; + OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); + Set equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds(); + + EqualityDeleteWriter deleteWriter; + + // anything is not an insert. + // upsertKeepDeletes = false, which means delete deletes + List deleteRows = icebergRecords.stream() + .filter(r -> + // anything is not an insert. + !r.getField(opColumn).equals("c") + // upsertKeepDeletes = false and its deleted record, which means delete deletes + // || !(upsertKeepDeletes && r.getField(opColumn).equals("d")) + ).collect(Collectors.toList()); + + if (deleteRows.size() == 0) { + return null; + } + + try { + LOGGER.debug("Writing data to equality delete file: {}!", out); + + deleteWriter = Parquet.writeDeletes(out) + .createWriterFunc(GenericParquetWriter::buildWriter) + .overwrite() + .rowSchema(icebergTable.sortOrder().schema()) + .withSpec(icebergTable.spec()) + .equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds())) + .metricsConfig(MetricsConfig.fromProperties(icebergTable.properties())) + .withSortOrder(icebergTable.sortOrder()) + .setAll(icebergTable.properties()) + .buildEqualityWriter() + ; + + try (Closeable toClose = deleteWriter) { + deleteWriter.deleteAll(deleteRows); + } + + } catch (IOException e) { + throw new InterruptedException(e.getMessage()); + } + + LOGGER.debug("Creating iceberg equality delete file!"); + // Equality delete files identify deleted rows in a collection of data files by one or more column values, + // and may optionally contain additional columns of the deleted row. + return FileMetadata.deleteFileBuilder(icebergTable.spec()) + .ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds())) + .withFormat(FileFormat.PARQUET) + .withPath(out.location()) + .withFileSizeInBytes(deleteWriter.length()) + .withFileSizeInBytes(deleteWriter.length()) + .withRecordCount(deleteRows.size()) + .withSortOrder(icebergTable.sortOrder()) + .build(); + } + + private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { + ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); + + for (ChangeEvent e : events) { + GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), + getBytes(e.value()))); + + // only replace it if its newer + if (icebergRecordsmap.containsKey(e.key())) { + + if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) { + icebergRecordsmap.put(e.key(), icebergRecord); + } + + } else { + icebergRecordsmap.put(e.key(), icebergRecord); + } + + } + return new ArrayList<>(icebergRecordsmap.values()); + } + + private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { + if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) { + // return (x < y) ? -1 : ((x == y) ? 0 : 1); + return + cdcOperations.getOrDefault(lhs.getField(opColumn), -1) + .compareTo( + cdcOperations.getOrDefault(rhs.getField(opColumn), -1) + ) + ; + } else { + return Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn)); + } + } + + @Override + public void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException { + + if (icebergTable.sortOrder().isUnsorted()) { + LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); + // call append here! + icebergTableAppend.addToTable(icebergTable, events); + return; + } + + // DO UPSERT >>> DELETE + INSERT + ArrayList icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events); + DataFile dataFile = getDataFile(icebergTable, icebergRecords); + DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords); + LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path()); + RowDelta c = icebergTable + .newRowDelta() + .addRows(dataFile); + + if (deleteDataFile != null) { + c.addDeletes(deleteDataFile) + .validateDeletedFiles(); + } + + c.commit(); + LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); + } + + @Override + public Predicate filterEvents() { + return p -> + // if its upsert and upsertKeepDeletes = true + upsertKeepDeletes + // if not then exclude deletes + || !(p.getField(opColumn).equals("d")); + } + +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java new file mode 100644 index 00000000..fd14ec96 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java @@ -0,0 +1,35 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperator; + +import io.debezium.engine.ChangeEvent; + +import java.util.ArrayList; +import java.util.Optional; +import java.util.function.Predicate; + +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; + +public interface InterfaceIcebergTableOperator { + + void initialize(); + + void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException; + + Predicate filterEvents(); + + Table createIcebergTable(Catalog catalog, + TableIdentifier tableIdentifier, + ChangeEvent event); + + Optional
loadIcebergTable(Catalog catalog, TableIdentifier tableId); +}