Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use regexp to enable flexible destination mapping #86

Merged
merged 12 commits into from
May 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
Expand Down Expand Up @@ -84,6 +85,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
String defaultFs;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp", defaultValue = "")
protected Optional<String> destinationRegexp;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "")
protected Optional<String> destinationRegexpReplace;
@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
Expand Down Expand Up @@ -154,13 +159,12 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
throw new DebeziumException(ex);
}
})
.collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable));
.collect(Collectors.groupingBy(IcebergChangeEvent::destination));

// consume list of events for each destination table
for (Map.Entry<String, List<IcebergChangeEvent>> event : result.entrySet()) {
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
Table icebergTable = this.loadIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0));
icebergTableOperator.addToTable(icebergTable, event.getValue());
for (Map.Entry<String, List<IcebergChangeEvent>> tableEvents : result.entrySet()) {
Table icebergTable = this.loadIcebergTable(icebergCatalog, mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0));
icebergTableOperator.addToTable(icebergTable, tableEvents.getValue());
}

// workaround! somehow offset is not saved to file unless we call committer.markProcessed
Expand Down Expand Up @@ -203,4 +207,11 @@ protected void logConsumerProgress(long numUploadedEvents) {
}
}

public TableIdentifier mapDestination(String destination) {
final String tableName = destination
.replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse(""))
.replace(".", "_");

return TableIdentifier.of(Namespace.of(namespace), tablePrefix + tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public Schema icebergSchema() {
return jsonSchema.icebergSchema();
}

public String destinationTable() {
return destination.replace(".", "_");
public String destination() {
return destination;
}

public GenericRecord asIcebergRecord(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;
import javax.inject.Inject;

import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
Expand All @@ -42,6 +45,11 @@ public class IcebergChangeConsumerTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.sink.type")
String sinkType;

@Inject
IcebergChangeConsumer icebergConsumer;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
Expand Down Expand Up @@ -277,4 +285,10 @@ public void testPartitionedTable() {
S3Minio.listFiles();
}

@Test
public void testMapDestination() {
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("debezium.sink.iceberg.write.format.default", "orc");
config.put("debezium.sink.iceberg.destination-regexp", "\\d");
//config.put("debezium.sink.iceberg.destination-regexp-replace", "");

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class IcebergTableOperatorTest extends BaseSparkTest {

public Table createTable(IcebergChangeEvent sampleEvent) {
HadoopCatalog icebergCatalog = getIcebergCatalog();
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destinationTable());
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager {
public static final String MYSQL_PASSWORD = "mysqlpw";
public static final String MYSQL_DEBEZIUM_USER = "debezium";
public static final String MYSQL_DEBEZIUM_PASSWORD = "dbz";
public static final String MYSQL_IMAGE = "debezium/example-mysql:1.7.0.Final";
public static final String MYSQL_IMAGE = "debezium/example-mysql:1.9.2.Final";
public static final String MYSQL_HOST = "127.0.0.1";
public static final String MYSQL_DATABASE = "inventory";
public static final Integer MYSQL_PORT_DEFAULT = 3306;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager {
public static final String POSTGRES_USER = "postgres";
public static final String POSTGRES_PASSWORD = "postgres";
public static final String POSTGRES_DBNAME = "postgres";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.7.0.Final";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.9.2.Final";
public static final String POSTGRES_HOST = "localhost";
public static final Integer POSTGRES_PORT_DEFAULT = 5432;
private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<version.hadoop>3.3.1</version.hadoop>
<version.awssdk>2.17.120</version.awssdk>
<version.parquet>1.12.2</version.parquet>
<version.testcontainers>1.15.3</version.testcontainers>
<version.testcontainers>1.17.1</version.testcontainers>
<version.kafkaclients>3.1.0</version.kafkaclients>
<!-- Debezium -->
<version.debezium>1.9.2.Final</version.debezium>
Expand Down