Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th

- Get a list of all the scopes and collections in the specified bucket
- Get the structure for a collection
- Get a document by ID from a specified scope and collection
- Upsert a document by ID to a specified scope and collection
- Delete a document by ID from a specified scope and collection
- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified scope
- Get a document by ID from a specified bucket, scope and collection
- Upsert a document by ID to a specified bucket, scope and collection
- Delete a document by ID from a specified bucket, scope and collection
- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified bucket and scope
- There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID.
- Retrieve Index Advisor advice for a query on a specified bucket and scope.
- Retrieve cluster node IPs and health status.
- Retrieve /metrics endpoint data by IP.

## Prerequisites

Expand Down Expand Up @@ -40,13 +43,14 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur
"--directory",
"path/to/cloned/repo/mcp-server-couchbase/",
"run",
"--with",
"requests",
"src/mcp_server.py"
],
"env": {
"CB_CONNECTION_STRING": "couchbases://connection-string",
"CB_USERNAME": "username",
"CB_PASSWORD": "password",
"CB_BUCKET_NAME": "bucket_name"
}
}
}
Expand All @@ -58,7 +62,6 @@ The server can be configured using environment variables. The following variable
- `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster
- `CB_USERNAME`: The username with access to the bucket to use to connect
- `CB_PASSWORD`: The password for the username to connect
- `CB_BUCKET_NAME`: The name of the bucket that the server will access
- `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default.
- `path/to/cloned/repo/mcp-server-couchbase/` should be the path to the cloned repository on your local machine. Don't forget the trailing slash at the end!

Expand Down Expand Up @@ -138,7 +141,7 @@ There is an option to run the MCP server in [Server-Sent Events (SSE)](https://m

By default, the MCP server will run on port 8080 but this can be configured using the `FASTMCP_PORT` environment variable.

> uv run src/mcp_server.py --connection-string='<couchbase_connection_string>' --username='<database_username>' --password='<database_password>' --bucket-name='<couchbase_bucket_to_use>' --read-only-query-mode=true --transport=sse
> uv run src/mcp_server.py --connection-string='<couchbase_connection_string>' --username='<database_username>' --password='<database_password>' --read-only-query-mode=true --transport=sse

The server will be available on http://localhost:8080/sse. This can be used in MCP clients supporting SSE transport mode.

Expand All @@ -159,7 +162,6 @@ docker run -i \
-e CB_CONNECTION_STRING='<couchbase_connection_string>' \
-e CB_USERNAME='<database_user>' \
-e CB_PASSWORD='<database_password>' \
-e CB_BUCKET_NAME='<bucket_name>' \
-e MCP_TRANSPORT='stdio/sse' \
-e READ_ONLY_QUERY_MODE="true/false" \
mcp/couchbase
Expand Down
193 changes: 160 additions & 33 deletions src/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from typing import AsyncIterator
from lark_sqlpp import modifies_data, modifies_structure, parse_sqlpp
import click

import requests
from requests.auth import HTTPBasicAuth
MCP_SERVER_NAME = "couchbase"

# Configure logging
Expand All @@ -26,7 +27,6 @@ class AppContext:
"""Context for the MCP server."""

cluster: Cluster | None = None
bucket: Any | None = None
read_only_query_mode: bool = True


Expand Down Expand Up @@ -64,12 +64,7 @@ def get_settings() -> dict:
help="Couchbase database password",
callback=validate_required_param,
)
@click.option(
"--bucket-name",
envvar="CB_BUCKET_NAME",
help="Couchbase bucket name",
callback=validate_required_param,
)

@click.option(
"--read-only-query-mode",
envvar="READ_ONLY_QUERY_MODE",
Expand All @@ -90,7 +85,6 @@ def main(
connection_string,
username,
password,
bucket_name,
read_only_query_mode,
transport,
):
Expand All @@ -99,7 +93,6 @@ def main(
"connection_string": connection_string,
"username": username,
"password": password,
"bucket_name": bucket_name,
"read_only_query_mode": read_only_query_mode,
}
mcp.run(transport=transport)
Expand All @@ -114,7 +107,6 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
connection_string = settings.get("connection_string")
username = settings.get("username")
password = settings.get("password")
bucket_name = settings.get("bucket_name")
read_only_query_mode = settings.get("read_only_query_mode")

# Validate configuration
Expand All @@ -128,9 +120,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
if not password:
logger.error("Couchbase database password is not set")
missing_vars.append("password")
if not bucket_name:
logger.error("Couchbase bucket name is not set")
missing_vars.append("bucket_name")


if missing_vars:
error_msg = f"Missing required configuration: {', '.join(missing_vars)}"
Expand All @@ -148,9 +138,9 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
cluster.wait_until_ready(timedelta(seconds=5))
logger.info("Successfully connected to Couchbase cluster")

bucket = cluster.bucket(bucket_name)
yield AppContext(
cluster=cluster, bucket=bucket, read_only_query_mode=read_only_query_mode
cluster=cluster,
read_only_query_mode=read_only_query_mode
)

except Exception as e:
Expand All @@ -162,13 +152,103 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
mcp = FastMCP(MCP_SERVER_NAME, lifespan=app_lifespan)



# Tools
@mcp.tool()
def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]:
"""Get the names of all scopes and collections in the bucket.
def get_cluster_health_check(
ctx: Context) -> list[dict[str,Any]]:
"""Runs a healthcheck (ping report) on the Couchbase cluster. Returns an array of json objects with pintreport results and node IPs.
Also useful for discovering available services in the cluster. Multiple services may reside on each node.
Returns: Array of service objects, each having the URL (consisting of IP and port for the service) and state of the service"""
cluster = ctx.request_context.lifespan_context.cluster

services = []
try:
ping_report = cluster.ping()
except Exception as e:
logger.error(f"Unable to reach cluster for health check: {e}")
raise e
try:
services = []
for service_type, endpoints in ping_report.endpoints.items():
for ep in endpoints:
services.append({
"service": service_type.value,
"state": ep.state.value,
"id": ep.id,
"ip": ep.remote,
"latency_ms": ep.latency.total_seconds() * 1000 if ep.latency else None,
"error": str(ep.error) if ep.error else None
})
except Exception as e:
logger.error(f"Unable to parse ping report: {e}")
raise e

return services

def fetch_metrics(ip, username, password) -> str:
url = f"https://{ip}:18091/metrics"

try:
response = requests.get(
url,
auth=HTTPBasicAuth(username, password),
verify=False, # <--- Ignores SSL cert verification
timeout=10
)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
return f"Error fetching metrics: {e}"


@mcp.tool()
def get_cluster_metrics(
ctx: Context, ip: str) -> str:
"""Runs an API call to the metrics endpoint of a given couchbase node by IP or hostname. Metrics contain info on nodes performance,
resources and services.
Returns: String representing the prometheus formatted metrics return by couchbase."""

settings = get_settings()

username = settings.get("username")
password = settings.get("password")

metrics = fetch_metrics(ip, username, password)
return metrics

@mcp.tool()
def get_list_of_buckets_with_settings(
ctx: Context
) -> list[str]:
"""Get the list of buckets from the Couchbase cluster, including their bucket settings.
Returns a list of bucket setting objects.
"""
cluster = ctx.request_context.lifespan_context.cluster
result=[]
try:
bucket_manager = cluster.buckets()
buckets = bucket_manager.get_all_buckets()
for b in buckets:
result.append(b)
return result
except Exception as e:
logger.error(f"Error getting bucket names: {e}")
raise e


@mcp.tool()
def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]:
"""Get the names of all scopes and collections for a specified bucket.
Returns a dictionary with scope names as keys and lists of collection names as values.
"""
bucket = ctx.request_context.lifespan_context.bucket
cluster = ctx.request_context.lifespan_context.cluster

try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
try:
scopes_collections = {}
collection_manager = bucket.collections()
Expand All @@ -184,14 +264,14 @@ def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]:

@mcp.tool()
def get_schema_for_collection(
ctx: Context, scope_name: str, collection_name: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str
) -> dict[str, Any]:
"""Get the schema for a collection in the specified scope.
"""Get the schema for a collection in the specified scope of a specified bucket.
Returns a dictionary with the schema returned by running INFER on the Couchbase collection.
"""
try:
query = f"INFER {collection_name}"
result = run_sql_plus_plus_query(ctx, scope_name, query)
result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query)
return result
except Exception as e:
logger.error(f"Error getting schema: {e}")
Expand All @@ -200,10 +280,15 @@ def get_schema_for_collection(

@mcp.tool()
def get_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str
) -> dict[str, Any]:
"""Get a document by its ID from the specified scope and collection."""
bucket = ctx.request_context.lifespan_context.bucket
"""Get a document by its ID from the specified bucket, scope and collection."""
cluster = ctx.request_context.lifespan_context.cluster
try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
Comment on lines +383 to +387
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for retrieving a bucket object from the cluster and handling potential errors is duplicated across multiple tool functions (e.g., get_scopes_and_collections_in_bucket, get_document_by_id, upsert_document_by_id, etc.). To improve maintainability and reduce code duplication, consider extracting this logic into a private helper function.

Comment on lines +382 to +387
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code for retrieving a bucket and handling errors is repeated in several functions (e.g., get_document_by_id, upsert_document_by_id, delete_document_by_id, run_sql_plus_plus_query, get_scopes_and_collections_in_bucket). To improve maintainability and adhere to the DRY (Don't Repeat Yourself) principle, consider extracting this logic into a shared helper function.

try:
collection = bucket.scope(scope_name).collection(collection_name)
result = collection.get(document_id)
Expand All @@ -216,14 +301,20 @@ def get_document_by_id(
@mcp.tool()
def upsert_document_by_id(
ctx: Context,
bucket_name: str,
scope_name: str,
collection_name: str,
document_id: str,
document_content: dict[str, Any],
) -> bool:
"""Insert or update a document by its ID.
"""Insert or update a document in a bucket, scope and collection by its ID.
Returns True on success, False on failure."""
bucket = ctx.request_context.lifespan_context.bucket
cluster = ctx.request_context.lifespan_context.cluster
try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.upsert(document_id, document_content)
Expand All @@ -236,11 +327,16 @@ def upsert_document_by_id(

@mcp.tool()
def delete_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str
) -> bool:
"""Delete a document by its ID.
"""Delete a document in a bucket, scope and collection by its ID.
Returns True on success, False on failure."""
bucket = ctx.request_context.lifespan_context.bucket
cluster = ctx.request_context.lifespan_context.cluster
try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.remove(document_id)
Expand All @@ -250,13 +346,44 @@ def delete_document_by_id(
logger.error(f"Error deleting document {document_id}: {e}")
return False

@mcp.tool()
def advise_index_for_sql_plus_plus_query(
ctx: Context, bucket_name: str, scope_name: str, query: str
) -> dict[str, Any]:
"""Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope.
Returns a dictionary with the query advised on, as well as:
1. an array of the current indexes used and their status (or a string indicating no existing indexes available)
2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements)
"""
response = {}

try:
query = f"ADVISE {query}"
result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query)

if result and (advice := result[0].get("advice")):
if (advice is not None):
advise_info = advice.get("adviseinfo")
if ( advise_info is not None):
response["current_indexes"] = advise_info.get("current_indexes", "No current indexes")
response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available")
response["query"]=result[0].get("query","Query statement unavailable")
Comment on lines +460 to +466
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These nested if statements make the code difficult to read and can be simplified. The walrus operator := in the first if already ensures that advice is not None. The subsequent checks are redundant. You can flatten this structure for better readability and conciseness.

if result and (advice := result[0].get("advice")) and (advise_info := advice.get("adviseinfo")):
    response["current_indexes"] = advise_info.get("current_indexes", "No current indexes")
    response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available")
    response["query"]=result[0].get("query","Query statement unavailable")

return response
Comment on lines +460 to +467
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The nested if checks here are redundant and make the code harder to read. The walrus operator := in the outer if condition already ensures that advice is a truthy value (not None) within the if block. You can simplify this logic by removing the redundant checks and flattening the structure.

        if result and (advice := result[0].get("advice")) and (advise_info := advice.get("adviseinfo")):
            response["current_indexes"] = advise_info.get("current_indexes", "No current indexes")
            response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available")
            response["query"]=result[0].get("query","Query statement unavailable")
        return response

except Exception as e:
logger.error(f"Error running Advise on query: {e}")
raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}")

@mcp.tool()
def run_sql_plus_plus_query(
ctx: Context, scope_name: str, query: str
ctx: Context, bucket_name: str, scope_name: str, query: str
) -> list[dict[str, Any]]:
"""Run a SQL++ query on a scope and return the results as a list of JSON objects."""
bucket = ctx.request_context.lifespan_context.bucket
"""Run a SQL++ query on a scope in a specified bucket and return the results as a list of JSON objects."""
cluster = ctx.request_context.lifespan_context.cluster
try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
read_only_query_mode = ctx.request_context.lifespan_context.read_only_query_mode
logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}")

Expand Down