Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move get or create table behavior to IcebergTableOperations #31

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,27 @@ public class DebeziumToIcebergTable {
private final List<Types.NestedField> tableColumns;
private final List<Types.NestedField> tableRowIdentifierColumns;

public DebeziumToIcebergTable(byte[] eventVal, byte[] eventKey) throws IOException {
public DebeziumToIcebergTable(byte[] eventVal, byte[] eventKey) {
tableColumns = extractSchema(eventVal);
tableRowIdentifierColumns = (eventKey == null) ? null : extractSchema(eventKey);
}

public DebeziumToIcebergTable(byte[] eventVal) throws IOException {
public DebeziumToIcebergTable(byte[] eventVal) {
this(eventVal, null);
}

private List<Types.NestedField> extractSchema(byte[] eventVal) throws IOException {

JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);
private List<Types.NestedField> extractSchema(byte[] eventVal) {
try {
JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);
if (IcebergUtil.hasSchema(jsonEvent)) {
return IcebergUtil.getIcebergSchema(jsonEvent.get("schema"));
}

if (IcebergUtil.hasSchema(jsonEvent)) {
return IcebergUtil.getIcebergSchema(jsonEvent.get("schema"));
LOGGER.trace("Event schema not found in the given data:!");
return null;
} catch (IOException e) {
throw new RuntimeException(e);
}

LOGGER.trace("Event schema not found in the given data:!");
return null;
}

public boolean hasSchema() {
Expand All @@ -73,7 +75,7 @@ private SortOrder getSortOrder(Schema schema) {
return so;
}

private Set<Integer> getRowIdentifierFieldIds() throws Exception {
private Set<Integer> getRowIdentifierFieldIds() {

if (this.tableRowIdentifierColumns == null) {
return ImmutableSet.of();
Expand Down Expand Up @@ -107,7 +109,7 @@ private Set<Integer> getRowIdentifierFieldIds() throws Exception {
return identifierFieldIds;
}

public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) throws Exception {
public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) {

Schema schema = new Schema(this.tableColumns, getRowIdentifierFieldIds());

Expand All @@ -122,7 +124,7 @@ public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) thr
return tb.create();
}

throw new Exception("Failed to create table "+ tableIdentifier);
throw new RuntimeException("Failed to create table "+ tableIdentifier);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
Deserializer<JsonNode> valDeserializer;
private IcebergTableOperations icebergTableOperations;

@PostConstruct
void connect() throws InterruptedException {
Expand All @@ -117,6 +118,7 @@ void connect() throws InterruptedException {
conf.forEach(this.icebergProperties::put);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);
icebergTableOperations = new IcebergTableOperations(icebergCatalog);

valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();
Expand All @@ -136,15 +138,15 @@ public String map(String destination) {
return destination.replace(".", "_");
}

private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) throws Exception {
private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) {

if (!eventSchemaEnabled) {
throw new Exception("Table '" + tableIdentifier + "' not found! " +
throw new RuntimeException("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}

if (event.value() == null) {
throw new Exception("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
}

DebeziumToIcebergTable eventSchema = event.key() == null
Expand All @@ -166,20 +168,9 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
Collectors.toCollection(ArrayList::new))));

for (Map.Entry<String, ArrayList<ChangeEvent<Object, Object>>> event : result.entrySet()) {
Table icebergTable;
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
try {
// load iceberg table
icebergTable = icebergCatalog.loadTable(tableIdentifier);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
// get schema from an event and create iceberg table
try {
icebergTable = createIcebergTable(tableIdentifier, event.getValue().get(0));
} catch (Exception e2) {
e.printStackTrace();
throw new InterruptedException("Failed to create iceberg table, " + e2.getMessage());
}
}
Table icebergTable = icebergTableOperations.loadTable(tableIdentifier)
.orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0)));
addToTable(icebergTable, event.getValue());
}
// workaround! somehow offset is not saved to file unless we call committer.markProcessed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.debezium.server.iceberg;

import java.util.Optional;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Wrapper to perform operations in iceberg tables
* @author Rafael Acevedo
*/
public class IcebergTableOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperations.class);

private final Catalog catalog;

public IcebergTableOperations(Catalog catalog) {
this.catalog = catalog;
}

public Optional<Table> loadTable(TableIdentifier tableId) {
try {
Table table = catalog.loadTable(tableId);
return Optional.of(table);
} catch (NoSuchTableException e) {
LOGGER.warn("table not found: {}", tableId.toString());
return Optional.empty();
}
}
}