diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 3505e310..4a7ab97e 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -50,7 +50,9 @@ from uagents.protocol import Protocol from uagents.registration import ( AgentRegistrationPolicy, + AgentStatusUpdate, DefaultRegistrationPolicy, + update_agent_status, ) from uagents.resolver import GlobalResolver, Resolver from uagents.storage import KeyValueStore, get_or_create_private_keys @@ -346,10 +348,10 @@ def __init__( else: self._mailbox_client = None - almanac_api_url = f"{self._agentverse['http_prefix']}://{self._agentverse['base_url']}/v1/almanac" + self._almanac_api_url = f"{self._agentverse['http_prefix']}://{self._agentverse['base_url']}/v1/almanac" self._resolver = resolve or GlobalResolver( max_endpoints=max_resolver_endpoints, - almanac_api_url=almanac_api_url, + almanac_api_url=self._almanac_api_url, ) self._ledger = get_ledger(test) @@ -378,7 +380,7 @@ def __init__( self._almanac_contract, self._test, logger=self._logger, - almanac_api=almanac_api_url, + almanac_api=self._almanac_api_url, ) self._metadata = self._initialize_metadata(metadata) @@ -1095,6 +1097,13 @@ async def _shutdown(self): Perform shutdown actions. """ + try: + status = AgentStatusUpdate(agent_address=self.address, is_active=False) + status.sign(self._identity) + await update_agent_status(status, self._almanac_api_url) + except Exception as ex: + self._logger.exception(f"Failed to update agent registration status: {ex}") + for handler in self._on_shutdown: try: ctx = self._build_context() diff --git a/python/src/uagents/registration.py b/python/src/uagents/registration.py index dcc722dd..ed98b915 100644 --- a/python/src/uagents/registration.py +++ b/python/src/uagents/registration.py @@ -2,6 +2,7 @@ import hashlib import json import logging +import time from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Union @@ -23,6 +24,43 @@ from uagents.types import AgentEndpoint +class VerifiableModel(BaseModel): + agent_address: str + signature: Optional[str] = None + timestamp: Optional[int] = None + + def sign(self, identity: Identity): + self.timestamp = int(time.time()) + digest = self._build_digest() + self.signature = identity.sign_digest(digest) + + def verify(self) -> bool: + return self.signature is not None and Identity.verify_digest( + self.agent_address, self._build_digest(), self.signature + ) + + def _build_digest(self) -> bytes: + sha256 = hashlib.sha256() + sha256.update( + json.dumps( + self.model_dump(exclude={"signature"}), + sort_keys=True, + separators=(",", ":"), + ).encode("utf-8") + ) + return sha256.digest() + + +class AgentRegistrationAttestation(VerifiableModel): + protocols: List[str] + endpoints: List[AgentEndpoint] + metadata: Optional[Dict[str, Union[str, Dict[str, str]]]] = None + + +class AgentStatusUpdate(VerifiableModel): + is_active: bool + + def generate_backoff_time(retry: int) -> float: """ Generate a backoff time starting from 0.128 seconds and limited to ~131 seconds @@ -49,6 +87,29 @@ def coerce_metadata_to_str( return out +async def almanac_api_post( + url: str, data: BaseModel, raise_from: bool = True, retries: int = 3 +) -> bool: + async with aiohttp.ClientSession() as session: + for retry in range(retries): + try: + async with session.post( + url, + headers={"content-type": "application/json"}, + data=data.model_dump_json(), + timeout=aiohttp.ClientTimeout(total=ALMANAC_API_TIMEOUT_SECONDS), + ) as resp: + resp.raise_for_status() + return True + except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: + if retry == retries - 1: + if raise_from: + raise e + return False + await asyncio.sleep(generate_backoff_time(retry)) + return False + + class AgentRegistrationPolicy(ABC): @abstractmethod # pylint: disable=unnecessary-pass @@ -62,43 +123,6 @@ async def register( pass -class AgentRegistrationAttestation(BaseModel): - agent_address: str - protocols: List[str] - endpoints: List[AgentEndpoint] - metadata: Optional[Dict[str, Union[str, Dict[str, str]]]] = None - signature: Optional[str] = None - - def sign(self, identity: Identity): - digest = self._build_digest() - self.signature = identity.sign_digest(digest) - - def verify(self) -> bool: - if self.signature is None: - raise ValueError("Attestation signature is missing") - return Identity.verify_digest( - self.agent_address, self._build_digest(), self.signature - ) - - def _build_digest(self) -> bytes: - normalised_attestation = AgentRegistrationAttestation( - agent_address=self.agent_address, - protocols=sorted(self.protocols), - endpoints=sorted(self.endpoints, key=lambda x: x.url), - metadata=self.metadata, - ) - - sha256 = hashlib.sha256() - sha256.update( - json.dumps( - normalised_attestation.model_dump(exclude={"signature"}), - sort_keys=True, - separators=(",", ":"), - ).encode("utf-8") - ) - return sha256.digest() - - class AlmanacApiRegistrationPolicy(AgentRegistrationPolicy): def __init__( self, @@ -137,25 +161,11 @@ async def register( # sign the attestation attestation.sign(self._identity) - # submit the attestation to the API - async with aiohttp.ClientSession() as session: # noqa: SIM117 - for retry in range(self._max_retries): - try: - async with session.post( - f"{self._almanac_api}/agents", - headers={"content-type": "application/json"}, - data=attestation.model_dump_json(), - timeout=aiohttp.ClientTimeout( - total=ALMANAC_API_TIMEOUT_SECONDS - ), - ) as resp: - resp.raise_for_status() - self._logger.info("Registration on Almanac API successful") - return - except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: - if retry == self._max_retries - 1: - raise e - await asyncio.sleep(generate_backoff_time(retry)) + success = await almanac_api_post( + f"{self._almanac_api}/agents", attestation, retries=self._max_retries + ) + if success: + self._logger.info("Registration on Almanac API successful") class LedgerBasedRegistrationPolicy(AgentRegistrationPolicy): @@ -293,3 +303,11 @@ async def register( except Exception as e: self._logger.error(f"Failed to register on Almanac contract: {e}") raise + + +async def update_agent_status(status: AgentStatusUpdate, almanac_api: str): + await almanac_api_post( + f"{almanac_api}/agents/{status.agent_address}/status", + status, + raise_from=False, + ) diff --git a/python/tests/test_registration.py b/python/tests/test_registration.py index f83e4126..a3cd78ff 100644 --- a/python/tests/test_registration.py +++ b/python/tests/test_registration.py @@ -75,27 +75,7 @@ def test_recovery_of_attestation(): protocols=TEST_PROTOCOLS, endpoints=TEST_ENDPOINTS, signature=original_attestation.signature, - ) - assert recovered.verify() - - -def test_order_of_protocols_or_endpoints_does_not_matter(): - identity = Identity.generate() - - # create an attestation - original_attestation = AgentRegistrationAttestation( - agent_address=identity.address, - protocols=TEST_PROTOCOLS, - endpoints=TEST_ENDPOINTS, - ) - original_attestation.sign(identity) - - # recover the attestation - recovered = AgentRegistrationAttestation( - agent_address=original_attestation.agent_address, - protocols=TEST_PROTOCOLS, - endpoints=TEST_ENDPOINTS, - signature=original_attestation.signature, + timestamp=original_attestation.timestamp, ) assert recovered.verify()