Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply code recommendations #34

Merged
merged 1 commit into from
Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ private Set<Integer> getRowIdentifierFieldIds() {

Set<Integer> identifierFieldIds = new HashSet<>();

ListIterator<Types.NestedField> idIterator = this.tableRowIdentifierColumns.listIterator();
while (idIterator.hasNext()) {
Types.NestedField ic = idIterator.next();
for (Types.NestedField ic : this.tableRowIdentifierColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = this.tableColumns.listIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

Configuration hadoopConf = new Configuration();
final Configuration hadoopConf = new Configuration();
Catalog icebergCatalog;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();

@Inject
@Any
Expand All @@ -99,7 +99,7 @@ void connect() throws InterruptedException {
// pass iceberg properties to iceberg and hadoop
Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
conf.forEach(this.icebergProperties::put);
this.icebergProperties.putAll(conf);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
Configuration hadoopConf = new Configuration();
final Configuration hadoopConf = new Configuration();
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
String defaultFs;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
Expand All @@ -103,7 +103,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

Map<String, String> icebergProperties = new ConcurrentHashMap<>();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Catalog icebergCatalog;
Table eventTable;

Expand All @@ -123,7 +123,7 @@ void connect() throws InterruptedException {

Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
conf.forEach(this.icebergProperties::put);
this.icebergProperties.putAll(conf);

if (warehouseLocation == null || warehouseLocation.trim().isEmpty()) {
warehouseLocation = defaultFs + "/iceberg_warehouse";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000")
Integer maxWaitMs;

LinkedList<Integer> batchSizeHistory = new LinkedList<Integer>();
LinkedList<Integer> sleepMsHistory = new LinkedList<Integer>();
final LinkedList<Integer> batchSizeHistory = new LinkedList<>();
final LinkedList<Integer> sleepMsHistory = new LinkedList<>();

public DynamicBatchSizeWait() {
batchSizeHistory.add(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOper

@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
boolean eventSchemaEnabled;
Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
Deserializer<JsonNode> valDeserializer;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class);
static ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void testIcebergConsumer() throws Exception {
}

@Test
public void testSimpleUpload() throws Exception {
public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class IcebergEventsChangeConsumerTest extends BaseSparkTest {
String sinkType;

@Test
public void testIcebergEvents() throws Exception {
public void testIcebergEvents() {
Assertions.assertEquals(sinkType, "icebergevents");
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DynamicBatchSizeWaitTest {

@Test
void shouldIncreaseSleepMs() {
DynamicBatchSizeWait dynamicSleep = (DynamicBatchSizeWait) waitBatchSize;
DynamicBatchSizeWait dynamicSleep = waitBatchSize;
// if its consuming small batch sizes, the sleep delay should increase to adjust batch size
// sleep size should increase and stay at max (pollIntervalMs)
int sleep = 0;
Expand All @@ -47,7 +47,7 @@ void shouldIncreaseSleepMs() {

@Test
void shouldDecreaseSleepMs() {
DynamicBatchSizeWait dynamicSleep = (DynamicBatchSizeWait) waitBatchSize;
DynamicBatchSizeWait dynamicSleep = waitBatchSize;
// if its consuming large batch sizes, the sleep delay should decrease
dynamicSleep.getWaitMs(3);
dynamicSleep.getWaitMs(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static int PGLoadTestDataTable(int numRows) throws Exception {
return PGLoadTestDataTable(numRows, false);
}

public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) throws Exception {
public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) {
int numInsert = 0;
do {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,18 @@ public static String randomString(int len) {

public static DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> getCommitter() {
return new DebeziumEngine.RecordCommitter() {
public synchronized void markProcessed(SourceRecord record) throws InterruptedException {
return;
public synchronized void markProcessed(SourceRecord record) {
}

@Override
public void markProcessed(Object record) throws InterruptedException {
return;
public void markProcessed(Object record) {
}

public synchronized void markBatchFinished() throws InterruptedException {
return;
public synchronized void markBatchFinished() {
}

@Override
public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
return;
public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) {
}

@Override
Expand Down