From b3abd77bbd05702f8c99d32135c828a06047dc11 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 00:58:06 +0300 Subject: [PATCH 01/10] Adding support for multiple buckets. Adding support for index advisor --- src/mcp_server.py | 123 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 32 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 52660f3..170034b 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -26,7 +26,7 @@ class AppContext: """Context for the MCP server.""" cluster: Cluster | None = None - bucket: Any | None = None + #bucket: Any | None = None read_only_query_mode: bool = True @@ -64,12 +64,12 @@ def get_settings() -> dict: help="Couchbase database password", callback=validate_required_param, ) -@click.option( - "--bucket-name", - envvar="CB_BUCKET_NAME", - help="Couchbase bucket name", - callback=validate_required_param, -) +#@click.option( +# "--bucket-name", +# envvar="CB_BUCKET_NAME", +# help="Couchbase bucket name", +# callback=validate_required_param, +#) @click.option( "--read-only-query-mode", envvar="READ_ONLY_QUERY_MODE", @@ -90,7 +90,7 @@ def main( connection_string, username, password, - bucket_name, + #bucket_name, read_only_query_mode, transport, ): @@ -99,7 +99,7 @@ def main( "connection_string": connection_string, "username": username, "password": password, - "bucket_name": bucket_name, + #"bucket_name": bucket_name, "read_only_query_mode": read_only_query_mode, } mcp.run(transport=transport) @@ -114,7 +114,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: connection_string = settings.get("connection_string") username = settings.get("username") password = settings.get("password") - bucket_name = settings.get("bucket_name") + #bucket_name = settings.get("bucket_name") read_only_query_mode = settings.get("read_only_query_mode") # Validate configuration @@ -128,9 +128,9 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: if not password: logger.error("Couchbase database password is not set") missing_vars.append("password") - if not bucket_name: - logger.error("Couchbase bucket name is not set") - missing_vars.append("bucket_name") + #if not bucket_name: + # logger.error("Couchbase bucket name is not set") + # missing_vars.append("bucket_name") if missing_vars: error_msg = f"Missing required configuration: {', '.join(missing_vars)}" @@ -148,9 +148,11 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: cluster.wait_until_ready(timedelta(seconds=5)) logger.info("Successfully connected to Couchbase cluster") - bucket = cluster.bucket(bucket_name) + #bucket = cluster.bucket(bucket_name) yield AppContext( - cluster=cluster, bucket=bucket, read_only_query_mode=read_only_query_mode + cluster=cluster, + #bucket=bucket, + read_only_query_mode=read_only_query_mode ) except Exception as e: @@ -164,11 +166,18 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: # Tools @mcp.tool() -def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: - """Get the names of all scopes and collections in the bucket. +def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]: + """Get the names of all scopes and collections for a specified bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - bucket = ctx.request_context.lifespan_context.bucket + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: scopes_collections = {} collection_manager = bucket.collections() @@ -184,14 +193,14 @@ def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: @mcp.tool() def get_schema_for_collection( - ctx: Context, scope_name: str, collection_name: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str ) -> dict[str, Any]: - """Get the schema for a collection in the specified scope. + """Get the schema for a collection in the specified scope of a specified bucket. Returns a dictionary with the schema returned by running INFER on the Couchbase collection. """ try: query = f"INFER {collection_name}" - result = run_sql_plus_plus_query(ctx, scope_name, query) + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) return result except Exception as e: logger.error(f"Error getting schema: {e}") @@ -200,10 +209,16 @@ def get_schema_for_collection( @mcp.tool() def get_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> dict[str, Any]: - """Get a document by its ID from the specified scope and collection.""" - bucket = ctx.request_context.lifespan_context.bucket + """Get a document by its ID from the specified bucket, scope and collection.""" + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) result = collection.get(document_id) @@ -216,14 +231,21 @@ def get_document_by_id( @mcp.tool() def upsert_document_by_id( ctx: Context, + bucket_name: str, scope_name: str, collection_name: str, document_id: str, document_content: dict[str, Any], ) -> bool: - """Insert or update a document by its ID. + """Insert or update a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ctx.request_context.lifespan_context.bucket + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) collection.upsert(document_id, document_content) @@ -236,11 +258,17 @@ def upsert_document_by_id( @mcp.tool() def delete_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> bool: - """Delete a document by its ID. + """Delete a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ctx.request_context.lifespan_context.bucket + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) collection.remove(document_id) @@ -250,13 +278,44 @@ def delete_document_by_id( logger.error(f"Error deleting document {document_id}: {e}") return False +@mcp.tool() +def advise_index_for_sql_plus_plus_query( + ctx: Context, bucket_name: str, scope_name: str, query: str +) -> dict[str, Any]: + """Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope. + Returns a dictionary with the query advised on, as well as: + 1. an array of the current indexes used and their status (or a string indicating no existing indexes available) + 2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements) + """ + response = {} + + try: + query = f"ADVISE {query}" + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) + advice = result[0].get("advice") + if (advice is not None): + advise_info = advice.get("adviseinfo") + if ( advise_info is not None): + response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") + response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") + response["query"]=result[0].get("query","Query statement unavailable") + return response + except Exception as e: + logger.error(f"Error running Advise on query: {e}") + raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") @mcp.tool() def run_sql_plus_plus_query( - ctx: Context, scope_name: str, query: str + ctx: Context, bucket_name: str, scope_name: str, query: str ) -> list[dict[str, Any]]: - """Run a SQL++ query on a scope and return the results as a list of JSON objects.""" - bucket = ctx.request_context.lifespan_context.bucket + """Run a SQL++ query on a scope in a specified bucket and return the results as a list of JSON objects.""" + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") read_only_query_mode = ctx.request_context.lifespan_context.read_only_query_mode logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}") From 77ce027ac07eb18a75876549b4c61c96a41e54db Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 12:41:42 +0300 Subject: [PATCH 02/10] Updated documentation after implementing index advisor and mutltiple bucket access --- README.md | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index b1ad21c..08b1eb8 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,12 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th - Get a list of all the scopes and collections in the specified bucket - Get the structure for a collection -- Get a document by ID from a specified scope and collection -- Upsert a document by ID to a specified scope and collection -- Delete a document by ID from a specified scope and collection -- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified scope +- Get a document by ID from a specified bucket, scope and collection +- Upsert a document by ID to a specified bucket, scope and collection +- Delete a document by ID from a specified bucket, scope and collection +- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified bucket and scope - There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID. +- Retreive Index Advisor advice for a query on a specified bucket and scope. ## Prerequisites @@ -46,7 +47,6 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur "CB_CONNECTION_STRING": "couchbases://connection-string", "CB_USERNAME": "username", "CB_PASSWORD": "password", - "CB_BUCKET_NAME": "bucket_name" } } } @@ -58,7 +58,6 @@ The server can be configured using environment variables. The following variable - `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster - `CB_USERNAME`: The username with access to the bucket to use to connect - `CB_PASSWORD`: The password for the username to connect -- `CB_BUCKET_NAME`: The name of the bucket that the server will access - `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default. - `path/to/cloned/repo/mcp-server-couchbase/` should be the path to the cloned repository on your local machine. Don't forget the trailing slash at the end! @@ -138,7 +137,7 @@ There is an option to run the MCP server in [Server-Sent Events (SSE)](https://m By default, the MCP server will run on port 8080 but this can be configured using the `FASTMCP_PORT` environment variable. -> uv run src/mcp_server.py --connection-string='' --username='' --password='' --bucket-name='' --read-only-query-mode=true --transport=sse +> uv run src/mcp_server.py --connection-string='' --username='' --password='' --read-only-query-mode=true --transport=sse The server will be available on http://localhost:8080/sse. This can be used in MCP clients supporting SSE transport mode. @@ -159,7 +158,6 @@ docker run -i \ -e CB_CONNECTION_STRING='' \ -e CB_USERNAME='' \ -e CB_PASSWORD='' \ - -e CB_BUCKET_NAME='' \ -e MCP_TRANSPORT='stdio/sse' \ -e READ_ONLY_QUERY_MODE="true/false" \ mcp/couchbase From eda8103cc0424ddb0bca8fe254b9b93b638d76a4 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 12:44:40 +0300 Subject: [PATCH 03/10] Code comment cleanup --- src/mcp_server.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 170034b..185e088 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -26,7 +26,6 @@ class AppContext: """Context for the MCP server.""" cluster: Cluster | None = None - #bucket: Any | None = None read_only_query_mode: bool = True @@ -64,12 +63,7 @@ def get_settings() -> dict: help="Couchbase database password", callback=validate_required_param, ) -#@click.option( -# "--bucket-name", -# envvar="CB_BUCKET_NAME", -# help="Couchbase bucket name", -# callback=validate_required_param, -#) + @click.option( "--read-only-query-mode", envvar="READ_ONLY_QUERY_MODE", @@ -90,7 +84,6 @@ def main( connection_string, username, password, - #bucket_name, read_only_query_mode, transport, ): @@ -99,7 +92,6 @@ def main( "connection_string": connection_string, "username": username, "password": password, - #"bucket_name": bucket_name, "read_only_query_mode": read_only_query_mode, } mcp.run(transport=transport) @@ -114,7 +106,6 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: connection_string = settings.get("connection_string") username = settings.get("username") password = settings.get("password") - #bucket_name = settings.get("bucket_name") read_only_query_mode = settings.get("read_only_query_mode") # Validate configuration @@ -128,9 +119,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: if not password: logger.error("Couchbase database password is not set") missing_vars.append("password") - #if not bucket_name: - # logger.error("Couchbase bucket name is not set") - # missing_vars.append("bucket_name") + if missing_vars: error_msg = f"Missing required configuration: {', '.join(missing_vars)}" @@ -148,10 +137,8 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: cluster.wait_until_ready(timedelta(seconds=5)) logger.info("Successfully connected to Couchbase cluster") - #bucket = cluster.bucket(bucket_name) yield AppContext( cluster=cluster, - #bucket=bucket, read_only_query_mode=read_only_query_mode ) @@ -170,7 +157,6 @@ def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict """Get the names of all scopes and collections for a specified bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: @@ -212,7 +198,6 @@ def get_document_by_id( ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> dict[str, Any]: """Get a document by its ID from the specified bucket, scope and collection.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) @@ -239,7 +224,6 @@ def upsert_document_by_id( ) -> bool: """Insert or update a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) @@ -262,7 +246,6 @@ def delete_document_by_id( ) -> bool: """Delete a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) @@ -309,7 +292,6 @@ def run_sql_plus_plus_query( ctx: Context, bucket_name: str, scope_name: str, query: str ) -> list[dict[str, Any]]: """Run a SQL++ query on a scope in a specified bucket and return the results as a list of JSON objects.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) From 03d0d3c241ef88c785789bd379b1820465fe5f8c Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 12:57:29 +0300 Subject: [PATCH 04/10] Added check in advise_index_for_sql_plus_plus_query for empty response --- src/mcp_server.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 185e088..75e124e 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -275,13 +275,14 @@ def advise_index_for_sql_plus_plus_query( try: query = f"ADVISE {query}" result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) - advice = result[0].get("advice") - if (advice is not None): - advise_info = advice.get("adviseinfo") - if ( advise_info is not None): - response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") - response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") - response["query"]=result[0].get("query","Query statement unavailable") + + if result and (advice := result[0].get("advice")): + if (advice is not None): + advise_info = advice.get("adviseinfo") + if ( advise_info is not None): + response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") + response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") + response["query"]=result[0].get("query","Query statement unavailable") return response except Exception as e: logger.error(f"Error running Advise on query: {e}") From 842a09b765db85b37957988e82007e1b79f14dd3 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Wed, 16 Jul 2025 16:39:21 +0300 Subject: [PATCH 05/10] added aggregate_slow_queries_stats_by_pattern function to assist in slow quewry anlaysis. This groups similarly worded queries in the completed_requests catalog and returns a min, max and average runtime for them --- src/mcp_server.py | 66 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/mcp_server.py b/src/mcp_server.py index 75e124e..7627e84 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -336,6 +336,72 @@ def run_sql_plus_plus_query( logger.error(f"Error running query: {str(e)}", exc_info=True) raise +#Slow Query tools + +@mcp.tool() +def aggregate_slow_queries_stats_by_pattern(ctx: Context, query_limit:int ) -> list[dict[str, Any]]: + """Run a query on the completed_requests system catalog to discover potential slow running queries. + This query attempts to reduce the statements of logged queries into patterns by removing any values and only looking at the query structure with regard to returned fields, + filtered predicates, sorts and aggregations. For each query pattern it returns the count, min, max and average duration. + Accepts an integer as a limit to the number of results returned. + Returns an array of query patterns with the count, min, max and average duration ordered from most to least found patterns. + """ + + query_template = """ + SELECT query_pattern, + COUNT(*) AS count, + MIN(STR_TO_DURATION(serviceTime))/1000000000 AS min_duration_in_seconds, + MAX(STR_TO_DURATION(serviceTime))/1000000000 AS max_duration_in_seconds, + AVG(STR_TO_DURATION(serviceTime))/1000000000 AS avg_duration_in_seconds + FROM system:completed_requests + LET query_pattern = IFMISSING(preparedText, REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE(statement, + "\\\\s+", " "), + '"(?:[^"]|"")*"', "?"), + "'(?:[^']|'')*'", "?"), + "\\\\b-?\\\\d+\\\\.?\\\\d*\\\\b", "?"), + "(?i)\\\\b(NULL|TRUE|FALSE)\\\\b", "?"), + "(\\\\?\\\\s*,\\\\s*)+\\\\?", "?")) + WHERE UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'INFER %' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'ADVISE %' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'CREATE %' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'CREATE INDEX%' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'ALTER INDEX%' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE '% SYSTEM:%' + GROUP BY query_pattern + ORDER BY count DESC + LIMIT {limit} + """ + + query = query_template.format(limit=query_limit) + try: + result = system_catalog_query(ctx,query) + return result + except Exception as e: + logger.error(f"Error completed_request query: {str(e)}", exc_info=True) + raise e + + + +# Util Functions +def system_catalog_query(ctx: Context, query: str) -> list[dict[str, Any]]: + cluster = ctx.request_context.lifespan_context.cluster + try: + + results = [] + result = cluster.query(query) + for row in result: + results.append(row) + return results + except Exception as e: + logger.error(f"Error running query: {str(e)}", exc_info=True) + raise e + + if __name__ == "__main__": main() From 00a391ca58621c135a01cf61f0de2fac5947b1be Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Wed, 16 Jul 2025 16:55:02 +0300 Subject: [PATCH 06/10] added function for MCP tool to get the list of buckets from the Couchbase cluster, including their bucket settings. --- src/mcp_server.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/mcp_server.py b/src/mcp_server.py index 75e124e..5da4a42 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -151,7 +151,29 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: mcp = FastMCP(MCP_SERVER_NAME, lifespan=app_lifespan) + # Tools + +@mcp.tool() +def get_list_of_buckets_with_settings( + ctx: Context +) -> list[str]: + """Get the list of buckets from the Couchbase cluster, including their bucket settings. + Returns a list of bucket setting objects. + """ + cluster = ctx.request_context.lifespan_context.cluster + result=[] + try: + bucket_manager = cluster.buckets() + buckets = bucket_manager.get_all_buckets() + for b in buckets: + result.append(b) + return result + except Exception as e: + logger.error(f"Error getting bucket names: {e}") + raise e + + @mcp.tool() def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]: """Get the names of all scopes and collections for a specified bucket. From 94e6695e9a8a187e1af8a35666e63446e641fa78 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Thu, 17 Jul 2025 00:04:00 +0300 Subject: [PATCH 07/10] Added tools to get summary and specific information on slow running queries from the completed_requests catalog. Summary statistics are grouped by like-queries with different filtered values. Specific information can be requested per query. --- README.md | 1 + src/mcp_server.py | 158 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 137 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 08b1eb8..552c8c0 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th - Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified bucket and scope - There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID. - Retreive Index Advisor advice for a query on a specified bucket and scope. +- Get summary and specific information on slow running queries from the completed_requests catalog. ## Prerequisites diff --git a/src/mcp_server.py b/src/mcp_server.py index f20a881..cf1462f 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -361,22 +361,133 @@ def run_sql_plus_plus_query( #Slow Query tools @mcp.tool() -def aggregate_slow_queries_stats_by_pattern(ctx: Context, query_limit:int ) -> list[dict[str, Any]]: - """Run a query on the completed_requests system catalog to discover potential slow running queries. +def advanced_aggregate_slow_queries_stats_by_pattern(ctx: Context, query_limit:int = 10) -> list[dict[str, Any]]: + """Run an in depth analysis query on the completed_requests system catalog to discover potential slow running queries along with index scan and fetch counts and times. This query attempts to reduce the statements of logged queries into patterns by removing any values and only looking at the query structure with regard to returned fields, - filtered predicates, sorts and aggregations. For each query pattern it returns the count, min, max and average duration. - Accepts an integer as a limit to the number of results returned. - Returns an array of query patterns with the count, min, max and average duration ordered from most to least found patterns. + filtered predicates, sorts and aggregations. + For each query pattern it returns execution durations, counts and example queries. + + Accepts an optional integer as a limit to the number of results returned (default 10). + Returns an object array of query patterns with: + 1. total count and count per user running the query. + 2. min, max and average duration of the full execution. + 3. min, max and average duration of each of the sub-operations index scan (or primary scan) and Fetch. + 4. Count of number of documents found in index scans and the total fetched. + 5. An example instance of one of the queries run matching this pattern. """ query_template = """ + SELECT grouped.query_pattern, + grouped.statement_example, + (OBJECT u: ARRAY_LENGTH(ARRAY v FOR v IN grouped.users_agg WHEN v = u END) FOR u IN ARRAY_DISTINCT(grouped.users_agg) END) AS user_query_counts, + grouped.total_count, + ROUND(grouped.min_duration_in_seconds, 3) AS min_duration_in_seconds, + ROUND(grouped.max_duration_in_seconds, 3) AS max_duration_in_seconds, + ROUND(grouped.avg_duration_in_seconds, 3) AS avg_duration_in_seconds, + ROUND((grouped.sorted_durations[FLOOR(grouped.total_count / 2)] + + grouped.sorted_durations[CEIL(grouped.total_count / 2) - 1]) / 2, 3) AS median_duration_in_seconds, + ROUND(grouped.avg_fetch_count, 3) AS avg_fetch_docs_count, + ROUND(grouped.avg_primaryScan_count, 3) AS avg_primaryScan_docs_count, + ROUND(grouped.avg_indexScan_count, 3) AS avg_indexScan_docs_count, + ROUND(grouped.avg_fetch_time, 3) AS avg_fetch_duration_in_seconds, + ROUND(grouped.avg_primaryScan_time, 3) AS avg_primaryScan_duration_in_seconds, + ROUND(grouped.avg_indexScan_time, 3) AS avg_indexScan_duration_in_seconds +FROM ( SELECT query_pattern, - COUNT(*) AS count, - MIN(STR_TO_DURATION(serviceTime))/1000000000 AS min_duration_in_seconds, - MAX(STR_TO_DURATION(serviceTime))/1000000000 AS max_duration_in_seconds, - AVG(STR_TO_DURATION(serviceTime))/1000000000 AS avg_duration_in_seconds - FROM system:completed_requests - LET query_pattern = IFMISSING(preparedText, REGEX_REPLACE( + ARRAY_AGG(sub.users) AS users_agg, + ARRAY_AGG(sub.statement)[0] as statement_example, + COUNT(*) AS total_count, + MIN(duration_in_seconds) AS min_duration_in_seconds, + MAX(duration_in_seconds) AS max_duration_in_seconds, + AVG(duration_in_seconds) AS avg_duration_in_seconds, + ARRAY_SORT(ARRAY_AGG(duration_in_seconds)) AS sorted_durations, + AVG(sub.`fetch_count`) AS avg_fetch_count, + AVG(sub.`primaryScan_count`) AS avg_primaryScan_count, + AVG(sub.`indexScan_count`) AS avg_indexScan_count, + AVG(sub.`fetch_time`) AS avg_fetch_duration_in_seconds, + AVG(sub.`primaryScan_time`) AS avg_primaryScan_duration_in_seconds, + AVG(sub.`indexScan_time`) AS avg_indexScan_duration_in_seconds + FROM ( + SELECT query_pattern, + statement, + users, + STR_TO_DURATION(serviceTime) / 1000000000 AS duration_in_seconds, + phaseCounts.`fetch` AS `fetch_count`, + phaseCounts.`primaryScan` AS `primaryScan_count`, + phaseCounts.`indexScan` AS `indexScan_count`, + STR_TO_DURATION(phaseTimes.`fetch`)/ 1000000000 AS `fetch_duration_in_seconds`, + STR_TO_DURATION(phaseTimes.`primaryScan`)/ 1000000000 AS `primaryScan_duration_in_seconds`, + STR_TO_DURATION(phaseTimes.`indexScan`)/ 1000000000 AS `indexScan_duration_in_seconds` + FROM system:completed_requests + LET query_pattern = IFMISSING(preparedText, REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE(statement, + "\\\\s+", " "), + '"(?:[^"]|"")*"', "?"), + "'(?:[^']|'')*'", "?"), + "\\\\b-?\\\\d+\\\\.?\\\\d*\\\\b", "?"), + "(?i)\\\\b(NULL|TRUE|FALSE)\\\\b", "?"), + "(\\\\?\\\\s*,\\\\s*)+\\\\?", "?")) + WHERE UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'INFER %' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'ADVISE %' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'CREATE %' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'CREATE INDEX%' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'ALTER INDEX%' + AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE '% SYSTEM:%' + ) AS sub + GROUP BY query_pattern +) AS grouped +ORDER BY grouped.total_count DESC + LIMIT {limit} + """ + + query = query_template.format(limit=query_limit) + try: + result = system_catalog_query(ctx,query) + return result + except Exception as e: + logger.error(f"Error completed_request query: {str(e)}", exc_info=True) + raise e + +@mcp.tool() +def retreive_single_slow_query_plan(ctx: Context, query_statement : str, query_limit:int = 10) -> list[dict[str, Any]]: + """Retrieve the query execution report and execution plan of all executions of the given query saved in the completed_requests catalog. + The query statement must be an exact match to the statement executed, including values or placeholders where applicable. + Accepts a query statement and an optional integer as a limit to the number of results returned (default 10). + Returns an object array of execution plans for all instances of the query, ordered by execution time from highest to lowest. + """ + + query_template = """ + SELECT r.*, meta(r).plan + FROM system:completed_requests AS r + WHERE UPPER(IFMISSING(preparedText, statement)) = UPPER('{query_statement}') + ORDER BY STR_TO_DURATION(r.elapsedTime) DESC + LIMIT {limit} + """ + + query = query_template.format(limit=query_limit,query_statement=query_statement) + try: + result = system_catalog_query(ctx,query) + return result + except Exception as e: + logger.error(f"Error completed_request query: {str(e)}", exc_info=True) + raise e + +@mcp.tool() +def retreive_list_of_similar_queries_from_completed_requests_catalog(ctx: Context, query_statement : str, query_limit:int = 10) -> list[dict[str, Any]]: + """Retrieve a list of all recorded query statements matching the pattern of the specified query (i.e. similar queries differing in predicate values and capitalizations). + The query statement must be an exact match to the statement executed, including values or placeholders where applicable. + Accepts a query statement and an optional integer as a limit to the number of results returned (default 10). + Returns a list of query statements matching the pattern. + """ + + query_template = """ + SELECT raw r.statement + FROM system:completed_requests AS r + LET query_pattern = UPPER(IFMISSING(preparedText, REGEX_REPLACE( REGEX_REPLACE( REGEX_REPLACE( REGEX_REPLACE( @@ -387,19 +498,23 @@ def aggregate_slow_queries_stats_by_pattern(ctx: Context, query_limit:int ) -> l "'(?:[^']|'')*'", "?"), "\\\\b-?\\\\d+\\\\.?\\\\d*\\\\b", "?"), "(?i)\\\\b(NULL|TRUE|FALSE)\\\\b", "?"), - "(\\\\?\\\\s*,\\\\s*)+\\\\?", "?")) - WHERE UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'INFER %' - AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'ADVISE %' - AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'CREATE %' - AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'CREATE INDEX%' - AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE 'ALTER INDEX%' - AND UPPER(IFMISSING(preparedText, statement)) NOT LIKE '% SYSTEM:%' - GROUP BY query_pattern - ORDER BY count DESC + "(\\\\?\\\\s*,\\\\s*)+\\\\?", "?"))) + WHERE query_pattern = REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE( + REGEX_REPLACE({query_statement}, + "\\\\s+", " "), + '"(?:[^"]|"")*"', "?"), + "'(?:[^']|'')*'", "?"), + "\\\\b-?\\\\d+\\\\.?\\\\d*\\\\b", "?"), + "(?i)\\\\b(NULL|TRUE|FALSE)\\\\b", "?"), + "(\\\\?\\\\s*,\\\\s*)+\\\\?", "?") LIMIT {limit} """ - query = query_template.format(limit=query_limit) + query = query_template.format(limit=query_limit,query_statement=query_statement) try: result = system_catalog_query(ctx,query) return result @@ -408,7 +523,6 @@ def aggregate_slow_queries_stats_by_pattern(ctx: Context, query_limit:int ) -> l raise e - # Util Functions def system_catalog_query(ctx: Context, query: str) -> list[dict[str, Any]]: cluster = ctx.request_context.lifespan_context.cluster From 9601951b80844bd05770a9c61dda28c619eda1e9 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sat, 19 Jul 2025 23:42:21 +0300 Subject: [PATCH 08/10] Added int type verification for query_limit values sent to slow query requests --- src/mcp_server.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/mcp_server.py b/src/mcp_server.py index cf1462f..c67a275 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -468,6 +468,8 @@ def retreive_single_slow_query_plan(ctx: Context, query_statement : str, query_l LIMIT {limit} """ + if (not isinstance(query_limit, int)): + raise TypeError(f"Param query_limit must be an integer value, received {type(query_limit)} - {query_limit}") query = query_template.format(limit=query_limit,query_statement=query_statement) try: result = system_catalog_query(ctx,query) @@ -514,6 +516,8 @@ def retreive_list_of_similar_queries_from_completed_requests_catalog(ctx: Contex LIMIT {limit} """ + if (not isinstance(query_limit, int)): + raise TypeError(f"Param query_limit must be an integer value, received {type(query_limit)} - {query_limit}") query = query_template.format(limit=query_limit,query_statement=query_statement) try: result = system_catalog_query(ctx,query) From d05e5fc50c6b954e5ed08a9ece7afe645e467e33 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sat, 19 Jul 2025 23:52:36 +0300 Subject: [PATCH 09/10] additional query_limit type check --- src/mcp_server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mcp_server.py b/src/mcp_server.py index c67a275..53079f4 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -444,6 +444,8 @@ def advanced_aggregate_slow_queries_stats_by_pattern(ctx: Context, query_limit:i LIMIT {limit} """ + if (not isinstance(query_limit, int)): + raise TypeError(f"Param query_limit must be an integer value, received {type(query_limit)} - {query_limit}") query = query_template.format(limit=query_limit) try: result = system_catalog_query(ctx,query) From 08a3334db80dd21d78ac4ef6ddd7dc25125f9c36 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sun, 20 Jul 2025 21:46:41 +0300 Subject: [PATCH 10/10] Fixed ambiguous reference to query param in advise tool function --- src/mcp_server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 5da4a42..a668db2 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -295,8 +295,8 @@ def advise_index_for_sql_plus_plus_query( response = {} try: - query = f"ADVISE {query}" - result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) + advise_query = f"ADVISE {query}" + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, advise_query) if result and (advice := result[0].get("advice")): if (advice is not None): @@ -308,7 +308,7 @@ def advise_index_for_sql_plus_plus_query( return response except Exception as e: logger.error(f"Error running Advise on query: {e}") - raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") + raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") from e @mcp.tool() def run_sql_plus_plus_query(