Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade debezium to 2.5.0.Final #262

Merged
merged 2 commits into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,14 @@

package io.debezium.server.iceberg.offset;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.Strings;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.enterprise.context.Dependent;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -50,6 +38,20 @@
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static io.debezium.server.iceberg.IcebergChangeConsumer.PROP_PREFIX;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
Expand Down Expand Up @@ -102,6 +104,44 @@ public synchronized void start() {
load();
}

@Override
public synchronized void stop() {
if (executor != null) {
shutdownExecutorServiceQuietly(executor, 30, TimeUnit.SECONDS);
executor = null;
}
LOG.info("Stopped IcebergOffsetBackingStore table:{}", tableFullName);
}


/**
* Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks,
* and then calling shutdownNow, if necessary, to cancel any lingering tasks.
* After the timeout/on interrupt, the service is forcefully closed.
* @param executorService The service to shut down.
* @param timeout The timeout of the shutdown.
* @param timeUnit The time unit of the shutdown timeout.
*/
public static void shutdownExecutorServiceQuietly(ExecutorService executorService,
long timeout, TimeUnit timeUnit) {
executorService.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executorService.awaitTermination(timeout, timeUnit)) {
executorService.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executorService.awaitTermination(timeout, timeUnit)) {
LOG.error("Executor {} did not terminate in time", executorService);
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executorService.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

private void initializeTable() {
if (icebergCatalog.tableExists(tableId)) {
offsetTable = icebergCatalog.loadTable(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public V record() {
public String destination() {
return destination;
}

public Integer partition() {
return 0;
}
@Override
public String toString() {
return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value + ", sourceRecord=" + destination + "]";
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<packaging>pom</packaging>

<properties>
<revision>0.3.0-SNAPSHOT</revision>
<revision>0.4.0-SNAPSHOT</revision>

<!-- Instruct the build to use only UTF-8 encoding for source code -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -37,7 +37,7 @@
<version.awssdk>2.19.4</version.awssdk>
<version.testcontainers>1.19.3</version.testcontainers>
<!-- Debezium -->
<version.debezium>2.4.0.CR1</version.debezium>
<version.debezium>2.5.0.Final</version.debezium>
<version.mysql.driver>8.0.32</version.mysql.driver>
<!-- Quarkus -->
<version.quarkus>3.5.3</version.quarkus>
Expand Down