-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
bccc475
commit 59d8488
Showing
5 changed files
with
290 additions
and
0 deletions.
There are no files selected for viewing
50 changes: 50 additions & 0 deletions
50
...eberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* | ||
* * Copyright memiiso Authors. | ||
* * | ||
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
*/ | ||
|
||
package io.debezium.server.iceberg; | ||
|
||
import io.debezium.server.iceberg.testresources.BaseSparkTest; | ||
import io.debezium.server.iceberg.testresources.S3Minio; | ||
import io.debezium.server.iceberg.testresources.SourceMangoDB; | ||
import io.quarkus.test.common.QuarkusTestResource; | ||
import io.quarkus.test.junit.QuarkusTest; | ||
import io.quarkus.test.junit.TestProfile; | ||
|
||
import java.time.Duration; | ||
|
||
import org.apache.spark.sql.Dataset; | ||
import org.apache.spark.sql.Row; | ||
import org.awaitility.Awaitility; | ||
import org.junit.jupiter.api.Test; | ||
|
||
/** | ||
* Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. | ||
* | ||
* @author Ismail Simsek | ||
*/ | ||
@QuarkusTest | ||
@QuarkusTestResource(S3Minio.class) | ||
@QuarkusTestResource(SourceMangoDB.class) | ||
@TestProfile(IcebergChangeConsumerMangodbTestProfile.class) | ||
public class IcebergChangeConsumerMangodbTest extends BaseSparkTest { | ||
|
||
@Test | ||
public void testSimpleUpload() { | ||
|
||
Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { | ||
try { | ||
Dataset<Row> df = getTableData("testc.inventory.products"); | ||
df.show(); | ||
return df.filter("_id is not null").count() >= 4; | ||
} catch (Exception e) { | ||
return false; | ||
} | ||
}); | ||
} | ||
|
||
} |
45 changes: 45 additions & 0 deletions
45
...ink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTestProfile.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* | ||
* * Copyright memiiso Authors. | ||
* * | ||
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
*/ | ||
|
||
package io.debezium.server.iceberg; | ||
|
||
import io.quarkus.test.junit.QuarkusTestProfile; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile { | ||
|
||
//This method allows us to override configuration properties. | ||
@Override | ||
public Map<String, String> getConfigOverrides() { | ||
Map<String, String> config = new HashMap<>(); | ||
config.put("quarkus.profile", "mongodb"); | ||
config.put("%mongodb.debezium.source.connector.class", "io.debezium.connector.mongodb.MongoDbConnector"); | ||
config.put("%mongodb.debezium.transforms.unwrap.type", "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"); | ||
config.put("%mongodb.debezium.transforms.unwrap.add.fields", "op,source.ts_ms,db"); | ||
config.put("%mongodb.debezium.sink.iceberg.allow-field-addition", "false"); | ||
config.put("%mongodb.debezium.source.mongodb.name", "testc"); | ||
config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok | ||
config.put("%mongodb.debezium.source.collection.include.list", "inventory.products"); | ||
|
||
// IMPORTANT !!! FIX MongoDbConnector KEY FIELD NAME "id"=>"_id" !!! | ||
config.put("%mongodb.debezium.transforms", "unwrap,renamekeyfield"); | ||
config.put("%mongodb.debezium.transforms.renamekeyfield.type", | ||
"org.apache.kafka.connect.transforms.ReplaceField$Key"); | ||
config.put("%mongodb.debezium.transforms.renamekeyfield.renames", "id:_id"); | ||
|
||
return config; | ||
} | ||
|
||
@Override | ||
public String getConfigProfile() { | ||
return "mongodb"; | ||
} | ||
|
||
} |
54 changes: 54 additions & 0 deletions
54
...er-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMangoDB.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
/* | ||
* | ||
* * Copyright memiiso Authors. | ||
* * | ||
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
*/ | ||
|
||
package io.debezium.server.iceberg.testresources; | ||
|
||
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; | ||
|
||
import java.time.Duration; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
import org.testcontainers.containers.GenericContainer; | ||
import org.testcontainers.containers.wait.strategy.Wait; | ||
import org.testcontainers.images.builder.ImageFromDockerfile; | ||
|
||
public class SourceMangoDB implements QuarkusTestResourceLifecycleManager { | ||
|
||
public static final int MONGODB_PORT = 27017; | ||
public static final GenericContainer<?> container = new GenericContainer( | ||
new ImageFromDockerfile("debezium_mongodb", false) | ||
.withFileFromClasspath("Dockerfile", "mongodb/Dockerfile") | ||
.withFileFromClasspath("start-mongodb.sh", "mongodb/start-mongodb.sh")) | ||
|
||
.waitingFor(Wait.forLogMessage(".*Successfully initialized inventory database.*", 1)) | ||
.withStartupTimeout(Duration.ofSeconds(60L)); | ||
|
||
@Override | ||
public Map<String, String> start() { | ||
container.withExposedPorts(MONGODB_PORT).start(); | ||
|
||
Map<String, String> params = new ConcurrentHashMap<>(); | ||
params.put("%mongodb.debezium.source.mongodb.hosts", | ||
"rs0/" + container.getHost() + ":" + container.getMappedPort(MONGODB_PORT) | ||
); | ||
params.put("%mongodb.debezium.source.mongodb.authsource", "admin"); | ||
params.put("%mongodb.debezium.source.mongodb.user", "debezium"); | ||
params.put("%mongodb.debezium.source.mongodb.password", "dbz"); | ||
//params.put("%mongodb.debezium.source.mongodb.ssl.enabled", "false"); | ||
return params; | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
if (container != null) { | ||
container.stop(); | ||
} | ||
} | ||
|
||
} |
16 changes: 16 additions & 0 deletions
16
debezium-server-iceberg-sink/src/test/resources/mongodb/Dockerfile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
FROM mongo:5.0 | ||
|
||
LABEL maintainer="Debezium Community" | ||
|
||
ENV REPLICA_SET_HOSTS="localhost" | ||
|
||
# Starting with MongoDB 4.4 the authentication enabled MongoDB requires a key | ||
# for intra-replica set communication | ||
RUN openssl rand -base64 756 > /etc/mongodb.keyfile &&\ | ||
chown mongodb:mongodb /etc/mongodb.keyfile &&\ | ||
chmod 400 /etc/mongodb.keyfile | ||
|
||
COPY start-mongodb.sh /usr/local/bin/ | ||
RUN chmod +x /usr/local/bin/start-mongodb.sh | ||
|
||
ENTRYPOINT ["start-mongodb.sh"] |
125 changes: 125 additions & 0 deletions
125
debezium-server-iceberg-sink/src/test/resources/mongodb/start-mongodb.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
#!/usr/bin/env bash | ||
|
||
# | ||
# /* | ||
# * Copyright memiiso Authors. | ||
# * | ||
# * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | ||
# */ | ||
# | ||
|
||
set -e | ||
# enable job control used fg | ||
set -m | ||
|
||
function wait_mongodb_upandready { | ||
# Wait until Mongo is ready to accept connections, exit if this does not happen within 30 seconds | ||
COUNTER=0 | ||
until mongosh --quiet --eval "use admin; printjson(db.serverStatus());" | ||
do | ||
sleep 1 | ||
COUNTER=$((COUNTER+1)) | ||
if [ "${COUNTER}" -gt "30" ]; then | ||
echo "MongoDB did not initialize within 30 seconds, exiting" | ||
exit 2 | ||
fi | ||
echo "Waiting for MongoDB to initialize... ${COUNTER}/30" | ||
done | ||
} | ||
|
||
function deploy_replica_set { | ||
wait_mongodb_upandready | ||
mongosh -u admin -p admin --authenticationDatabase admin localhost:27017/inventory <<-EOF | ||
var repsetmembers = {_id : "rs0",members: []}; | ||
var arrayhosts = "${REPLICA_SET_HOSTS}".split(','); | ||
for(var i = 0; i < arrayhosts.length; i++) { | ||
repsetmembers['members'].push({ _id: i, host: arrayhosts[i]}); | ||
} | ||
print("Initializing replica set:\n" + JSON.stringify(repsetmembers)); | ||
rs.initiate(repsetmembers); | ||
print('Initiated replica set'); | ||
EOF | ||
echo "Successfully initialized inventory database" | ||
} | ||
|
||
function deploy_inventory_database { | ||
wait_mongodb_upandready | ||
echo "Deploying users and roles" | ||
mongosh localhost:27017/admin <<-EOF | ||
db.createUser({ user: 'admin', pwd: 'admin', roles: [ { role: "root", db: "admin" } ] }); | ||
EOF | ||
|
||
mongosh -u admin -p admin localhost:27017/admin <<-EOF | ||
db.runCommand({ | ||
createRole: "listDatabases", | ||
privileges: [ | ||
{ resource: { cluster : true }, actions: ["listDatabases"]} | ||
], | ||
roles: [] | ||
}); | ||
db.runCommand({ | ||
createRole: "readChangeStream", | ||
privileges: [ | ||
{ resource: { db: "", collection: ""}, actions: [ "find", "changeStream" ] } | ||
], | ||
roles: [] | ||
}); | ||
db.createUser({ | ||
user: 'debezium', | ||
pwd: 'dbz', | ||
roles: [ | ||
{ role: "readWrite", db: "inventory" }, | ||
{ role: "read", db: "local" }, | ||
{ role: "listDatabases", db: "admin" }, | ||
{ role: "readChangeStream", db: "admin" }, | ||
{ role: "read", db: "config" }, | ||
{ role: "read", db: "admin" } | ||
] | ||
}); | ||
EOF | ||
|
||
echo "Created users" | ||
|
||
mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory <<-EOF | ||
db.products.insert([ | ||
{ _id : NumberLong("101"), name : 'scooter', description: 'Small 2-wheel scooter', weight : 3.14, quantity : NumberInt("3") }, | ||
{ _id : NumberLong("102"), name : 'car battery', description: '12V car battery', weight : 8.1, quantity : NumberInt("8") }, | ||
{ _id : NumberLong("103"), name : '12-pack drill bits', description: '12-pack of drill bits with sizes ranging from #40 to #3', weight : 0.8, quantity : NumberInt("18") }, | ||
{ _id : NumberLong("104"), name : 'hammer', description: "12oz carpenter's hammer", weight : 0.75, quantity : NumberInt("4") }, | ||
{ _id : NumberLong("105"), name : 'hammer', description: "14oz carpenter's hammer", weight : 0.875, quantity : NumberInt("5") }, | ||
{ _id : NumberLong("106"), name : 'hammer', description: "16oz carpenter's hammer", weight : 1.0, quantity : NumberInt("0") }, | ||
{ _id : NumberLong("107"), name : 'rocks', description: 'box of assorted rocks', weight : 5.3, quantity : NumberInt("44") }, | ||
{ _id : NumberLong("108"), name : 'jacket', description: 'water resistent black wind breaker', weight : 0.1, quantity : NumberInt("2") }, | ||
{ _id : NumberLong("109"), name : 'spare tire', description: '24 inch spare tire', weight : 22.2, quantity : NumberInt("5") } | ||
]); | ||
db.customers.insert([ | ||
{ _id : NumberLong("1001"), first_name : 'Sally', last_name : 'Thomas', email : '[email protected]' }, | ||
{ _id : NumberLong("1002"), first_name : 'George', last_name : 'Bailey', email : '[email protected]' }, | ||
{ _id : NumberLong("1003"), first_name : 'Edward', last_name : 'Walker', email : '[email protected]' }, | ||
{ _id : NumberLong("1004"), first_name : 'Anne', last_name : 'Kretchmar', email : '[email protected]' } | ||
]); | ||
db.orders.insert([ | ||
{ _id : NumberLong("10001"), order_date : new ISODate("2016-01-16T00:00:00Z"), purchaser_id : NumberLong("1001"), quantity : NumberInt("1"), product_id : NumberLong("102") }, | ||
{ _id : NumberLong("10002"), order_date : new ISODate("2016-01-17T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("105") }, | ||
{ _id : NumberLong("10003"), order_date : new ISODate("2016-02-19T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("106") }, | ||
{ _id : NumberLong("10004"), order_date : new ISODate("2016-02-21T00:00:00Z"), purchaser_id : NumberLong("1003"), quantity : NumberInt("1"), product_id : NumberLong("107") } | ||
]); | ||
EOF | ||
|
||
echo "Inserted example data" | ||
} | ||
|
||
# echo '' > /var/log/mongodb/mongod.log | ||
mongod --keyFile /etc/mongodb.keyfile --fork --logpath /var/log/mongodb/mongod.log | ||
deploy_inventory_database | ||
|
||
echo "Restarting...." | ||
mongod --shutdown | ||
|
||
mongod --bind_ip_all --replSet rs0 --keyFile /etc/mongodb.keyfile &\ | ||
deploy_replica_set &&\ | ||
fg |