Skip to content

Commit

Permalink
Improve exception messages and fix committing to IcebergSchemaHistory (
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Sep 10, 2023
1 parent 6fa7a25 commit abcf2a8
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,13 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat,
!upsert, // partition if its append mode
partitionField);
try {
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat,
!upsert, // partition if its append mode
partitionField);
} catch (Exception e){
throw new DebeziumException("Failed to create table from debezium event schema:"+tableId+" Error:" + e.getMessage(), e);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,13 @@ public int hashCode() {
public Schema icebergSchema() {

if (this.valueSchema == null) {
throw new RuntimeException("Failed to get event schema, event schema is null");
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

final List<Types.NestedField> tableColumns = icebergSchemaFields(valueSchema);

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get event schema, event schema has no fields!");
throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!");
}

final List<Types.NestedField> keyColumns = icebergSchemaFields(keySchema);
Expand All @@ -338,7 +338,7 @@ public Schema icebergSchema() {
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
throw new ValidationException("Debezium key/identifier field `" + ic.name() + "` not found in event columns!");
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {

Transaction t = historyTable.newTransaction();
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
Arrays.stream(files.dataFiles()).forEach(t.newAppend()::appendFile);
Arrays.stream(files.dataFiles()).forEach(f -> t.newAppend().appendFile(f).commit());
t.commitTransaction();
LOG.trace("Successfully saved history data to Iceberg table");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,7 @@ protected void save() {

Transaction t = offsetTable.newTransaction();
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
AppendFiles tableAppender = t.newAppend();
Arrays.stream(files.dataFiles()).forEach(tableAppender::appendFile);
tableAppender.commit();
Arrays.stream(files.dataFiles()).forEach(f -> t.newAppend().appendFile(f).commit());
t.commitTransaction();
LOG.debug("Successfully saved offset data to iceberg table");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public TestConfigSource() {
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("debezium.source.include.schema.changes", "false");

config.put("quarkus.log.level", "INFO");
config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN");
Expand Down

0 comments on commit abcf2a8

Please sign in to comment.