diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d6e5c466..c9aecadc 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,6 +17,9 @@ on: - '.idea/**' - '.run/**' +env: + SPARK_LOCAL_IP: 127.0.0.1 + jobs: build: diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 479ec038..48ac9211 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -58,24 +58,20 @@ public class IcebergTableOperator { private List deduplicateBatch(List events) { - ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); - - for (IcebergChangeEvent e : events) { - - // deduplicate using key(PK) @TODO improve using map.merge - if (icebergRecordsmap.containsKey(e.key())) { - - // replace it if it's new - if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()).value(), e.value()) <= 0) { - icebergRecordsmap.put(e.key(), e); - } - - } else { - icebergRecordsmap.put(e.key(), e); - } - - } - return new ArrayList<>(icebergRecordsmap.values()); + ConcurrentHashMap deduplicatedEvents = new ConcurrentHashMap<>(); + + events.forEach(e -> + // deduplicate using key(PK) + deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> { + if (this.compareByTsThenOp(oldValue.value(), newValue.value()) <= 0) { + return newValue; + } else { + return oldValue; + } + }) + ); + + return new ArrayList<>(deduplicatedEvents.values()); } /**