From 939175f19f6cefd0a832aabc9533c71458893269 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sat, 16 Oct 2021 19:52:01 +0200 Subject: [PATCH] apply code recommendations --- .../server/iceberg/DebeziumToIcebergTable.java | 4 +--- .../server/iceberg/IcebergChangeConsumer.java | 6 +++--- .../server/iceberg/IcebergEventsChangeConsumer.java | 6 +++--- .../iceberg/batchsizewait/DynamicBatchSizeWait.java | 4 ++-- .../tableoperator/AbstractIcebergTableOperator.java | 2 +- .../tableoperator/IcebergTableOperatorUpsert.java | 2 +- .../server/iceberg/IcebergChangeConsumerTest.java | 2 +- .../iceberg/IcebergEventsChangeConsumerTest.java | 2 +- .../batchsizewait/DynamicBatchSizeWaitTest.java | 4 ++-- .../server/iceberg/testresources/BaseSparkTest.java | 2 +- .../server/iceberg/testresources/TestUtil.java | 12 ++++-------- 11 files changed, 20 insertions(+), 26 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java index 78a7981f..0996e7ce 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java @@ -83,9 +83,7 @@ private Set getRowIdentifierFieldIds() { Set identifierFieldIds = new HashSet<>(); - ListIterator idIterator = this.tableRowIdentifierColumns.listIterator(); - while (idIterator.hasNext()) { - Types.NestedField ic = idIterator.next(); + for (Types.NestedField ic : this.tableRowIdentifierColumns) { boolean found = false; ListIterator colsIterator = this.tableColumns.listIterator(); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index ffae1728..4e147ac7 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -78,9 +78,9 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; - Configuration hadoopConf = new Configuration(); + final Configuration hadoopConf = new Configuration(); Catalog icebergCatalog; - Map icebergProperties = new ConcurrentHashMap<>(); + final Map icebergProperties = new ConcurrentHashMap<>(); @Inject @Any @@ -99,7 +99,7 @@ void connect() throws InterruptedException { // pass iceberg properties to iceberg and hadoop Map conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); conf.forEach(this.hadoopConf::set); - conf.forEach(this.icebergProperties::put); + this.icebergProperties.putAll(conf); icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index 1d81b270..ae0617e8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -88,7 +88,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") String keyFormat; - Configuration hadoopConf = new Configuration(); + final Configuration hadoopConf = new Configuration(); @ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS") String defaultFs; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") @@ -103,7 +103,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; - Map icebergProperties = new ConcurrentHashMap<>(); + final Map icebergProperties = new ConcurrentHashMap<>(); Catalog icebergCatalog; Table eventTable; @@ -123,7 +123,7 @@ void connect() throws InterruptedException { Map conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); conf.forEach(this.hadoopConf::set); - conf.forEach(this.icebergProperties::put); + this.icebergProperties.putAll(conf); if (warehouseLocation == null || warehouseLocation.trim().isEmpty()) { warehouseLocation = defaultFs + "/iceberg_warehouse"; diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java index c0733014..65637ba1 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java @@ -35,8 +35,8 @@ public class DynamicBatchSizeWait implements InterfaceBatchSizeWait { @ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000") Integer maxWaitMs; - LinkedList batchSizeHistory = new LinkedList(); - LinkedList sleepMsHistory = new LinkedList(); + final LinkedList batchSizeHistory = new LinkedList<>(); + final LinkedList sleepMsHistory = new LinkedList<>(); public DynamicBatchSizeWait() { batchSizeHistory.add(1); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java index fed67fb4..16054284 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java @@ -47,7 +47,7 @@ abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOper @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") boolean eventSchemaEnabled; - Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); + final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); Deserializer valDeserializer; @Override diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java index f19ac465..f7b4d787 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java @@ -44,7 +44,7 @@ public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator { private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class); - static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); + static final ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") String sourceTsMsColumn; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 8be7c8a0..273d0e68 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -209,7 +209,7 @@ public void testIcebergConsumer() throws Exception { } @Test - public void testSimpleUpload() throws Exception { + public void testSimpleUpload() { Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { Dataset ds = getTableData("testc.inventory.customers"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index 06e42296..6a00fef1 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -38,7 +38,7 @@ public class IcebergEventsChangeConsumerTest extends BaseSparkTest { String sinkType; @Test - public void testIcebergEvents() throws Exception { + public void testIcebergEvents() { Assertions.assertEquals(sinkType, "icebergevents"); Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java index 2262a7f4..a2a77e08 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java @@ -29,7 +29,7 @@ class DynamicBatchSizeWaitTest { @Test void shouldIncreaseSleepMs() { - DynamicBatchSizeWait dynamicSleep = (DynamicBatchSizeWait) waitBatchSize; + DynamicBatchSizeWait dynamicSleep = waitBatchSize; // if its consuming small batch sizes, the sleep delay should increase to adjust batch size // sleep size should increase and stay at max (pollIntervalMs) int sleep = 0; @@ -47,7 +47,7 @@ void shouldIncreaseSleepMs() { @Test void shouldDecreaseSleepMs() { - DynamicBatchSizeWait dynamicSleep = (DynamicBatchSizeWait) waitBatchSize; + DynamicBatchSizeWait dynamicSleep = waitBatchSize; // if its consuming large batch sizes, the sleep delay should decrease dynamicSleep.getWaitMs(3); dynamicSleep.getWaitMs(2); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index a0609777..6c2ac758 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -85,7 +85,7 @@ public static int PGLoadTestDataTable(int numRows) throws Exception { return PGLoadTestDataTable(numRows, false); } - public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) throws Exception { + public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) { int numInsert = 0; do { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestUtil.java index a1a72b03..556c3dec 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestUtil.java @@ -33,22 +33,18 @@ public static String randomString(int len) { public static DebeziumEngine.RecordCommitter> getCommitter() { return new DebeziumEngine.RecordCommitter() { - public synchronized void markProcessed(SourceRecord record) throws InterruptedException { - return; + public synchronized void markProcessed(SourceRecord record) { } @Override - public void markProcessed(Object record) throws InterruptedException { - return; + public void markProcessed(Object record) { } - public synchronized void markBatchFinished() throws InterruptedException { - return; + public synchronized void markBatchFinished() { } @Override - public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException { - return; + public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) { } @Override