Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions mcp_servers/integration_test_generator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ This MCP server helps generate OTel integration metric test files similar to `te
- `Test_<Integration>MetricsCollection` - validates metrics received by collector
- `Test_BackendValidity` - validates metrics received by backend
- `Test_Smoke` - generates integration-specific activity and validates basic metrics

- **Uses shared utilities**:
- All tests use the shared `utils/otel_metrics_validator.py`

- **Generates supporting files**:
- `__init__.py`
- Template for metrics JSON file
Expand Down Expand Up @@ -153,7 +153,7 @@ def setup_main(self) -> None:
"""When the container spins up, we need some activity."""
scenario: OtelCollectorScenario = context.scenario
container = scenario.redis_container

# Customize these operations for your integration
r = container.exec_run("redis-cli SET test_key test_value")
logger.info(r.output)
Expand Down Expand Up @@ -198,7 +198,7 @@ The generator will create:
1. Check the configuration file path is correct
2. Ensure the Python path in configuration matches your system
3. Restart Cursor/Claude Desktop after configuration changes
4. Check logs:
4. Check logs:
- Cursor: Developer Tools → Console
- Claude Desktop: Console logs

Expand Down
1 change: 1 addition & 0 deletions mcp_servers/integration_test_generator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""MCP Server for generating OTel integration metric test files."""
125 changes: 60 additions & 65 deletions mcp_servers/integration_test_generator/server.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
#!/usr/bin/env python3
"""MCP Server for generating OTel integration metric test files.

This server provides tools to generate test files similar to test_postgres_metrics.py
but for different integrations (Redis, MySQL, Kafka, etc.).
"""

import json
import sys
from pathlib import Path
from typing import Any

# MCP SDK imports
try:
from mcp.server import Server
from mcp.types import Tool, TextContent, Resource
from mcp.types import Tool, TextContent, Resource, Prompt, PromptArgument, PromptMessage
import mcp.server.stdio
except ImportError:
print("Error: MCP SDK not installed. Install with: pip install mcp")
exit(1)
sys.exit(1)

# Path to reference test files
SYSTEM_TESTS_ROOT = Path(__file__).parent.parent.parent
Expand Down Expand Up @@ -47,7 +46,8 @@
"container_name": "mysql_container",
"smoke_test_operations": [
"r = container.exec_run(\"mysql -u root -ppassword -e 'CREATE DATABASE IF NOT EXISTS test_db;'\")",
"r = container.exec_run(\"mysql -u root -ppassword test_db -e 'CREATE TABLE IF NOT EXISTS test_table (id INT PRIMARY KEY);'\")",
'r = container.exec_run("mysql -u root -ppassword test_db -e '
"'CREATE TABLE IF NOT EXISTS test_table (id INT PRIMARY KEY);'\")",
"r = container.exec_run(\"mysql -u root -ppassword test_db -e 'INSERT INTO test_table VALUES (1);'\")",
"logger.info(r.output)",
"r = container.exec_run(\"mysql -u root -ppassword test_db -e 'SELECT * FROM test_table;'\")",
Expand Down Expand Up @@ -76,7 +76,8 @@
"smoke_test_operations": [
'r = container.exec_run("kafka-topics --create --topic test-topic --bootstrap-server localhost:9092")',
"logger.info(r.output)",
'r = container.exec_run("kafka-console-producer --topic test-topic --bootstrap-server localhost:9092", stdin="test message")',
'r = container.exec_run("kafka-console-producer --topic test-topic '
'--bootstrap-server localhost:9092", stdin="test message")',
],
"expected_smoke_metrics": [
"kafka.messages",
Expand Down Expand Up @@ -132,7 +133,7 @@ def generate_test_file(
# Format expected smoke metrics
expected_metrics_formatted = ",\n ".join([f'"{m}"' for m in config["expected_smoke_metrics"]])

template = f'''import time
return f'''import time
from pathlib import Path
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -235,19 +236,15 @@ def test_main(self) -> None:
observed_metrics.add(metric)
logger.info(f" {{metric}} {{serie['points']}}")

all_metric_has_be_seen = True
for metric in expected_metrics:
if metric not in observed_metrics:
logger.error(f"Metric {{metric}} hasn't been observed")
all_metric_has_be_seen = False
else:
logger.info(f"Metric {{metric}} has been observed")

assert all_metric_has_be_seen
'''

return template


def generate_init_file() -> str:
"""Generate __init__.py file."""
Expand Down Expand Up @@ -287,7 +284,10 @@ async def list_tools() -> list[Tool]:
},
"feature_name": {
"type": "string",
"description": "Feature name for the @features decorator (optional, defaults to <integration>_receiver_metrics)",
"description": (
"Feature name for the @features decorator "
"(optional, defaults to <integration>_receiver_metrics)"
),
},
},
"required": ["integration_name", "metrics_json_file"],
Expand Down Expand Up @@ -335,27 +335,30 @@ async def list_tools() -> list[Tool]:
async def list_resources() -> list[Resource]:
"""List available reference resources."""
resources = []

if POSTGRES_TEST_PATH.exists():
resources.append(
Resource(
uri=f"file://{POSTGRES_TEST_PATH}",
name="PostgreSQL Metrics Test (Reference)",
description="Reference implementation of OTel metrics test. Use this as the gold standard for structure and patterns.",
mimeType="text/x-python"
description=(
"Reference implementation of OTel metrics test. "
"Use this as the gold standard for structure and patterns."
),
mimeType="text/x-python",
)
)

if MYSQL_TEST_PATH.exists():
resources.append(
Resource(
uri=f"file://{MYSQL_TEST_PATH}",
name="MySQL Metrics Test (Reference)",
description="MySQL metrics test implementation following PostgreSQL patterns",
mimeType="text/x-python"
mimeType="text/x-python",
)
)

# Add OtelMetricsValidator reference
validator_path = SYSTEM_TESTS_ROOT / "utils/otel_metrics_validator.py"
if validator_path.exists():
Expand All @@ -364,10 +367,10 @@ async def list_resources() -> list[Resource]:
uri=f"file://{validator_path}",
name="OtelMetricsValidator Utility",
description="Shared utility for validating OTel metrics. All tests should use this.",
mimeType="text/x-python"
mimeType="text/x-python",
)
)

# Add improvements document
improvements_path = Path(__file__).parent / "IMPROVEMENTS.md"
if improvements_path.exists():
Expand All @@ -376,10 +379,10 @@ async def list_resources() -> list[Resource]:
uri=f"file://{improvements_path}",
name="Integration Test Improvements",
description="Design document with improvements and patterns for test generation",
mimeType="text/markdown"
mimeType="text/markdown",
)
)

return resources


Expand All @@ -389,19 +392,17 @@ async def read_resource(uri: str) -> str:
# Extract path from file:// URI
path = uri.replace("file://", "")
path_obj = Path(path)

if not path_obj.exists():
raise ValueError(f"Resource not found: {uri}")
with open(path_obj, "r", encoding="utf-8") as f:
return f.read()

# Read file synchronously (MCP server context)
return path_obj.read_text(encoding="utf-8")


@app.list_prompts()
async def list_prompts():
async def list_prompts() -> list[Prompt]:
"""List available prompts."""
from mcp.types import Prompt, PromptArgument

return [
Prompt(
name="generate_with_reference",
Expand All @@ -410,33 +411,29 @@ async def list_prompts():
PromptArgument(
name="integration_name",
description="Name of the integration (e.g., redis, kafka, mongodb)",
required=True
required=True,
),
PromptArgument(
name="metrics_json_file",
description="Name of the metrics JSON file",
required=True
),
]
PromptArgument(name="metrics_json_file", description="Name of the metrics JSON file", required=True),
],
)
]


@app.get_prompt()
async def get_prompt(name: str, arguments: dict[str, str] | None = None):
async def get_prompt(name: str, arguments: dict[str, str] | None = None) -> PromptMessage:
"""Get a specific prompt."""
from mcp.types import PromptMessage, TextContent as PromptTextContent

if name == "generate_with_reference":
integration_name = arguments.get("integration_name", "example") if arguments else "example"
metrics_json_file = arguments.get("metrics_json_file", "example_metrics.json") if arguments else "example_metrics.json"

metrics_json_file = (
arguments.get("metrics_json_file", "example_metrics.json") if arguments else "example_metrics.json"
)

# Read the PostgreSQL test as reference
postgres_test_content = ""
if POSTGRES_TEST_PATH.exists():
with open(POSTGRES_TEST_PATH, "r", encoding="utf-8") as f:
postgres_test_content = f.read()
# Read file synchronously (MCP server context)
postgres_test_content = POSTGRES_TEST_PATH.read_text(encoding="utf-8")

prompt_text = f"""You are generating an OTel integration metrics test for {integration_name}.

CRITICAL: Use the PostgreSQL test as your REFERENCE TEMPLATE. Follow its structure exactly.
Expand All @@ -449,18 +446,18 @@ async def get_prompt(name: str, arguments: dict[str, str] | None = None):

## Requirements for {integration_name} test:

1. **Structure**: Follow PostgreSQL test structure EXACTLY:
1. **Structure**: Follow PostgreSQL test structure EXACTLY:
- Three separate test classes (not one big class)
- Test_{{Integration}}MetricsCollection
- Test_BackendValidity
- Test_BackendValidity
- Test_Smoke

2. **Use OtelMetricsValidator**: Import and use the shared validator
2. **Use OtelMetricsValidator**: Import and use the shared validator
```python
from utils.otel_metrics_validator import OtelMetricsValidator, get_collector_metrics_from_scenario
```

3. **Correct Decorators**:
3. **Correct Decorators**:
- Use scenario-specific decorator: @scenarios.otel_{integration_name}_metrics_e2e

4. **Real Metrics**: Use actual metrics from {integration_name} receiver
Expand All @@ -485,30 +482,24 @@ async def get_prompt(name: str, arguments: dict[str, str] | None = None):
def test_main(self) -> None:
observed_metrics: set[str] = set()
expected_metrics = {{...}}

for data in interfaces.otel_collector.get_data("/api/v2/series"):
# ... collect metrics

missing_metrics = expected_metrics - observed_metrics
assert not missing_metrics, f"Missing metrics: {{missing_metrics}}"
```

Generate the complete test file for {integration_name} with metrics file {metrics_json_file}.
"""

return PromptMessage(
role="user",
content=PromptTextContent(
type="text",
text=prompt_text
)
)

Generate the complete test file for {integration_name} with metrics file {metrics_json_file}.
"""

return PromptMessage(role="user", content=TextContent(type="text", text=prompt_text))

raise ValueError(f"Unknown prompt: {name}")


@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Handle tool calls."""

if name == "generate_integration_test":
Expand Down Expand Up @@ -538,7 +529,9 @@ async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"shared_utility": {
"note": "Uses shared OtelMetricsValidator from utils/otel_metrics_validator.py",
"location": "utils/otel_metrics_validator.py",
"import_statement": "from utils.otel_metrics_validator import OtelMetricsValidator, get_collector_metrics_from_scenario",
"import_statement": (
"from utils.otel_metrics_validator import OtelMetricsValidator, get_collector_metrics_from_scenario"
),
},
"directory_structure": f"""
Create the following directory structure:
Expand Down Expand Up @@ -601,7 +594,9 @@ async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"shared_utility": {
"location": "utils/otel_metrics_validator.py",
"description": "Reusable metrics validation class for all OTel integration tests",
"import_statement": "from utils.otel_metrics_validator import OtelMetricsValidator, get_collector_metrics_from_scenario",
"import_statement": (
"from utils.otel_metrics_validator import OtelMetricsValidator, get_collector_metrics_from_scenario"
),
},
"classes": {
"OtelMetricsValidator": {
Expand Down Expand Up @@ -651,7 +646,7 @@ async def call_tool(name: str, arguments: Any) -> list[TextContent]:
raise ValueError(f"Unknown tool: {name}")


async def main():
async def main() -> None:
"""Main entry point for the MCP server."""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(
Expand Down
4 changes: 4 additions & 0 deletions tests/otel_mysql_metrics_e2e/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Unless explicitly stated otherwise all files in this repository are licensed under the the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2024 Datadog, Inc.

Loading
Loading