From ae83bf7d0f58fb84df412d917308f30f7e56050d Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 1 Oct 2022 12:04:54 +0200 Subject: [PATCH 1/3] Use upstream DebeziumMetrics --- .../server/iceberg/DebeziumMetrics.java | 90 ------------------- .../batchsizewait/MaxBatchSizeWait.java | 3 +- pom.xml | 2 +- 3 files changed, 2 insertions(+), 93 deletions(-) delete mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java deleted file mode 100644 index 561b5506..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg; - -import io.debezium.DebeziumException; -import io.debezium.config.CommonConnectorConfig; - -import java.lang.management.ManagementFactory; -import java.util.Optional; -import javax.enterprise.context.Dependent; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Ismail Simsek - */ -@Dependent -public class DebeziumMetrics { - protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMetrics.class); - final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); - @ConfigProperty(name = "debezium.sink.batch.metrics.snapshot-mbean", defaultValue = "") - Optional snapshotMbean; - @ConfigProperty(name = "debezium.sink.batch.metrics.streaming-mbean", defaultValue = "") - Optional streamingMbean; - @ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "") - int maxQueueSize; - - ObjectName snapshotMetricsObjectName; - ObjectName streamingMetricsObjectName; - - public void initizalize() throws DebeziumException { - assert snapshotMbean.isPresent() : - "Snapshot metrics Mbean `debezium.sink.batch.metrics.snapshot-mbean` not provided"; - assert streamingMbean.isPresent() : - "Streaming metrics Mbean `debezium.sink.batch.metrics.streaming-mbean` not provided"; - try { - snapshotMetricsObjectName = new ObjectName(snapshotMbean.get()); - streamingMetricsObjectName = new ObjectName(streamingMbean.get()); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - - public boolean snapshotRunning() { - try { - return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotRunning"); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - - public boolean snapshotCompleted() { - try { - return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotCompleted"); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - - public int streamingQueueRemainingCapacity() { - try { - return (int) mbeanServer.getAttribute(streamingMetricsObjectName, "QueueRemainingCapacity"); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - - public int streamingQueueCurrentSize() { - return maxQueueSize - streamingQueueRemainingCapacity(); - } - - public long streamingMilliSecondsBehindSource() { - try { - return (long) mbeanServer.getAttribute(streamingMetricsObjectName, "MilliSecondsBehindSource"); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - -} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java index bd853de1..3e0f710f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java @@ -10,7 +10,7 @@ import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig; -import io.debezium.server.iceberg.DebeziumMetrics; +import io.debezium.server.DebeziumMetrics; import javax.enterprise.context.Dependent; import javax.inject.Inject; @@ -45,7 +45,6 @@ public class MaxBatchSizeWait implements InterfaceBatchSizeWait { @Override public void initizalize() throws DebeziumException { assert waitIntervalMs < maxWaitMs : "`wait-interval-ms` cannot be bigger than `max-wait-ms`"; - dbzMetrics.initizalize(); } @Override diff --git a/pom.xml b/pom.xml index e3704456..449bdacb 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ 1.17.3 3.2.1 - 1.9.5.Final + 1.9.6.Final 8.0.28 2.12.1.Final From 4f1cf73b936e2f14a8ac54eb7b8b08a46aa895b2 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 1 Oct 2022 12:10:26 +0200 Subject: [PATCH 2/3] Use upstream DebeziumMetrics --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 449bdacb..c8771066 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ 1.9.6.Final 8.0.28 - 2.12.1.Final + 2.12.3.Final 4.8 From 322e732ab50b897d48d03d9da28c5fb63ffde85c Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 1 Oct 2022 12:14:41 +0200 Subject: [PATCH 3/3] Use upstream DebeziumMetrics --- .../server/iceberg/IcebergChangeConsumerMangodbTest.java | 1 - .../debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java | 1 - .../io/debezium/server/iceberg/IcebergChangeConsumerTest.java | 2 +- .../iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java | 1 - .../server/iceberg/IcebergChangeConsumerUpsertTest.java | 1 - .../server/iceberg/IcebergEventsChangeConsumerTest.java | 1 - .../server/iceberg/tableoperator/IcebergTableOperatorTest.java | 1 - 7 files changed, 1 insertion(+), 7 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java index 807f707c..ac41c3fe 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index 2c53d7a6..9d926f30 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index aa29b944..03bee516 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -34,7 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. + * Integration test that verifies basic reading from PostgreSQL database and writing to iceberg destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java index 4c9c8629..d20eb2de 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 6d42b77a..f8fe0cd7 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index f72088ae..355b2fe9 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java index 6303c91d..1b48e4ef 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -32,7 +32,6 @@ /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */