Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch size wait to optimize batch size #16

Merged
merged 8 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;

import java.lang.management.ManagementFactory;
import java.util.Optional;
import javax.enterprise.context.Dependent;
import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms)
*
* @author Ismail Simsek
*/
@Dependent
public class DebeziumMetrics {
protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMetrics.class);
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
@ConfigProperty(name = "debezium.sink.batch.metrics.snapshot-mbean", defaultValue = "")
Optional<String> snapshotMbean;
@ConfigProperty(name = "debezium.sink.batch.metrics.streaming-mbean", defaultValue = "")
Optional<String> streamingMbean;
@ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "")
int maxQueueSize;

ObjectName snapshotMetricsObjectName;
ObjectName streamingMetricsObjectName;

public void initizalize() throws DebeziumException {
assert snapshotMbean.isPresent() :
"Snapshot metrics Mbean `debezium.sink.batch.metrics.snapshot-mbean` not provided";
assert streamingMbean.isPresent() :
"Streaming metrics Mbean `debezium.sink.batch.metrics.streaming-mbean` not provided";
try {
snapshotMetricsObjectName = new ObjectName(snapshotMbean.get());
streamingMetricsObjectName = new ObjectName(streamingMbean.get());
} catch (Exception e) {
throw new DebeziumException(e);
}
}

public boolean snapshotRunning() {
try {
return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotRunning");
} catch (Exception e) {
throw new DebeziumException(e);
}
}

public boolean snapshotCompleted() {
try {
return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotCompleted");
} catch (Exception e) {
throw new DebeziumException(e);
}
}

public int streamingQueueRemainingCapacity() {
try {
return (int) mbeanServer.getAttribute(streamingMetricsObjectName, "QueueRemainingCapacity");
} catch (Exception e) {
throw new DebeziumException(e);
}
}

public int streamingQueueCurrentSize() {
return maxQueueSize - streamingQueueRemainingCapacity();
}

public long streamingMilliSecondsBehindSource() {
try {
return (long) mbeanServer.getAttribute(streamingMetricsObjectName, "MilliSecondsBehindSource");
} catch (Exception e) {
throw new DebeziumException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -23,6 +25,9 @@
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import javax.inject.Named;

Expand Down Expand Up @@ -75,8 +80,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.dynamic-wait", defaultValue = "true")
boolean dynamicWaitEnabled;

@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsertData;
Expand All @@ -88,8 +91,13 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
@ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;

@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
BatchDynamicWait dynamicWait;
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;
static ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);

Configuration hadoopConf = new Configuration();
Expand All @@ -116,6 +124,16 @@ void connect() throws InterruptedException {

valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();

Instance<InterfaceBatchSizeWait> instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName));
if (instance.isAmbiguous()) {
throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found");
} else if (instance.isUnsatisfied()) {
throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available");
}
batchSizeWait = instance.get();
batchSizeWait.initizalize();
LOGGER.info("Using {}", batchSizeWait.getClass().getName());
}

public String map(String destination) {
Expand Down Expand Up @@ -172,9 +190,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
}
committer.markBatchFinished();

if (dynamicWaitEnabled) {
dynamicWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());
}
batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());

}

Expand Down Expand Up @@ -257,7 +273,7 @@ private void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object

c.commit();
}
LOGGER.info("Committed events to table! {}", icebergTable.location());
LOGGER.info("Committed {} events to table! {}", events.size(),icebergTable.location());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -27,6 +29,9 @@
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import javax.inject.Named;

Expand Down Expand Up @@ -90,10 +95,13 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.dynamic-wait", defaultValue = "true")
boolean dynamicWaitEnabled;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
BatchDynamicWait dynamicWait;
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

private TableIdentifier tableIdentifier;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -132,6 +140,16 @@ void connect() throws InterruptedException {
}
// load table
eventTable = icebergCatalog.loadTable(tableIdentifier);

Instance<InterfaceBatchSizeWait> instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName));
if (instance.isAmbiguous()) {
throw new DebeziumException("Multiple batch size wait class named '" + batchSizeWaitName + "' were found");
} else if (instance.isUnsatisfied()) {
throw new DebeziumException("No batch size wait class named '" + batchSizeWaitName + "' is available");
}
batchSizeWait = instance.get();
batchSizeWait.initizalize();
LOGGER.info("Using {}", batchSizeWait.getClass().getName());
}

public GenericRecord getIcebergRecord(String destination, ChangeEvent<Object, Object> record, OffsetDateTime batchTime) {
Expand Down Expand Up @@ -172,9 +190,8 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
// committer.markProcessed(record);
committer.markBatchFinished();

if (dynamicWaitEnabled) {
dynamicWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());
}
batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());

}

private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList<Record> icebergRecords) throws InterruptedException {
Expand Down Expand Up @@ -220,7 +237,7 @@ private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList
eventTable.newAppend()
.appendFile(dataFile)
.commit();
LOGGER.info("Committed events to table! {}", eventTable.location());
LOGGER.info("Committed {} events to table! {}", icebergRecords.size(), eventTable.location());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,38 @@
*
*/

package io.debezium.server.iceberg;
package io.debezium.server.iceberg.batchsizewait;

import java.util.IntSummaryStatistics;
import java.util.LinkedList;
import javax.enterprise.context.Dependent;
import javax.inject.Named;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE;

/**
* Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms)
*
* @author Ismail Simsek
*/
@Dependent
public class BatchDynamicWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(BatchDynamicWait.class);
@Named("DynamicBatchSizeWait")
public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class);

@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "2048")
@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = DEFAULT_MAX_BATCH_SIZE + "")
Integer maxBatchSize;

@ConfigProperty(name = "debezium.sink.batch.dynamic-wait.max-wait-ms", defaultValue = "300000")
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000")
Integer maxWaitMs;

LinkedList<Integer> batchSizeHistory = new LinkedList<Integer>();
LinkedList<Integer> sleepMsHistory = new LinkedList<Integer>();

public BatchDynamicWait() {
public DynamicBatchSizeWait() {
batchSizeHistory.add(1);
batchSizeHistory.add(1);
batchSizeHistory.add(1);
Expand Down Expand Up @@ -77,19 +80,17 @@ else if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.90) {
sleepMsHistory.add(Math.min(Math.max(sleepMs, 100), maxWaitMs));
sleepMsHistory.removeFirst();

LOGGER.debug("Calculating Wait delay\nmax.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory" +
"{}\nval{}",
maxBatchSize,
maxWaitMs,
batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast());
LOGGER.debug("Calculating Wait delay\n" +
"max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}",
maxBatchSize, maxWaitMs, batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast());

return sleepMsHistory.getLast();
}

public void waitMs(Integer numRecords, Integer processingTimeMs) throws InterruptedException {
int sleepMs = Math.max(getWaitMs(numRecords) - processingTimeMs, 0);
public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException {
int sleepMs = Math.max(getWaitMs(numRecordsProcessed) - processingTimeMs, 0);
if (sleepMs > 2000) {
LOGGER.info("Waiting {} ms", sleepMs);
LOGGER.debug("Waiting {} ms", sleepMs);
Thread.sleep(sleepMs);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg.batchsizewait;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
*
* @author Ismail Simsek
*/
public interface InterfaceBatchSizeWait {

default void initizalize() {
}

void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException;

}
Loading