Skip to content

Commit

Permalink
Add batch size wait to optimize batch size (#16)
Browse files Browse the repository at this point in the history
* rework batch wait
  • Loading branch information
ismailsimsek committed Aug 15, 2021
1 parent da19218 commit e82f3b7
Show file tree
Hide file tree
Showing 23 changed files with 540 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;

import java.lang.management.ManagementFactory;
import java.util.Optional;
import javax.enterprise.context.Dependent;
import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms)
*
* @author Ismail Simsek
*/
@Dependent
public class DebeziumMetrics {
protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMetrics.class);
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
@ConfigProperty(name = "debezium.sink.batch.metrics.snapshot-mbean", defaultValue = "")
Optional<String> snapshotMbean;
@ConfigProperty(name = "debezium.sink.batch.metrics.streaming-mbean", defaultValue = "")
Optional<String> streamingMbean;
@ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "")
int maxQueueSize;

ObjectName snapshotMetricsObjectName;
ObjectName streamingMetricsObjectName;

public void initizalize() throws DebeziumException {
assert snapshotMbean.isPresent() :
"Snapshot metrics Mbean `debezium.sink.batch.metrics.snapshot-mbean` not provided";
assert streamingMbean.isPresent() :
"Streaming metrics Mbean `debezium.sink.batch.metrics.streaming-mbean` not provided";
try {
snapshotMetricsObjectName = new ObjectName(snapshotMbean.get());
streamingMetricsObjectName = new ObjectName(streamingMbean.get());
} catch (Exception e) {
throw new DebeziumException(e);
}
}

public boolean snapshotRunning() {
try {
return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotRunning");
} catch (Exception e) {
throw new DebeziumException(e);
}
}

public boolean snapshotCompleted() {
try {
return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotCompleted");
} catch (Exception e) {
throw new DebeziumException(e);
}
}

public int streamingQueueRemainingCapacity() {
try {
return (int) mbeanServer.getAttribute(streamingMetricsObjectName, "QueueRemainingCapacity");
} catch (Exception e) {
throw new DebeziumException(e);
}
}

public int streamingQueueCurrentSize() {
return maxQueueSize - streamingQueueRemainingCapacity();
}

public long streamingMilliSecondsBehindSource() {
try {
return (long) mbeanServer.getAttribute(streamingMetricsObjectName, "MilliSecondsBehindSource");
} catch (Exception e) {
throw new DebeziumException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -23,6 +25,9 @@
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import javax.inject.Named;

Expand Down Expand Up @@ -75,8 +80,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.dynamic-wait", defaultValue = "true")
boolean dynamicWaitEnabled;

@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsertData;
Expand All @@ -88,8 +91,13 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
@ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;

@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
BatchDynamicWait dynamicWait;
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;
static ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);

Configuration hadoopConf = new Configuration();
Expand All @@ -116,6 +124,16 @@ void connect() throws InterruptedException {

valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();

Instance<InterfaceBatchSizeWait> instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName));
if (instance.isAmbiguous()) {
throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found");
} else if (instance.isUnsatisfied()) {
throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available");
}
batchSizeWait = instance.get();
batchSizeWait.initizalize();
LOGGER.info("Using {}", batchSizeWait.getClass().getName());
}

public String map(String destination) {
Expand Down Expand Up @@ -172,9 +190,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
}
committer.markBatchFinished();

if (dynamicWaitEnabled) {
dynamicWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());
}
batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());

}

Expand Down Expand Up @@ -257,7 +273,7 @@ private void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object

c.commit();
}
LOGGER.info("Committed events to table! {}", icebergTable.location());
LOGGER.info("Committed {} events to table! {}", events.size(),icebergTable.location());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -27,6 +29,9 @@
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import javax.inject.Named;

Expand Down Expand Up @@ -90,10 +95,13 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.dynamic-wait", defaultValue = "true")
boolean dynamicWaitEnabled;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
BatchDynamicWait dynamicWait;
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Catalog icebergCatalog;
Expand Down Expand Up @@ -131,6 +139,16 @@ void connect() throws InterruptedException {
}
// load table
eventTable = icebergCatalog.loadTable(tableIdentifier);

Instance<InterfaceBatchSizeWait> instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName));
if (instance.isAmbiguous()) {
throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found");
} else if (instance.isUnsatisfied()) {
throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available");
}
batchSizeWait = instance.get();
batchSizeWait.initizalize();
LOGGER.info("Using {}", batchSizeWait.getClass().getName());
}

public GenericRecord getIcebergRecord(String destination, ChangeEvent<Object, Object> record, OffsetDateTime batchTime) {
Expand Down Expand Up @@ -171,9 +189,8 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
// committer.markProcessed(record);
committer.markBatchFinished();

if (dynamicWaitEnabled) {
dynamicWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());
}
batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());

}

private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList<Record> icebergRecords) throws InterruptedException {
Expand Down Expand Up @@ -219,7 +236,7 @@ private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList
eventTable.newAppend()
.appendFile(dataFile)
.commit();
LOGGER.info("Committed events to table! {}", eventTable.location());
LOGGER.info("Committed {} events to table! {}", icebergRecords.size(), eventTable.location());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,38 @@
*
*/

package io.debezium.server.iceberg;
package io.debezium.server.iceberg.batchsizewait;

import java.util.IntSummaryStatistics;
import java.util.LinkedList;
import javax.enterprise.context.Dependent;
import javax.inject.Named;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE;

/**
* Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms)
*
* @author Ismail Simsek
*/
@Dependent
public class BatchDynamicWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(BatchDynamicWait.class);
@Named("DynamicBatchSizeWait")
public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class);

@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "2048")
@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = DEFAULT_MAX_BATCH_SIZE + "")
Integer maxBatchSize;

@ConfigProperty(name = "debezium.sink.batch.dynamic-wait.max-wait-ms", defaultValue = "300000")
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000")
Integer maxWaitMs;

LinkedList<Integer> batchSizeHistory = new LinkedList<Integer>();
LinkedList<Integer> sleepMsHistory = new LinkedList<Integer>();

public BatchDynamicWait() {
public DynamicBatchSizeWait() {
batchSizeHistory.add(1);
batchSizeHistory.add(1);
batchSizeHistory.add(1);
Expand Down Expand Up @@ -77,19 +80,17 @@ else if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.90) {
sleepMsHistory.add(Math.min(Math.max(sleepMs, 100), maxWaitMs));
sleepMsHistory.removeFirst();

LOGGER.debug("Calculating Wait delay\nmax.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory" +
"{}\nval{}",
maxBatchSize,
maxWaitMs,
batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast());
LOGGER.debug("Calculating Wait delay\n" +
"max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}",
maxBatchSize, maxWaitMs, batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast());

return sleepMsHistory.getLast();
}

public void waitMs(Integer numRecords, Integer processingTimeMs) throws InterruptedException {
int sleepMs = Math.max(getWaitMs(numRecords) - processingTimeMs, 0);
public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException {
int sleepMs = Math.max(getWaitMs(numRecordsProcessed) - processingTimeMs, 0);
if (sleepMs > 2000) {
LOGGER.info("Waiting {} ms", sleepMs);
LOGGER.debug("Waiting {} ms", sleepMs);
Thread.sleep(sleepMs);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg.batchsizewait;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
*
* @author Ismail Simsek
*/
public interface InterfaceBatchSizeWait {

default void initizalize() {
}

void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException;

}
Loading

0 comments on commit e82f3b7

Please sign in to comment.