diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java new file mode 100644 index 00000000..abf29bf7 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java @@ -0,0 +1,92 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg; + +import io.debezium.DebeziumException; +import io.debezium.config.CommonConnectorConfig; + +import java.lang.management.ManagementFactory; +import java.util.Optional; +import javax.enterprise.context.Dependent; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms) + * + * @author Ismail Simsek + */ +@Dependent +public class DebeziumMetrics { + protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMetrics.class); + final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + @ConfigProperty(name = "debezium.sink.batch.metrics.snapshot-mbean", defaultValue = "") + Optional snapshotMbean; + @ConfigProperty(name = "debezium.sink.batch.metrics.streaming-mbean", defaultValue = "") + Optional streamingMbean; + @ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "") + int maxQueueSize; + + ObjectName snapshotMetricsObjectName; + ObjectName streamingMetricsObjectName; + + public void initizalize() throws DebeziumException { + assert snapshotMbean.isPresent() : + "Snapshot metrics Mbean `debezium.sink.batch.metrics.snapshot-mbean` not provided"; + assert streamingMbean.isPresent() : + "Streaming metrics Mbean `debezium.sink.batch.metrics.streaming-mbean` not provided"; + try { + snapshotMetricsObjectName = new ObjectName(snapshotMbean.get()); + streamingMetricsObjectName = new ObjectName(streamingMbean.get()); + } catch (Exception e) { + throw new DebeziumException(e); + } + } + + public boolean snapshotRunning() { + try { + return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotRunning"); + } catch (Exception e) { + throw new DebeziumException(e); + } + } + + public boolean snapshotCompleted() { + try { + return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotCompleted"); + } catch (Exception e) { + throw new DebeziumException(e); + } + } + + public int streamingQueueRemainingCapacity() { + try { + return (int) mbeanServer.getAttribute(streamingMetricsObjectName, "QueueRemainingCapacity"); + } catch (Exception e) { + throw new DebeziumException(e); + } + } + + public int streamingQueueCurrentSize() { + return maxQueueSize - streamingQueueRemainingCapacity(); + } + + public long streamingMilliSecondsBehindSource() { + try { + return (long) mbeanServer.getAttribute(streamingMetricsObjectName, "MilliSecondsBehindSource"); + } catch (Exception e) { + throw new DebeziumException(e); + } + } + +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 3b7a3d5e..74dd961d 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -8,11 +8,13 @@ package io.debezium.server.iceberg; +import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; import io.debezium.serde.DebeziumSerdes; import io.debezium.server.BaseChangeConsumer; +import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; import java.io.Closeable; import java.io.IOException; @@ -23,6 +25,9 @@ import java.util.stream.Collectors; import javax.annotation.PostConstruct; import javax.enterprise.context.Dependent; +import javax.enterprise.inject.Any; +import javax.enterprise.inject.Instance; +import javax.enterprise.inject.literal.NamedLiteral; import javax.inject.Inject; import javax.inject.Named; @@ -75,8 +80,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String namespace; @ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default") String catalogName; - @ConfigProperty(name = "debezium.sink.iceberg.dynamic-wait", defaultValue = "true") - boolean dynamicWaitEnabled; @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsertData; @@ -88,8 +91,13 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu @ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms") String sourceTsMsColumn; + @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") + String batchSizeWaitName; + @Inject - BatchDynamicWait dynamicWait; + @Any + Instance batchSizeWaitInstances; + InterfaceBatchSizeWait batchSizeWait; static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); Configuration hadoopConf = new Configuration(); @@ -116,6 +124,16 @@ void connect() throws InterruptedException { valSerde.configure(Collections.emptyMap(), false); valDeserializer = valSerde.deserializer(); + + Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); + if (instance.isAmbiguous()) { + throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found"); + } else if (instance.isUnsatisfied()) { + throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available"); + } + batchSizeWait = instance.get(); + batchSizeWait.initizalize(); + LOGGER.info("Using {}", batchSizeWait.getClass().getName()); } public String map(String destination) { @@ -172,9 +190,7 @@ public void handleBatch(List> records, DebeziumEngin } committer.markBatchFinished(); - if (dynamicWaitEnabled) { - dynamicWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); - } + batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); } @@ -257,7 +273,7 @@ private void addToTable(Table icebergTable, ArrayList batchSizeWaitInstances; + InterfaceBatchSizeWait batchSizeWait; private TableIdentifier tableIdentifier; Map icebergProperties = new ConcurrentHashMap<>(); @@ -132,6 +140,16 @@ void connect() throws InterruptedException { } // load table eventTable = icebergCatalog.loadTable(tableIdentifier); + + Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); + if (instance.isAmbiguous()) { + throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found"); + } else if (instance.isUnsatisfied()) { + throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available"); + } + batchSizeWait = instance.get(); + batchSizeWait.initizalize(); + LOGGER.info("Using {}", batchSizeWait.getClass().getName()); } public GenericRecord getIcebergRecord(String destination, ChangeEvent record, OffsetDateTime batchTime) { @@ -172,9 +190,8 @@ public void handleBatch(List> records, DebeziumEngin // committer.markProcessed(record); committer.markBatchFinished(); - if (dynamicWaitEnabled) { - dynamicWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); - } + batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); + } private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList icebergRecords) throws InterruptedException { @@ -220,7 +237,7 @@ private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList eventTable.newAppend() .appendFile(dataFile) .commit(); - LOGGER.info("Committed events to table! {}", eventTable.location()); + LOGGER.info("Committed {} events to table! {}", icebergRecords.size(), eventTable.location()); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/BatchDynamicWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java similarity index 71% rename from debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/BatchDynamicWait.java rename to debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java index 96e66418..53908a3f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/BatchDynamicWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java @@ -6,15 +6,17 @@ * */ -package io.debezium.server.iceberg; +package io.debezium.server.iceberg.batchsizewait; import java.util.IntSummaryStatistics; import java.util.LinkedList; import javax.enterprise.context.Dependent; +import javax.inject.Named; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE; /** * Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms) @@ -22,19 +24,20 @@ * @author Ismail Simsek */ @Dependent -public class BatchDynamicWait { - protected static final Logger LOGGER = LoggerFactory.getLogger(BatchDynamicWait.class); +@Named("DynamicBatchSizeWait") +public class DynamicBatchSizeWait implements InterfaceBatchSizeWait { + protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class); - @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "2048") + @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = DEFAULT_MAX_BATCH_SIZE + "") Integer maxBatchSize; - @ConfigProperty(name = "debezium.sink.batch.dynamic-wait.max-wait-ms", defaultValue = "300000") + @ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000") Integer maxWaitMs; LinkedList batchSizeHistory = new LinkedList(); LinkedList sleepMsHistory = new LinkedList(); - public BatchDynamicWait() { + public DynamicBatchSizeWait() { batchSizeHistory.add(1); batchSizeHistory.add(1); batchSizeHistory.add(1); @@ -77,19 +80,17 @@ else if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.90) { sleepMsHistory.add(Math.min(Math.max(sleepMs, 100), maxWaitMs)); sleepMsHistory.removeFirst(); - LOGGER.debug("Calculating Wait delay\nmax.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory" + - "{}\nval{}", - maxBatchSize, - maxWaitMs, - batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast()); + LOGGER.debug("Calculating Wait delay\n" + + "max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}", + maxBatchSize, maxWaitMs, batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast()); return sleepMsHistory.getLast(); } - public void waitMs(Integer numRecords, Integer processingTimeMs) throws InterruptedException { - int sleepMs = Math.max(getWaitMs(numRecords) - processingTimeMs, 0); + public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException { + int sleepMs = Math.max(getWaitMs(numRecordsProcessed) - processingTimeMs, 0); if (sleepMs > 2000) { - LOGGER.info("Waiting {} ms", sleepMs); + LOGGER.debug("Waiting {} ms", sleepMs); Thread.sleep(sleepMs); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java new file mode 100644 index 00000000..29071f7b --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java @@ -0,0 +1,23 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.batchsizewait; + +/** + * Implementation of the consumer that delivers the messages into Amazon S3 destination. + * + * @author Ismail Simsek + */ +public interface InterfaceBatchSizeWait { + + default void initizalize() { + } + + void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException; + +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java new file mode 100644 index 00000000..e71dde01 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java @@ -0,0 +1,88 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.batchsizewait; + +import io.debezium.DebeziumException; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.server.iceberg.DebeziumMetrics; + +import javax.enterprise.context.Dependent; +import javax.inject.Inject; +import javax.inject.Named; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms) + * + * @author Ismail Simsek + */ +@Dependent +@Named("MaxBatchSizeWait") +public class MaxBatchSizeWait implements InterfaceBatchSizeWait { + protected static final Logger LOGGER = LoggerFactory.getLogger(MaxBatchSizeWait.class); + + @ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "") + int maxQueueSize; + @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE + "") + int maxBatchSize; + @ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000") + int maxWaitMs; + @ConfigProperty(name = "debezium.sink.batch.batch-size-wait.wait-interval-ms", defaultValue = "10000") + int waitIntervalMs; + + @Inject + DebeziumMetrics debeziumMetrics; + + @Override + public void initizalize() throws DebeziumException { + assert waitIntervalMs < maxWaitMs : "`wait-interval-ms` cannot be bigger than `max-wait-ms`"; + debeziumMetrics.initizalize(); + } + +// log warning! +// if (streamingSecondsBehindSource > 30 * 60) { // behind 30 minutes +// LOGGER.warn("Streaming {} is behind by {} seconds, QueueCurrentSize:{}, QueueTotalCapacity:{}, " + +// "SnapshotCompleted:{}", +// numRecordsProcessed, streamingQueueCurrentSize, maxQueueSize, streamingSecondsBehindSource, snapshotCompleted +// ); +// } + + @Override + public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException { + + if (debeziumMetrics.snapshotRunning()) { + return; + } + + final int streamingQueueCurrentSize = debeziumMetrics.streamingQueueCurrentSize(); + final int streamingSecondsBehindSource = (int) (debeziumMetrics.streamingMilliSecondsBehindSource() / 1000); + final boolean snapshotCompleted = debeziumMetrics.snapshotCompleted(); + + LOGGER.debug("Processed {}, QueueCurrentSize:{}, QueueTotalCapacity:{}, SecondsBehindSource:{}, SnapshotCompleted:{}", + numRecordsProcessed, streamingQueueCurrentSize, maxQueueSize, streamingSecondsBehindSource, snapshotCompleted + ); + + int totalWaitMs = 0; + while (totalWaitMs < maxWaitMs && debeziumMetrics.streamingQueueCurrentSize() < maxBatchSize) { + totalWaitMs += waitIntervalMs; + LOGGER.debug("Sleeping {} Milliseconds, QueueCurrentSize:{} < maxBatchSize:{}", + waitIntervalMs, debeziumMetrics.streamingQueueCurrentSize(), maxBatchSize); + + Thread.sleep(waitIntervalMs); + } + + LOGGER.debug("Total wait {} Milliseconds, QueueCurrentSize:{} < maxBatchSize:{}", + totalWaitMs, debeziumMetrics.streamingQueueCurrentSize(), maxBatchSize); + + } + +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java new file mode 100644 index 00000000..5bd8d8f4 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java @@ -0,0 +1,26 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.batchsizewait; + +import javax.enterprise.context.Dependent; +import javax.inject.Named; + +/** + * Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms) + * + * @author Ismail Simsek + */ +@Dependent +@Named("NoBatchSizeWait") +public class NoBatchSizeWait implements InterfaceBatchSizeWait { + + public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) { + } + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java index 2ad50d13..caa9fd0c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java @@ -8,9 +8,9 @@ package io.debezium.server.iceberg; -import io.debezium.server.testresource.BaseSparkTest; -import io.debezium.server.testresource.S3Minio; -import io.debezium.server.testresource.SourceMysqlDB; +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.SourceMysqlDB; import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index e91a286f..1b7eed26 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -12,8 +12,6 @@ import io.debezium.util.Testing; import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -21,57 +19,54 @@ public class ConfigSource extends TestConfigSource { public static final String S3_REGION = "us-east-1"; public static final String S3_BUCKET = "test-bucket"; - - final Map s3Test = new HashMap<>(); public static final Path HISTORY_FILE = Testing.Files.createTestingPath("dbhistory.txt").toAbsolutePath(); public ConfigSource() { - s3Test.put("quarkus.profile", "postgresql"); + config.put("quarkus.profile", "postgresql"); // common sink conf - s3Test.put("debezium.sink.type", "iceberg"); - s3Test.put("debezium.sink.iceberg.upsert", "false"); - s3Test.put("debezium.sink.iceberg.upsert-keep-deletes", "true"); + config.put("debezium.sink.type", "iceberg"); + config.put("debezium.sink.iceberg.upsert", "false"); + config.put("debezium.sink.iceberg.upsert-keep-deletes", "true"); // ==== configure batch behaviour/size ==== // Positive integer value that specifies the maximum size of each batch of events that should be processed during // each iteration of this connector. Defaults to 2048. - s3Test.put("debezium.source.max.batch.size", "2"); + config.put("debezium.source.max.batch.size", "2"); // Positive integer value that specifies the number of milliseconds the connector should wait for new change // events to appear before it starts processing a batch of events. Defaults to 1000 milliseconds, or 1 second. - s3Test.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds! + config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds! // iceberg - s3Test.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_"); - s3Test.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET); - s3Test.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse"); - s3Test.put("debezium.sink.iceberg.type", "hadoop"); - s3Test.put("debezium.sink.iceberg.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog"); + config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_"); + config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET); + config.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse"); + config.put("debezium.sink.iceberg.type", "hadoop"); + config.put("debezium.sink.iceberg.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog"); // enable disable schema - s3Test.put("debezium.format.value.schemas.enable", "true"); + config.put("debezium.format.value.schemas.enable", "true"); // debezium unwrap message - s3Test.put("debezium.transforms", "unwrap"); - s3Test.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState"); - s3Test.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db"); - s3Test.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite"); + config.put("debezium.transforms", "unwrap"); + config.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState"); + config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db"); + config.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite"); // DEBEZIUM SOURCE conf - s3Test.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); - s3Test.put("debezium.source.database.history", "io.debezium.relational.history.FileDatabaseHistory"); - s3Test.put("debezium.source.database.history.file.filename", HISTORY_FILE.toAbsolutePath().toString()); - s3Test.put("debezium.source.offset.flush.interval.ms", "60000"); - s3Test.put("debezium.source.database.server.name", "testc"); - s3Test.put("%postgresql.debezium.source.schema.whitelist", "inventory"); - s3Test.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," + + config.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + config.put("debezium.source.database.history", "io.debezium.relational.history.FileDatabaseHistory"); + config.put("debezium.source.database.history.file.filename", HISTORY_FILE.toAbsolutePath().toString()); + config.put("debezium.source.offset.flush.interval.ms", "60000"); + config.put("debezium.source.database.server.name", "testc"); + config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); + config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," + "inventory.geom,inventory.table_datatypes,inventory.test_date_table"); config.put("quarkus.log.level", "INFO"); - s3Test.put("quarkus.log.category.\"org.apache.spark\".level", "WARN"); - s3Test.put("quarkus.log.category.\"org.apache.hadoop\".level", "ERROR"); - s3Test.put("quarkus.log.category.\"org.apache.parquet\".level", "WARN"); - s3Test.put("quarkus.log.category.\"org.eclipse.jetty\".level", "WARN"); - s3Test.put("quarkus.log.category.\"org.apache.iceberg\".level", "ERROR"); + config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN"); + config.put("quarkus.log.category.\"org.apache.hadoop\".level", "ERROR"); + config.put("quarkus.log.category.\"org.apache.parquet\".level", "WARN"); + config.put("quarkus.log.category.\"org.eclipse.jetty\".level", "WARN"); + config.put("quarkus.log.category.\"org.apache.iceberg\".level", "ERROR"); - config = s3Test; } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index e1da209e..57d0aec1 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -9,9 +9,9 @@ package io.debezium.server.iceberg; import io.debezium.server.DebeziumServer; -import io.debezium.server.testresource.BaseSparkTest; -import io.debezium.server.testresource.S3Minio; -import io.debezium.server.testresource.SourcePostgresqlDB; +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.SourcePostgresqlDB; import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; @@ -197,6 +197,8 @@ public void testIcebergConsumer() throws Exception { } }); + S3Minio.listFiles(); + } @Test diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java index 3ec8cf53..9ab5edba 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java @@ -9,7 +9,7 @@ package io.debezium.server.iceberg; import io.debezium.engine.ChangeEvent; -import io.debezium.server.testresource.*; +import io.debezium.server.iceberg.testresources.*; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 8c71e9a4..f79269fd 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -9,7 +9,7 @@ package io.debezium.server.iceberg; import io.debezium.engine.ChangeEvent; -import io.debezium.server.testresource.*; +import io.debezium.server.iceberg.testresources.*; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index 13e6d793..ddcf79c8 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -8,9 +8,9 @@ package io.debezium.server.iceberg; -import io.debezium.server.testresource.BaseSparkTest; -import io.debezium.server.testresource.S3Minio; -import io.debezium.server.testresource.SourcePostgresqlDB; +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.SourcePostgresqlDB; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java new file mode 100644 index 00000000..2262a7f4 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java @@ -0,0 +1,71 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.batchsizewait; + +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +import javax.inject.Inject; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +@QuarkusTest +@TestProfile(DynamicBatchSizeWaitTestProfile.class) +class DynamicBatchSizeWaitTest { + + @Inject + DynamicBatchSizeWait waitBatchSize; + + @ConfigProperty(name = "debezium.source.poll.interval.ms", defaultValue = "1000") + Integer pollIntervalMs; + + @Test + void shouldIncreaseSleepMs() { + DynamicBatchSizeWait dynamicSleep = (DynamicBatchSizeWait) waitBatchSize; + // if its consuming small batch sizes, the sleep delay should increase to adjust batch size + // sleep size should increase and stay at max (pollIntervalMs) + int sleep = 0; + sleep = dynamicSleep.getWaitMs(3); + Assertions.assertTrue(sleep < pollIntervalMs); + sleep = dynamicSleep.getWaitMs(2); + Assertions.assertTrue(sleep <= pollIntervalMs); + sleep = dynamicSleep.getWaitMs(1); + Assertions.assertEquals((Integer) sleep, pollIntervalMs); + sleep = dynamicSleep.getWaitMs(1); + Assertions.assertEquals((Integer) sleep, pollIntervalMs); + sleep = dynamicSleep.getWaitMs(1); + Assertions.assertEquals((Integer) sleep, pollIntervalMs); + } + + @Test + void shouldDecreaseSleepMs() { + DynamicBatchSizeWait dynamicSleep = (DynamicBatchSizeWait) waitBatchSize; + // if its consuming large batch sizes, the sleep delay should decrease + dynamicSleep.getWaitMs(3); + dynamicSleep.getWaitMs(2); + dynamicSleep.getWaitMs(1); + // start test + // max batch size = debezium.source.max.batch.size = 100 + int sleep1 = dynamicSleep.getWaitMs(120); + int sleep2 = dynamicSleep.getWaitMs(120); + Assertions.assertTrue(sleep2 <= sleep1); + int sleep3 = dynamicSleep.getWaitMs(120); + Assertions.assertTrue(sleep3 <= sleep2); + int sleep4 = dynamicSleep.getWaitMs(120); + Assertions.assertTrue(sleep4 <= sleep3); + dynamicSleep.getWaitMs(120); + dynamicSleep.getWaitMs(120); + dynamicSleep.getWaitMs(120); + dynamicSleep.getWaitMs(120); + Assertions.assertTrue(dynamicSleep.getWaitMs(120) <= 100); + } + +} \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTestProfile.java new file mode 100644 index 00000000..3c0dbebf --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTestProfile.java @@ -0,0 +1,27 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.batchsizewait; + +import io.quarkus.test.junit.QuarkusTestProfile; + +import java.util.HashMap; +import java.util.Map; + +public class DynamicBatchSizeWaitTestProfile implements QuarkusTestProfile { + + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("quarkus.arc.selected-alternatives", "DynamicBatchSizeWait"); + config.put("debezium.source.max.batch.size", "100"); + config.put("debezium.sink.batch.batch-size-wait.max-wait-ms", "5000"); + config.put("debezium.source.poll.interval.ms", "5000"); + return config; + } +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java new file mode 100644 index 00000000..d1f08064 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java @@ -0,0 +1,58 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.batchsizewait; + +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +import java.time.Duration; +import javax.inject.Inject; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.awaitility.Awaitility; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Test; + +@QuarkusTest +@TestProfile(MaxBatchSizeWaitTestProfile.class) +class MaxBatchSizeWaitTest extends BaseSparkTest { + @Inject + MaxBatchSizeWait waitBatchSize; + @ConfigProperty(name = "debezium.source.poll.interval.ms", defaultValue = "1000") + Integer pollIntervalMs; + @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000") + Integer maxBatchSize; + + + @Test + public void testPerformance() throws Exception { + int iteration = 100; + PGCreateTestDataTable(); + for (int i = 0; i <= iteration; i++) { + PGLoadTestDataTable(maxBatchSize / 10, true); + } + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + Dataset df = getTableData("testc.inventory.test_date_table"); + df.createOrReplaceGlobalTempView("test_date_table_batch_size"); + df = spark + .sql("SELECT substring(input_file,94,60) as input_file, " + + "count(*) as batch_size FROM global_temp.test_date_table_batch_size group " + + "by 1"); + df.show(false); + return df.filter("batch_size = " + maxBatchSize).count() >= 5; + } catch (Exception e) { + return false; + } + }); + } + +} \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java new file mode 100644 index 00000000..2a99a2a2 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java @@ -0,0 +1,34 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.batchsizewait; + +import io.quarkus.test.junit.QuarkusTestProfile; + +import java.util.HashMap; +import java.util.Map; + +public class MaxBatchSizeWaitTestProfile implements QuarkusTestProfile { + + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + // wait + config.put("debezium.sink.batch.batch-size-wait", "MaxBatchSizeWait"); + config.put("debezium.sink.batch.metrics.snapshot-mbean", "debezium.postgres:type=connector-metrics,context=snapshot,server=testc"); + config.put("debezium.sink.batch.metrics.streaming-mbean", "debezium.postgres:type=connector-metrics,context=streaming,server=testc"); + config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + config.put("debezium.source.max.batch.size", "5000"); + config.put("debezium.source.max.queue.size", "70000"); + //config.put("debezium.source.poll.interval.ms", "1000"); + config.put("debezium.sink.batch.batch-size-wait.max-wait-ms", "5000"); + config.put("debezium.sink.batch.batch-size-wait.wait-interval-ms", "1000"); + config.put("quarkus.log.category.\"io.debezium.server.iceberg.batchsizewait\".level", "DEBUG"); + return config; + } +} \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java similarity index 80% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/BaseSparkTest.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index cc341394..1a80d171 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -6,7 +6,7 @@ * */ -package io.debezium.server.testresource; +package io.debezium.server.iceberg.testresources; import io.debezium.server.iceberg.ConfigSource; import io.debezium.server.iceberg.IcebergUtil; @@ -21,8 +21,6 @@ import org.eclipse.microprofile.config.ConfigProvider; import org.junit.jupiter.api.BeforeAll; import static io.debezium.server.iceberg.ConfigSource.S3_BUCKET; -import static io.debezium.server.testresource.TestUtil.randomInt; -import static io.debezium.server.testresource.TestUtil.randomString; /** * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. @@ -87,16 +85,23 @@ public static void PGCreateTestDataTable() throws Exception { } public static int PGLoadTestDataTable(int numRows) throws Exception { + return PGLoadTestDataTable(numRows, false); + } + + public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) throws Exception { int numInsert = 0; do { new Thread(() -> { try { + if (addRandomDelay) { + Thread.sleep(TestUtil.randomInt(20000, 100000)); + } String sql = "INSERT INTO inventory.test_date_table (c_id, c_text, c_varchar ) " + "VALUES "; - StringBuilder values = new StringBuilder("\n(" + randomInt(15, 32) + ", '" + randomString(524) + "', '" + randomString(524) + "')"); + StringBuilder values = new StringBuilder("\n(" + TestUtil.randomInt(15, 32) + ", '" + TestUtil.randomString(524) + "', '" + TestUtil.randomString(524) + "')"); for (int i = 0; i < 100; i++) { - values.append("\n,(").append(randomInt(15, 32)).append(", '").append(randomString(524)).append("', '").append(randomString(524)).append("')"); + values.append("\n,(").append(TestUtil.randomInt(15, 32)).append(", '").append(TestUtil.randomString(524)).append("', '").append(TestUtil.randomString(524)).append("')"); } SourcePostgresqlDB.runSQL(sql + values); SourcePostgresqlDB.runSQL("COMMIT;"); @@ -126,9 +131,9 @@ public static int mysqlLoadTestDataTable(int numRows) throws Exception { do { String sql = "INSERT INTO inventory.test_date_table (c_id, c_text, c_varchar ) " + "VALUES "; - StringBuilder values = new StringBuilder("\n(" + randomInt(15, 32) + ", '" + randomString(524) + "', '" + randomString(524) + "')"); + StringBuilder values = new StringBuilder("\n(" + TestUtil.randomInt(15, 32) + ", '" + TestUtil.randomString(524) + "', '" + TestUtil.randomString(524) + "')"); for (int i = 0; i < 10; i++) { - values.append("\n,(").append(randomInt(15, 32)).append(", '").append(randomString(524)).append("', '").append(randomString(524)).append("')"); + values.append("\n,(").append(TestUtil.randomInt(15, 32)).append(", '").append(TestUtil.randomString(524)).append("', '").append(TestUtil.randomString(524)).append("')"); } SourceMysqlDB.runSQL(sql + values); numInsert += 10; @@ -137,7 +142,7 @@ public static int mysqlLoadTestDataTable(int numRows) throws Exception { } public Dataset getTableData(String table) { - return spark.newSession().sql("SELECT * FROM default.debeziumcdc_" + table.replace(".", "_")); + return spark.newSession().sql("SELECT input_file_name() as input_file, * FROM default.debeziumcdc_" + table.replace(".", "_")); } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/S3Minio.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java similarity index 98% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/S3Minio.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java index ddc2c768..3a01aebd 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/S3Minio.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java @@ -6,7 +6,7 @@ * */ -package io.debezium.server.testresource; +package io.debezium.server.iceberg.testresources; import io.debezium.server.iceberg.ConfigSource; import io.minio.ListObjectsArgs; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourceMysqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java similarity index 98% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourceMysqlDB.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java index 14e83b4c..de315aea 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourceMysqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java @@ -6,7 +6,7 @@ * */ -package io.debezium.server.testresource; +package io.debezium.server.iceberg.testresources; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java similarity index 98% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourcePostgresqlDB.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java index fdf5e808..2467a9a4 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourcePostgresqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java @@ -6,7 +6,7 @@ * */ -package io.debezium.server.testresource; +package io.debezium.server.iceberg.testresources; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java similarity index 95% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestChangeEvent.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java index 6db18f9c..d1ff77e3 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java @@ -6,7 +6,7 @@ * */ -package io.debezium.server.testresource; +package io.debezium.server.iceberg.testresources; import io.debezium.engine.ChangeEvent; import io.debezium.engine.RecordChangeEvent; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestUtil.java similarity index 96% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestUtil.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestUtil.java index 4f57e549..a1a72b03 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestUtil.java @@ -6,7 +6,7 @@ * */ -package io.debezium.server.testresource; +package io.debezium.server.iceberg.testresources; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine;