Skip to content

feat(core): add batch almanac api and contract registrations for Bureau #551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 67 additions & 31 deletions python/src/uagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from uagents.asgi import ASGIServer
from uagents.communication import Dispenser
from uagents.config import (
ALMANAC_CONTRACT_VERSION,
AVERAGE_BLOCK_INTERVAL,
LEDGER_PREFIX,
MAINNET_PREFIX,
Expand All @@ -50,6 +49,8 @@
from uagents.protocol import Protocol
from uagents.registration import (
AgentRegistrationPolicy,
BatchAlmanacApiRegistrationPolicy,
BatchRegistrationPolicy,
DefaultRegistrationPolicy,
)
from uagents.resolver import GlobalResolver, Resolver
Expand Down Expand Up @@ -756,21 +757,11 @@ async def register(self):
if necessary.

"""
# Check if the deployed contract version matches the supported version
deployed_version = self._almanac_contract.get_contract_version()
if deployed_version != ALMANAC_CONTRACT_VERSION:
self._logger.warning(
"Mismatch in almanac contract versions: supported (%s), deployed (%s). "
"Update uAgents to the latest version for compatibility.",
ALMANAC_CONTRACT_VERSION,
deployed_version,
)

await self._registration_policy.register(
self.address, list(self.protocols.keys()), self._endpoints, self._metadata
)

async def _registration_loop(self):
async def _schedule_registration(self):
"""
Execute the registration loop.

Expand All @@ -784,12 +775,12 @@ async def _registration_loop(self):
except InsufficientFundsError:
time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register on almanac contract: {ex}")
self._logger.exception(f"Failed to register: {ex}")
time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._registration_loop(), time_until_next_registration)
_delay(self._schedule_registration(), time_until_next_registration)
)

def on_interval(
Expand Down Expand Up @@ -1061,18 +1052,18 @@ async def handle_rest(

return await handler(*args) # type: ignore

async def _startup(self):
async def _startup(self, start_registration_loop: bool = True):
"""
Perform startup actions.

"""
if self._endpoints:
await self._registration_loop()

else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
)
if start_registration_loop:
if self._endpoints:
await self._schedule_registration()
else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
)
for handler in self._on_startup:
try:
ctx = self._build_context()
Expand Down Expand Up @@ -1100,13 +1091,13 @@ async def _shutdown(self):
except Exception as ex:
self._logger.exception(f"Exception in shutdown handler: {ex}")

async def setup(self):
async def setup(self, start_registration_loop: bool = True):
"""
Include the internal agent protocol, run startup tasks, and start background tasks.
"""
self.include(self._protocol)
self.start_message_dispenser()
await self._startup()
await self._startup(start_registration_loop)
self.start_message_receivers()
self.start_interval_tasks()

Expand Down Expand Up @@ -1329,6 +1320,8 @@ def __init__(
agents: Optional[List[Agent]] = None,
port: Optional[int] = None,
endpoint: Optional[Union[str, List[str], Dict[str, dict]]] = None,
agentverse: Optional[Union[str, Dict[str, str]]] = None,
registration_policy: Optional[BatchRegistrationPolicy] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
log_level: Union[int, str] = logging.INFO,
):
Expand All @@ -1352,41 +1345,84 @@ def __init__(
queries=self._queries,
logger=self._logger,
)
self._use_mailbox = False
self._agentverse = parse_agentverse_config(agentverse)
self._use_mailbox = self._agentverse["use_mailbox"]
almanac_api_url = f"{self._agentverse['http_prefix']}://{self._agentverse['base_url']}/v1/almanac"
self._registration_policy = (
registration_policy
or BatchAlmanacApiRegistrationPolicy(
almanac_api=almanac_api_url,
logger=self._logger,
)
)

if agents is not None:
for agent in agents:
self.add(agent)

def add(self, agent: Agent):
def _update_agent(self, agent: Agent):
"""
Add an agent to the bureau.
Update the agent to be taken over by the Bureau.

Args:
agent (Agent): The agent to be added.
agent (Agent): The agent to be updated.

"""
if agent in self._agents:
return
agent.update_loop(self._loop)
agent.update_queries(self._queries)
if agent.agentverse["use_mailbox"]:
self._use_mailbox = True
else:
agent.update_endpoints(self._endpoints)
self._server._rest_handler_map.update(agent._server._rest_handler_map)

agent._agentverse = self._agentverse
agent._logger.setLevel(self._logger.level)

def add(self, agent: Agent):
"""
Add an agent to the bureau.

Args:
agent (Agent): The agent to be added.

"""
if agent in self._agents:
return
self._update_agent(agent)
self._registration_policy.add_agent(agent)
self._agents.append(agent)

async def _schedule_registration(self):
"""
Start the batch registration loop.

"""
time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self._registration_policy.register()
except InsufficientFundsError:
time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._schedule_registration(), time_to_next_registration)
)

async def run_async(self):
"""
Run the agents managed by the bureau.

"""
tasks = [self._server.serve()]
for agent in self._agents:
await agent.setup()
await agent.setup(start_registration_loop=False)
if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None:
tasks.append(agent.mailbox_client.run())
tasks.append(self._schedule_registration())

try:
await asyncio.gather(*tasks)
Expand Down
80 changes: 78 additions & 2 deletions python/src/uagents/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ALMANAC_API_MAX_RETRIES,
ALMANAC_API_TIMEOUT_SECONDS,
ALMANAC_API_URL,
ALMANAC_CONTRACT_VERSION,
REGISTRATION_FEE,
REGISTRATION_UPDATE_INTERVAL_SECONDS,
)
Expand Down Expand Up @@ -43,6 +44,13 @@ async def register(
pass


class BatchRegistrationPolicy(ABC):
@abstractmethod
# pylint: disable=unnecessary-pass
async def register(self):
pass


class AgentRegistrationAttestation(BaseModel):
agent_address: str
protocols: List[str]
Expand Down Expand Up @@ -133,6 +141,56 @@ async def register(
await asyncio.sleep(generate_backoff_time(retry))


class BatchAlmanacApiRegistrationPolicy(AgentRegistrationPolicy):
def __init__(
self, almanac_api: Optional[str] = None, logger: Optional[logging.Logger] = None
):
self._almanac_api = almanac_api or ALMANAC_API_URL
self._attestations: List[AgentRegistrationAttestation] = []
self._logger = logger or logging.getLogger(__name__)

def add_agent(self, agent: Any):
attestation = AgentRegistrationAttestation(
agent_address=agent.address,
protocols=list(agent.protocols.keys()),
endpoints=agent._endpoints,
metadata=agent.metadata,
)
attestation.sign(agent._identity)
self._attestations.append(attestation)

async def register(self):
if not self._attestations:
return

attestations = [a.model_dump() for a in self._attestations]

async with aiohttp.ClientSession() as session:
for retry in range(ALMANAC_API_MAX_RETRIES):
try:
async with session.post(
f"{self._almanac_api}/agents/batch",
headers={"content-type": "application/json"},
data=json.dumps(attestations),
timeout=aiohttp.ClientTimeout(
total=ALMANAC_API_TIMEOUT_SECONDS
),
) as resp:
resp.raise_for_status()
self._logger.info(
"Batch registration on Almanac API successful"
)
return
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
if retry == ALMANAC_API_MAX_RETRIES - 1:
raise e
time_to_retry = generate_backoff_time(retry)
self._logger.debug(
f"Batch registration failed. Retrying in {time_to_retry} seconds..."
)
await asyncio.sleep(time_to_retry)


class LedgerBasedRegistrationPolicy(AgentRegistrationPolicy):
def __init__(
self,
Expand All @@ -151,15 +209,33 @@ def __init__(
self._almanac_contract = almanac_contract
self._logger = logger or logging.getLogger(__name__)

def check_contract_version(self):
"""
Check the version of the deployed Almanac contract and log a warning
if it is different from the supported version.
"""
deployed_version = self._almanac_contract.get_contract_version()
if deployed_version != ALMANAC_CONTRACT_VERSION:
self._logger.warning(
"Mismatch in almanac contract versions: supported (%s), deployed (%s). "
"Update uAgents to the latest version for compatibility.",
ALMANAC_CONTRACT_VERSION,
deployed_version,
)

async def register(
self,
agent_address: str,
protocols: List[str],
endpoints: List[AgentEndpoint],
metadata: Optional[Dict[str, Any]] = None,
):
# register if not yet registered or registration is about to expire
# or anything has changed from the last registration
"""
Register the agent on the Almanac contract if registration is about to expire or
the registration data has changed.
"""
self.check_contract_version()

if (
not self._almanac_contract.is_registered(agent_address)
or self._almanac_contract.get_expiry(agent_address)
Expand Down
1 change: 1 addition & 0 deletions python/src/uagents/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class AgentInfo(BaseModel):
agent_address: str
endpoints: List[AgentEndpoint]
protocols: List[str]
metadata: Optional[Dict[str, Any]] = None


class RestHandlerDetails(BaseModel):
Expand Down
Loading