From 4e462323644bb1e8a97a4f812d054352cdb42b66 Mon Sep 17 00:00:00 2001 From: Alex Cheema Date: Mon, 22 Jul 2024 02:38:37 -0700 Subject: [PATCH] add simple prometheus metrics collection, with a prometheus / grafana instance for live dashboard. related: #22 --- exo/orchestration/standard_node.py | 19 +++++++++++++++---- exo/stats/__init__.py | 0 exo/stats/docker-compose-stats.yml | 27 +++++++++++++++++++++++++++ exo/stats/metrics.py | 28 ++++++++++++++++++++++++++++ exo/stats/prometheus.yml | 7 +++++++ main.py | 5 ++++- setup.py | 1 + 7 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 exo/stats/__init__.py create mode 100644 exo/stats/docker-compose-stats.yml create mode 100644 exo/stats/metrics.py create mode 100644 exo/stats/prometheus.yml diff --git a/exo/orchestration/standard_node.py b/exo/orchestration/standard_node.py index b892be4d5..67af8e717 100644 --- a/exo/orchestration/standard_node.py +++ b/exo/orchestration/standard_node.py @@ -28,7 +28,7 @@ def __init__(self, id: str, server: Server, inference_engine: InferenceEngine, d self.topology_viz = TopologyViz(chatgpt_api_endpoint=chatgpt_api_endpoint, web_chat_url=web_chat_url) if not disable_tui else None self.max_generate_tokens = max_generate_tokens self._on_token = AsyncCallbackSystem[str, Tuple[str, List[int], bool]]() - self._on_opaque_status = AsyncCallbackSystem[str, str]() + self._on_opaque_status = AsyncCallbackSystem[str, Tuple[str, str]]() self._on_opaque_status.register("node_status").on_next(self.on_node_status) def on_node_status(self, request_id, opaque_status): @@ -275,7 +275,7 @@ def on_token(self) -> AsyncCallbackSystem[str, Tuple[str, List[int], bool]]: return self._on_token @property - def on_opaque_status(self) -> AsyncCallbackSystem[str, str]: + def on_opaque_status(self) -> AsyncCallbackSystem[str, Tuple[str, str]]: return self._on_opaque_status def trigger_on_token_callbacks(self, request_id: str, tokens: List[int], is_finished: bool) -> None: @@ -296,8 +296,19 @@ async def send_result_to_peer(peer): await asyncio.gather(*[send_result_to_peer(peer) for peer in self.peers], return_exceptions=True) async def broadcast_opaque_status(self, request_id: str, status: str) -> None: - for peer in self.peers: - await peer.send_opaque_status(request_id, status) + async def send_status_to_peer(peer): + try: + await asyncio.wait_for(peer.send_opaque_status(request_id, status), timeout=15.0) + except asyncio.TimeoutError: + print(f"Timeout sending opaque status to {peer.id()}") + except Exception as e: + print(f"Error sending opaque status to {peer.id()}: {e}") + import traceback + traceback.print_exc() + + await asyncio.gather(*[send_status_to_peer(peer) for peer in self.peers], return_exceptions=True) + # in the case of opaque status, we also want to receive our own opaque statuses + self.on_opaque_status.trigger_all(request_id, status) @property def current_topology(self) -> Topology: diff --git a/exo/stats/__init__.py b/exo/stats/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/exo/stats/docker-compose-stats.yml b/exo/stats/docker-compose-stats.yml new file mode 100644 index 000000000..a37fda439 --- /dev/null +++ b/exo/stats/docker-compose-stats.yml @@ -0,0 +1,27 @@ +version: '3.8' + +services: + prometheus: + image: prom/prometheus:latest + container_name: prometheus + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + command: + - '--config.file=/etc/prometheus/prometheus.yml' + ports: + - "9090:9090" + networks: + - monitoring + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "3000:3000" + networks: + - monitoring + depends_on: + - prometheus + +networks: + monitoring: diff --git a/exo/stats/metrics.py b/exo/stats/metrics.py new file mode 100644 index 000000000..950d59429 --- /dev/null +++ b/exo/stats/metrics.py @@ -0,0 +1,28 @@ +from exo.orchestration import Node +from prometheus_client import start_http_server, Counter, Histogram +import json +from typing import List + +# Create metrics to track time spent and requests made. +PROCESS_PROMPT_COUNTER = Counter('process_prompt_total', 'Total number of prompts processed', ['node_id']) +PROCESS_TENSOR_COUNTER = Counter('process_tensor_total', 'Total number of tensors processed', ['node_id']) +PROCESS_TENSOR_TIME = Histogram('process_tensor_seconds', 'Time spent processing tensor', ['node_id']) + +def start_metrics_server(node: Node, port: int): + start_http_server(port) + + def _on_opaque_status(request_id, opaque_status: str): + status_data = json.loads(opaque_status) + type = status_data.get("type", "") + node_id = status_data.get("node_id", "") + if type != "node_status": return + status = status_data.get("status", "") + + if status == "end_process_prompt": + PROCESS_PROMPT_COUNTER.labels(node_id=node_id).inc() + elif status == "end_process_tensor": + elapsed_time_ns = status_data.get("elapsed_time_ns", 0) + PROCESS_TENSOR_COUNTER.labels(node_id=node_id).inc() + PROCESS_TENSOR_TIME.labels(node_id=node_id).observe(elapsed_time_ns / 1e9) # Convert ns to seconds + + node.on_opaque_status.register("stats").on_next(_on_opaque_status) \ No newline at end of file diff --git a/exo/stats/prometheus.yml b/exo/stats/prometheus.yml new file mode 100644 index 000000000..1530f45d0 --- /dev/null +++ b/exo/stats/prometheus.yml @@ -0,0 +1,7 @@ +global: + scrape_interval: 15s + +scrape_configs: + - job_name: 'exo-node' + static_configs: + - targets: ['host.docker.internal:8005'] diff --git a/main.py b/main.py index 3864a859d..08cd174de 100644 --- a/main.py +++ b/main.py @@ -16,6 +16,7 @@ parser.add_argument("--node-host", type=str, default="0.0.0.0", help="Node host") parser.add_argument("--node-port", type=int, default=None, help="Node port") parser.add_argument("--listen-port", type=int, default=5678, help="Listening port for discovery") +parser.add_argument("--prometheus-client-port", type=int, default=None, help="Prometheus client port") parser.add_argument("--broadcast-port", type=int, default=5678, help="Broadcast port for discovery") parser.add_argument("--wait-for-peers", type=int, default=0, help="Number of peers to wait to connect to before starting") parser.add_argument("--chatgpt-api-port", type=int, default=8000, help="ChatGPT API port") @@ -41,8 +42,10 @@ server = GRPCServer(node, args.node_host, args.node_port) node.server = server api = ChatGPTAPI(node, inference_engine.__class__.__name__, response_timeout_secs=args.chatgpt_api_response_timeout_secs) - node.on_token.register("main_log").on_next(lambda _, tokens , __: print(inference_engine.tokenizer.decode(tokens) if hasattr(inference_engine, "tokenizer") else tokens)) +if args.prometheus_client_port: + from exo.stats.metrics import start_metrics_server + start_metrics_server(node, args.prometheus_client_port) async def shutdown(signal, loop): """Gracefully shutdown the server and close the asyncio loop.""" diff --git a/setup.py b/setup.py index 4b1db878c..7e3a61dfb 100644 --- a/setup.py +++ b/setup.py @@ -11,6 +11,7 @@ "huggingface-hub==0.23.4", "Jinja2==3.1.4", "numpy==2.0.0", + "prometheus-client==0.20.0", "protobuf==5.27.1", "psutil==6.0.0", "pynvml==11.5.3",