diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java deleted file mode 100644 index ae008f1d..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg.batchsizewait; - -import java.util.IntSummaryStatistics; -import java.util.LinkedList; - -import jakarta.enterprise.context.Dependent; -import jakarta.inject.Named; -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE; - -/** - * Optimizes batch size around 85%-90% of max,batch.size by dynamically calculating sleep(ms) - * - * @author Ismail Simsek - */ -@Dependent -@Named("DynamicBatchSizeWait") -@Deprecated -public class DynamicBatchSizeWait implements InterfaceBatchSizeWait { - protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class); - final LinkedList batchSizeHistory = new LinkedList<>(); - final LinkedList sleepMsHistory = new LinkedList<>(); - @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = DEFAULT_MAX_BATCH_SIZE + "") - Integer maxBatchSize; - @ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000") - Integer maxWaitMs; - - public DynamicBatchSizeWait() { - batchSizeHistory.add(1); - batchSizeHistory.add(1); - batchSizeHistory.add(1); - sleepMsHistory.add(100); - sleepMsHistory.add(100); - sleepMsHistory.add(100); - } - - private double getAverage(LinkedList linkedList) { - IntSummaryStatistics stats = linkedList.stream() - .mapToInt((x) -> x) - .summaryStatistics(); - return stats.getAverage(); - } - - public int getWaitMs(Integer numRecords) { - batchSizeHistory.add(numRecords); - batchSizeHistory.removeFirst(); - int sleepMs = 1; - - // if batchsize > XX% decrease wait - if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.97) { - sleepMs = (int) (sleepMsHistory.getLast() * 0.50); - } - // if batchsize > XX% decrease wait - else if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.95) { - sleepMs = (int) (sleepMsHistory.getLast() * 0.65); - } - // if batchsize > XX% decrease wait - else if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.90) { - sleepMs = (int) (sleepMsHistory.getLast() * 0.80); - } else if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.85) { - return sleepMsHistory.getLast(); - } - // else increase - else { - sleepMs = (sleepMsHistory.getLast() * maxBatchSize) / numRecords; - } - - sleepMsHistory.add(Math.min(Math.max(sleepMs, 100), maxWaitMs)); - sleepMsHistory.removeFirst(); - - LOGGER.debug("Calculating Wait delay\n" + - "max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}", - maxBatchSize, maxWaitMs, batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast()); - - return sleepMsHistory.getLast(); - } - - public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException { - int sleepMs = Math.max(getWaitMs(numRecordsProcessed) - processingTimeMs, 0); - if (sleepMs > 2000) { - LOGGER.debug("Waiting {} ms", sleepMs); - Thread.sleep(sleepMs); - } - } - -} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java deleted file mode 100644 index 62e07552..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg.batchsizewait; - -import io.debezium.server.iceberg.testresources.S3Minio; -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusTest; -import io.quarkus.test.junit.QuarkusTestProfile; -import io.quarkus.test.junit.TestProfile; - -import java.util.HashMap; -import java.util.Map; - -import jakarta.inject.Inject; -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -@QuarkusTest -@TestProfile(DynamicBatchSizeWaitTest.TestProfile.class) -@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) -class DynamicBatchSizeWaitTest { - - @Inject - DynamicBatchSizeWait waitBatchSize; - - @ConfigProperty(name = "debezium.source.poll.interval.ms", defaultValue = "1000") - Integer pollIntervalMs; - - @Test - void shouldIncreaseSleepMs() { - DynamicBatchSizeWait dynamicSleep = waitBatchSize; - // if its consuming small batch sizes, the sleep delay should increase to adjust batch size - // sleep size should increase and stay at max (pollIntervalMs) - int sleep = 0; - sleep = dynamicSleep.getWaitMs(3); - Assertions.assertTrue(sleep < pollIntervalMs); - sleep = dynamicSleep.getWaitMs(2); - Assertions.assertTrue(sleep <= pollIntervalMs); - sleep = dynamicSleep.getWaitMs(1); - Assertions.assertEquals((Integer) sleep, pollIntervalMs); - sleep = dynamicSleep.getWaitMs(1); - Assertions.assertEquals((Integer) sleep, pollIntervalMs); - sleep = dynamicSleep.getWaitMs(1); - Assertions.assertEquals((Integer) sleep, pollIntervalMs); - } - - @Test - void shouldDecreaseSleepMs() { - DynamicBatchSizeWait dynamicSleep = waitBatchSize; - // if its consuming large batch sizes, the sleep delay should decrease - dynamicSleep.getWaitMs(3); - dynamicSleep.getWaitMs(2); - dynamicSleep.getWaitMs(1); - // start test - // max batch size = debezium.source.max.batch.size = 100 - int sleep1 = dynamicSleep.getWaitMs(120); - int sleep2 = dynamicSleep.getWaitMs(120); - Assertions.assertTrue(sleep2 <= sleep1); - int sleep3 = dynamicSleep.getWaitMs(120); - Assertions.assertTrue(sleep3 <= sleep2); - int sleep4 = dynamicSleep.getWaitMs(120); - Assertions.assertTrue(sleep4 <= sleep3); - dynamicSleep.getWaitMs(120); - dynamicSleep.getWaitMs(120); - dynamicSleep.getWaitMs(120); - dynamicSleep.getWaitMs(120); - Assertions.assertTrue(dynamicSleep.getWaitMs(120) <= 100); - } - - public static class TestProfile implements QuarkusTestProfile { - - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - config.put("quarkus.arc.selected-alternatives", "DynamicBatchSizeWait"); - config.put("debezium.source.max.batch.size", "100"); - config.put("debezium.sink.batch.batch-size-wait.max-wait-ms", "5000"); - config.put("debezium.source.poll.interval.ms", "5000"); - return config; - } - } - -} \ No newline at end of file diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index b21c8edf..e87ea332 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -33,14 +33,14 @@ services: networks: iceberg_net: ports: - - 5432:5432 + - "5432:5432" jdbccatalog: image: postgres:14-alpine container_name: iceberg-jdbc-catalog networks: iceberg_net: ports: - - 5433:5432 + - "5433:5432" environment: - POSTGRES_DB=test - POSTGRES_USER=testuser @@ -56,8 +56,8 @@ services: networks: iceberg_net: ports: - - 9001:9001 - - 9000:9000 + - "9001:9001" + - "9000:9000" command: ["server", "/data", "--console-address", ":9001"] mc: image: minio/mc