From 0d02003b383b336a96ed162713fdf5ed06493f00 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 1 Jan 2024 15:38:17 +0100 Subject: [PATCH] Upgrade debezium to 2.5.0.Final (#262) * Upgrade debezium to 2.5.0.Final * Upgrade debezium to 2.5.0.Final --- .../offset/IcebergOffsetBackingStore.java | 70 +++++++++++++++---- .../testresources/TestChangeEvent.java | 4 +- pom.xml | 4 +- 3 files changed, 60 insertions(+), 18 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index 8e37e46c..b3855179 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -8,26 +8,14 @@ package io.debezium.server.iceberg.offset; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.Strings; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.enterprise.context.Dependent; import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; @@ -50,6 +38,20 @@ import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import static io.debezium.server.iceberg.IcebergChangeConsumer.PROP_PREFIX; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -102,6 +104,44 @@ public synchronized void start() { load(); } + @Override + public synchronized void stop() { + if (executor != null) { + shutdownExecutorServiceQuietly(executor, 30, TimeUnit.SECONDS); + executor = null; + } + LOG.info("Stopped IcebergOffsetBackingStore table:{}", tableFullName); + } + + + /** + * Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks, + * and then calling shutdownNow, if necessary, to cancel any lingering tasks. + * After the timeout/on interrupt, the service is forcefully closed. + * @param executorService The service to shut down. + * @param timeout The timeout of the shutdown. + * @param timeUnit The time unit of the shutdown timeout. + */ + public static void shutdownExecutorServiceQuietly(ExecutorService executorService, + long timeout, TimeUnit timeUnit) { + executorService.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!executorService.awaitTermination(timeout, timeUnit)) { + executorService.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(timeout, timeUnit)) { + LOG.error("Executor {} did not terminate in time", executorService); + } + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + private void initializeTable() { if (icebergCatalog.tableExists(tableId)) { offsetTable = icebergCatalog.loadTable(tableId); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java index 1fdbf734..b1f25633 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java @@ -129,7 +129,9 @@ public V record() { public String destination() { return destination; } - + public Integer partition() { + return 0; + } @Override public String toString() { return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value + ", sourceRecord=" + destination + "]"; diff --git a/pom.xml b/pom.xml index 2eca75da..731d1ab9 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ pom - 0.3.0-SNAPSHOT + 0.4.0-SNAPSHOT UTF-8 @@ -37,7 +37,7 @@ 2.19.4 1.19.3 - 2.4.0.CR1 + 2.5.0.Final 8.0.32 3.5.3