Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Aug 3, 2021
1 parent b842dfb commit 980fc9b
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
Expand All @@ -24,6 +25,9 @@
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import javax.inject.Named;

Expand Down Expand Up @@ -87,7 +91,12 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
@ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;

@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;
static ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);

Expand Down Expand Up @@ -115,7 +124,16 @@ void connect() throws InterruptedException {

valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();

Instance<InterfaceBatchSizeWait> instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName));
if (instance.isAmbiguous()) {
throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found");
} else if (instance.isUnsatisfied()) {
throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available");
}
batchSizeWait = instance.get();
batchSizeWait.initizalize();
LOGGER.info("Using {}", batchSizeWait.getClass().getName());
}

public String map(String destination) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
Expand All @@ -28,6 +29,9 @@
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import javax.inject.Named;

Expand Down Expand Up @@ -91,7 +95,12 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
String catalogName;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

private TableIdentifier tableIdentifier;
Expand Down Expand Up @@ -131,7 +140,16 @@ void connect() throws InterruptedException {
}
// load table
eventTable = icebergCatalog.loadTable(tableIdentifier);

Instance<InterfaceBatchSizeWait> instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName));
if (instance.isAmbiguous()) {
throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found");
} else if (instance.isUnsatisfied()) {
throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available");
}
batchSizeWait = instance.get();
batchSizeWait.initizalize();
LOGGER.info("Using {}", batchSizeWait.getClass().getName());
}

public GenericRecord getIcebergRecord(String destination, ChangeEvent<Object, Object> record, OffsetDateTime batchTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.IntSummaryStatistics;
import java.util.LinkedList;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Alternative;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
Expand All @@ -24,7 +23,6 @@
* @author Ismail Simsek
*/
@Dependent
@Alternative
public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@

package io.debezium.server.iceberg.batchsizewait;

import javax.enterprise.context.Dependent;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
*
* @author Ismail Simsek
*/
@Dependent
public interface InterfaceBatchSizeWait {

default void initizalize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.debezium.server.iceberg.DebeziumMetrics;

import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Alternative;
import javax.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand All @@ -26,7 +25,6 @@
* @author Ismail Simsek
*/
@Dependent
@Alternative
public class MaxBatchSizeWait implements InterfaceBatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(MaxBatchSizeWait.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
package io.debezium.server.iceberg.batchsizewait;

import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Default;

/**
* Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms)
*
* @author Ismail Simsek
*/
@Dependent
@Default
public class NoBatchSizeWait implements InterfaceBatchSizeWait {

public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class DynamicBatchSizeWaitTest {

@Inject
InterfaceBatchSizeWait waitBatchSize;
DynamicBatchSizeWait waitBatchSize;

@ConfigProperty(name = "debezium.source.poll.interval.ms", defaultValue = "1000")
Integer pollIntervalMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
@TestProfile(MaxBatchSizeWaitTestProfile.class)
class MaxBatchSizeWaitTest extends BaseSparkTest {
@Inject
InterfaceBatchSizeWait waitBatchSize;
MaxBatchSizeWait waitBatchSize;
@ConfigProperty(name = "debezium.source.poll.interval.ms", defaultValue = "1000")
Integer pollIntervalMs;
@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class MaxBatchSizeWaitTestProfile implements QuarkusTestProfile {
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
// wait
config.put("quarkus.arc.selected-alternatives", "MaxBatchSizeWait");
config.put("debezium.sink.batch.batch-size-wait", "MaxBatchSizeWait");
config.put("debezium.sink.batch.metrics.snapshot-mbean", "debezium.postgres:type=connector-metrics,context=snapshot,server=testc");
config.put("debezium.sink.batch.metrics.streaming-mbean", "debezium.postgres:type=connector-metrics,context=streaming,server=testc");
config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
Expand Down

0 comments on commit 980fc9b

Please sign in to comment.