diff --git a/README.md b/README.md index b1ad21c..552c8c0 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,13 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th - Get a list of all the scopes and collections in the specified bucket - Get the structure for a collection -- Get a document by ID from a specified scope and collection -- Upsert a document by ID to a specified scope and collection -- Delete a document by ID from a specified scope and collection -- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified scope +- Get a document by ID from a specified bucket, scope and collection +- Upsert a document by ID to a specified bucket, scope and collection +- Delete a document by ID from a specified bucket, scope and collection +- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified bucket and scope - There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID. +- Retreive Index Advisor advice for a query on a specified bucket and scope. +- Get summary and specific information on slow running queries from the completed_requests catalog. ## Prerequisites @@ -46,7 +48,6 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur "CB_CONNECTION_STRING": "couchbases://connection-string", "CB_USERNAME": "username", "CB_PASSWORD": "password", - "CB_BUCKET_NAME": "bucket_name" } } } @@ -58,7 +59,6 @@ The server can be configured using environment variables. The following variable - `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster - `CB_USERNAME`: The username with access to the bucket to use to connect - `CB_PASSWORD`: The password for the username to connect -- `CB_BUCKET_NAME`: The name of the bucket that the server will access - `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default. - `path/to/cloned/repo/mcp-server-couchbase/` should be the path to the cloned repository on your local machine. Don't forget the trailing slash at the end! @@ -138,7 +138,7 @@ There is an option to run the MCP server in [Server-Sent Events (SSE)](https://m By default, the MCP server will run on port 8080 but this can be configured using the `FASTMCP_PORT` environment variable. -> uv run src/mcp_server.py --connection-string='' --username='' --password='' --bucket-name='' --read-only-query-mode=true --transport=sse +> uv run src/mcp_server.py --connection-string='' --username='' --password='' --read-only-query-mode=true --transport=sse The server will be available on http://localhost:8080/sse. This can be used in MCP clients supporting SSE transport mode. @@ -159,7 +159,6 @@ docker run -i \ -e CB_CONNECTION_STRING='' \ -e CB_USERNAME='' \ -e CB_PASSWORD='' \ - -e CB_BUCKET_NAME='' \ -e MCP_TRANSPORT='stdio/sse' \ -e READ_ONLY_QUERY_MODE="true/false" \ mcp/couchbase diff --git a/src/mcp_server.py b/src/mcp_server.py index 52660f3..cf59602 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -26,7 +26,6 @@ class AppContext: """Context for the MCP server.""" cluster: Cluster | None = None - bucket: Any | None = None read_only_query_mode: bool = True @@ -64,12 +63,7 @@ def get_settings() -> dict: help="Couchbase database password", callback=validate_required_param, ) -@click.option( - "--bucket-name", - envvar="CB_BUCKET_NAME", - help="Couchbase bucket name", - callback=validate_required_param, -) + @click.option( "--read-only-query-mode", envvar="READ_ONLY_QUERY_MODE", @@ -90,7 +84,6 @@ def main( connection_string, username, password, - bucket_name, read_only_query_mode, transport, ): @@ -99,7 +92,6 @@ def main( "connection_string": connection_string, "username": username, "password": password, - "bucket_name": bucket_name, "read_only_query_mode": read_only_query_mode, } mcp.run(transport=transport) @@ -114,7 +106,6 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: connection_string = settings.get("connection_string") username = settings.get("username") password = settings.get("password") - bucket_name = settings.get("bucket_name") read_only_query_mode = settings.get("read_only_query_mode") # Validate configuration @@ -128,9 +119,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: if not password: logger.error("Couchbase database password is not set") missing_vars.append("password") - if not bucket_name: - logger.error("Couchbase bucket name is not set") - missing_vars.append("bucket_name") + if missing_vars: error_msg = f"Missing required configuration: {', '.join(missing_vars)}" @@ -148,9 +137,9 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: cluster.wait_until_ready(timedelta(seconds=5)) logger.info("Successfully connected to Couchbase cluster") - bucket = cluster.bucket(bucket_name) yield AppContext( - cluster=cluster, bucket=bucket, read_only_query_mode=read_only_query_mode + cluster=cluster, + read_only_query_mode=read_only_query_mode ) except Exception as e: @@ -162,13 +151,41 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: mcp = FastMCP(MCP_SERVER_NAME, lifespan=app_lifespan) + # Tools + +@mcp.tool() +def get_list_of_buckets_with_settings( + ctx: Context +) -> list[str]: + """Get the list of buckets from the Couchbase cluster, including their bucket settings. + Returns a list of bucket setting objects. + """ + cluster = ctx.request_context.lifespan_context.cluster + result=[] + try: + bucket_manager = cluster.buckets() + buckets = bucket_manager.get_all_buckets() + for b in buckets: + result.append(b) + return result + except Exception as e: + logger.error(f"Error getting bucket names: {e}") + raise e + + @mcp.tool() -def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: - """Get the names of all scopes and collections in the bucket. +def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]: + """Get the names of all scopes and collections for a specified bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: scopes_collections = {} collection_manager = bucket.collections() @@ -184,14 +201,14 @@ def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: @mcp.tool() def get_schema_for_collection( - ctx: Context, scope_name: str, collection_name: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str ) -> dict[str, Any]: - """Get the schema for a collection in the specified scope. + """Get the schema for a collection in the specified scope of a specified bucket. Returns a dictionary with the schema returned by running INFER on the Couchbase collection. """ try: query = f"INFER {collection_name}" - result = run_sql_plus_plus_query(ctx, scope_name, query) + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) return result except Exception as e: logger.error(f"Error getting schema: {e}") @@ -200,10 +217,15 @@ def get_schema_for_collection( @mcp.tool() def get_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> dict[str, Any]: - """Get a document by its ID from the specified scope and collection.""" - bucket = ctx.request_context.lifespan_context.bucket + """Get a document by its ID from the specified bucket, scope and collection.""" + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) result = collection.get(document_id) @@ -216,14 +238,20 @@ def get_document_by_id( @mcp.tool() def upsert_document_by_id( ctx: Context, + bucket_name: str, scope_name: str, collection_name: str, document_id: str, document_content: dict[str, Any], ) -> bool: - """Insert or update a document by its ID. + """Insert or update a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) collection.upsert(document_id, document_content) @@ -236,11 +264,16 @@ def upsert_document_by_id( @mcp.tool() def delete_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> bool: - """Delete a document by its ID. + """Delete a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) collection.remove(document_id) @@ -250,13 +283,44 @@ def delete_document_by_id( logger.error(f"Error deleting document {document_id}: {e}") return False +@mcp.tool() +def advise_index_for_sql_plus_plus_query( + ctx: Context, bucket_name: str, scope_name: str, query: str +) -> dict[str, Any]: + """Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope. + Returns a dictionary with the query advised on, as well as: + 1. an array of the current indexes used and their status (or a string indicating no existing indexes available) + 2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements) + """ + response = {} + + try: + advise_query = f"ADVISE {query}" + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, advise_query) + + if result and (advice := result[0].get("advice")): + if (advice is not None): + advise_info = advice.get("adviseinfo") + if ( advise_info is not None): + response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") + response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") + response["query"]=result[0].get("query","Query statement unavailable") + return response + except Exception as e: + logger.error(f"Error running Advise on query: {e}") + raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") from e @mcp.tool() def run_sql_plus_plus_query( - ctx: Context, scope_name: str, query: str + ctx: Context, bucket_name: str, scope_name: str, query: str ) -> list[dict[str, Any]]: - """Run a SQL++ query on a scope and return the results as a list of JSON objects.""" - bucket = ctx.request_context.lifespan_context.bucket + """Run a SQL++ query on a scope in a specified bucket and return the results as a list of JSON objects.""" + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") read_only_query_mode = ctx.request_context.lifespan_context.read_only_query_mode logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}") @@ -294,6 +358,192 @@ def run_sql_plus_plus_query( logger.error(f"Error running query: {str(e)}", exc_info=True) raise +#Slow Query tools + +@mcp.tool() +def advanced_aggregate_slow_queries_stats_by_pattern(ctx: Context, query_limit:int = 10) -> list[dict[str, Any]]: + """Run an in depth analysis query on the completed_requests system catalog to discover potential slow running queries along with index scan and fetch counts and times. + This query attempts to reduce the statements of logged queries into patterns by removing any values and only looking at the query structure with regard to returned fields, + filtered predicates, sorts and aggregations. + For each query pattern it returns execution durations, counts and example queries. + + Accepts an optional integer as a limit to the number of results returned (default 10). + Returns an object array of query patterns with: + 1. total count and count per user running the query. + 2. min, max and average duration of the full execution. + 3. min, max and average duration of each of the sub-operations index scan (or primary scan) and Fetch. + 4. Count of number of documents found in index scans and the total fetched. + 5. An example instance of one of the queries run matching this pattern. + """ + + query_template = """ + SELECT grouped.query_pattern, + grouped.statement_example, + (OBJECT u: ARRAY_LENGTH(ARRAY v FOR v IN grouped.users_agg WHEN v = u END) FOR u IN ARRAY_DISTINCT(grouped.users_agg) END) AS user_query_counts, + grouped.total_count, + ROUND(grouped.min_duration_in_seconds, 3) AS min_duration_in_seconds, + ROUND(grouped.max_duration_in_seconds, 3) AS max_duration_in_seconds, + ROUND(grouped.avg_duration_in_seconds, 3) AS avg_duration_in_seconds, + ROUND((grouped.sorted_durations[FLOOR(grouped.total_count / 2)] + + grouped.sorted_durations[CEIL(grouped.total_count / 2) - 1]) / 2, 3) AS median_duration_in_seconds, + ROUND(grouped.avg_fetch_count, 3) AS avg_fetch_docs_count, + ROUND(grouped.avg_primaryScan_count, 3) AS avg_primaryScan_docs_count, + ROUND(grouped.avg_indexScan_count, 3) AS avg_indexScan_docs_count, + ROUND(grouped.avg_fetch_time, 3) AS avg_fetch_duration_in_seconds, + ROUND(grouped.avg_primaryScan_time, 3) AS avg_primaryScan_duration_in_seconds, + ROUND(grouped.avg_indexScan_time, 3) AS avg_indexScan_duration_in_seconds +FROM ( + SELECT query_pattern, + ARRAY_AGG(sub.users) AS users_agg, + ARRAY_AGG(sub.statement)[0] as statement_example, + COUNT(*) AS total_count, + MIN(duration_in_seconds) AS min_duration_in_seconds, + MAX(duration_in_seconds) AS max_duration_in_seconds, + AVG(duration_in_seconds) AS avg_duration_in_seconds, + ARRAY_SORT(ARRAY_AGG(duration_in_seconds)) AS sorted_durations, + AVG(sub.`fetch_count`) AS avg_fetch_count, + AVG(sub.`primaryScan_count`) AS avg_primaryScan_count, + AVG(sub.`indexScan_count`) AS avg_indexScan_count, + AVG(sub.`fetch_time`) AS avg_fetch_duration_in_seconds, + AVG(sub.`primaryScan_time`) AS avg_primaryScan_duration_in_seconds, + AVG(sub.`indexScan_time`) AS avg_indexScan_duration_in_seconds + FROM ( + SELECT query_pattern, + statement, + users, + STR_TO_DURATION(serviceTime) / 1000000000 AS duration_in_seconds, + phaseCounts.`fetch` AS `fetch_count`, + phaseCounts.`primaryScan` AS `primaryScan_count`, + phaseCounts.`indexScan` AS `indexScan_count`, + STR_TO_DURATION(phaseTimes.`fetch`)/ 1000000000 AS `fetch_duration_in_seconds`, + STR_TO_DURATION(phaseTimes.`primaryScan`)/ 1000000000 AS `primaryScan_duration_in_seconds`, + STR_TO_DURATION(phaseTimes.`indexScan`)/ 1000000000 AS `indexScan_duration_in_seconds` + FROM system:completed_requests + LET query_pattern = IFMISSING(preparedText, REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE(statement, + "\\\\s+", " "), + '"(?:[^"]|"")*"', "?"), + "'(?:[^']|'')*'", "?"), + "\\\\b-?\\\\d+\\\\.?\\\\d*\\\\b", "?"), + "(?i)\\\\b(NULL|TRUE|FALSE)\\\\b", "?"), + "(\\\\?\\\\s*,\\\\s*)+\\\\?", "?")) + WHERE UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'INFER %' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'ADVISE %' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'CREATE %' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'CREATE INDEX%' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'ALTER INDEX%' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE '% SYSTEM:%' + ) AS sub + GROUP BY query_pattern +) AS grouped +ORDER BY grouped.total_count DESC + LIMIT {limit} + """ + + if (not isinstance(query_limit, int)): + raise TypeError(f"Param query_limit must be an integer value, received {type(query_limit)} - {query_limit}") + query = query_template.format(limit=query_limit) + try: + result = system_catalog_query(ctx,query) + return result + except Exception as e: + logger.error(f"Error completed_request query: {str(e)}", exc_info=True) + raise e + +@mcp.tool() +def retreive_single_slow_query_plan(ctx: Context, query_statement : str, query_limit:int = 10) -> list[dict[str, Any]]: + """Retrieve the query execution report and execution plan of all executions of the given query saved in the completed_requests catalog. + The query statement must be an exact match to the statement executed, including values or placeholders where applicable. + Accepts a query statement and an optional integer as a limit to the number of results returned (default 10). + Returns an object array of execution plans for all instances of the query, ordered by execution time from highest to lowest. + """ + + query_template = """ + SELECT r.*, meta(r).plan + FROM system:completed_requests AS r + WHERE UPPER(IFMISSING(preparedText, statement)) = UPPER('{query_statement}') + ORDER BY STR_TO_DURATION(r.elapsedTime) DESC + LIMIT {limit} + """ + + if (not isinstance(query_limit, int)): + raise TypeError(f"Param query_limit must be an integer value, received {type(query_limit)} - {query_limit}") + query = query_template.format(limit=query_limit,query_statement=query_statement) + try: + result = system_catalog_query(ctx,query) + return result + except Exception as e: + logger.error(f"Error completed_request query: {str(e)}", exc_info=True) + raise e + +@mcp.tool() +def retreive_list_of_similar_queries_from_completed_requests_catalog(ctx: Context, query_statement : str, query_limit:int = 10) -> list[dict[str, Any]]: + """Retrieve a list of all recorded query statements matching the pattern of the specified query (i.e. similar queries differing in predicate values and capitalizations). + The query statement must be an exact match to the statement executed, including values or placeholders where applicable. + Accepts a query statement and an optional integer as a limit to the number of results returned (default 10). + Returns a list of query statements matching the pattern. + """ + + query_template = """ + SELECT raw r.statement + FROM system:completed_requests AS r + LET query_pattern = UPPER(IFMISSING(preparedText, REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE(statement, + "\\\\s+", " "), + '"(?:[^"]|"")*"', "?"), + "'(?:[^']|'')*'", "?"), + "\\\\b-?\\\\d+\\\\.?\\\\d*\\\\b", "?"), + "(?i)\\\\b(NULL|TRUE|FALSE)\\\\b", "?"), + "(\\\\?\\\\s*,\\\\s*)+\\\\?", "?"))) + WHERE query_pattern = REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE({query_statement}, + "\\\\s+", " "), + '"(?:[^"]|"")*"', "?"), + "'(?:[^']|'')*'", "?"), + "\\\\b-?\\\\d+\\\\.?\\\\d*\\\\b", "?"), + "(?i)\\\\b(NULL|TRUE|FALSE)\\\\b", "?"), + "(\\\\?\\\\s*,\\\\s*)+\\\\?", "?") + LIMIT {limit} + """ + + if (not isinstance(query_limit, int)): + raise TypeError(f"Param query_limit must be an integer value, received {type(query_limit)} - {query_limit}") + query = query_template.format(limit=query_limit,query_statement=query_statement) + try: + result = system_catalog_query(ctx,query) + return result + except Exception as e: + logger.error(f"Error completed_request query: {str(e)}", exc_info=True) + raise e + + +# Util Functions +def system_catalog_query(ctx: Context, query: str) -> list[dict[str, Any]]: + cluster = ctx.request_context.lifespan_context.cluster + try: + + results = [] + result = cluster.query(query) + for row in result: + results.append(row) + return results + except Exception as e: + logger.error(f"Error running query: {str(e)}", exc_info=True) + raise e + + if __name__ == "__main__": main()