Skip to content

Commit

Permalink
Add option to change case of destination/iceberg table names
Browse files Browse the repository at this point in the history
  • Loading branch information
Ismail Simsek committed Jul 24, 2024
1 parent 06fb519 commit c073eff
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
protected Optional<String> destinationRegexp;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "")
protected Optional<String> destinationRegexpReplace;
@ConfigProperty(name = "debezium.sink.iceberg.destination-uppercase-names", defaultValue = "false")
protected boolean destinationUppercaseNames;
@ConfigProperty(name = "debezium.sink.iceberg.destination-lowercase-names", defaultValue = "false")
protected boolean destinationLowercaseNames;
@ConfigProperty(name = "debezium.sink.iceberg.destination-uppercase-table-names", defaultValue = "false")
protected boolean destinationUppercaseTableNames;
@ConfigProperty(name = "debezium.sink.iceberg.destination-lowercase-table-names", defaultValue = "false")
protected boolean destinationLowercaseTableNames;
@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
Optional<String> tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
Expand Down Expand Up @@ -208,9 +208,9 @@ public TableIdentifier mapDestination(String destination) {
.replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse(""))
.replace(".", "_");

if (destinationUppercaseNames) {
if (destinationUppercaseTableNames) {
return TableIdentifier.of(Namespace.of(namespace.toUpperCase()), (tablePrefix.orElse("") + tableName).toUpperCase());
} else if (destinationLowercaseNames) {
} else if (destinationLowercaseTableNames) {
return TableIdentifier.of(Namespace.of(namespace.toLowerCase()), (tablePrefix.orElse("") + tableName).toLowerCase());
} else {
return TableIdentifier.of(Namespace.of(namespace), tablePrefix.orElse("") + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,11 @@ public void testPartitionedTable() {
public void testMapDestination() {
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2"));
icebergConsumer.destinationUppercaseNames = true;
icebergConsumer.destinationLowercaseNames = false;
icebergConsumer.destinationUppercaseTableNames = true;
icebergConsumer.destinationLowercaseTableNames = false;
assertEquals(TableIdentifier.of(Namespace.of(namespace.toUpperCase()), "DEBEZIUMCDC_TABLE_LOWERCASE"), icebergConsumer.mapDestination("table_lowercase"));
icebergConsumer.destinationUppercaseNames = false;
icebergConsumer.destinationLowercaseNames = true;
icebergConsumer.destinationUppercaseTableNames = false;
icebergConsumer.destinationLowercaseTableNames = true;
assertEquals(TableIdentifier.of(Namespace.of(namespace.toLowerCase()), "debeziumcdc_table_camelcase"), icebergConsumer.mapDestination("table_CamelCase"));
}

Expand Down

0 comments on commit c073eff

Please sign in to comment.