From e57f39682089c6b71893f41f01fdd83412c7151c Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Tue, 3 Aug 2021 11:18:09 +0200 Subject: [PATCH] fix test --- .../server/iceberg/IcebergChangeConsumer.java | 18 ++++++++++++++++++ .../iceberg/IcebergEventsChangeConsumer.java | 18 ++++++++++++++++++ .../batchsizewait/DynamicBatchSizeWait.java | 4 ++-- .../batchsizewait/InterfaceBatchSizeWait.java | 3 --- .../batchsizewait/MaxBatchSizeWait.java | 4 ++-- .../iceberg/batchsizewait/NoBatchSizeWait.java | 4 ++-- .../DynamicBatchSizeWaitTest.java | 2 +- .../batchsizewait/MaxBatchSizeWaitTest.java | 2 +- .../MaxBatchSizeWaitTestProfile.java | 2 +- 9 files changed, 45 insertions(+), 12 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index b173b2e5..74dd961d 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -8,6 +8,7 @@ package io.debezium.server.iceberg; +import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; @@ -24,6 +25,9 @@ import java.util.stream.Collectors; import javax.annotation.PostConstruct; import javax.enterprise.context.Dependent; +import javax.enterprise.inject.Any; +import javax.enterprise.inject.Instance; +import javax.enterprise.inject.literal.NamedLiteral; import javax.inject.Inject; import javax.inject.Named; @@ -87,7 +91,12 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu @ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms") String sourceTsMsColumn; + @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") + String batchSizeWaitName; + @Inject + @Any + Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); @@ -115,7 +124,16 @@ void connect() throws InterruptedException { valSerde.configure(Collections.emptyMap(), false); valDeserializer = valSerde.deserializer(); + + Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); + if (instance.isAmbiguous()) { + throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found"); + } else if (instance.isUnsatisfied()) { + throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available"); + } + batchSizeWait = instance.get(); batchSizeWait.initizalize(); + LOGGER.info("Using {}", batchSizeWait.getClass().getName()); } public String map(String destination) { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index 2a49ddbb..271a3e7d 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -8,6 +8,7 @@ package io.debezium.server.iceberg; +import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; @@ -28,6 +29,9 @@ import java.util.stream.Collectors; import javax.annotation.PostConstruct; import javax.enterprise.context.Dependent; +import javax.enterprise.inject.Any; +import javax.enterprise.inject.Instance; +import javax.enterprise.inject.literal.NamedLiteral; import javax.inject.Inject; import javax.inject.Named; @@ -91,7 +95,12 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D String namespace; @ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default") String catalogName; + @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") + String batchSizeWaitName; + @Inject + @Any + Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; private TableIdentifier tableIdentifier; @@ -131,7 +140,16 @@ void connect() throws InterruptedException { } // load table eventTable = icebergCatalog.loadTable(tableIdentifier); + + Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); + if (instance.isAmbiguous()) { + throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found"); + } else if (instance.isUnsatisfied()) { + throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available"); + } + batchSizeWait = instance.get(); batchSizeWait.initizalize(); + LOGGER.info("Using {}", batchSizeWait.getClass().getName()); } public GenericRecord getIcebergRecord(String destination, ChangeEvent record, OffsetDateTime batchTime) { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java index 1ebc4f28..53908a3f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java @@ -11,7 +11,7 @@ import java.util.IntSummaryStatistics; import java.util.LinkedList; import javax.enterprise.context.Dependent; -import javax.enterprise.inject.Alternative; +import javax.inject.Named; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -24,7 +24,7 @@ * @author Ismail Simsek */ @Dependent -@Alternative +@Named("DynamicBatchSizeWait") public class DynamicBatchSizeWait implements InterfaceBatchSizeWait { protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java index 2ef2e208..29071f7b 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java @@ -8,14 +8,11 @@ package io.debezium.server.iceberg.batchsizewait; -import javax.enterprise.context.Dependent; - /** * Implementation of the consumer that delivers the messages into Amazon S3 destination. * * @author Ismail Simsek */ -@Dependent public interface InterfaceBatchSizeWait { default void initizalize() { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java index a98f6075..e71dde01 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java @@ -13,8 +13,8 @@ import io.debezium.server.iceberg.DebeziumMetrics; import javax.enterprise.context.Dependent; -import javax.enterprise.inject.Alternative; import javax.inject.Inject; +import javax.inject.Named; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -26,7 +26,7 @@ * @author Ismail Simsek */ @Dependent -@Alternative +@Named("MaxBatchSizeWait") public class MaxBatchSizeWait implements InterfaceBatchSizeWait { protected static final Logger LOGGER = LoggerFactory.getLogger(MaxBatchSizeWait.class); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java index f3ba9465..5bd8d8f4 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java @@ -9,7 +9,7 @@ package io.debezium.server.iceberg.batchsizewait; import javax.enterprise.context.Dependent; -import javax.enterprise.inject.Default; +import javax.inject.Named; /** * Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms) @@ -17,7 +17,7 @@ * @author Ismail Simsek */ @Dependent -@Default +@Named("NoBatchSizeWait") public class NoBatchSizeWait implements InterfaceBatchSizeWait { public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java index 547692d9..2262a7f4 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java @@ -22,7 +22,7 @@ class DynamicBatchSizeWaitTest { @Inject - InterfaceBatchSizeWait waitBatchSize; + DynamicBatchSizeWait waitBatchSize; @ConfigProperty(name = "debezium.source.poll.interval.ms", defaultValue = "1000") Integer pollIntervalMs; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java index 04d1ff6e..297f4897 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java @@ -25,7 +25,7 @@ @TestProfile(MaxBatchSizeWaitTestProfile.class) class MaxBatchSizeWaitTest extends BaseSparkTest { @Inject - InterfaceBatchSizeWait waitBatchSize; + MaxBatchSizeWait waitBatchSize; @ConfigProperty(name = "debezium.source.poll.interval.ms", defaultValue = "1000") Integer pollIntervalMs; @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000") diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java index 98e5fcab..2a99a2a2 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java @@ -19,7 +19,7 @@ public class MaxBatchSizeWaitTestProfile implements QuarkusTestProfile { public Map getConfigOverrides() { Map config = new HashMap<>(); // wait - config.put("quarkus.arc.selected-alternatives", "MaxBatchSizeWait"); + config.put("debezium.sink.batch.batch-size-wait", "MaxBatchSizeWait"); config.put("debezium.sink.batch.metrics.snapshot-mbean", "debezium.postgres:type=connector-metrics,context=snapshot,server=testc"); config.put("debezium.sink.batch.metrics.streaming-mbean", "debezium.postgres:type=connector-metrics,context=streaming,server=testc"); config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");