Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mangodb test #65

Merged
merged 1 commit into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMangoDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

/**
* Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination.
*
* @author Ismail Simsek
*/
@QuarkusTest
@QuarkusTestResource(S3Minio.class)
@QuarkusTestResource(SourceMangoDB.class)
@TestProfile(IcebergChangeConsumerMangodbTestProfile.class)
public class IcebergChangeConsumerMangodbTest extends BaseSparkTest {

@Test
public void testSimpleUpload() {

Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.products");
df.show();
return df.filter("_id is not null").count() >= 4;
} catch (Exception e) {
return false;
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import io.quarkus.test.junit.QuarkusTestProfile;

import java.util.HashMap;
import java.util.Map;

public class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mongodb");
config.put("%mongodb.debezium.source.connector.class", "io.debezium.connector.mongodb.MongoDbConnector");
config.put("%mongodb.debezium.transforms.unwrap.type", "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState");
config.put("%mongodb.debezium.transforms.unwrap.add.fields", "op,source.ts_ms,db");
config.put("%mongodb.debezium.sink.iceberg.allow-field-addition", "false");
config.put("%mongodb.debezium.source.mongodb.name", "testc");
config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok
config.put("%mongodb.debezium.source.collection.include.list", "inventory.products");

// IMPORTANT !!! FIX MongoDbConnector KEY FIELD NAME "id"=>"_id" !!!
config.put("%mongodb.debezium.transforms", "unwrap,renamekeyfield");
config.put("%mongodb.debezium.transforms.renamekeyfield.type",
"org.apache.kafka.connect.transforms.ReplaceField$Key");
config.put("%mongodb.debezium.transforms.renamekeyfield.renames", "id:_id");

return config;
}

@Override
public String getConfigProfile() {
return "mongodb";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg.testresources;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.ImageFromDockerfile;

public class SourceMangoDB implements QuarkusTestResourceLifecycleManager {

public static final int MONGODB_PORT = 27017;
public static final GenericContainer<?> container = new GenericContainer(
new ImageFromDockerfile("debezium_mongodb", false)
.withFileFromClasspath("Dockerfile", "mongodb/Dockerfile")
.withFileFromClasspath("start-mongodb.sh", "mongodb/start-mongodb.sh"))

.waitingFor(Wait.forLogMessage(".*Successfully initialized inventory database.*", 1))
.withStartupTimeout(Duration.ofSeconds(60L));

@Override
public Map<String, String> start() {
container.withExposedPorts(MONGODB_PORT).start();

Map<String, String> params = new ConcurrentHashMap<>();
params.put("%mongodb.debezium.source.mongodb.hosts",
"rs0/" + container.getHost() + ":" + container.getMappedPort(MONGODB_PORT)
);
params.put("%mongodb.debezium.source.mongodb.authsource", "admin");
params.put("%mongodb.debezium.source.mongodb.user", "debezium");
params.put("%mongodb.debezium.source.mongodb.password", "dbz");
//params.put("%mongodb.debezium.source.mongodb.ssl.enabled", "false");
return params;
}

@Override
public void stop() {
if (container != null) {
container.stop();
}
}

}
16 changes: 16 additions & 0 deletions debezium-server-iceberg-sink/src/test/resources/mongodb/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM mongo:5.0

LABEL maintainer="Debezium Community"

ENV REPLICA_SET_HOSTS="localhost"

# Starting with MongoDB 4.4 the authentication enabled MongoDB requires a key
# for intra-replica set communication
RUN openssl rand -base64 756 > /etc/mongodb.keyfile &&\
chown mongodb:mongodb /etc/mongodb.keyfile &&\
chmod 400 /etc/mongodb.keyfile

COPY start-mongodb.sh /usr/local/bin/
RUN chmod +x /usr/local/bin/start-mongodb.sh

ENTRYPOINT ["start-mongodb.sh"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env bash

#
# /*
# * Copyright memiiso Authors.
# *
# * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
# */
#

set -e
# enable job control used fg
set -m

function wait_mongodb_upandready {
# Wait until Mongo is ready to accept connections, exit if this does not happen within 30 seconds
COUNTER=0
until mongosh --quiet --eval "use admin; printjson(db.serverStatus());"
do
sleep 1
COUNTER=$((COUNTER+1))
if [ "${COUNTER}" -gt "30" ]; then
echo "MongoDB did not initialize within 30 seconds, exiting"
exit 2
fi
echo "Waiting for MongoDB to initialize... ${COUNTER}/30"
done
}

function deploy_replica_set {
wait_mongodb_upandready
mongosh -u admin -p admin --authenticationDatabase admin localhost:27017/inventory <<-EOF
var repsetmembers = {_id : "rs0",members: []};
var arrayhosts = "${REPLICA_SET_HOSTS}".split(',');
for(var i = 0; i < arrayhosts.length; i++) {
repsetmembers['members'].push({ _id: i, host: arrayhosts[i]});
}
print("Initializing replica set:\n" + JSON.stringify(repsetmembers));
rs.initiate(repsetmembers);
print('Initiated replica set');
EOF
echo "Successfully initialized inventory database"
}

function deploy_inventory_database {
wait_mongodb_upandready
echo "Deploying users and roles"
mongosh localhost:27017/admin <<-EOF
db.createUser({ user: 'admin', pwd: 'admin', roles: [ { role: "root", db: "admin" } ] });
EOF

mongosh -u admin -p admin localhost:27017/admin <<-EOF
db.runCommand({
createRole: "listDatabases",
privileges: [
{ resource: { cluster : true }, actions: ["listDatabases"]}
],
roles: []
});

db.runCommand({
createRole: "readChangeStream",
privileges: [
{ resource: { db: "", collection: ""}, actions: [ "find", "changeStream" ] }
],
roles: []
});

db.createUser({
user: 'debezium',
pwd: 'dbz',
roles: [
{ role: "readWrite", db: "inventory" },
{ role: "read", db: "local" },
{ role: "listDatabases", db: "admin" },
{ role: "readChangeStream", db: "admin" },
{ role: "read", db: "config" },
{ role: "read", db: "admin" }
]
});
EOF

echo "Created users"

mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory <<-EOF
db.products.insert([
{ _id : NumberLong("101"), name : 'scooter', description: 'Small 2-wheel scooter', weight : 3.14, quantity : NumberInt("3") },
{ _id : NumberLong("102"), name : 'car battery', description: '12V car battery', weight : 8.1, quantity : NumberInt("8") },
{ _id : NumberLong("103"), name : '12-pack drill bits', description: '12-pack of drill bits with sizes ranging from #40 to #3', weight : 0.8, quantity : NumberInt("18") },
{ _id : NumberLong("104"), name : 'hammer', description: "12oz carpenter's hammer", weight : 0.75, quantity : NumberInt("4") },
{ _id : NumberLong("105"), name : 'hammer', description: "14oz carpenter's hammer", weight : 0.875, quantity : NumberInt("5") },
{ _id : NumberLong("106"), name : 'hammer', description: "16oz carpenter's hammer", weight : 1.0, quantity : NumberInt("0") },
{ _id : NumberLong("107"), name : 'rocks', description: 'box of assorted rocks', weight : 5.3, quantity : NumberInt("44") },
{ _id : NumberLong("108"), name : 'jacket', description: 'water resistent black wind breaker', weight : 0.1, quantity : NumberInt("2") },
{ _id : NumberLong("109"), name : 'spare tire', description: '24 inch spare tire', weight : 22.2, quantity : NumberInt("5") }
]);

db.customers.insert([
{ _id : NumberLong("1001"), first_name : 'Sally', last_name : 'Thomas', email : '[email protected]' },
{ _id : NumberLong("1002"), first_name : 'George', last_name : 'Bailey', email : '[email protected]' },
{ _id : NumberLong("1003"), first_name : 'Edward', last_name : 'Walker', email : '[email protected]' },
{ _id : NumberLong("1004"), first_name : 'Anne', last_name : 'Kretchmar', email : '[email protected]' }
]);

db.orders.insert([
{ _id : NumberLong("10001"), order_date : new ISODate("2016-01-16T00:00:00Z"), purchaser_id : NumberLong("1001"), quantity : NumberInt("1"), product_id : NumberLong("102") },
{ _id : NumberLong("10002"), order_date : new ISODate("2016-01-17T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("105") },
{ _id : NumberLong("10003"), order_date : new ISODate("2016-02-19T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("106") },
{ _id : NumberLong("10004"), order_date : new ISODate("2016-02-21T00:00:00Z"), purchaser_id : NumberLong("1003"), quantity : NumberInt("1"), product_id : NumberLong("107") }
]);
EOF

echo "Inserted example data"
}

# echo '' > /var/log/mongodb/mongod.log
mongod --keyFile /etc/mongodb.keyfile --fork --logpath /var/log/mongodb/mongod.log
deploy_inventory_database

echo "Restarting...."
mongod --shutdown

mongod --bind_ip_all --replSet rs0 --keyFile /etc/mongodb.keyfile &\
deploy_replica_set &&\
fg