Skip to content

Commit

Permalink
Add option to change case of destination/iceberg table names (#389)
Browse files Browse the repository at this point in the history
* Add option to change case of destination/iceberg table names

* Add option to change case of destination/iceberg table names

* Add option to change case of destination/iceberg table names

* Add option to change case of destination/iceberg table names

* Add option to change case of destination/iceberg table names

* Add option to change case of destination/iceberg table names

---------

Co-authored-by: Ismail Simsek <[email protected]>
  • Loading branch information
ismailsimsek and Ismail Simsek committed Aug 3, 2024
1 parent 623377d commit 9bd04b5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
protected Optional<String> destinationRegexp;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "")
protected Optional<String> destinationRegexpReplace;
@ConfigProperty(name = "debezium.sink.iceberg.destination-uppercase-table-names", defaultValue = "false")
protected boolean destinationUppercaseTableNames;
@ConfigProperty(name = "debezium.sink.iceberg.destination-lowercase-table-names", defaultValue = "false")
protected boolean destinationLowercaseTableNames;
@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
Optional<String> tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
Expand Down Expand Up @@ -144,7 +148,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
Map<String, List<IcebergChangeEvent>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
-> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key())))
-> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key())))
.collect(Collectors.groupingBy(IcebergChangeEvent::destination));

// consume list of events for each destination table
Expand Down Expand Up @@ -178,8 +182,8 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample
}
try {
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(createIdentifierFields), writeFormat);
} catch (Exception e){
throw new DebeziumException("Failed to create table from debezium event schema:"+tableId+" Error:" + e.getMessage(), e);
} catch (Exception e) {
throw new DebeziumException("Failed to create table from debezium event schema:" + tableId + " Error:" + e.getMessage(), e);
}
});
}
Expand All @@ -204,6 +208,12 @@ public TableIdentifier mapDestination(String destination) {
.replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse(""))
.replace(".", "_");

return TableIdentifier.of(Namespace.of(namespace), tablePrefix.orElse("") + tableName);
if (destinationUppercaseTableNames) {
return TableIdentifier.of(Namespace.of(namespace), (tablePrefix.orElse("") + tableName).toUpperCase());
} else if (destinationLowercaseTableNames) {
return TableIdentifier.of(Namespace.of(namespace), (tablePrefix.orElse("") + tableName).toLowerCase());
} else {
return TableIdentifier.of(Namespace.of(namespace), tablePrefix.orElse("") + tableName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,15 @@ public void testPartitionedTable() {
public void testMapDestination() {
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2"));
icebergConsumer.destinationUppercaseTableNames = true;
icebergConsumer.destinationLowercaseTableNames = false;
assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("table_name"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("Table_Name"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("TABLE_NAME"));
icebergConsumer.destinationUppercaseTableNames = false;
icebergConsumer.destinationLowercaseTableNames = true;
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), icebergConsumer.mapDestination("Table_Name"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), icebergConsumer.mapDestination("TABLE_NAME"));
}

public static class TestProfile implements QuarkusTestProfile {
Expand Down
2 changes: 2 additions & 0 deletions docs/DOCS.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ tables created automatically with the first start.
| `debezium.sink.iceberg.create-identifier-fields` | `true` | When set to false the consumer will create tables without identifier fields. useful when user wants to consume nested events with append only mode. |
| `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination iceberg table name. For example with this setting, its possible to combine some tables `table_ptt1`,`table_ptt2` to one `table_combined`. |
| `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp replace part to modify destination iceberg table name |
| `debezium.sink.iceberg.destination-uppercase-table-names` | `false` | Maps debezium destination to uppercase iceberg table names |
| `debezium.sink.iceberg.destination-lowercase-table-names` | `false` | Maps debezium destination to lowercase iceberg table names |
| `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy, Used to optimize data file size and upload interval. explained below. |
| `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) this settings are passed to Iceberg (without the prefix) |
| `debezium.source.offset.storage` | `io.debezium.server.iceberg.offset.IcebergOffsetBackingStore` | The name of the Java class that is responsible for persistence of connector offsets. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming) |
Expand Down

0 comments on commit 9bd04b5

Please sign in to comment.