From 65508b6cb53d166e5c0e192b6c5b1f454fe3e97f Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Fri, 18 Jun 2021 13:00:22 +0200 Subject: [PATCH] Adding mysql test (#12) * added mysql test * added mysql test * added mysql test * added mysql test --- ...Table.java => DebeziumToIcebergTable.java} | 42 +++++++------- .../server/iceberg/IcebergChangeConsumer.java | 12 +++- .../conf/application.properties.example | 36 +++++++++--- .../src/main/resources/testdata | 12 ---- .../BatchSparkChangeConsumerMysqlTest.java | 57 +++++++++++++++++++ ...chSparkChangeConsumerMysqlTestProfile.java | 32 +++++++++++ .../debezium/server/iceberg/ConfigSource.java | 11 +++- .../server/testresource/SourceMysqlDB.java | 25 ++++---- .../server/testresource/TestUtil.java | 10 ++++ pom.xml | 4 +- 10 files changed, 180 insertions(+), 61 deletions(-) rename debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/{EventToIcebergTable.java => DebeziumToIcebergTable.java} (68%) delete mode 100644 debezium-server-iceberg-sink/src/main/resources/testdata create mode 100644 debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java create mode 100644 debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/EventToIcebergTable.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java similarity index 68% rename from debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/EventToIcebergTable.java rename to debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java index 716615de..9810ff2b 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/EventToIcebergTable.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java @@ -23,20 +23,20 @@ * * @author Ismail Simsek */ -public class EventToIcebergTable { - protected static final Logger LOGGER = LoggerFactory.getLogger(EventToIcebergTable.class); +public class DebeziumToIcebergTable { + protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumToIcebergTable.class); - private final Schema schemaTable; - private final Schema schemaTableRowKeyIdentifier; + private final Schema tableSchema; + private final Schema tableRowIdentifierSchema; - public EventToIcebergTable(byte[] eventKey, byte[] eventVal) throws IOException { - schemaTable = extractSchema(eventVal); - schemaTableRowKeyIdentifier = extractSchema(eventKey); + public DebeziumToIcebergTable(byte[] eventKey, byte[] eventVal) throws IOException { + tableSchema = extractSchema(eventVal); + tableRowIdentifierSchema = extractSchema(eventKey); } - public EventToIcebergTable(byte[] eventVal) throws IOException { - schemaTable = extractSchema(eventVal); - schemaTableRowKeyIdentifier = null; + public DebeziumToIcebergTable(byte[] eventVal) throws IOException { + tableSchema = extractSchema(eventVal); + tableRowIdentifierSchema = null; } private Schema extractSchema(byte[] eventVal) throws IOException { @@ -51,12 +51,12 @@ private Schema extractSchema(byte[] eventVal) throws IOException { return null; } - public Schema getSchemaTable() { - return schemaTable; + public Schema getTableSchema() { + return tableSchema; } - public Schema getSchemaTableRowKeyIdentifier() { - return schemaTableRowKeyIdentifier; + public Schema getTableRowIdentifierSchema() { + return tableRowIdentifierSchema; } private Schema getIcebergSchema(JsonNode eventSchema) { @@ -64,25 +64,25 @@ private Schema getIcebergSchema(JsonNode eventSchema) { } public boolean hasSchema() { - return schemaTable != null; + return tableSchema != null; } public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) { if (this.hasSchema()) { - Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, this.schemaTable); + Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, this.tableSchema); - if (this.schemaTableRowKeyIdentifier != null) { - SortOrder.Builder sob = SortOrder.builderFor(schemaTable); - for (Types.NestedField coll : schemaTableRowKeyIdentifier.columns()) { + if (this.tableRowIdentifierSchema != null) { + SortOrder.Builder sob = SortOrder.builderFor(tableSchema); + for (Types.NestedField coll : tableRowIdentifierSchema.columns()) { sob = sob.asc(coll.name(), NullOrder.NULLS_FIRST); } tb.withSortOrder(sob.build()); // "@TODO waiting spec v2 // use as PK / RowKeyIdentifier } - LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schemaTable, - schemaTableRowKeyIdentifier); + LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, tableSchema, + tableRowIdentifierSchema); Table table = tb.create(); // @TODO remove once spec v2 released return upgradeToFormatVersion2(table); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 217d547b..3b7a3d5e 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -125,9 +125,9 @@ public String map(String destination) { private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent event) { if (eventSchemaEnabled && event.value() != null) { try { - EventToIcebergTable eventSchema = event.key() == null - ? new EventToIcebergTable(getBytes(event.value())) - : new EventToIcebergTable(getBytes(event.key()), getBytes(event.value())); + DebeziumToIcebergTable eventSchema = event.key() == null + ? new DebeziumToIcebergTable(getBytes(event.value())) + : new DebeziumToIcebergTable(getBytes(event.key()), getBytes(event.value())); return eventSchema.create(icebergCatalog, tableIdentifier); } catch (Exception e) { @@ -164,6 +164,12 @@ public void handleBatch(List> records, DebeziumEngin } addToTable(icebergTable, event.getValue()); } + // workaround! somehow offset is not saved to file unless we call committer.markProcessed + // even its should be saved to file periodically + for (ChangeEvent record : records) { + LOGGER.trace("Processed event '{}'", record); + committer.markProcessed(record); + } committer.markBatchFinished(); if (dynamicWaitEnabled) { diff --git a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example b/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example index 944ead5c..8572f6ab 100644 --- a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example +++ b/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example @@ -11,13 +11,33 @@ debezium.source.database.dbname=postgres debezium.source.database.server.name=tutorial debezium.source.schema.include.list=inventory -# @TODO add defaults!! -debezium.sink.iceberg.fs.defaultFS=xyz -debezium.sink.batch.objectkey.mapper=xyz -debezium.format.schemas.enable=xyz -debezium.sink.batch.batchwriter=xyz -debezium.sink.iceberg.warehouse=xyz -debezium.transforms=xyz +# configure batch behaviour/size +debezium.source.max.batch.size=2048 +debezium.source.poll.interval.ms=10000 # 5 seconds! +# enable schemas +debezium.format.value.schemas.enable=true +debezium.format.key.schemas.enable=true -# @TODO add multiple example!! \ No newline at end of file +# debezium unwrap message +debezium.transforms=unwrap +debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState +debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms,db +debezium.transforms.unwrap.delete.handling.mode=rewrite + +# iceberg settings +debezium.sink.iceberg.upsert=false +debezium.sink.iceberg.upsert-keep-deletes=true +debezium.sink.iceberg.table-prefix=debeziumcdc_ +debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET +debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse +debezium.sink.iceberg.type=hadoop +debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog + +# set logging levels +quarkus.log.level=INFO +quarkus.log.category."io.debezium.server.iceberg".level=DEBUG +quarkus.log.category."org.apache.hadoop".level=ERROR +quarkus.log.category."org.apache.parquet".level=WARN +quarkus.log.category."org.eclipse.jetty".level=WARN +quarkus.log.category."org.apache.iceberg".level=ERROR \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/main/resources/testdata b/debezium-server-iceberg-sink/src/main/resources/testdata deleted file mode 100644 index 5698c4d7..00000000 --- a/debezium-server-iceberg-sink/src/main/resources/testdata +++ /dev/null @@ -1,12 +0,0 @@ -2020-12-16 20:28:39,040 DEBUG key:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"testc.inventory.customers.Key"},"payload":{"id":1001}} (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG value:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"int64","optional":true,"field":"__lsn"},{"type":"int64","optional":true,"field":"__null_ts_ms"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"testc.inventory.customers.Value"},"payload":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com","__op":"r","__table":"customers","__lsn":33832960,"__null_ts_ms":1608146918808,"__deleted":"false"}} (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG dest:testc.inventory.customers (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG key:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"testc.inventory.customers.Key"},"payload":{"id":1002}} (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG value:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"int64","optional":true,"field":"__lsn"},{"type":"int64","optional":true,"field":"__null_ts_ms"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"testc.inventory.customers.Value"},"payload":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com","__op":"r","__table":"customers","__lsn":33832960,"__null_ts_ms":1608146918820,"__deleted":"false"}} (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG dest:testc.inventory.customers (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG key:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"testc.inventory.customers.Key"},"payload":{"id":1003}} (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG value:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"int64","optional":true,"field":"__lsn"},{"type":"int64","optional":true,"field":"__null_ts_ms"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"testc.inventory.customers.Value"},"payload":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com","__op":"r","__table":"customers","__lsn":33832960,"__null_ts_ms":1608146918829,"__deleted":"false"}} (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG dest:testc.inventory.customers (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG key:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"testc.inventory.customers.Key"},"payload":{"id":1004}} (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG value:{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"int64","optional":true,"field":"__lsn"},{"type":"int64","optional":true,"field":"__null_ts_ms"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"testc.inventory.customers.Value"},"payload":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org","__op":"r","__table":"customers","__lsn":33832960,"__null_ts_ms":1608146918831,"__deleted":"false"}} (io.debezium.server.batch.IcebergChangeConsumer) -2020-12-16 20:28:39,041 DEBUG dest:testc.inventory.customers (io.debezium.server.batch.IcebergChangeConsumer) \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java new file mode 100644 index 00000000..2ad50d13 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java @@ -0,0 +1,57 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg; + +import io.debezium.server.testresource.BaseSparkTest; +import io.debezium.server.testresource.S3Minio; +import io.debezium.server.testresource.SourceMysqlDB; +import io.debezium.util.Testing; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +import java.time.Duration; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.awaitility.Awaitility; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Test; + +/** + * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. + * + * @author Ismail Simsek + */ +@QuarkusTest +@QuarkusTestResource(S3Minio.class) +@QuarkusTestResource(SourceMysqlDB.class) +@TestProfile(BatchSparkChangeConsumerMysqlTestProfile.class) +public class BatchSparkChangeConsumerMysqlTest extends BaseSparkTest { + + + @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000") + Integer maxBatchSize; + + @Test + public void testSimpleUpload() { + Testing.Print.enable(); + + Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { + try { + Dataset df = getTableData("testc.inventory.customers"); + df.show(false); + return df.filter("id is not null").count() >= 4; + } catch (Exception e) { + return false; + } + }); + } + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java new file mode 100644 index 00000000..b1107adf --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java @@ -0,0 +1,32 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg; + +import io.quarkus.test.junit.QuarkusTestProfile; + +import java.util.HashMap; +import java.util.Map; + +public class BatchSparkChangeConsumerMysqlTestProfile implements QuarkusTestProfile { + + //This method allows us to override configuration properties. + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("quarkus.profile", "mysql"); + config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector"); + return config; + } + + @Override + public String getConfigProfile() { + return "mysql"; + } + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index d5fcac32..e91a286f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -9,7 +9,9 @@ package io.debezium.server.iceberg; import io.debezium.server.TestConfigSource; +import io.debezium.util.Testing; +import java.nio.file.Path; import java.util.HashMap; import java.util.Map; @@ -21,8 +23,10 @@ public class ConfigSource extends TestConfigSource { public static final String S3_BUCKET = "test-bucket"; final Map s3Test = new HashMap<>(); + public static final Path HISTORY_FILE = Testing.Files.createTestingPath("dbhistory.txt").toAbsolutePath(); public ConfigSource() { + s3Test.put("quarkus.profile", "postgresql"); // common sink conf s3Test.put("debezium.sink.type", "iceberg"); s3Test.put("debezium.sink.iceberg.upsert", "false"); @@ -48,15 +52,16 @@ public ConfigSource() { // debezium unwrap message s3Test.put("debezium.transforms", "unwrap"); s3Test.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState"); - s3Test.put("debezium.transforms.unwrap.add.fields", "op,table,lsn,source.ts_ms"); - s3Test.put("debezium.transforms.unwrap.add.headers", "db"); + s3Test.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db"); s3Test.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite"); // DEBEZIUM SOURCE conf s3Test.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + s3Test.put("debezium.source.database.history", "io.debezium.relational.history.FileDatabaseHistory"); + s3Test.put("debezium.source.database.history.file.filename", HISTORY_FILE.toAbsolutePath().toString()); s3Test.put("debezium.source.offset.flush.interval.ms", "60000"); s3Test.put("debezium.source.database.server.name", "testc"); - s3Test.put("debezium.source.schema.whitelist", "inventory"); + s3Test.put("%postgresql.debezium.source.schema.whitelist", "inventory"); s3Test.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," + "inventory.geom,inventory.table_datatypes,inventory.test_date_table"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourceMysqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourceMysqlDB.java index 75bbe3e3..14e83b4c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourceMysqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/SourceMysqlDB.java @@ -36,7 +36,14 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager { public static final Integer MYSQL_PORT_DEFAULT = 3306; private static final Logger LOGGER = LoggerFactory.getLogger(SourceMysqlDB.class); - private static GenericContainer container; + static private GenericContainer container; + + @Override + public void stop() { + if (container != null) { + container.stop(); + } + } public static void runSQL(String query) throws SQLException, ClassNotFoundException { try { @@ -52,17 +59,6 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept } } - public static Integer getMappedPort() { - return container.getMappedPort(MYSQL_PORT_DEFAULT); - } - - @Override - public void stop() { - if (container != null) { - container.stop(); - } - } - @Override public Map start() { container = new GenericContainer<>(MYSQL_IMAGE) @@ -82,4 +78,9 @@ public Map start() { return params; } + public static Integer getMappedPort() { + return container.getMappedPort(MYSQL_PORT_DEFAULT); + } + + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestUtil.java index 268d2e84..4f57e549 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/testresource/TestUtil.java @@ -45,6 +45,16 @@ public void markProcessed(Object record) throws InterruptedException { public synchronized void markBatchFinished() throws InterruptedException { return; } + + @Override + public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException { + return; + } + + @Override + public DebeziumEngine.Offsets buildOffsets() { + return null; + } }; } diff --git a/pom.xml b/pom.xml index 58642c80..a2507ddc 100644 --- a/pom.xml +++ b/pom.xml @@ -37,9 +37,9 @@ 2.16.34 1.11.1 - 1.4.2.Final + 1.5.2.Final - 1.13.2.Final + 1.13.7.Final