diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java new file mode 100644 index 00000000..d6bdfe33 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java @@ -0,0 +1,50 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg; + +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.SourceMangoDB; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +import java.time.Duration; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +/** + * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. + * + * @author Ismail Simsek + */ +@QuarkusTest +@QuarkusTestResource(S3Minio.class) +@QuarkusTestResource(SourceMangoDB.class) +@TestProfile(IcebergChangeConsumerMangodbTestProfile.class) +public class IcebergChangeConsumerMangodbTest extends BaseSparkTest { + + @Test + public void testSimpleUpload() { + + Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { + try { + Dataset df = getTableData("testc.inventory.products"); + df.show(); + return df.filter("_id is not null").count() >= 4; + } catch (Exception e) { + return false; + } + }); + } + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTestProfile.java new file mode 100644 index 00000000..237a31b0 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTestProfile.java @@ -0,0 +1,45 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg; + +import io.quarkus.test.junit.QuarkusTestProfile; + +import java.util.HashMap; +import java.util.Map; + +public class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile { + + //This method allows us to override configuration properties. + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("quarkus.profile", "mongodb"); + config.put("%mongodb.debezium.source.connector.class", "io.debezium.connector.mongodb.MongoDbConnector"); + config.put("%mongodb.debezium.transforms.unwrap.type", "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"); + config.put("%mongodb.debezium.transforms.unwrap.add.fields", "op,source.ts_ms,db"); + config.put("%mongodb.debezium.sink.iceberg.allow-field-addition", "false"); + config.put("%mongodb.debezium.source.mongodb.name", "testc"); + config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok + config.put("%mongodb.debezium.source.collection.include.list", "inventory.products"); + + // IMPORTANT !!! FIX MongoDbConnector KEY FIELD NAME "id"=>"_id" !!! + config.put("%mongodb.debezium.transforms", "unwrap,renamekeyfield"); + config.put("%mongodb.debezium.transforms.renamekeyfield.type", + "org.apache.kafka.connect.transforms.ReplaceField$Key"); + config.put("%mongodb.debezium.transforms.renamekeyfield.renames", "id:_id"); + + return config; + } + + @Override + public String getConfigProfile() { + return "mongodb"; + } + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMangoDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMangoDB.java new file mode 100644 index 00000000..f713367b --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMangoDB.java @@ -0,0 +1,54 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.testresources; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.ImageFromDockerfile; + +public class SourceMangoDB implements QuarkusTestResourceLifecycleManager { + + public static final int MONGODB_PORT = 27017; + public static final GenericContainer container = new GenericContainer( + new ImageFromDockerfile("debezium_mongodb", false) + .withFileFromClasspath("Dockerfile", "mongodb/Dockerfile") + .withFileFromClasspath("start-mongodb.sh", "mongodb/start-mongodb.sh")) + + .waitingFor(Wait.forLogMessage(".*Successfully initialized inventory database.*", 1)) + .withStartupTimeout(Duration.ofSeconds(60L)); + + @Override + public Map start() { + container.withExposedPorts(MONGODB_PORT).start(); + + Map params = new ConcurrentHashMap<>(); + params.put("%mongodb.debezium.source.mongodb.hosts", + "rs0/" + container.getHost() + ":" + container.getMappedPort(MONGODB_PORT) + ); + params.put("%mongodb.debezium.source.mongodb.authsource", "admin"); + params.put("%mongodb.debezium.source.mongodb.user", "debezium"); + params.put("%mongodb.debezium.source.mongodb.password", "dbz"); + //params.put("%mongodb.debezium.source.mongodb.ssl.enabled", "false"); + return params; + } + + @Override + public void stop() { + if (container != null) { + container.stop(); + } + } + +} diff --git a/debezium-server-iceberg-sink/src/test/resources/mongodb/Dockerfile b/debezium-server-iceberg-sink/src/test/resources/mongodb/Dockerfile new file mode 100644 index 00000000..7ba8a3c9 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/mongodb/Dockerfile @@ -0,0 +1,16 @@ +FROM mongo:5.0 + +LABEL maintainer="Debezium Community" + +ENV REPLICA_SET_HOSTS="localhost" + +# Starting with MongoDB 4.4 the authentication enabled MongoDB requires a key +# for intra-replica set communication +RUN openssl rand -base64 756 > /etc/mongodb.keyfile &&\ + chown mongodb:mongodb /etc/mongodb.keyfile &&\ + chmod 400 /etc/mongodb.keyfile + +COPY start-mongodb.sh /usr/local/bin/ +RUN chmod +x /usr/local/bin/start-mongodb.sh + +ENTRYPOINT ["start-mongodb.sh"] diff --git a/debezium-server-iceberg-sink/src/test/resources/mongodb/start-mongodb.sh b/debezium-server-iceberg-sink/src/test/resources/mongodb/start-mongodb.sh new file mode 100644 index 00000000..e68eb1a7 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/mongodb/start-mongodb.sh @@ -0,0 +1,125 @@ +#!/usr/bin/env bash + +# +# /* +# * Copyright memiiso Authors. +# * +# * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 +# */ +# + +set -e +# enable job control used fg +set -m + +function wait_mongodb_upandready { + # Wait until Mongo is ready to accept connections, exit if this does not happen within 30 seconds + COUNTER=0 + until mongosh --quiet --eval "use admin; printjson(db.serverStatus());" + do + sleep 1 + COUNTER=$((COUNTER+1)) + if [ "${COUNTER}" -gt "30" ]; then + echo "MongoDB did not initialize within 30 seconds, exiting" + exit 2 + fi + echo "Waiting for MongoDB to initialize... ${COUNTER}/30" + done +} + +function deploy_replica_set { + wait_mongodb_upandready + mongosh -u admin -p admin --authenticationDatabase admin localhost:27017/inventory <<-EOF + var repsetmembers = {_id : "rs0",members: []}; + var arrayhosts = "${REPLICA_SET_HOSTS}".split(','); + for(var i = 0; i < arrayhosts.length; i++) { + repsetmembers['members'].push({ _id: i, host: arrayhosts[i]}); + } + print("Initializing replica set:\n" + JSON.stringify(repsetmembers)); + rs.initiate(repsetmembers); + print('Initiated replica set'); +EOF + echo "Successfully initialized inventory database" +} + +function deploy_inventory_database { + wait_mongodb_upandready + echo "Deploying users and roles" + mongosh localhost:27017/admin <<-EOF + db.createUser({ user: 'admin', pwd: 'admin', roles: [ { role: "root", db: "admin" } ] }); +EOF + + mongosh -u admin -p admin localhost:27017/admin <<-EOF + db.runCommand({ + createRole: "listDatabases", + privileges: [ + { resource: { cluster : true }, actions: ["listDatabases"]} + ], + roles: [] + }); + + db.runCommand({ + createRole: "readChangeStream", + privileges: [ + { resource: { db: "", collection: ""}, actions: [ "find", "changeStream" ] } + ], + roles: [] + }); + + db.createUser({ + user: 'debezium', + pwd: 'dbz', + roles: [ + { role: "readWrite", db: "inventory" }, + { role: "read", db: "local" }, + { role: "listDatabases", db: "admin" }, + { role: "readChangeStream", db: "admin" }, + { role: "read", db: "config" }, + { role: "read", db: "admin" } + ] + }); +EOF + + echo "Created users" + + mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory <<-EOF + db.products.insert([ + { _id : NumberLong("101"), name : 'scooter', description: 'Small 2-wheel scooter', weight : 3.14, quantity : NumberInt("3") }, + { _id : NumberLong("102"), name : 'car battery', description: '12V car battery', weight : 8.1, quantity : NumberInt("8") }, + { _id : NumberLong("103"), name : '12-pack drill bits', description: '12-pack of drill bits with sizes ranging from #40 to #3', weight : 0.8, quantity : NumberInt("18") }, + { _id : NumberLong("104"), name : 'hammer', description: "12oz carpenter's hammer", weight : 0.75, quantity : NumberInt("4") }, + { _id : NumberLong("105"), name : 'hammer', description: "14oz carpenter's hammer", weight : 0.875, quantity : NumberInt("5") }, + { _id : NumberLong("106"), name : 'hammer', description: "16oz carpenter's hammer", weight : 1.0, quantity : NumberInt("0") }, + { _id : NumberLong("107"), name : 'rocks', description: 'box of assorted rocks', weight : 5.3, quantity : NumberInt("44") }, + { _id : NumberLong("108"), name : 'jacket', description: 'water resistent black wind breaker', weight : 0.1, quantity : NumberInt("2") }, + { _id : NumberLong("109"), name : 'spare tire', description: '24 inch spare tire', weight : 22.2, quantity : NumberInt("5") } + ]); + + db.customers.insert([ + { _id : NumberLong("1001"), first_name : 'Sally', last_name : 'Thomas', email : 'sally.thomas@acme.com' }, + { _id : NumberLong("1002"), first_name : 'George', last_name : 'Bailey', email : 'gbailey@foobar.com' }, + { _id : NumberLong("1003"), first_name : 'Edward', last_name : 'Walker', email : 'ed@walker.com' }, + { _id : NumberLong("1004"), first_name : 'Anne', last_name : 'Kretchmar', email : 'annek@noanswer.org' } + ]); + + db.orders.insert([ + { _id : NumberLong("10001"), order_date : new ISODate("2016-01-16T00:00:00Z"), purchaser_id : NumberLong("1001"), quantity : NumberInt("1"), product_id : NumberLong("102") }, + { _id : NumberLong("10002"), order_date : new ISODate("2016-01-17T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("105") }, + { _id : NumberLong("10003"), order_date : new ISODate("2016-02-19T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("106") }, + { _id : NumberLong("10004"), order_date : new ISODate("2016-02-21T00:00:00Z"), purchaser_id : NumberLong("1003"), quantity : NumberInt("1"), product_id : NumberLong("107") } + ]); +EOF + + echo "Inserted example data" +} + +# echo '' > /var/log/mongodb/mongod.log +mongod --keyFile /etc/mongodb.keyfile --fork --logpath /var/log/mongodb/mongod.log +deploy_inventory_database + +echo "Restarting...." +mongod --shutdown + +mongod --bind_ip_all --replSet rs0 --keyFile /etc/mongodb.keyfile &\ +deploy_replica_set &&\ +fg