Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,21 @@ The Couchbase MCP server can also be used as a managed server in your agentic ap
- Check the logs for any errors or warnings that may indicate issues with the MCP server. The location of the logs depend on your MCP client.
- If you are observing issues running your MCP server from source after updating your local MCP server repository, try running `uv sync` to update the [dependencies](https://docs.astral.sh/uv/concepts/projects/sync/#syncing-the-environment).

## Integration testing

We provide high-level MCP integration tests to verify that the server exposes the expected tools and that they can be invoked against a demo Couchbase cluster.

1. Export demo cluster credentials:
- `CB_CONNECTION_STRING`
- `CB_USERNAME`
- `CB_PASSWORD`
- Optional: `CB_MCP_TEST_BUCKET` (a bucket to probe during the tests)
2. Run the tests:

```bash
uv run pytest tests/test_mcp_server_integration.py
```

---

## 👩‍💻 Contributing
Expand Down
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ couchbase-mcp-server = "mcp_server:main"
dev = [
"ruff==0.12.5",
"pre-commit==4.2.0",
"pytest==8.3.3",
"pytest-asyncio==0.24.0",
]

# Ruff configuration
Expand Down Expand Up @@ -124,6 +126,11 @@ indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"

# Pytest configuration
[tool.pytest.ini_options]
asyncio_mode = "strict"
asyncio_default_fixture_loop_scope = "function"

# Build system configuration
[build-system]
requires = ["hatchling"]
Expand Down
162 changes: 162 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
Shared fixtures and utilities for MCP server integration tests.
"""

from __future__ import annotations

import asyncio
import json
import os
import sys
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any

import pytest
from mcp import ClientSession, StdioServerParameters, stdio_client

PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = PROJECT_ROOT / "src"

# Tools we expect to be registered by the server
EXPECTED_TOOLS = {
"get_buckets_in_cluster",
"get_server_configuration_status",
"test_cluster_connection",
"get_scopes_and_collections_in_bucket",
"get_collections_in_scope",
"get_scopes_in_bucket",
"get_document_by_id",
"upsert_document_by_id",
"delete_document_by_id",
"get_schema_for_collection",
"run_sql_plus_plus_query",
"get_index_advisor_recommendations",
"list_indexes",
"get_cluster_health_and_services",
}

# Minimum configuration needed to talk to a demo cluster
REQUIRED_ENV_VARS = ("CB_CONNECTION_STRING", "CB_USERNAME", "CB_PASSWORD")

# Default timeout (seconds) to guard against hangs when the Couchbase cluster
# is unreachable or slow. Override with CB_MCP_TEST_TIMEOUT if needed.
DEFAULT_TIMEOUT = int(os.getenv("CB_MCP_TEST_TIMEOUT", "120"))


def _build_env() -> dict[str, str]:
"""Build the environment passed to the test server process."""
env = os.environ.copy()
missing = [var for var in REQUIRED_ENV_VARS if not env.get(var)]
if missing:
pytest.skip(
"Integration tests require demo cluster credentials. "
f"Missing env vars: {', '.join(missing)}"
)

# Ensure the server module can be imported from the repo's src/ folder
existing_path = env.get("PYTHONPATH")
env["PYTHONPATH"] = (
f"{SRC_DIR}{os.pathsep}{existing_path}" if existing_path else str(SRC_DIR)
)

# Force stdio transport for the test server to match stdio_client
env["CB_MCP_TRANSPORT"] = "stdio"
# Ensure unbuffered output to avoid stdout/stderr buffering surprises
env.setdefault("PYTHONUNBUFFERED", "1")
return env


@asynccontextmanager
async def create_mcp_session() -> AsyncIterator[ClientSession]:
"""Create a fresh MCP client session connected to the server over stdio."""
env = _build_env()
params = StdioServerParameters(
command=sys.executable,
args=["-m", "mcp_server"],
env=env,
)

async with asyncio.timeout(DEFAULT_TIMEOUT):
async with stdio_client(params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
yield session


def extract_payload(response: Any) -> Any:
"""Extract a usable payload from a tool response.

MCP tool responses can return data in different formats:
- A single content block with JSON-encoded data (dict, list, etc.)
- Multiple content blocks, one per list item (for list returns)

This function handles both cases.
"""
content = getattr(response, "content", None) or []
if not content:
return None

# If there are multiple content blocks, collect them all as a list
# (each item in a list return may be a separate content block)
if len(content) > 1:
items = []
for block in content:
text = getattr(block, "text", None)
if text is not None:
try:
items.append(json.loads(text))
except json.JSONDecodeError:
items.append(text)
return items if items else None

# Single content block - try to parse as JSON
first = content[0]
raw = getattr(first, "text", None)
if raw is None and hasattr(first, "data"):
raw = first.data

if isinstance(raw, str):
try:
return json.loads(raw)
except json.JSONDecodeError:
return raw

return raw


def get_test_bucket() -> str | None:
"""Get the test bucket name from environment, or None if not set."""
return os.getenv("CB_MCP_TEST_BUCKET")


def get_test_scope() -> str:
"""Get the test scope name from environment, defaults to _default."""
return os.getenv("CB_MCP_TEST_SCOPE", "_default")


def get_test_collection() -> str:
"""Get the test collection name from environment, defaults to _default."""
return os.getenv("CB_MCP_TEST_COLLECTION", "_default")


def require_test_bucket() -> str:
"""Get the test bucket name, skipping test if not set."""
bucket = get_test_bucket()
if not bucket:
pytest.skip("CB_MCP_TEST_BUCKET not set")
return bucket


def ensure_list(value: Any) -> list[Any]:
"""Ensure the value is a list.

MCP can return single-item lists as just the item (not wrapped in a list).
This helper wraps single non-list values in a list for consistent handling.
"""
if value is None:
return []
if isinstance(value, list):
return value
return [value]
169 changes: 169 additions & 0 deletions tests/test_index_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""
Integration tests for index.py tools.

Tests for:
- list_indexes
- get_index_advisor_recommendations
"""

from __future__ import annotations

import pytest
from conftest import (
create_mcp_session,
extract_payload,
get_test_collection,
get_test_scope,
require_test_bucket,
)


@pytest.mark.asyncio
async def test_list_indexes_all() -> None:
"""Verify list_indexes returns all indexes in the cluster."""
async with create_mcp_session() as session:
response = await session.call_tool("list_indexes", arguments={})
payload = extract_payload(response)

# Payload can be None/empty if no indexes exist in the cluster
if payload is None:
return # No indexes in cluster, tool executed successfully

assert isinstance(payload, list), f"Expected list, got {type(payload)}"
# Each index should have required fields
if payload:
first_index = payload[0]
assert "name" in first_index
assert "definition" in first_index
assert "status" in first_index
assert "bucket" in first_index


@pytest.mark.asyncio
async def test_list_indexes_filtered_by_bucket() -> None:
"""Verify list_indexes can filter by bucket name."""
bucket = require_test_bucket()

async with create_mcp_session() as session:
response = await session.call_tool(
"list_indexes", arguments={"bucket_name": bucket}
)
payload = extract_payload(response)

# Payload can be None/empty list if no indexes exist for the bucket
if payload is None:
return # No indexes in bucket, which is valid

assert isinstance(payload, list), f"Expected list, got {type(payload)}"
# All returned indexes should belong to the specified bucket
for index in payload:
assert index.get("bucket") == bucket, (
f"Index {index.get('name')} belongs to bucket {index.get('bucket')}, "
f"expected {bucket}"
)


@pytest.mark.asyncio
async def test_list_indexes_filtered_by_scope() -> None:
"""Verify list_indexes can filter by bucket and scope."""
bucket = require_test_bucket()
scope = get_test_scope()

async with create_mcp_session() as session:
response = await session.call_tool(
"list_indexes",
arguments={"bucket_name": bucket, "scope_name": scope},
)
payload = extract_payload(response)

# Payload can be None/empty list if no indexes exist for the scope
if payload is None:
return # No indexes in scope, which is valid

assert isinstance(payload, list), f"Expected list, got {type(payload)}"
# All returned indexes should belong to the specified bucket and scope
for index in payload:
assert index.get("bucket") == bucket
assert index.get("scope") == scope


@pytest.mark.asyncio
async def test_list_indexes_filtered_by_collection() -> None:
"""Verify list_indexes can filter by bucket, scope, and collection."""
bucket = require_test_bucket()
scope = get_test_scope()
collection = get_test_collection()

async with create_mcp_session() as session:
response = await session.call_tool(
"list_indexes",
arguments={
"bucket_name": bucket,
"scope_name": scope,
"collection_name": collection,
},
)
payload = extract_payload(response)

# Payload can be None/empty list if no indexes exist for the collection
# This is valid - we just verify the tool executed successfully
if payload is None:
return # No indexes in collection, which is valid

assert isinstance(payload, list), f"Expected list, got {type(payload)}"
# All returned indexes should belong to the specified collection
for index in payload:
assert index.get("bucket") == bucket
assert index.get("scope") == scope
assert index.get("collection") == collection


@pytest.mark.asyncio
async def test_list_indexes_with_raw_stats() -> None:
"""Verify list_indexes can include raw index stats."""
async with create_mcp_session() as session:
response = await session.call_tool(
"list_indexes", arguments={"include_raw_index_stats": True}
)
payload = extract_payload(response)

assert isinstance(payload, list), f"Expected list, got {type(payload)}"
# When include_raw_index_stats is True, each index should have raw_index_stats
if payload:
first_index = payload[0]
assert "raw_index_stats" in first_index, (
"Expected raw_index_stats when include_raw_index_stats=True"
)


@pytest.mark.asyncio
async def test_get_index_advisor_recommendations() -> None:
"""Verify get_index_advisor_recommendations returns recommendations."""
bucket = require_test_bucket()
scope = get_test_scope()
collection = get_test_collection()

# A query that might benefit from an index (avoid single quotes - they break ADVISOR)
# Use a numeric comparison instead of string literal
query = f"SELECT * FROM `{collection}` WHERE id > 100"

async with create_mcp_session() as session:
response = await session.call_tool(
"get_index_advisor_recommendations",
arguments={
"bucket_name": bucket,
"scope_name": scope,
"query": query,
},
)
payload = extract_payload(response)

assert isinstance(payload, dict), f"Expected dict, got {type(payload)}"
# Response should have the expected structure
assert "current_used_indexes" in payload
assert "recommended_indexes" in payload
assert "recommended_covering_indexes" in payload
# Summary should also be present
assert "summary" in payload
summary = payload["summary"]
assert "has_recommendations" in summary
Loading
Loading