Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions modules/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
_ "embed"
"errors"
"fmt"
"net"
"net/url"
"time"

"github.com/testcontainers/testcontainers-go"
Expand Down Expand Up @@ -125,10 +127,23 @@ func (c *MongoDBContainer) ConnectionString(ctx context.Context) (string, error)
if err != nil {
return "", err
}
u := url.URL{
Scheme: "mongodb",
Host: net.JoinHostPort(host, port.Port()),
Path: "/",
}

if c.username != "" && c.password != "" {
return fmt.Sprintf("mongodb://%s:%s@%s:%s", c.username, c.password, host, port.Port()), nil
u.User = url.UserPassword(c.username, c.password)
}

if c.replicaSet != "" {
q := url.Values{}
q.Add("replicaSet", c.replicaSet)
u.RawQuery = q.Encode()
}
return c.Endpoint(ctx, "mongodb")

return u.String(), nil
}

func setupEntrypointForAuth(req *testcontainers.GenericContainerRequest) {
Expand Down Expand Up @@ -186,7 +201,6 @@ func initiateReplicaSet(req *testcontainers.GenericContainerRequest, cli mongoCl
replSetName,
ip,
)

return wait.ForExec(cmd).WaitUntilReady(ctx, c)
},
},
Expand Down
46 changes: 41 additions & 5 deletions modules/mongodb/mongodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package mongodb_test

import (
"context"
"fmt"
"net/url"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -125,18 +127,52 @@ func TestMongoDB(t *testing.T) {
endpoint, err := mongodbContainer.ConnectionString(ctx)
require.NoError(tt, err)

// Force direct connection to the container to avoid the replica set
// connection string that is returned by the container itself when
// using the replica set option.
// Force direct connection to the container.
mongoClient, err := mongo.Connect(ctx, options.Client().ApplyURI(endpoint).SetDirect(true))
require.NoError(tt, err)

err = mongoClient.Ping(ctx, nil)
require.NoError(tt, err)
require.Equal(t, "test", mongoClient.Database("test").Name())
require.Equal(tt, "test", mongoClient.Database("test").Name())

_, err = mongoClient.Database("testcontainer").Collection("test").InsertOne(context.Background(), bson.M{})
// Basic insert test.
_, err = mongoClient.Database("testcontainer").Collection("test").InsertOne(ctx, bson.M{})
require.NoError(tt, err)

// If the container is configured with a replica set, run the change stream test.
if hasReplica, _ := hasReplicaSet(endpoint); hasReplica {
coll := mongoClient.Database("test").Collection("changes")
stream, err := coll.Watch(ctx, mongo.Pipeline{})
require.NoError(tt, err)
defer stream.Close(ctx)

doc := bson.M{"message": "hello change streams"}
_, err = coll.InsertOne(ctx, doc)
require.NoError(tt, err)

require.True(tt, stream.Next(ctx))
var changeEvent bson.M
err = stream.Decode(&changeEvent)
require.NoError(tt, err)

opType, ok := changeEvent["operationType"].(string)
require.True(tt, ok, "Expected operationType field")
require.Equal(tt, "insert", opType, "Expected operationType to be 'insert'")

fullDoc, ok := changeEvent["fullDocument"].(bson.M)
require.True(tt, ok, "Expected fullDocument field")
require.Equal(tt, "hello change streams", fullDoc["message"])
}
})
}
}

// hasReplicaSet checks if the connection string includes a replicaSet query parameter.
func hasReplicaSet(connStr string) (bool, error) {
u, err := url.Parse(connStr)
if err != nil {
return false, fmt.Errorf("parse connection string: %w", err)
}
q := u.Query()
return q.Get("replicaSet") != "", nil
}
Loading