Skip to content

Commit

Permalink
add simple prometheus metrics collection, with a prometheus / grafana…
Browse files Browse the repository at this point in the history
… instance for live dashboard. related: exo-explore#22
  • Loading branch information
AlexCheema committed Jul 22, 2024
1 parent ed35135 commit 7b9c2a0
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 5 deletions.
19 changes: 15 additions & 4 deletions exo/orchestration/standard_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, id: str, server: Server, inference_engine: InferenceEngine, d
self.topology_viz = TopologyViz(chatgpt_api_endpoint=chatgpt_api_endpoint, web_chat_url=web_chat_url) if not disable_tui else None
self.max_generate_tokens = max_generate_tokens
self._on_token = AsyncCallbackSystem[str, Tuple[str, List[int], bool]]()
self._on_opaque_status = AsyncCallbackSystem[str, str]()
self._on_opaque_status = AsyncCallbackSystem[str, Tuple[str, str]]()
self._on_opaque_status.register("node_status").on_next(self.on_node_status)

def on_node_status(self, request_id, opaque_status):
Expand Down Expand Up @@ -275,7 +275,7 @@ def on_token(self) -> AsyncCallbackSystem[str, Tuple[str, List[int], bool]]:
return self._on_token

@property
def on_opaque_status(self) -> AsyncCallbackSystem[str, str]:
def on_opaque_status(self) -> AsyncCallbackSystem[str, Tuple[str, str]]:
return self._on_opaque_status

def trigger_on_token_callbacks(self, request_id: str, tokens: List[int], is_finished: bool) -> None:
Expand All @@ -296,8 +296,19 @@ async def send_result_to_peer(peer):
await asyncio.gather(*[send_result_to_peer(peer) for peer in self.peers], return_exceptions=True)

async def broadcast_opaque_status(self, request_id: str, status: str) -> None:
for peer in self.peers:
await peer.send_opaque_status(request_id, status)
async def send_status_to_peer(peer):
try:
await asyncio.wait_for(peer.send_opaque_status(request_id, status), timeout=15.0)
except asyncio.TimeoutError:
print(f"Timeout sending opaque status to {peer.id()}")
except Exception as e:
print(f"Error sending opaque status to {peer.id()}: {e}")
import traceback
traceback.print_exc()

await asyncio.gather(*[send_status_to_peer(peer) for peer in self.peers], return_exceptions=True)
# in the case of opaque status, we also want to receive our own opaque statuses
self.on_opaque_status.trigger_all(request_id, status)

@property
def current_topology(self) -> Topology:
Expand Down
Empty file added exo/stats/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions exo/stats/docker-compose-stats.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: '3.8'

services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
ports:
- "9090:9090"
networks:
- monitoring

grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
networks:
- monitoring
depends_on:
- prometheus

networks:
monitoring:
28 changes: 28 additions & 0 deletions exo/stats/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from exo.orchestration import Node
from prometheus_client import start_http_server, Counter, Histogram
import json
from typing import List

# Create metrics to track time spent and requests made.
PROCESS_PROMPT_COUNTER = Counter('process_prompt_total', 'Total number of prompts processed', ['node_id'])
PROCESS_TENSOR_COUNTER = Counter('process_tensor_total', 'Total number of tensors processed', ['node_id'])
PROCESS_TENSOR_TIME = Histogram('process_tensor_seconds', 'Time spent processing tensor', ['node_id'])

def start_metrics_server(node: Node, port: int):
start_http_server(port)

def _on_opaque_status(request_id, opaque_status: str):
status_data = json.loads(opaque_status)
type = status_data.get("type", "")
node_id = status_data.get("node_id", "")
if type != "node_status": return
status = status_data.get("status", "")

if status == "end_process_prompt":
PROCESS_PROMPT_COUNTER.labels(node_id=node_id).inc()
elif status == "end_process_tensor":
elapsed_time_ns = status_data.get("elapsed_time_ns", 0)
PROCESS_TENSOR_COUNTER.labels(node_id=node_id).inc()
PROCESS_TENSOR_TIME.labels(node_id=node_id).observe(elapsed_time_ns / 1e9) # Convert ns to seconds

node.on_opaque_status.register("stats").on_next(_on_opaque_status)
7 changes: 7 additions & 0 deletions exo/stats/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
global:
scrape_interval: 15s

scrape_configs:
- job_name: 'exo-node'
static_configs:
- targets: ['host.docker.internal:8005']
5 changes: 4 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
parser.add_argument("--node-host", type=str, default="0.0.0.0", help="Node host")
parser.add_argument("--node-port", type=int, default=None, help="Node port")
parser.add_argument("--listen-port", type=int, default=5678, help="Listening port for discovery")
parser.add_argument("--prometheus-client-port", type=int, default=None, help="Prometheus client port")
parser.add_argument("--broadcast-port", type=int, default=5678, help="Broadcast port for discovery")
parser.add_argument("--wait-for-peers", type=int, default=0, help="Number of peers to wait to connect to before starting")
parser.add_argument("--chatgpt-api-port", type=int, default=8000, help="ChatGPT API port")
Expand All @@ -41,8 +42,10 @@
server = GRPCServer(node, args.node_host, args.node_port)
node.server = server
api = ChatGPTAPI(node, inference_engine.__class__.__name__, response_timeout_secs=args.chatgpt_api_response_timeout_secs)

node.on_token.register("main_log").on_next(lambda _, tokens , __: print(inference_engine.tokenizer.decode(tokens) if hasattr(inference_engine, "tokenizer") else tokens))
if args.prometheus_client_port:
from exo.stats.metrics import start_metrics_server
start_metrics_server(node, args.prometheus_client_port)

async def shutdown(signal, loop):
"""Gracefully shutdown the server and close the asyncio loop."""
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"huggingface-hub==0.23.4",
"Jinja2==3.1.4",
"numpy==2.0.0",
"prometheus-client==0.20.0",
"protobuf==5.27.1",
"psutil==6.0.0",
"pynvml==11.5.3",
Expand Down

0 comments on commit 7b9c2a0

Please sign in to comment.