diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index a4f77e8e..5e5d73c3 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -189,9 +189,13 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample if (!eventSchemaEnabled) { throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); } - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, - !upsert, // partition if its append mode - partitionField); + try { + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, + !upsert, // partition if its append mode + partitionField); + } catch (Exception e){ + throw new DebeziumException("Failed to create table from debezium event schema:"+tableId+" Error:" + e.getMessage(), e); + } }); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 02f526d1..5a4ff956 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -310,13 +310,13 @@ public int hashCode() { public Schema icebergSchema() { if (this.valueSchema == null) { - throw new RuntimeException("Failed to get event schema, event schema is null"); + throw new RuntimeException("Failed to get schema from debezium event, event schema is null"); } final List tableColumns = icebergSchemaFields(valueSchema); if (tableColumns.isEmpty()) { - throw new RuntimeException("Failed to get event schema, event schema has no fields!"); + throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!"); } final List keyColumns = icebergSchemaFields(keySchema); @@ -338,7 +338,7 @@ public Schema icebergSchema() { } if (!found) { - throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns"); + throw new ValidationException("Debezium key/identifier field `" + ic.name() + "` not found in event columns!"); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java index 2d0342e3..549f7c9c 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -151,7 +151,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { Transaction t = historyTable.newTransaction(); t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - Arrays.stream(files.dataFiles()).forEach(t.newAppend()::appendFile); + Arrays.stream(files.dataFiles()).forEach(f -> t.newAppend().appendFile(f).commit()); t.commitTransaction(); LOG.trace("Successfully saved history data to Iceberg table"); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index 57af007f..f6d1e639 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -165,9 +165,7 @@ protected void save() { Transaction t = offsetTable.newTransaction(); t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - AppendFiles tableAppender = t.newAppend(); - Arrays.stream(files.dataFiles()).forEach(tableAppender::appendFile); - tableAppender.commit(); + Arrays.stream(files.dataFiles()).forEach(f -> t.newAppend().appendFile(f).commit()); t.commitTransaction(); LOG.debug("Successfully saved offset data to iceberg table"); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 6092f0d2..0287a72f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -68,7 +68,6 @@ public TestConfigSource() { config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); config.put("debezium.source.table.whitelist", "inventory.*"); - config.put("debezium.source.include.schema.changes", "false"); config.put("quarkus.log.level", "INFO"); config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN");