Skip to content

Commit

Permalink
apply code recommendations (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Oct 16, 2021
1 parent f1d8802 commit 823a895
Show file tree
Hide file tree
Showing 11 changed files with 20 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ private Set<Integer> getRowIdentifierFieldIds() {

Set<Integer> identifierFieldIds = new HashSet<>();

ListIterator<Types.NestedField> idIterator = this.tableRowIdentifierColumns.listIterator();
while (idIterator.hasNext()) {
Types.NestedField ic = idIterator.next();
for (Types.NestedField ic : this.tableRowIdentifierColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = this.tableColumns.listIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

Configuration hadoopConf = new Configuration();
final Configuration hadoopConf = new Configuration();
Catalog icebergCatalog;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();

@Inject
@Any
Expand All @@ -99,7 +99,7 @@ void connect() throws InterruptedException {
// pass iceberg properties to iceberg and hadoop
Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
conf.forEach(this.icebergProperties::put);
this.icebergProperties.putAll(conf);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
Configuration hadoopConf = new Configuration();
final Configuration hadoopConf = new Configuration();
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
String defaultFs;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
Expand All @@ -103,7 +103,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

Map<String, String> icebergProperties = new ConcurrentHashMap<>();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Catalog icebergCatalog;
Table eventTable;

Expand All @@ -123,7 +123,7 @@ void connect() throws InterruptedException {

Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
conf.forEach(this.icebergProperties::put);
this.icebergProperties.putAll(conf);

if (warehouseLocation == null || warehouseLocation.trim().isEmpty()) {
warehouseLocation = defaultFs + "/iceberg_warehouse";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000")
Integer maxWaitMs;

LinkedList<Integer> batchSizeHistory = new LinkedList<Integer>();
LinkedList<Integer> sleepMsHistory = new LinkedList<Integer>();
final LinkedList<Integer> batchSizeHistory = new LinkedList<>();
final LinkedList<Integer> sleepMsHistory = new LinkedList<>();

public DynamicBatchSizeWait() {
batchSizeHistory.add(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOper

@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
boolean eventSchemaEnabled;
Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
Deserializer<JsonNode> valDeserializer;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class);
static ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void testIcebergConsumer() throws Exception {
}

@Test
public void testSimpleUpload() throws Exception {
public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class IcebergEventsChangeConsumerTest extends BaseSparkTest {
String sinkType;

@Test
public void testIcebergEvents() throws Exception {
public void testIcebergEvents() {
Assertions.assertEquals(sinkType, "icebergevents");
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DynamicBatchSizeWaitTest {

@Test
void shouldIncreaseSleepMs() {
DynamicBatchSizeWait dynamicSleep = (DynamicBatchSizeWait) waitBatchSize;
DynamicBatchSizeWait dynamicSleep = waitBatchSize;
// if its consuming small batch sizes, the sleep delay should increase to adjust batch size
// sleep size should increase and stay at max (pollIntervalMs)
int sleep = 0;
Expand All @@ -47,7 +47,7 @@ void shouldIncreaseSleepMs() {

@Test
void shouldDecreaseSleepMs() {
DynamicBatchSizeWait dynamicSleep = (DynamicBatchSizeWait) waitBatchSize;
DynamicBatchSizeWait dynamicSleep = waitBatchSize;
// if its consuming large batch sizes, the sleep delay should decrease
dynamicSleep.getWaitMs(3);
dynamicSleep.getWaitMs(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static int PGLoadTestDataTable(int numRows) throws Exception {
return PGLoadTestDataTable(numRows, false);
}

public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) throws Exception {
public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) {
int numInsert = 0;
do {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,18 @@ public static String randomString(int len) {

public static DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> getCommitter() {
return new DebeziumEngine.RecordCommitter() {
public synchronized void markProcessed(SourceRecord record) throws InterruptedException {
return;
public synchronized void markProcessed(SourceRecord record) {
}

@Override
public void markProcessed(Object record) throws InterruptedException {
return;
public void markProcessed(Object record) {
}

public synchronized void markBatchFinished() throws InterruptedException {
return;
public synchronized void markBatchFinished() {
}

@Override
public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
return;
public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) {
}

@Override
Expand Down

0 comments on commit 823a895

Please sign in to comment.