Skip to content

Commit

Permalink
Use regexp to enable flexible destination mapping (#86)
Browse files Browse the repository at this point in the history
* Use regexp to enable flexible destination mapping
  • Loading branch information
ismailsimsek committed May 15, 2022
1 parent 39255cf commit 6c16b4e
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
Expand Down Expand Up @@ -84,6 +85,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
String defaultFs;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp", defaultValue = "")
protected Optional<String> destinationRegexp;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "")
protected Optional<String> destinationRegexpReplace;
@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
Expand Down Expand Up @@ -154,13 +159,12 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
throw new DebeziumException(ex);
}
})
.collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable));
.collect(Collectors.groupingBy(IcebergChangeEvent::destination));

// consume list of events for each destination table
for (Map.Entry<String, List<IcebergChangeEvent>> event : result.entrySet()) {
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
Table icebergTable = this.loadIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0));
icebergTableOperator.addToTable(icebergTable, event.getValue());
for (Map.Entry<String, List<IcebergChangeEvent>> tableEvents : result.entrySet()) {
Table icebergTable = this.loadIcebergTable(icebergCatalog, mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0));
icebergTableOperator.addToTable(icebergTable, tableEvents.getValue());
}

// workaround! somehow offset is not saved to file unless we call committer.markProcessed
Expand Down Expand Up @@ -203,4 +207,11 @@ protected void logConsumerProgress(long numUploadedEvents) {
}
}

public TableIdentifier mapDestination(String destination) {
final String tableName = destination
.replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse(""))
.replace(".", "_");

return TableIdentifier.of(Namespace.of(namespace), tablePrefix + tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public Schema icebergSchema() {
return jsonSchema.icebergSchema();
}

public String destinationTable() {
return destination.replace(".", "_");
public String destination() {
return destination;
}

public GenericRecord asIcebergRecord(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;
import javax.inject.Inject;

import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
Expand All @@ -42,6 +45,11 @@ public class IcebergChangeConsumerTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.sink.type")
String sinkType;

@Inject
IcebergChangeConsumer icebergConsumer;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
Expand Down Expand Up @@ -277,4 +285,10 @@ public void testPartitionedTable() {
S3Minio.listFiles();
}

@Test
public void testMapDestination() {
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("debezium.sink.iceberg.write.format.default", "orc");
config.put("debezium.sink.iceberg.destination-regexp", "\\d");
//config.put("debezium.sink.iceberg.destination-regexp-replace", "");

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class IcebergTableOperatorTest extends BaseSparkTest {

public Table createTable(IcebergChangeEvent sampleEvent) {
HadoopCatalog icebergCatalog = getIcebergCatalog();
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destinationTable());
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager {
public static final String MYSQL_PASSWORD = "mysqlpw";
public static final String MYSQL_DEBEZIUM_USER = "debezium";
public static final String MYSQL_DEBEZIUM_PASSWORD = "dbz";
public static final String MYSQL_IMAGE = "debezium/example-mysql:1.7.0.Final";
public static final String MYSQL_IMAGE = "debezium/example-mysql:1.9.2.Final";
public static final String MYSQL_HOST = "127.0.0.1";
public static final String MYSQL_DATABASE = "inventory";
public static final Integer MYSQL_PORT_DEFAULT = 3306;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager {
public static final String POSTGRES_USER = "postgres";
public static final String POSTGRES_PASSWORD = "postgres";
public static final String POSTGRES_DBNAME = "postgres";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.7.0.Final";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.9.2.Final";
public static final String POSTGRES_HOST = "localhost";
public static final Integer POSTGRES_PORT_DEFAULT = 5432;
private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<version.hadoop>3.3.1</version.hadoop>
<version.awssdk>2.17.120</version.awssdk>
<version.parquet>1.12.2</version.parquet>
<version.testcontainers>1.15.3</version.testcontainers>
<version.testcontainers>1.17.1</version.testcontainers>
<version.kafkaclients>3.1.0</version.kafkaclients>
<!-- Debezium -->
<version.debezium>1.9.2.Final</version.debezium>
Expand Down

0 comments on commit 6c16b4e

Please sign in to comment.