Skip to content

Commit

Permalink
apply code recommendations (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Oct 18, 2021
1 parent 823a895 commit 772f01d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
Expand All @@ -54,6 +56,8 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
Expand All @@ -72,16 +76,13 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
boolean upsert;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
boolean eventSchemaEnabled;
@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

final Configuration hadoopConf = new Configuration();
Catalog icebergCatalog;
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();

@Inject
@Any
Instance<InterfaceIcebergTableOperator> icebergTableOperatorInstances;
Expand Down Expand Up @@ -144,10 +145,8 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin

for (Map.Entry<String, ArrayList<ChangeEvent<Object, Object>>> event : result.entrySet()) {
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
Table icebergTable = icebergTableOperator
.loadIcebergTable(icebergCatalog, tableIdentifier)
.orElseGet(() ->
icebergTableOperator.createIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0)));
Table icebergTable = loadIcebergTable(tableIdentifier)
.orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0)));
//addToTable(icebergTable, event.getValue());
icebergTableOperator.addToTable(icebergTable, event.getValue());
}
Expand All @@ -163,4 +162,35 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin

}


private Table createIcebergTable(TableIdentifier tableIdentifier,
ChangeEvent<Object, Object> event) {

if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}

if (event.value() == null) {
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
}

DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));

return eventSchema.create(icebergCatalog, tableIdentifier);
}

private Optional<Table> loadIcebergTable(TableIdentifier tableId) {
try {
Table table = icebergCatalog.loadTable(tableId);
return Optional.of(table);
} catch (NoSuchTableException e) {
LOGGER.warn("table not found: {}", tableId.toString());
return Optional.empty();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,27 @@
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.DebeziumToIcebergTable;
import io.debezium.server.iceberg.IcebergUtil;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,8 +43,6 @@
abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class);

@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
boolean eventSchemaEnabled;
final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
Deserializer<JsonNode> valDeserializer;

Expand Down Expand Up @@ -122,33 +118,4 @@ protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergReco
.build();
}

public Table createIcebergTable(Catalog catalog,
TableIdentifier tableIdentifier,
ChangeEvent<Object, Object> event) {

if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}

if (event.value() == null) {
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
}

DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));

return eventSchema.create(catalog, tableIdentifier);
}

public Optional<Table> loadIcebergTable(Catalog catalog, TableIdentifier tableId) {
try {
Table table = catalog.loadTable(tableId);
return Optional.of(table);
} catch (NoSuchTableException e) {
LOGGER.warn("table not found: {}", tableId.toString());
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
import io.debezium.engine.ChangeEvent;

import java.util.ArrayList;
import java.util.Optional;
import java.util.function.Predicate;

import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;

public interface InterfaceIcebergTableOperator {
Expand All @@ -26,10 +23,4 @@ public interface InterfaceIcebergTableOperator {
void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException;

Predicate<Record> filterEvents();

Table createIcebergTable(Catalog catalog,
TableIdentifier tableIdentifier,
ChangeEvent<Object, Object> event);

Optional<Table> loadIcebergTable(Catalog catalog, TableIdentifier tableId);
}

0 comments on commit 772f01d

Please sign in to comment.