From 1eae890c40f02259b0a413982ba193279592267f Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 1 Jan 2024 13:41:48 +0100 Subject: [PATCH 1/2] Upgrade debezium to 2.5.0.Final --- .../server/iceberg/testresources/TestChangeEvent.java | 4 +++- pom.xml | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java index 1fdbf734..b1f25633 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java @@ -129,7 +129,9 @@ public V record() { public String destination() { return destination; } - + public Integer partition() { + return 0; + } @Override public String toString() { return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value + ", sourceRecord=" + destination + "]"; diff --git a/pom.xml b/pom.xml index 2eca75da..731d1ab9 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ pom - 0.3.0-SNAPSHOT + 0.4.0-SNAPSHOT UTF-8 @@ -37,7 +37,7 @@ 2.19.4 1.19.3 - 2.4.0.CR1 + 2.5.0.Final 8.0.32 3.5.3 From b7a7c09fe8356a8188f6c72c693a7f9c03da3eac Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 1 Jan 2024 14:35:39 +0100 Subject: [PATCH 2/2] Upgrade debezium to 2.5.0.Final --- .../offset/IcebergOffsetBackingStore.java | 70 +++++++++++++++---- 1 file changed, 55 insertions(+), 15 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index 8e37e46c..b3855179 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -8,26 +8,14 @@ package io.debezium.server.iceberg.offset; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.Strings; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.enterprise.context.Dependent; import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; @@ -50,6 +38,20 @@ import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import static io.debezium.server.iceberg.IcebergChangeConsumer.PROP_PREFIX; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -102,6 +104,44 @@ public synchronized void start() { load(); } + @Override + public synchronized void stop() { + if (executor != null) { + shutdownExecutorServiceQuietly(executor, 30, TimeUnit.SECONDS); + executor = null; + } + LOG.info("Stopped IcebergOffsetBackingStore table:{}", tableFullName); + } + + + /** + * Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks, + * and then calling shutdownNow, if necessary, to cancel any lingering tasks. + * After the timeout/on interrupt, the service is forcefully closed. + * @param executorService The service to shut down. + * @param timeout The timeout of the shutdown. + * @param timeUnit The time unit of the shutdown timeout. + */ + public static void shutdownExecutorServiceQuietly(ExecutorService executorService, + long timeout, TimeUnit timeUnit) { + executorService.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!executorService.awaitTermination(timeout, timeUnit)) { + executorService.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(timeout, timeUnit)) { + LOG.error("Executor {} did not terminate in time", executorService); + } + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + private void initializeTable() { if (icebergCatalog.tableExists(tableId)) { offsetTable = icebergCatalog.loadTable(tableId);