Skip to content

Commit

Permalink
refactor: move get or create table behavior to IcebergTableOperations (
Browse files Browse the repository at this point in the history
…#31)

The code related to iceberg table "get or create" behavior was
a bit confusing, with some try catches.

This attemps to make it simpler by moving the `loadTable` operation to
IcebergTableOperations and making it return a `Optional<Table>`. To make
us able to use functional-style code, some `Exception` instances were changed to
`RuntimeException`.
  • Loading branch information
racevedoo committed Oct 16, 2021
1 parent f5a756a commit 5fb20dd
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,27 @@ public class DebeziumToIcebergTable {
private final List<Types.NestedField> tableColumns;
private final List<Types.NestedField> tableRowIdentifierColumns;

public DebeziumToIcebergTable(byte[] eventVal, byte[] eventKey) throws IOException {
public DebeziumToIcebergTable(byte[] eventVal, byte[] eventKey) {
tableColumns = extractSchema(eventVal);
tableRowIdentifierColumns = (eventKey == null) ? null : extractSchema(eventKey);
}

public DebeziumToIcebergTable(byte[] eventVal) throws IOException {
public DebeziumToIcebergTable(byte[] eventVal) {
this(eventVal, null);
}

private List<Types.NestedField> extractSchema(byte[] eventVal) throws IOException {

JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);
private List<Types.NestedField> extractSchema(byte[] eventVal) {
try {
JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);
if (IcebergUtil.hasSchema(jsonEvent)) {
return IcebergUtil.getIcebergSchema(jsonEvent.get("schema"));
}

if (IcebergUtil.hasSchema(jsonEvent)) {
return IcebergUtil.getIcebergSchema(jsonEvent.get("schema"));
LOGGER.trace("Event schema not found in the given data:!");
return null;
} catch (IOException e) {
throw new RuntimeException(e);
}

LOGGER.trace("Event schema not found in the given data:!");
return null;
}

public boolean hasSchema() {
Expand All @@ -73,7 +75,7 @@ private SortOrder getSortOrder(Schema schema) {
return so;
}

private Set<Integer> getRowIdentifierFieldIds() throws Exception {
private Set<Integer> getRowIdentifierFieldIds() {

if (this.tableRowIdentifierColumns == null) {
return ImmutableSet.of();
Expand Down Expand Up @@ -107,7 +109,7 @@ private Set<Integer> getRowIdentifierFieldIds() throws Exception {
return identifierFieldIds;
}

public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) throws Exception {
public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) {

Schema schema = new Schema(this.tableColumns, getRowIdentifierFieldIds());

Expand All @@ -122,7 +124,7 @@ public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) thr
return tb.create();
}

throw new Exception("Failed to create table "+ tableIdentifier);
throw new RuntimeException("Failed to create table "+ tableIdentifier);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
Deserializer<JsonNode> valDeserializer;
private IcebergTableOperations icebergTableOperations;

@PostConstruct
void connect() throws InterruptedException {
Expand All @@ -117,6 +118,7 @@ void connect() throws InterruptedException {
conf.forEach(this.icebergProperties::put);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);
icebergTableOperations = new IcebergTableOperations(icebergCatalog);

valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();
Expand All @@ -136,15 +138,15 @@ public String map(String destination) {
return destination.replace(".", "_");
}

private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) throws Exception {
private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) {

if (!eventSchemaEnabled) {
throw new Exception("Table '" + tableIdentifier + "' not found! " +
throw new RuntimeException("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}

if (event.value() == null) {
throw new Exception("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
}

DebeziumToIcebergTable eventSchema = event.key() == null
Expand All @@ -166,20 +168,9 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
Collectors.toCollection(ArrayList::new))));

for (Map.Entry<String, ArrayList<ChangeEvent<Object, Object>>> event : result.entrySet()) {
Table icebergTable;
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
try {
// load iceberg table
icebergTable = icebergCatalog.loadTable(tableIdentifier);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
// get schema from an event and create iceberg table
try {
icebergTable = createIcebergTable(tableIdentifier, event.getValue().get(0));
} catch (Exception e2) {
e.printStackTrace();
throw new InterruptedException("Failed to create iceberg table, " + e2.getMessage());
}
}
Table icebergTable = icebergTableOperations.loadTable(tableIdentifier)
.orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0)));
addToTable(icebergTable, event.getValue());
}
// workaround! somehow offset is not saved to file unless we call committer.markProcessed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.debezium.server.iceberg;

import java.util.Optional;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Wrapper to perform operations in iceberg tables
* @author Rafael Acevedo
*/
public class IcebergTableOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperations.class);

private final Catalog catalog;

public IcebergTableOperations(Catalog catalog) {
this.catalog = catalog;
}

public Optional<Table> loadTable(TableIdentifier tableId) {
try {
Table table = catalog.loadTable(tableId);
return Optional.of(table);
} catch (NoSuchTableException e) {
LOGGER.warn("table not found: {}", tableId.toString());
return Optional.empty();
}
}
}

0 comments on commit 5fb20dd

Please sign in to comment.