From 9543f31a7458d6fabc75f10475735554f5ef7527 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 1 Oct 2022 17:01:38 +0200 Subject: [PATCH] Use upstream DebeziumMetrics (#111) * Use upstream DebeziumMetrics --- .../server/iceberg/DebeziumMetrics.java | 90 ------------------- .../batchsizewait/MaxBatchSizeWait.java | 3 +- .../IcebergChangeConsumerMangodbTest.java | 1 - .../IcebergChangeConsumerMysqlTest.java | 1 - .../iceberg/IcebergChangeConsumerTest.java | 2 +- ...ChangeConsumerUpsertDeleteDeletesTest.java | 1 - .../IcebergChangeConsumerUpsertTest.java | 1 - .../IcebergEventsChangeConsumerTest.java | 1 - .../IcebergTableOperatorTest.java | 1 - pom.xml | 4 +- 10 files changed, 4 insertions(+), 101 deletions(-) delete mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java deleted file mode 100644 index 561b5506..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg; - -import io.debezium.DebeziumException; -import io.debezium.config.CommonConnectorConfig; - -import java.lang.management.ManagementFactory; -import java.util.Optional; -import javax.enterprise.context.Dependent; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Ismail Simsek - */ -@Dependent -public class DebeziumMetrics { - protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMetrics.class); - final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); - @ConfigProperty(name = "debezium.sink.batch.metrics.snapshot-mbean", defaultValue = "") - Optional snapshotMbean; - @ConfigProperty(name = "debezium.sink.batch.metrics.streaming-mbean", defaultValue = "") - Optional streamingMbean; - @ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "") - int maxQueueSize; - - ObjectName snapshotMetricsObjectName; - ObjectName streamingMetricsObjectName; - - public void initizalize() throws DebeziumException { - assert snapshotMbean.isPresent() : - "Snapshot metrics Mbean `debezium.sink.batch.metrics.snapshot-mbean` not provided"; - assert streamingMbean.isPresent() : - "Streaming metrics Mbean `debezium.sink.batch.metrics.streaming-mbean` not provided"; - try { - snapshotMetricsObjectName = new ObjectName(snapshotMbean.get()); - streamingMetricsObjectName = new ObjectName(streamingMbean.get()); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - - public boolean snapshotRunning() { - try { - return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotRunning"); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - - public boolean snapshotCompleted() { - try { - return (boolean) mbeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotCompleted"); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - - public int streamingQueueRemainingCapacity() { - try { - return (int) mbeanServer.getAttribute(streamingMetricsObjectName, "QueueRemainingCapacity"); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - - public int streamingQueueCurrentSize() { - return maxQueueSize - streamingQueueRemainingCapacity(); - } - - public long streamingMilliSecondsBehindSource() { - try { - return (long) mbeanServer.getAttribute(streamingMetricsObjectName, "MilliSecondsBehindSource"); - } catch (Exception e) { - throw new DebeziumException(e); - } - } - -} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java index bd853de1..3e0f710f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java @@ -10,7 +10,7 @@ import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig; -import io.debezium.server.iceberg.DebeziumMetrics; +import io.debezium.server.DebeziumMetrics; import javax.enterprise.context.Dependent; import javax.inject.Inject; @@ -45,7 +45,6 @@ public class MaxBatchSizeWait implements InterfaceBatchSizeWait { @Override public void initizalize() throws DebeziumException { assert waitIntervalMs < maxWaitMs : "`wait-interval-ms` cannot be bigger than `max-wait-ms`"; - dbzMetrics.initizalize(); } @Override diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java index 807f707c..ac41c3fe 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index 2c53d7a6..9d926f30 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index aa29b944..03bee516 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -34,7 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. + * Integration test that verifies basic reading from PostgreSQL database and writing to iceberg destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java index 4c9c8629..d20eb2de 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 6d42b77a..f8fe0cd7 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index f72088ae..355b2fe9 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java index 6303c91d..1b48e4ef 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -32,7 +32,6 @@ /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. * * @author Ismail Simsek */ diff --git a/pom.xml b/pom.xml index e3704456..c8771066 100644 --- a/pom.xml +++ b/pom.xml @@ -38,10 +38,10 @@ 1.17.3 3.2.1 - 1.9.5.Final + 1.9.6.Final 8.0.28 - 2.12.1.Final + 2.12.3.Final 4.8