Skip to content

Commit

Permalink
Upgrade debezium to 2.5.0.Final (#262)
Browse files Browse the repository at this point in the history
* Upgrade debezium to 2.5.0.Final

* Upgrade debezium to 2.5.0.Final
  • Loading branch information
ismailsimsek committed Jan 1, 2024
1 parent 1682980 commit 0d02003
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,14 @@

package io.debezium.server.iceberg.offset;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.Strings;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.enterprise.context.Dependent;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -50,6 +38,20 @@
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static io.debezium.server.iceberg.IcebergChangeConsumer.PROP_PREFIX;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
Expand Down Expand Up @@ -102,6 +104,44 @@ public synchronized void start() {
load();
}

@Override
public synchronized void stop() {
if (executor != null) {
shutdownExecutorServiceQuietly(executor, 30, TimeUnit.SECONDS);
executor = null;
}
LOG.info("Stopped IcebergOffsetBackingStore table:{}", tableFullName);
}


/**
* Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks,
* and then calling shutdownNow, if necessary, to cancel any lingering tasks.
* After the timeout/on interrupt, the service is forcefully closed.
* @param executorService The service to shut down.
* @param timeout The timeout of the shutdown.
* @param timeUnit The time unit of the shutdown timeout.
*/
public static void shutdownExecutorServiceQuietly(ExecutorService executorService,
long timeout, TimeUnit timeUnit) {
executorService.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executorService.awaitTermination(timeout, timeUnit)) {
executorService.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executorService.awaitTermination(timeout, timeUnit)) {
LOG.error("Executor {} did not terminate in time", executorService);
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executorService.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

private void initializeTable() {
if (icebergCatalog.tableExists(tableId)) {
offsetTable = icebergCatalog.loadTable(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public V record() {
public String destination() {
return destination;
}

public Integer partition() {
return 0;
}
@Override
public String toString() {
return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value + ", sourceRecord=" + destination + "]";
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<packaging>pom</packaging>

<properties>
<revision>0.3.0-SNAPSHOT</revision>
<revision>0.4.0-SNAPSHOT</revision>

<!-- Instruct the build to use only UTF-8 encoding for source code -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -37,7 +37,7 @@
<version.awssdk>2.19.4</version.awssdk>
<version.testcontainers>1.19.3</version.testcontainers>
<!-- Debezium -->
<version.debezium>2.4.0.CR1</version.debezium>
<version.debezium>2.5.0.Final</version.debezium>
<version.mysql.driver>8.0.32</version.mysql.driver>
<!-- Quarkus -->
<version.quarkus>3.5.3</version.quarkus>
Expand Down

0 comments on commit 0d02003

Please sign in to comment.