From 5fb20dd867b43f8ef6867282600e1efa1289ba4c Mon Sep 17 00:00:00 2001 From: Rafael Acevedo Date: Sat, 16 Oct 2021 04:06:35 -0300 Subject: [PATCH] refactor: move get or create table behavior to IcebergTableOperations (#31) The code related to iceberg table "get or create" behavior was a bit confusing, with some try catches. This attemps to make it simpler by moving the `loadTable` operation to IcebergTableOperations and making it return a `Optional`. To make us able to use functional-style code, some `Exception` instances were changed to `RuntimeException`. --- .../iceberg/DebeziumToIcebergTable.java | 28 ++++++++-------- .../server/iceberg/IcebergChangeConsumer.java | 23 ++++--------- .../iceberg/IcebergTableOperations.java | 33 +++++++++++++++++++ 3 files changed, 55 insertions(+), 29 deletions(-) create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java index 80f3c17a..78a7981f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java @@ -34,25 +34,27 @@ public class DebeziumToIcebergTable { private final List tableColumns; private final List tableRowIdentifierColumns; - public DebeziumToIcebergTable(byte[] eventVal, byte[] eventKey) throws IOException { + public DebeziumToIcebergTable(byte[] eventVal, byte[] eventKey) { tableColumns = extractSchema(eventVal); tableRowIdentifierColumns = (eventKey == null) ? null : extractSchema(eventKey); } - public DebeziumToIcebergTable(byte[] eventVal) throws IOException { + public DebeziumToIcebergTable(byte[] eventVal) { this(eventVal, null); } - private List extractSchema(byte[] eventVal) throws IOException { - - JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal); + private List extractSchema(byte[] eventVal) { + try { + JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal); + if (IcebergUtil.hasSchema(jsonEvent)) { + return IcebergUtil.getIcebergSchema(jsonEvent.get("schema")); + } - if (IcebergUtil.hasSchema(jsonEvent)) { - return IcebergUtil.getIcebergSchema(jsonEvent.get("schema")); + LOGGER.trace("Event schema not found in the given data:!"); + return null; + } catch (IOException e) { + throw new RuntimeException(e); } - - LOGGER.trace("Event schema not found in the given data:!"); - return null; } public boolean hasSchema() { @@ -73,7 +75,7 @@ private SortOrder getSortOrder(Schema schema) { return so; } - private Set getRowIdentifierFieldIds() throws Exception { + private Set getRowIdentifierFieldIds() { if (this.tableRowIdentifierColumns == null) { return ImmutableSet.of(); @@ -107,7 +109,7 @@ private Set getRowIdentifierFieldIds() throws Exception { return identifierFieldIds; } - public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) throws Exception { + public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) { Schema schema = new Schema(this.tableColumns, getRowIdentifierFieldIds()); @@ -122,7 +124,7 @@ public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) thr return tb.create(); } - throw new Exception("Failed to create table "+ tableIdentifier); + throw new RuntimeException("Failed to create table "+ tableIdentifier); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index b06a0050..939c594f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -101,6 +101,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu Map icebergProperties = new ConcurrentHashMap<>(); Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); Deserializer valDeserializer; + private IcebergTableOperations icebergTableOperations; @PostConstruct void connect() throws InterruptedException { @@ -117,6 +118,7 @@ void connect() throws InterruptedException { conf.forEach(this.icebergProperties::put); icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf); + icebergTableOperations = new IcebergTableOperations(icebergCatalog); valSerde.configure(Collections.emptyMap(), false); valDeserializer = valSerde.deserializer(); @@ -136,15 +138,15 @@ public String map(String destination) { return destination.replace(".", "_"); } - private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent event) throws Exception { + private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent event) { if (!eventSchemaEnabled) { - throw new Exception("Table '" + tableIdentifier + "' not found! " + + throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); } if (event.value() == null) { - throw new Exception("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); + throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); } DebeziumToIcebergTable eventSchema = event.key() == null @@ -166,20 +168,9 @@ public void handleBatch(List> records, DebeziumEngin Collectors.toCollection(ArrayList::new)))); for (Map.Entry>> event : result.entrySet()) { - Table icebergTable; final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); - try { - // load iceberg table - icebergTable = icebergCatalog.loadTable(tableIdentifier); - } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - // get schema from an event and create iceberg table - try { - icebergTable = createIcebergTable(tableIdentifier, event.getValue().get(0)); - } catch (Exception e2) { - e.printStackTrace(); - throw new InterruptedException("Failed to create iceberg table, " + e2.getMessage()); - } - } + Table icebergTable = icebergTableOperations.loadTable(tableIdentifier) + .orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0))); addToTable(icebergTable, event.getValue()); } // workaround! somehow offset is not saved to file unless we call committer.markProcessed diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java new file mode 100644 index 00000000..fde020e6 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergTableOperations.java @@ -0,0 +1,33 @@ +package io.debezium.server.iceberg; + +import java.util.Optional; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper to perform operations in iceberg tables + * @author Rafael Acevedo + */ +public class IcebergTableOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperations.class); + + private final Catalog catalog; + + public IcebergTableOperations(Catalog catalog) { + this.catalog = catalog; + } + + public Optional
loadTable(TableIdentifier tableId) { + try { + Table table = catalog.loadTable(tableId); + return Optional.of(table); + } catch (NoSuchTableException e) { + LOGGER.warn("table not found: {}", tableId.toString()); + return Optional.empty(); + } + } +}