From b6609bdf43c78ec68e1d4908dd0a97dbd97dcbe8 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sat, 16 Oct 2021 15:00:07 +0200 Subject: [PATCH 1/3] WIP table operators --- .../server/iceberg/IcebergChangeConsumer.java | 23 +++ .../AbstractIcebergTableOperator.java | 158 ++++++++++++++++ .../IcebergTableOperatorAppend.java | 39 ++++ .../IcebergTableOperatorUpsert.java | 177 ++++++++++++++++++ .../InterfaceIcebergTableOperator.java | 24 +++ 5 files changed, 421 insertions(+) create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/AbstractIcebergTableOperator.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorAppend.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorUpsert.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/InterfaceIcebergTableOperator.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 939c594f..94b33009 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -15,6 +15,7 @@ import io.debezium.serde.DebeziumSerdes; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; +import io.debezium.server.iceberg.tableoperators.InterfaceIcebergTableOperator; import java.io.Closeable; import java.io.IOException; @@ -103,6 +104,12 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu Deserializer valDeserializer; private IcebergTableOperations icebergTableOperations; + + @Inject + @Any + Instance icebergTableOperatorInstances; + InterfaceIcebergTableOperator icebergTableOperator; + @PostConstruct void connect() throws InterruptedException { if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { @@ -132,6 +139,21 @@ void connect() throws InterruptedException { batchSizeWait = instance.get(); batchSizeWait.initizalize(); LOGGER.info("Using {}", batchSizeWait.getClass().getName()); + + // init table operator + ; + String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend"; + Instance toInstance = icebergTableOperatorInstances.select(NamedLiteral.of(icebergTableOperatorName)); + if (instance.isAmbiguous()) { + throw new DebeziumException("Multiple icebergTableOperation size wait class were found"); + } + if (instance.isUnsatisfied()) { + throw new DebeziumException("No batch size wait class is available"); + } + icebergTableOperator = toInstance.get(); + icebergTableOperator.initialize(); + LOGGER.info("Using {}", icebergTableOperator.getClass().getName()); + } public String map(String destination) { @@ -172,6 +194,7 @@ public void handleBatch(List> records, DebeziumEngin Table icebergTable = icebergTableOperations.loadTable(tableIdentifier) .orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0))); addToTable(icebergTable, event.getValue()); + //icebergTableOperator.addToTable(icebergTable,event.getValue()); } // workaround! somehow offset is not saved to file unless we call committer.markProcessed // even its should be saved to file periodically diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/AbstractIcebergTableOperator.java new file mode 100644 index 00000000..2acddc74 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/AbstractIcebergTableOperator.java @@ -0,0 +1,158 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperators; + +import io.debezium.DebeziumException; +import io.debezium.engine.ChangeEvent; +import io.debezium.serde.DebeziumSerdes; +import io.debezium.server.iceberg.IcebergUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.iceberg.*; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper to perform operations in iceberg tables + * @author Rafael Acevedo + */ +abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); + + @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") + String tablePrefix; + @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") + String namespace; + @ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default") + String catalogName; + @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") + boolean upsert; + @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") + boolean upsertKeepDeletes; + @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") + String opColumn; + @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") + String sourceTsMsColumn; + @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") + String batchSizeWaitName; + + Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); + protected Deserializer valDeserializer; + + protected byte[] getBytes(Object object) { + if (object instanceof byte[]) { + return (byte[]) object; + } + else if (object instanceof String) { + return ((String) object).getBytes(); + } + throw new DebeziumException(unsupportedTypeMessage(object)); + } + + protected String getString(Object object) { + if (object instanceof String) { + return (String) object; + } + throw new DebeziumException(unsupportedTypeMessage(object)); + } + + protected String unsupportedTypeMessage(Object object) { + final String type = (object == null) ? "null" : object.getClass().getName(); + return "Unexpected data type '" + type + "'"; + } + + protected ArrayList toIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { + + ArrayList icebergRecords = new ArrayList<>(); + for (ChangeEvent e : events) { + GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), + getBytes(e.value()))); + icebergRecords.add(icebergRecord); + } + + return icebergRecords; + } + + protected DataFile getDataFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { + final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; + OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); + + FileAppender writer; + // if its append OR upsert with keep deletes then add all the records to data file + // if table has no PK - which means fall back to append + // if its upsert and upsertKeepDeletes = true + // if nothing above then exclude deletes + List newRecords = icebergRecords.stream() + .filter(r -> + // if its append OR upsert with keep deletes then add all the records to data file + !upsert + // if table has no PK - which means fall back to append + || icebergTable.sortOrder().isUnsorted() + // if its upsert and upsertKeepDeletes = true + || upsertKeepDeletes + // if nothing above then exclude deletes + || !(r.getField(opColumn).equals("d"))) + .collect(Collectors.toList()); + try { + LOGGER.debug("Writing data to file: {}!", out); + writer = Parquet.write(out) + .createWriterFunc(GenericParquetWriter::buildWriter) + .forTable(icebergTable) + .overwrite() + .build(); + + try (Closeable toClose = writer) { + writer.addAll(newRecords); + } + + } catch (IOException e) { + throw new InterruptedException(e.getMessage()); + } + + LOGGER.debug("Creating iceberg DataFile!"); + return DataFiles.builder(icebergTable.spec()) + .withFormat(FileFormat.PARQUET) + .withPath(out.location()) + .withFileSizeInBytes(writer.length()) + .withSplitOffsets(writer.splitOffsets()) + .withMetrics(writer.metrics()) + .build(); + } + + public Optional loadTable(Catalog catalog, TableIdentifier tableId) { + try { + Table table = catalog.loadTable(tableId); + return Optional.of(table); + } catch (NoSuchTableException e) { + LOGGER.warn("table not found: {}", tableId.toString()); + return Optional.empty(); + } + } +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorAppend.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorAppend.java new file mode 100644 index 00000000..4220fcc9 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorAppend.java @@ -0,0 +1,39 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperators; + +import io.debezium.engine.ChangeEvent; + +import java.util.ArrayList; +import javax.enterprise.context.Dependent; +import javax.inject.Named; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Dependent +@Named("IcebergTableOperatorAppend") +public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); + + @Override + public void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException { + + ArrayList icebergRecords = toIcebergRecords(icebergTable.schema(), events); + DataFile dataFile = getDataFile(icebergTable, icebergRecords); + LOGGER.debug("Committing new file as Append '{}' !", dataFile.path()); + icebergTable.newAppend() + .appendFile(dataFile) + .commit(); + } + +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorUpsert.java new file mode 100644 index 00000000..afd8cf94 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorUpsert.java @@ -0,0 +1,177 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperators; + +import io.debezium.engine.ChangeEvent; +import io.debezium.server.iceberg.IcebergUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import javax.enterprise.context.Dependent; +import javax.inject.Inject; +import javax.inject.Named; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import org.apache.iceberg.*; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Dependent +@Named("IcebergTableOperatorUpsert") +public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator { + + static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class); + + + @Inject + IcebergTableOperatorAppend icebergTableAppend; + + private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { + + final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; + OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); + Set equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds(); + + EqualityDeleteWriter deleteWriter; + + // anything is not an insert. + // upsertKeepDeletes = false, which means delete deletes + List deleteRows = icebergRecords.stream() + .filter(r -> + // anything is not an insert. + !r.getField(opColumn).equals("c") + // upsertKeepDeletes = false and its deleted record, which means delete deletes + // || !(upsertKeepDeletes && r.getField(opColumn).equals("d")) + ).collect(Collectors.toList()); + + if (deleteRows.size() == 0) { + return null; + } + + try { + LOGGER.debug("Writing data to equality delete file: {}!", out); + + deleteWriter = Parquet.writeDeletes(out) + .createWriterFunc(GenericParquetWriter::buildWriter) + .overwrite() + .rowSchema(icebergTable.sortOrder().schema()) + .withSpec(icebergTable.spec()) + .equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds())) + .metricsConfig(MetricsConfig.fromProperties(icebergTable.properties())) + .withSortOrder(icebergTable.sortOrder()) + .setAll(icebergTable.properties()) + .buildEqualityWriter() + ; + + try (Closeable toClose = deleteWriter) { + deleteWriter.deleteAll(deleteRows); + } + + } catch (IOException e) { + throw new InterruptedException(e.getMessage()); + } + + LOGGER.debug("Creating iceberg equality delete file!"); + // Equality delete files identify deleted rows in a collection of data files by one or more column values, + // and may optionally contain additional columns of the deleted row. + return FileMetadata.deleteFileBuilder(icebergTable.spec()) + .ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds())) + .withFormat(FileFormat.PARQUET) + .withPath(out.location()) + .withFileSizeInBytes(deleteWriter.length()) + .withFileSizeInBytes(deleteWriter.length()) + .withRecordCount(deleteRows.size()) + .withSortOrder(icebergTable.sortOrder()) + .build(); + } + + private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { + ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); + + for (ChangeEvent e : events) { + GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), + getBytes(e.value()))); + + // only replace it if its newer + if (icebergRecordsmap.containsKey(e.key())) { + + if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) { + icebergRecordsmap.put(e.key(), icebergRecord); + } + + } else { + icebergRecordsmap.put(e.key(), icebergRecord); + } + + } + return new ArrayList<>(icebergRecordsmap.values()); + } + + public int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { + if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) { + // return (x < y) ? -1 : ((x == y) ? 0 : 1); + return + cdcOperations.getOrDefault(lhs.getField(opColumn), -1) + .compareTo( + cdcOperations.getOrDefault(rhs.getField(opColumn), -1) + ) + ; + } else { + return Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn)); + } + } + + @Override + public void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException { + + if (icebergTable.sortOrder().isUnsorted()) { + LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); + // call append here! + icebergTableAppend.addToTable(icebergTable,events); + return; + } + + // DO UPSERT >>> DELETE + INSERT + ArrayList icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events); + DataFile dataFile = getDataFile(icebergTable, icebergRecords); + DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords); + LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path()); + RowDelta c = icebergTable + .newRowDelta() + .addRows(dataFile); + + if (deleteDataFile != null) { + c.addDeletes(deleteDataFile) + .validateDeletedFiles(); + } + + c.commit(); + + LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); + + } + + +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/InterfaceIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/InterfaceIcebergTableOperator.java new file mode 100644 index 00000000..6ee0a0a1 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/InterfaceIcebergTableOperator.java @@ -0,0 +1,24 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperators; + +import io.debezium.engine.ChangeEvent; + +import java.util.ArrayList; + +import org.apache.iceberg.Table; + +public interface InterfaceIcebergTableOperator { + + default void initialize(){ + } + + void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException; + +} From 17375801f27b5a24eac4c0b3d7eca327476eda6a Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sat, 16 Oct 2021 17:42:12 +0200 Subject: [PATCH 2/3] WIP table operators --- .../server/iceberg/IcebergChangeConsumer.java | 233 +----------------- .../AbstractIcebergTableOperator.java | 53 +--- .../IcebergTableOperatorAppend.java | 16 +- .../IcebergTableOperatorUpsert.java | 76 +++--- .../InterfaceIcebergTableOperator.java | 10 +- 5 files changed, 90 insertions(+), 298 deletions(-) rename debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/{tableoperators => tableoperator}/AbstractIcebergTableOperator.java (67%) rename debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/{tableoperators => tableoperator}/IcebergTableOperatorAppend.java (73%) rename debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/{tableoperators => tableoperator}/IcebergTableOperatorUpsert.java (75%) rename debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/{tableoperators => tableoperator}/InterfaceIcebergTableOperator.java (70%) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 94b33009..757ab85c 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -12,16 +12,15 @@ import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; -import io.debezium.serde.DebeziumSerdes; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; -import io.debezium.server.iceberg.tableoperators.InterfaceIcebergTableOperator; +import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator; -import java.io.Closeable; -import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -32,24 +31,14 @@ import javax.inject.Inject; import javax.inject.Named; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.primitives.Ints; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.*; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.deletes.EqualityDeleteWriter; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -93,18 +82,17 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String sourceTsMsColumn; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; + @Inject @Any Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; + Configuration hadoopConf = new Configuration(); Catalog icebergCatalog; Map icebergProperties = new ConcurrentHashMap<>(); - Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); - Deserializer valDeserializer; private IcebergTableOperations icebergTableOperations; - @Inject @Any Instance icebergTableOperatorInstances; @@ -127,9 +115,6 @@ void connect() throws InterruptedException { icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf); icebergTableOperations = new IcebergTableOperations(icebergCatalog); - valSerde.configure(Collections.emptyMap(), false); - valDeserializer = valSerde.deserializer(); - Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); if (instance.isAmbiguous()) { throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found"); @@ -140,15 +125,13 @@ void connect() throws InterruptedException { batchSizeWait.initizalize(); LOGGER.info("Using {}", batchSizeWait.getClass().getName()); - // init table operator - ; String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend"; Instance toInstance = icebergTableOperatorInstances.select(NamedLiteral.of(icebergTableOperatorName)); if (instance.isAmbiguous()) { - throw new DebeziumException("Multiple icebergTableOperation size wait class were found"); + throw new DebeziumException("Multiple class named `"+icebergTableOperatorName+"` were found"); } if (instance.isUnsatisfied()) { - throw new DebeziumException("No batch size wait class is available"); + throw new DebeziumException("No class named `"+icebergTableOperatorName+"` found"); } icebergTableOperator = toInstance.get(); icebergTableOperator.initialize(); @@ -193,8 +176,8 @@ public void handleBatch(List> records, DebeziumEngin final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); Table icebergTable = icebergTableOperations.loadTable(tableIdentifier) .orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0))); - addToTable(icebergTable, event.getValue()); - //icebergTableOperator.addToTable(icebergTable,event.getValue()); + //addToTable(icebergTable, event.getValue()); + icebergTableOperator.addToTable(icebergTable,event.getValue()); } // workaround! somehow offset is not saved to file unless we call committer.markProcessed // even its should be saved to file periodically @@ -208,194 +191,4 @@ public void handleBatch(List> records, DebeziumEngin } - public int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { - if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) { - // return (x < y) ? -1 : ((x == y) ? 0 : 1); - return - cdcOperations.getOrDefault(lhs.getField(opColumn), -1) - .compareTo( - cdcOperations.getOrDefault(rhs.getField(opColumn), -1) - ) - ; - } else { - return Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn)); - } - } - - - private ArrayList toIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { - - ArrayList icebergRecords = new ArrayList<>(); - for (ChangeEvent e : events) { - GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), - getBytes(e.value()))); - icebergRecords.add(icebergRecord); - } - - return icebergRecords; - } - - private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { - ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); - - for (ChangeEvent e : events) { - GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(), - getBytes(e.value()))); - - // only replace it if its newer - if (icebergRecordsmap.containsKey(e.key())) { - - if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) { - icebergRecordsmap.put(e.key(), icebergRecord); - } - - } else { - icebergRecordsmap.put(e.key(), icebergRecord); - } - - } - return new ArrayList<>(icebergRecordsmap.values()); - } - - private void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException { - - if (!upsert || icebergTable.sortOrder().isUnsorted()) { - - if (upsert && icebergTable.sortOrder().isUnsorted()) { - LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); - } - - ArrayList icebergRecords = toIcebergRecords(icebergTable.schema(), events); - DataFile dataFile = getDataFile(icebergTable, icebergRecords); - LOGGER.debug("Committing new file as Append '{}' !", dataFile.path()); - icebergTable.newAppend() - .appendFile(dataFile) - .commit(); - - } else { - // DO UPSERT >>> DELETE + INSERT - ArrayList icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events); - DataFile dataFile = getDataFile(icebergTable, icebergRecords); - DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords); - LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path()); - RowDelta c = icebergTable - .newRowDelta() - .addRows(dataFile); - - if (deleteDataFile != null) { - c.addDeletes(deleteDataFile) - .validateDeletedFiles(); - } - - c.commit(); - } - LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); - - } - - private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { - - final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; - OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); - Set equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds(); - - EqualityDeleteWriter deleteWriter; - - // anything is not an insert. - // upsertKeepDeletes = false, which means delete deletes - List deleteRows = icebergRecords.stream() - .filter(r -> - // anything is not an insert. - !r.getField(opColumn).equals("c") - // upsertKeepDeletes = false and its deleted record, which means delete deletes - // || !(upsertKeepDeletes && r.getField(opColumn).equals("d")) - ).collect(Collectors.toList()); - - if (deleteRows.size() == 0) { - return null; - } - - try { - LOGGER.debug("Writing data to equality delete file: {}!", out); - - deleteWriter = Parquet.writeDeletes(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .rowSchema(icebergTable.sortOrder().schema()) - .withSpec(icebergTable.spec()) - .equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds())) - .metricsConfig(MetricsConfig.fromProperties(icebergTable.properties())) - .withSortOrder(icebergTable.sortOrder()) - .setAll(icebergTable.properties()) - .buildEqualityWriter() - ; - - try (Closeable toClose = deleteWriter) { - deleteWriter.deleteAll(deleteRows); - } - - } catch (IOException e) { - throw new InterruptedException(e.getMessage()); - } - - LOGGER.debug("Creating iceberg equality delete file!"); - // Equality delete files identify deleted rows in a collection of data files by one or more column values, - // and may optionally contain additional columns of the deleted row. - return FileMetadata.deleteFileBuilder(icebergTable.spec()) - .ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds())) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(deleteWriter.length()) - .withFileSizeInBytes(deleteWriter.length()) - .withRecordCount(deleteRows.size()) - .withSortOrder(icebergTable.sortOrder()) - .build(); - } - - private DataFile getDataFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { - final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; - OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); - - FileAppender writer; - // if its append OR upsert with keep deletes then add all the records to data file - // if table has no PK - which means fall back to append - // if its upsert and upsertKeepDeletes = true - // if nothing above then exclude deletes - List newRecords = icebergRecords.stream() - .filter(r -> - // if its append OR upsert with keep deletes then add all the records to data file - !upsert - // if table has no PK - which means fall back to append - || icebergTable.sortOrder().isUnsorted() - // if its upsert and upsertKeepDeletes = true - || upsertKeepDeletes - // if nothing above then exclude deletes - || !(r.getField(opColumn).equals("d"))) - .collect(Collectors.toList()); - try { - LOGGER.debug("Writing data to file: {}!", out); - writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .forTable(icebergTable) - .overwrite() - .build(); - - try (Closeable toClose = writer) { - writer.addAll(newRecords); - } - - } catch (IOException e) { - throw new InterruptedException(e.getMessage()); - } - - LOGGER.debug("Creating iceberg DataFile!"); - return DataFiles.builder(icebergTable.spec()) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(writer.length()) - .withSplitOffsets(writer.splitOffsets()) - .withMetrics(writer.metrics()) - .build(); - } - } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java similarity index 67% rename from debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/AbstractIcebergTableOperator.java rename to debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java index 2acddc74..3179da5f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/AbstractIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java @@ -6,7 +6,7 @@ * */ -package io.debezium.server.iceberg.tableoperators; +package io.debezium.server.iceberg.tableoperator; import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; @@ -16,10 +16,7 @@ import java.io.Closeable; import java.io.IOException; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; import com.fasterxml.jackson.databind.JsonNode; @@ -35,42 +32,29 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Wrapper to perform operations in iceberg tables + * * @author Rafael Acevedo */ abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); - - @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") - String tablePrefix; - @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") - String namespace; - @ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default") - String catalogName; - @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") - boolean upsert; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") - boolean upsertKeepDeletes; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") - String opColumn; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") - String sourceTsMsColumn; - @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") - String batchSizeWaitName; - Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); - protected Deserializer valDeserializer; + Deserializer valDeserializer; + + @Override + public void initialize() { + valSerde.configure(Collections.emptyMap(), false); + valDeserializer = valSerde.deserializer(); + } protected byte[] getBytes(Object object) { if (object instanceof byte[]) { return (byte[]) object; - } - else if (object instanceof String) { + } else if (object instanceof String) { return ((String) object).getBytes(); } throw new DebeziumException(unsupportedTypeMessage(object)); @@ -105,21 +89,8 @@ protected DataFile getDataFile(Table icebergTable, ArrayList icebergReco OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); FileAppender writer; - // if its append OR upsert with keep deletes then add all the records to data file - // if table has no PK - which means fall back to append - // if its upsert and upsertKeepDeletes = true - // if nothing above then exclude deletes List newRecords = icebergRecords.stream() - .filter(r -> - // if its append OR upsert with keep deletes then add all the records to data file - !upsert - // if table has no PK - which means fall back to append - || icebergTable.sortOrder().isUnsorted() - // if its upsert and upsertKeepDeletes = true - || upsertKeepDeletes - // if nothing above then exclude deletes - || !(r.getField(opColumn).equals("d"))) - .collect(Collectors.toList()); + .filter(this.filterEvents()).collect(Collectors.toList()); try { LOGGER.debug("Writing data to file: {}!", out); writer = Parquet.write(out) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorAppend.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java similarity index 73% rename from debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorAppend.java rename to debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java index 4220fcc9..851dc284 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorAppend.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java @@ -6,14 +6,16 @@ * */ -package io.debezium.server.iceberg.tableoperators; +package io.debezium.server.iceberg.tableoperator; import io.debezium.engine.ChangeEvent; import java.util.ArrayList; +import java.util.function.Predicate; import javax.enterprise.context.Dependent; import javax.inject.Named; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.Table; import org.apache.iceberg.data.Record; @@ -31,9 +33,15 @@ public void addToTable(Table icebergTable, ArrayList ArrayList icebergRecords = toIcebergRecords(icebergTable.schema(), events); DataFile dataFile = getDataFile(icebergTable, icebergRecords); LOGGER.debug("Committing new file as Append '{}' !", dataFile.path()); - icebergTable.newAppend() - .appendFile(dataFile) - .commit(); + AppendFiles c = icebergTable.newAppend() + .appendFile(dataFile); + + c.commit(); + LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); } + @Override + public Predicate filterEvents() { + return p -> true; + } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java similarity index 75% rename from debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorUpsert.java rename to debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java index afd8cf94..c8c97825 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/IcebergTableOperatorUpsert.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java @@ -6,7 +6,7 @@ * */ -package io.debezium.server.iceberg.tableoperators; +package io.debezium.server.iceberg.tableoperator; import io.debezium.engine.ChangeEvent; import io.debezium.server.iceberg.IcebergUtil; @@ -14,11 +14,9 @@ import java.io.Closeable; import java.io.IOException; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import java.util.stream.Collectors; import javax.enterprise.context.Dependent; import javax.inject.Inject; @@ -34,6 +32,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +40,26 @@ @Named("IcebergTableOperatorUpsert") public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator { - static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class); + static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); + @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") + String sourceTsMsColumn; + @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") + boolean upsertKeepDeletes; + @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") + String opColumn; @Inject IcebergTableOperatorAppend icebergTableAppend; + + @Override + public void initialize() { + super.initialize(); + icebergTableAppend.initialize(); + } + private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; @@ -129,7 +141,7 @@ private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList(icebergRecordsmap.values()); } - public int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { + private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) { // return (x < y) ? -1 : ((x == y) ? 0 : 1); return @@ -146,32 +158,38 @@ public int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { @Override public void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException { - if (icebergTable.sortOrder().isUnsorted()) { - LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); - // call append here! - icebergTableAppend.addToTable(icebergTable,events); - return; - } - - // DO UPSERT >>> DELETE + INSERT - ArrayList icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events); - DataFile dataFile = getDataFile(icebergTable, icebergRecords); - DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords); - LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path()); - RowDelta c = icebergTable - .newRowDelta() - .addRows(dataFile); - - if (deleteDataFile != null) { - c.addDeletes(deleteDataFile) - .validateDeletedFiles(); - } + if (icebergTable.sortOrder().isUnsorted()) { + LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); + // call append here! + icebergTableAppend.addToTable(icebergTable, events); + return; + } - c.commit(); + // DO UPSERT >>> DELETE + INSERT + ArrayList icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events); + DataFile dataFile = getDataFile(icebergTable, icebergRecords); + DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords); + LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path()); + RowDelta c = icebergTable + .newRowDelta() + .addRows(dataFile); + + if (deleteDataFile != null) { + c.addDeletes(deleteDataFile) + .validateDeletedFiles(); + } + c.commit(); LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); - } + @Override + public Predicate filterEvents() { + return p -> + // if its upsert and upsertKeepDeletes = true + upsertKeepDeletes + // if not then exclude deletes + || !(p.getField(opColumn).equals("d")); + } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/InterfaceIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java similarity index 70% rename from debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/InterfaceIcebergTableOperator.java rename to debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java index 6ee0a0a1..e444185c 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperators/InterfaceIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java @@ -6,19 +6,21 @@ * */ -package io.debezium.server.iceberg.tableoperators; +package io.debezium.server.iceberg.tableoperator; import io.debezium.engine.ChangeEvent; import java.util.ArrayList; +import java.util.function.Predicate; import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; public interface InterfaceIcebergTableOperator { - default void initialize(){ - } - + void initialize(); + void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException; + Predicate filterEvents(); } From f2d96c43a342ca3c49ba37aa0bd2e30d79a56750 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sat, 16 Oct 2021 18:09:15 +0200 Subject: [PATCH 3/3] WIP table operators --- .../server/iceberg/IcebergChangeConsumer.java | 42 ++++--------------- .../iceberg/IcebergTableOperations.java | 33 --------------- .../AbstractIcebergTableOperator.java | 27 +++++++++++- .../IcebergTableOperatorUpsert.java | 5 ++- .../InterfaceIcebergTableOperator.java | 9 ++++ 5 files changed, 46 insertions(+), 70 deletions(-) delete mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 757ab85c..ffae1728 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -31,7 +31,6 @@ import javax.inject.Inject; import javax.inject.Named; -import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -55,13 +54,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class); private static final String PROP_PREFIX = "debezium.sink.iceberg."; - static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); @ConfigProperty(name = "debezium.format.value", defaultValue = "json") String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") String keyFormat; - @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") - boolean eventSchemaEnabled; @ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION) String warehouseLocation; @ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS") @@ -74,12 +70,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String catalogName; @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsert; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") - boolean upsertKeepDeletes; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") - String opColumn; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") - String sourceTsMsColumn; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; @@ -91,7 +81,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu Configuration hadoopConf = new Configuration(); Catalog icebergCatalog; Map icebergProperties = new ConcurrentHashMap<>(); - private IcebergTableOperations icebergTableOperations; @Inject @Any @@ -113,7 +102,6 @@ void connect() throws InterruptedException { conf.forEach(this.icebergProperties::put); icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf); - icebergTableOperations = new IcebergTableOperations(icebergCatalog); Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); if (instance.isAmbiguous()) { @@ -128,10 +116,10 @@ void connect() throws InterruptedException { String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend"; Instance toInstance = icebergTableOperatorInstances.select(NamedLiteral.of(icebergTableOperatorName)); if (instance.isAmbiguous()) { - throw new DebeziumException("Multiple class named `"+icebergTableOperatorName+"` were found"); + throw new DebeziumException("Multiple class named `" + icebergTableOperatorName + "` were found"); } if (instance.isUnsatisfied()) { - throw new DebeziumException("No class named `"+icebergTableOperatorName+"` found"); + throw new DebeziumException("No class named `" + icebergTableOperatorName + "` found"); } icebergTableOperator = toInstance.get(); icebergTableOperator.initialize(); @@ -143,24 +131,6 @@ public String map(String destination) { return destination.replace(".", "_"); } - private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent event) { - - if (!eventSchemaEnabled) { - throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + - "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); - } - - if (event.value() == null) { - throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); - } - - DebeziumToIcebergTable eventSchema = event.key() == null - ? new DebeziumToIcebergTable(getBytes(event.value())) - : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); - - return eventSchema.create(icebergCatalog, tableIdentifier); - } - @Override public void handleBatch(List> records, DebeziumEngine.RecordCommitter> committer) throws InterruptedException { @@ -174,10 +144,12 @@ public void handleBatch(List> records, DebeziumEngin for (Map.Entry>> event : result.entrySet()) { final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); - Table icebergTable = icebergTableOperations.loadTable(tableIdentifier) - .orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0))); + Table icebergTable = icebergTableOperator + .loadIcebergTable(icebergCatalog, tableIdentifier) + .orElseGet(() -> + icebergTableOperator.createIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0))); //addToTable(icebergTable, event.getValue()); - icebergTableOperator.addToTable(icebergTable,event.getValue()); + icebergTableOperator.addToTable(icebergTable, event.getValue()); } // workaround! somehow offset is not saved to file unless we call committer.markProcessed // even its should be saved to file periodically diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java deleted file mode 100644 index fde020e6..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.debezium.server.iceberg; - -import java.util.Optional; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Wrapper to perform operations in iceberg tables - * @author Rafael Acevedo - */ -public class IcebergTableOperations { - private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperations.class); - - private final Catalog catalog; - - public IcebergTableOperations(Catalog catalog) { - this.catalog = catalog; - } - - public Optional
loadTable(TableIdentifier tableId) { - try { - Table table = catalog.loadTable(tableId); - return Optional.of(table); - } catch (NoSuchTableException e) { - LOGGER.warn("table not found: {}", tableId.toString()); - return Optional.empty(); - } - } -} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java index 3179da5f..fed67fb4 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java @@ -11,6 +11,7 @@ import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.serde.DebeziumSerdes; +import io.debezium.server.iceberg.DebeziumToIcebergTable; import io.debezium.server.iceberg.IcebergUtil; import java.io.Closeable; @@ -32,6 +33,7 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,9 @@ */ abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); + + @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") + boolean eventSchemaEnabled; Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); Deserializer valDeserializer; @@ -117,7 +122,27 @@ protected DataFile getDataFile(Table icebergTable, ArrayList icebergReco .build(); } - public Optional
loadTable(Catalog catalog, TableIdentifier tableId) { + public Table createIcebergTable(Catalog catalog, + TableIdentifier tableIdentifier, + ChangeEvent event) { + + if (!eventSchemaEnabled) { + throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); + } + + if (event.value() == null) { + throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); + } + + DebeziumToIcebergTable eventSchema = event.key() == null + ? new DebeziumToIcebergTable(getBytes(event.value())) + : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); + + return eventSchema.create(catalog, tableIdentifier); + } + + public Optional
loadIcebergTable(Catalog catalog, TableIdentifier tableId) { try { Table table = catalog.loadTable(tableId); return Optional.of(table); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java index c8c97825..f19ac465 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java @@ -14,7 +14,10 @@ import java.io.Closeable; import java.io.IOException; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.stream.Collectors; diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java index e444185c..fd14ec96 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java @@ -11,9 +11,12 @@ import io.debezium.engine.ChangeEvent; import java.util.ArrayList; +import java.util.Optional; import java.util.function.Predicate; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; public interface InterfaceIcebergTableOperator { @@ -21,6 +24,12 @@ public interface InterfaceIcebergTableOperator { void initialize(); void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException; + Predicate filterEvents(); + Table createIcebergTable(Catalog catalog, + TableIdentifier tableIdentifier, + ChangeEvent event); + + Optional
loadIcebergTable(Catalog catalog, TableIdentifier tableId); }