Skip to content

Commit

Permalink
Test consumer without event flattening
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 16, 2024
1 parent 96a2c94 commit 3b1e479
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
boolean createIdentifierFields;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;
@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
Expand Down Expand Up @@ -175,7 +177,7 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
try {
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(createIdentifierFields), writeFormat);
} catch (Exception e){
throw new DebeziumException("Failed to create table from debezium event schema:"+tableId+" Error:" + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public ChangeEventSchema changeEventSchema() {
}
}

public Schema icebergSchema() {
return changeEventSchema().icebergSchema();
public Schema icebergSchema(boolean createIdentifierFields) {
return changeEventSchema().icebergSchema(createIdentifierFields);
}

public String destination() {
Expand Down Expand Up @@ -312,14 +312,17 @@ private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaN
return schemaData;
}

private Schema icebergSchema() {
private Schema icebergSchema(boolean createIdentifierFields) {

if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
if (!eventsAreUnwrapped && keySchema != null) {
if (!createIdentifierFields) {
LOGGER.warn("Creating identifier fields is disabled, creating table without identifier field!");
icebergSchemaFields(valueSchema, null, schemaData);
} else if (!eventsAreUnwrapped && keySchema != null) {
// NOTE! Even tough we are able to set nested key field! we cannot user key field for nested(default debezium) events!
// NOTE! because while for insert events only `after` field is populated for delete events only `before` field is populated!
// NOTE! because of this inconsistency we cannot set either of the filed as key field!
Expand All @@ -328,10 +331,10 @@ private Schema icebergSchema() {
nestedKeySchema.put("type", "struct");
nestedKeySchema.putArray("fields").add(((ObjectNode) keySchema).put("field", "after"));
icebergSchemaFields(valueSchema, nestedKeySchema, schemaData);
// @TODO raise error

if (!schemaData.identifierFieldIds().isEmpty()) {
throw new DebeziumException("Events are untested, Identifier fields are not supported for untested events! " +
"Pleas make sure you are using event flattening SMT setting!");
"Pleas make sure you are using event flattening SMT setting! or disabling identifier field creation!");
}
} else {
icebergSchemaFields(valueSchema, keySchema, schemaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class IcebergTableOperator {
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
boolean createIdentifierFields;
@Inject
IcebergTableWriterFactory writerFactory;

Expand Down Expand Up @@ -157,7 +159,7 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

for (Map.Entry<IcebergChangeEvent.ChangeEventSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema());
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema(createIdentifierFields));
// add set of events to table
addToTablePerSchema(icebergTable, schemaEvents.getValue());
}
Expand Down

0 comments on commit 3b1e479

Please sign in to comment.