Skip to content

Commit

Permalink
Periodically log number of event consumed (#40)
Browse files Browse the repository at this point in the history
* periodically log number of event consumed

* periodically log number of event consumed
  • Loading branch information
ismailsimsek committed Oct 22, 2021
1 parent 6dbee1a commit 4b575db
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -88,6 +91,12 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
Instance<InterfaceIcebergTableOperator> icebergTableOperatorInstances;
InterfaceIcebergTableOperator icebergTableOperator;

protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15);
protected final Clock clock = Clock.system();
protected long consumerStart = clock.currentTimeInMillis();
protected long numConsumedEvents = 0;
protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL);

@PostConstruct
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
Expand Down Expand Up @@ -157,11 +166,22 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
committer.markProcessed(record);
}
committer.markBatchFinished();
this.logConsumerProgress(records.size());

batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());

}

protected void logConsumerProgress(long numUploadedEvents) {
numConsumedEvents += numUploadedEvents;
if (logTimer.expired()) {
LOGGER.info("Consumed {} records after {}", numConsumedEvents, Strings.duration(clock.currentTimeInMillis() - consumerStart));
numConsumedEvents = 0;
consumerStart = clock.currentTimeInMillis();
logTimer = Threads.timer(clock, LOG_INTERVAL);
}
}


private Table createIcebergTable(TableIdentifier tableIdentifier,
ChangeEvent<Object, Object> event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin

}

private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList<Record> icebergRecords) throws InterruptedException {
private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList<Record> icebergRecords) {
final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;

PartitionKey pk = new PartitionKey(TABLE_PARTITION, TABLE_SCHEMA);
Expand Down

0 comments on commit 4b575db

Please sign in to comment.