Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadFactory;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import io.debezium.engine.format.SerializationFormat;

import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.debezium.inbound.DebeziumMessageProducer;
import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.messaging.support.HeaderMapper;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

/**
* A {@link org.springframework.integration.dsl.MessageProducerSpec} for {@link DebeziumMessageProducer}.
*
* @author Christian Tzolov
* @author Artem Bilan
*
* @since 6.2
*/
Expand Down Expand Up @@ -74,19 +74,18 @@ public DebeziumMessageProducerSpec enableEmptyPayload(boolean enabled) {
}

/**
* Set a {@link ThreadFactory} for the Debezium executor. Defaults to the {@link CustomizableThreadFactory} with a
* {@code debezium:inbound-channel-adapter-thread-} prefix.
* @param threadFactory the {@link ThreadFactory} instance to use.
* Set a {@link TaskExecutor} for the Debezium engine.
* @param taskExecutor the {@link TaskExecutor} to use.
* @return the spec.
*/
public DebeziumMessageProducerSpec threadFactory(ThreadFactory threadFactory) {
this.target.setThreadFactory(threadFactory);
public DebeziumMessageProducerSpec taskExecutor(TaskExecutor taskExecutor) {
this.target.setTaskExecutor(taskExecutor);
return this;
}

/**
* Set the outbound message content type. Must be aligned with the {@link SerializationFormat} configuration used by
* the provided {@link DebeziumEngine}.
* Set the outbound message content type.
* Must be aligned with the {@link SerializationFormat} configuration used by the provided {@link DebeziumEngine}.
* @param contentType payload content type.
* @return the spec.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand All @@ -35,14 +32,15 @@
import io.debezium.engine.Header;
import io.debezium.engine.format.SerializationFormat;

import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.debezium.support.DebeziumHeaders;
import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.HeaderMapper;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;

/**
Expand All @@ -60,12 +58,9 @@ public class DebeziumMessageProducer extends MessageProducerSupport {
private DebeziumEngine<ChangeEvent<byte[], byte[]>> debeziumEngine;

/**
* Debezium Engine is designed to be submitted to an {@link Executor}
* or {@link ExecutorService} for execution by a single thread.
* By default, a single-threaded ExecutorService instance is provided configured with a
* {@link CustomizableThreadFactory} and a `debezium-` thread prefix.
* Debezium Engine is designed to be submitted to an {@link Executor}.
*/
private ExecutorService executorService;
private TaskExecutor taskExecutor;

private String contentType = "application/json";

Expand All @@ -75,8 +70,6 @@ public class DebeziumMessageProducer extends MessageProducerSupport {

private boolean enableBatch = false;

private ThreadFactory threadFactory;

private volatile CountDownLatch lifecycleLatch = new CountDownLatch(0);

/**
Expand Down Expand Up @@ -116,14 +109,12 @@ public void setEnableEmptyPayload(boolean enabled) {
}

/**
* Set a {@link ThreadFactory} for the Debezium executor.
* Defaults to the {@link CustomizableThreadFactory} with a
* {@code debezium:inbound-channel-adapter-thread-} prefix.
* @param threadFactory the {@link ThreadFactory} instance to use.
* Set a {@link TaskExecutor} for the Debezium engine task.
* @param taskExecutor the {@link TaskExecutor} to use.
*/
public void setThreadFactory(ThreadFactory threadFactory) {
Assert.notNull(threadFactory, "'threadFactory' must not be null");
this.threadFactory = threadFactory;
public void setTaskExecutor(TaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "'taskExecutor' must not be null");
this.taskExecutor = taskExecutor;
}

/**
Expand Down Expand Up @@ -156,12 +147,10 @@ public String getComponentType() {
protected void onInit() {
super.onInit();

if (this.threadFactory == null) {
this.threadFactory = new CustomizableThreadFactory(getComponentName() + "-thread-");
if (this.taskExecutor == null) {
this.taskExecutor = new SimpleAsyncTaskExecutor(getComponentName() + "-thread-");
}

this.executorService = Executors.newSingleThreadExecutor(this.threadFactory);

if (!this.enableBatch) {
this.debeziumEngineBuilder.notifying(new StreamChangeEventConsumer<>());
}
Expand All @@ -178,7 +167,7 @@ protected void doStart() {
return;
}
this.lifecycleLatch = new CountDownLatch(1);
this.executorService.execute(() -> {
this.taskExecutor.execute(() -> {
try {
// Runs the debezium connector and deliver database changes to the registered consumer. This method
// blocks until the connector is stopped.
Expand Down Expand Up @@ -213,19 +202,6 @@ protected void doStop() {
}
}

@Override
public void destroy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't you supposed to gracefully stop the task executor on exit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See a destroy() impl in the super class. It does call that stop(); for us.

super.destroy();

this.executorService.shutdown();
try {
this.executorService.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new IllegalStateException("Debezium failed to close!", e);
}
}

@Nullable
private <T> Message<?> toMessage(ChangeEvent<T, T> changeEvent) {
Object key = changeEvent.key();
Expand Down
3 changes: 1 addition & 2 deletions src/reference/asciidoc/debezium.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ Defaults to `false`.
- `headerMapper` - custom `HeaderMapper` implementation that allows for selecting and converting the `ChangeEvent` headers into `Message` headers.
The default `DefaultDebeziumHeaderMapper` implementation provides a setter for `setHeaderNamesToMap`.
By default, all headers are mapped.
- `threadFactory` - Set custom `ThreadFactory` for the Debezium executor service.
Debezium Engine is designed to be submitted to an `Executor` or `ExecutorService` for execution by single thread.
- `taskExecutor` - Set a custom `TaskExecutor` for the Debezium engine.

The following code snippets demonstrate various configuration for this channel adapter:

Expand Down