+ * Forex ample a record can be updated multiple times in the source. for example insert followed by update and + * delete. for this case we need to only pick last change event for the row. + *
+ * Its used when `upsert` feature enabled (when the consumer operating non append mode) which means it should not add + * duplicate records to target table. + * + * @param lhs + * @param rhs + * @return + */ private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) { int result = Long.compare(lhs.get(sourceTsMsColumn).asLong(0), rhs.get(sourceTsMsColumn).asLong(0)); @@ -94,6 +106,15 @@ private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) { return result; } + /** + * If given schema contains new fields compared to target table schema then it adds new fields to target iceberg + * table. + *
+ * Its used when allow field addition feature is enabled. + * + * @param icebergTable + * @param newSchema + */ private void applyFieldAddition(Table icebergTable, Schema newSchema) { UpdateSchema us = icebergTable.updateSchema(). @@ -108,6 +129,18 @@ private void applyFieldAddition(Table icebergTable, Schema newSchema) { } } + /** + * Adds list of events to iceberg table. + *
+ * If field addition enabled then it groups list of change events by their schema first. Then adds new fields to + * iceberg table if there is any. And then follows with adding data to the table. + *
+ * New fields are detected using CDC event schema, since events are grouped by their schemas it uses single
+ * event to find-out schema for the whole list of events.
+ *
+ * @param icebergTable
+ * @param events
+ */
public void addToTable(Table icebergTable, List