Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,21 @@ The Couchbase MCP server can also be used as a managed server in your agentic ap
- Check the logs for any errors or warnings that may indicate issues with the MCP server. The location of the logs depend on your MCP client.
- If you are observing issues running your MCP server from source after updating your local MCP server repository, try running `uv sync` to update the [dependencies](https://docs.astral.sh/uv/concepts/projects/sync/#syncing-the-environment).

## Integration testing

We provide high-level MCP integration tests to verify that the server exposes the expected tools and that they can be invoked against a demo Couchbase cluster.

1. Export demo cluster credentials:
- `CB_CONNECTION_STRING`
- `CB_USERNAME`
- `CB_PASSWORD`
- Optional: `CB_MCP_TEST_BUCKET` (a bucket to probe during the tests)
2. Run the tests:

```bash
uv run pytest tests/ -v
```

---

## 👩‍💻 Contributing
Expand Down
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ couchbase-mcp-server = "mcp_server:main"
dev = [
"ruff==0.12.5",
"pre-commit==4.2.0",
"pytest==8.3.3",
"pytest-asyncio==0.24.0",
]

# Ruff configuration
Expand Down Expand Up @@ -124,6 +126,11 @@ indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"

# Pytest configuration
[tool.pytest.ini_options]
asyncio_mode = "strict"
asyncio_default_fixture_loop_scope = "function"

# Build system configuration
[build-system]
requires = ["hatchling"]
Expand Down
217 changes: 217 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
"""
Shared fixtures and utilities for MCP server integration tests.
"""

from __future__ import annotations

import asyncio
import json
import os
import sys
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any

import pytest
from mcp import ClientSession, StdioServerParameters, stdio_client

PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = PROJECT_ROOT / "src"

# Tools we expect to be registered by the server
EXPECTED_TOOLS = {
"get_buckets_in_cluster",
"get_server_configuration_status",
"test_cluster_connection",
"get_scopes_and_collections_in_bucket",
"get_collections_in_scope",
"get_scopes_in_bucket",
"get_document_by_id",
"upsert_document_by_id",
"delete_document_by_id",
"get_schema_for_collection",
"run_sql_plus_plus_query",
"get_index_advisor_recommendations",
"list_indexes",
"get_cluster_health_and_services",
}

# Tools organized by category for validation
TOOLS_BY_CATEGORY = {
"server": {
"get_server_configuration_status",
"test_cluster_connection",
"get_buckets_in_cluster",
"get_scopes_in_bucket",
"get_scopes_and_collections_in_bucket",
"get_collections_in_scope",
"get_cluster_health_and_services",
},
"kv": {
"get_document_by_id",
"upsert_document_by_id",
"delete_document_by_id",
},
"query": {
"get_schema_for_collection",
"run_sql_plus_plus_query",
},
"index": {
"list_indexes",
"get_index_advisor_recommendations",
},
}

# Expected required parameters for tools that need them
TOOL_REQUIRED_PARAMS = {
"get_scopes_in_bucket": ["bucket_name"],
"get_scopes_and_collections_in_bucket": ["bucket_name"],
"get_collections_in_scope": ["bucket_name", "scope_name"],
"get_document_by_id": [
"bucket_name",
"scope_name",
"collection_name",
"document_id",
],
"upsert_document_by_id": [
"bucket_name",
"scope_name",
"collection_name",
"document_id",
"document_content",
],
"delete_document_by_id": [
"bucket_name",
"scope_name",
"collection_name",
"document_id",
],
"get_schema_for_collection": ["bucket_name", "scope_name", "collection_name"],
"run_sql_plus_plus_query": ["bucket_name", "scope_name", "query"],
"get_index_advisor_recommendations": ["bucket_name", "scope_name", "query"],
}

# Minimum configuration needed to talk to a demo cluster
REQUIRED_ENV_VARS = ("CB_CONNECTION_STRING", "CB_USERNAME", "CB_PASSWORD")

# Default timeout (seconds) to guard against hangs when the Couchbase cluster
# is unreachable or slow. Override with CB_MCP_TEST_TIMEOUT if needed.
DEFAULT_TIMEOUT = int(os.getenv("CB_MCP_TEST_TIMEOUT", "120"))


def _build_env() -> dict[str, str]:
"""Build the environment passed to the test server process."""
env = os.environ.copy()
missing = [var for var in REQUIRED_ENV_VARS if not env.get(var)]
if missing:
pytest.skip(
"Integration tests require demo cluster credentials. "
f"Missing env vars: {', '.join(missing)}"
)

# Ensure the server module can be imported from the repo's src/ folder
existing_path = env.get("PYTHONPATH")
env["PYTHONPATH"] = (
f"{SRC_DIR}{os.pathsep}{existing_path}" if existing_path else str(SRC_DIR)
)

# Force stdio transport for the test server to match stdio_client
env["CB_MCP_TRANSPORT"] = "stdio"
# Ensure unbuffered output to avoid stdout/stderr buffering surprises
env.setdefault("PYTHONUNBUFFERED", "1")
return env


@asynccontextmanager
async def create_mcp_session() -> AsyncIterator[ClientSession]:
"""Create a fresh MCP client session connected to the server over stdio."""
env = _build_env()
params = StdioServerParameters(
command=sys.executable,
args=["-m", "mcp_server"],
env=env,
)

async with asyncio.timeout(DEFAULT_TIMEOUT):
async with stdio_client(params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
yield session


def extract_payload(response: Any) -> Any:
"""Extract a usable payload from a tool response.

MCP tool responses can return data in different formats:
- A single content block with JSON-encoded data (dict, list, etc.)
- Multiple content blocks, one per list item (for list returns)

This function handles both cases.
"""
content = getattr(response, "content", None) or []
if not content:
return None

# If there are multiple content blocks, collect them all as a list
# (each item in a list return may be a separate content block)
if len(content) > 1:
items = []
for block in content:
text = getattr(block, "text", None)
if text is not None:
try:
items.append(json.loads(text))
except json.JSONDecodeError:
items.append(text)
return items if items else None

# Single content block - try to parse as JSON
first = content[0]
raw = getattr(first, "text", None)
if raw is None and hasattr(first, "data"):
raw = first.data

if isinstance(raw, str):
try:
return json.loads(raw)
except json.JSONDecodeError:
return raw

return raw


def get_test_bucket() -> str | None:
"""Get the test bucket name from environment, or None if not set."""
return os.getenv("CB_MCP_TEST_BUCKET")


def get_test_scope() -> str:
"""Get the test scope name from environment, defaults to _default."""
return os.getenv("CB_MCP_TEST_SCOPE", "_default")


def get_test_collection() -> str:
"""Get the test collection name from environment, defaults to _default."""
return os.getenv("CB_MCP_TEST_COLLECTION", "_default")


def require_test_bucket() -> str:
"""Get the test bucket name, skipping test if not set."""
bucket = get_test_bucket()
if not bucket:
pytest.skip("CB_MCP_TEST_BUCKET not set")
return bucket


def ensure_list(value: Any) -> list[Any]:
"""Ensure the value is a list.

MCP can return single-item lists as just the item (not wrapped in a list).
This helper wraps single non-list values in a list for consistent handling.
"""
if value is None:
return []
if isinstance(value, list):
return value
return [value]
Loading
Loading