From 4b575db838e4108f438c75fade12e4dc50a46df1 Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Fri, 22 Oct 2021 18:25:05 +0200 Subject: [PATCH] Periodically log number of event consumed (#40) * periodically log number of event consumed * periodically log number of event consumed --- .../server/iceberg/IcebergChangeConsumer.java | 20 +++++++++++++++++++ .../iceberg/IcebergEventsChangeConsumer.java | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 1d27fc63..819ad266 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -15,6 +15,9 @@ import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator; +import io.debezium.util.Clock; +import io.debezium.util.Strings; +import io.debezium.util.Threads; import java.time.Duration; import java.time.Instant; @@ -88,6 +91,12 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu Instance icebergTableOperatorInstances; InterfaceIcebergTableOperator icebergTableOperator; + protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15); + protected final Clock clock = Clock.system(); + protected long consumerStart = clock.currentTimeInMillis(); + protected long numConsumedEvents = 0; + protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL); + @PostConstruct void connect() { if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { @@ -157,11 +166,22 @@ public void handleBatch(List> records, DebeziumEngin committer.markProcessed(record); } committer.markBatchFinished(); + this.logConsumerProgress(records.size()); batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); } + protected void logConsumerProgress(long numUploadedEvents) { + numConsumedEvents += numUploadedEvents; + if (logTimer.expired()) { + LOGGER.info("Consumed {} records after {}", numConsumedEvents, Strings.duration(clock.currentTimeInMillis() - consumerStart)); + numConsumedEvents = 0; + consumerStart = clock.currentTimeInMillis(); + logTimer = Threads.timer(clock, LOG_INTERVAL); + } + } + private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent event) { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index e6f0bdd6..f56d16ee 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -195,7 +195,7 @@ public void handleBatch(List> records, DebeziumEngin } - private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList icebergRecords) throws InterruptedException { + private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList icebergRecords) { final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; PartitionKey pk = new PartitionKey(TABLE_PARTITION, TABLE_SCHEMA);