Skip to content

Commit

Permalink
Query profiling
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Mar 12, 2024
1 parent e7912fe commit 712395c
Showing 1 changed file with 19 additions and 2 deletions.
21 changes: 19 additions & 2 deletions sqa/worker/p2p/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import argparse
import asyncio
import datetime
import json
import logging
import os
import timeit
from json import JSONDecodeError
from typing import AsyncIterator, Optional, Dict, Iterator

Expand Down Expand Up @@ -157,15 +159,19 @@ async def _handle_query(
return

# Verify query signature
start = timeit.default_timer()
signature = query.signature
query.signature = b""
signed_data = query.SerializeToString()
if not await self._rpc.verify_signature(signed_data, signature, client_peer_id):
LOG.warning(f"Query with invalid signature received from {client_peer_id}")
return
query.signature = signature
elapsed = timeit.default_timer() - start
LOG.info(f"Signature verification time: {int(elapsed * 1000)} ms")

# Check if client has sufficient compute units allocated
start = timeit.default_timer()
if not await gateway_allocations.try_to_execute(client_peer_id):
LOG.warning(f"Not enough allocated for {client_peer_id}")
envelope = msg_pb.Envelope(
Expand All @@ -176,8 +182,12 @@ async def _handle_query(
)
await self._rpc.send_msg(envelope, peer_id=client_peer_id)
return
elapsed = timeit.default_timer() - start
LOG.info(f"Allocation check time: {int(elapsed * 1000)} ms")

self._query_info[query.query_id] = QueryInfo(client_peer_id)
query_info = QueryInfo(client_peer_id)
LOG.info(f"Query {query.query_id} received at {query_info.start_time}")
self._query_info[query.query_id] = query_info
await self._query_tasks.put(query)

async def send_ping(self, state: State, stored_bytes: int, pause=False) -> None:
Expand Down Expand Up @@ -214,6 +224,7 @@ async def send_query_result(self, query: msg_pb.Query, result: QueryResult) -> N
LOG.error(f"Unknown query: {query.query_id}")
return
query_info.finished()
LOG.info(f"Query {query.query_id} total execution time: {query_info.exec_time_ms} ms")

EXEC_TIME.observe(query_info.exec_time_ms)
RESULT_SIZE.observe(len(result.compressed_data))
Expand All @@ -228,7 +239,10 @@ async def send_query_result(self, query: msg_pb.Query, result: QueryResult) -> N
)
)
)
start = timeit.default_timer()
await self._rpc.send_msg(envelope, peer_id=query_info.client_id)
elapsed = timeit.default_timer() - start
LOG.info(f"Result sending time: {int(elapsed * 1000)} ms")
self._logs_storage.query_executed(query, query_info, result)

async def send_query_error(
Expand Down Expand Up @@ -259,13 +273,16 @@ async def execute_query(transport: P2PTransport, worker: Worker, query_task: msg
try:
query = json.loads(query_task.query)
dataset = dataset_decode(query_task.dataset)
LOG.info(f"Query {query_task.query_id} execution started at {datetime.datetime.now()}")
start = timeit.default_timer()
result = await worker.execute_query(
query,
dataset,
compute_data_hash=True,
profiling=query_task.profiling
)
LOG.info(f"Query {query_task.query_id} success")
elapsed = timeit.default_timer() - start
LOG.info(f"Query {query_task.query_id} success. Duration: {int(elapsed * 1000)} ms")
QUERY_OK.inc()
await transport.send_query_result(query_task, result)
except (JSONDecodeError, ValidationError, InvalidQuery, MissingData) as e:
Expand Down

0 comments on commit 712395c

Please sign in to comment.