From 63e17938492bb3c591a228f173fc9ef38f1e93e7 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 14 Jun 2022 09:37:29 +0200 Subject: [PATCH] Upgrade iceberg to 0.13.2 (#88) * Upgrade iceberg 0.13.2 * Improve code comments --- debezium-server-iceberg-sink/pom.xml | 1 - .../server/iceberg/IcebergChangeEvent.java | 2 +- .../tableoperator/BaseDeltaTaskWriter.java | 8 +++- .../tableoperator/IcebergTableOperator.java | 41 ++++++++++++++++++- .../PartitionedAppendWriter.java | 2 +- pom.xml | 2 +- 6 files changed, 50 insertions(+), 6 deletions(-) diff --git a/debezium-server-iceberg-sink/pom.xml b/debezium-server-iceberg-sink/pom.xml index 2cd9a6ea..d48dbc7d 100644 --- a/debezium-server-iceberg-sink/pom.xml +++ b/debezium-server-iceberg-sink/pom.xml @@ -53,7 +53,6 @@ iceberg-spark-runtime-3.2_2.12 ${version.iceberg} - software.amazon.awssdk diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 565543e8..c96ba2cb 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -33,7 +33,7 @@ public class IcebergChangeEvent { protected final String destination; protected final JsonNode value; protected final JsonNode key; - JsonSchema jsonSchema; + final JsonSchema jsonSchema; public IcebergChangeEvent(String destination, JsonNode value, diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java index 09fe1ec4..ea0867d9 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java @@ -18,6 +18,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final Schema schema; private final Schema deleteSchema; private final InternalRecordWrapper wrapper; + private final InternalRecordWrapper keyWrapper; private final boolean upsert; private final boolean upsertKeepDeletes; @@ -35,6 +36,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { this.schema = schema; this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); this.wrapper = new InternalRecordWrapper(schema.asStruct()); + this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct()); this.upsert = upsert; this.upsertKeepDeletes = upsertKeepDeletes; } @@ -50,7 +52,6 @@ public void write(Record row) throws IOException { RowDataDeltaWriter writer = route(row); if (upsert && !row.getField("__op").equals("c")) {// anything which not an insert is upsert writer.delete(row); - //System.out.println("->" + row); } // if its deleted row and upsertKeepDeletes = true then add deleted record to target table // else deleted records are deleted from target table @@ -71,5 +72,10 @@ public class RowDataDeltaWriter extends BaseEqualityDeltaWriter { protected StructLike asStructLike(Record data) { return wrapper.wrap(data); } + + @Override + protected StructLike asStructLikeKey(Record data) { + return keyWrapper.wrap(data); + } } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 8bfbf1e5..479ec038 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -78,7 +78,19 @@ private List deduplicateBatch(List event return new ArrayList<>(icebergRecordsmap.values()); } - + /** + * This is used to deduplicate events within given batch. + *

+ * Forex ample a record can be updated multiple times in the source. for example insert followed by update and + * delete. for this case we need to only pick last change event for the row. + *

+ * Its used when `upsert` feature enabled (when the consumer operating non append mode) which means it should not add + * duplicate records to target table. + * + * @param lhs + * @param rhs + * @return + */ private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) { int result = Long.compare(lhs.get(sourceTsMsColumn).asLong(0), rhs.get(sourceTsMsColumn).asLong(0)); @@ -94,6 +106,15 @@ private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) { return result; } + /** + * If given schema contains new fields compared to target table schema then it adds new fields to target iceberg + * table. + *

+ * Its used when allow field addition feature is enabled. + * + * @param icebergTable + * @param newSchema + */ private void applyFieldAddition(Table icebergTable, Schema newSchema) { UpdateSchema us = icebergTable.updateSchema(). @@ -108,6 +129,18 @@ private void applyFieldAddition(Table icebergTable, Schema newSchema) { } } + /** + * Adds list of events to iceberg table. + *

+ * If field addition enabled then it groups list of change events by their schema first. Then adds new fields to + * iceberg table if there is any. And then follows with adding data to the table. + *

+ * New fields are detected using CDC event schema, since events are grouped by their schemas it uses single + * event to find-out schema for the whole list of events. + * + * @param icebergTable + * @param events + */ public void addToTable(Table icebergTable, List events) { // when operation mode is not upsert deduplicate the events to avoid inserting duplicate row @@ -134,6 +167,12 @@ public void addToTable(Table icebergTable, List events) { } + /** + * Adds list of change events to iceberg table. All the events are having same schema. + * + * @param icebergTable + * @param events + */ private void addToTablePerSchema(Table icebergTable, List events) { // Initialize a task writer to write both INSERT and equality DELETE. BaseTaskWriter writer = writerFactory.create(icebergTable); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedAppendWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedAppendWriter.java index 87e02af1..0ac5e00f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedAppendWriter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedAppendWriter.java @@ -13,7 +13,7 @@ public class PartitionedAppendWriter extends PartitionedWriter { private final PartitionKey partitionKey; - InternalRecordWrapper wrapper; + final InternalRecordWrapper wrapper; public PartitionedAppendWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, diff --git a/pom.xml b/pom.xml index 4c1ffb3b..97c66770 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ 3.0.9 3.3.0 2.12.6 - 0.13.1 + 0.13.2 3.2.1 3.3.1 2.17.120