diff --git a/src/aleph_client/__main__.py b/src/aleph_client/__main__.py index c082ab69..60775ed6 100644 --- a/src/aleph_client/__main__.py +++ b/src/aleph_client/__main__.py @@ -23,7 +23,9 @@ name="message", help="Manage messages (post, amend, watch and forget) on aleph.im & twentysix.cloud", ) -app.add_typer(aggregate.app, name="aggregate", help="Manage aggregate messages on aleph.im & twentysix.cloud") +app.add_typer( + aggregate.app, name="aggregate", help="Manage aggregate messages and permissions on aleph.im & twentysix.cloud" +) app.add_typer(files.app, name="file", help="Manage files (upload and pin on IPFS) on aleph.im & twentysix.cloud") app.add_typer(program.app, name="program", help="Manage programs (micro-VMs) on aleph.im & twentysix.cloud") app.add_typer(instance.app, name="instance", help="Manage instances (VMs) on aleph.im & twentysix.cloud") diff --git a/src/aleph_client/commands/aggregate.py b/src/aleph_client/commands/aggregate.py index 5b1cd6f1..322529e5 100644 --- a/src/aleph_client/commands/aggregate.py +++ b/src/aleph_client/commands/aggregate.py @@ -1,79 +1,154 @@ from __future__ import annotations -import json +import inspect +import logging +from json import JSONDecodeError, dumps, loads from pathlib import Path from typing import Optional import typer +from aiohttp import ClientResponseError, ClientSession from aleph.sdk.account import _load_account from aleph.sdk.client import AuthenticatedAlephHttpClient from aleph.sdk.conf import settings -from aleph.sdk.query.filters import MessageFilter from aleph.sdk.types import AccountFromPrivateKey from aleph.sdk.utils import extended_json_encoder -from aleph_message.models.base import MessageType +from aleph_message.models import Chain, MessageType +from aleph_message.status import MessageStatus +from rich.console import Console +from rich.panel import Panel +from rich.text import Text +from typing_extensions import Annotated from aleph_client.commands import help_strings from aleph_client.commands.utils import setup_logging -from aleph_client.utils import AsyncTyper +from aleph_client.utils import AsyncTyper, sanitize_url +logger = logging.getLogger(__name__) app = AsyncTyper(no_args_is_help=True) +def is_same_context(): + caller = inspect.currentframe().f_back.f_back # type: ignore + current_file = __file__ + caller_file = caller.f_code.co_filename # type: ignore + return current_file == caller_file + + @app.command() async def forget( - key: str = typer.Argument(..., help="Aggregate item hash to be removed."), - reason: Optional[str] = typer.Option(None, help="A description of why the messages are being forgotten"), - channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + key: Annotated[str, typer.Argument(help="Aggregate key to remove")], + subkeys: Annotated[ + Optional[str], + typer.Option( + help="Remove specified subkey(s) only. Must be a comma separated list. E.g. `key1` or `key1,key2`", + ), + ] = None, + address: Annotated[Optional[str], typer.Option(help=help_strings.TARGET_ADDRESS)] = None, + channel: Annotated[Optional[str], typer.Option(help=help_strings.CHANNEL)] = settings.DEFAULT_CHANNEL, + inline: Annotated[bool, typer.Option(help="inline")] = False, + sync: Annotated[bool, typer.Option(help="Sync response")] = False, + private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING, + private_key_file: Annotated[ + Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE) + ] = settings.PRIVATE_KEY_FILE, + print_message: bool = False, + verbose: bool = True, debug: bool = False, -): - """Forget all the messages composing an aggregate.""" +) -> bool: + """Delete an aggregate by key or subkeys""" setup_logging(debug) account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + address = account.get_address() if address is None else address + + if key == "security" and not is_same_context(): + typer.echo(help_strings.AGGREGATE_SECURITY_KEY_PROTECTED) + raise typer.Exit(1) async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: - message_response = await client.get_messages( - message_filter=MessageFilter( - addresses=[account.get_address()], - message_types=[MessageType.aggregate], - content_keys=[key], + content = None + if subkeys: + content = {sk: None for sk in subkeys.split(",")} + else: + aggregates = await list_aggregates( + address=address, private_key=private_key, private_key_file=private_key_file, verbose=False, debug=debug ) + if aggregates and key in aggregates.keys(): + content = {k: None for k in aggregates.get(key).keys()} + else: + typer.echo(f"Aggregate `{key}` not found") + raise typer.Exit(1) + + message, status = await client.create_aggregate( + key=key, + content=content, + channel=channel, + sync=sync, + inline=inline, + address=address, ) - hash_list = [message.item_hash for message in message_response.messages] + content = f"{message.json(indent=4)}" - await client.forget(hashes=hash_list, reason=reason, channel=channel) + if status != MessageStatus.REJECTED: + if print_message: + typer.echo(content) + if verbose: + label_subkeys = f" -> {subkeys}" if subkeys else "" + typer.echo(f"Aggregate `{key}{label_subkeys}` has been deleted") + return True + elif verbose: + typer.echo(f"Aggregate deletion has been rejected:\n{content}") + return False @app.command() async def post( - key: str = typer.Argument(..., help="Aggregate key to be created."), - content: str = typer.Argument(..., help="Aggregate content (ex : {'c': 3, 'd': 4})"), - address: Optional[str] = typer.Option(default=None, help="address"), - channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - inline: bool = typer.Option(False, help="inline"), - sync: bool = typer.Option(False, help="Sync response"), - private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + key: Annotated[str, typer.Argument(help="Aggregate key to create/update")], + content: Annotated[ + str, + typer.Argument( + help='Aggregate content, in json format and between single quotes. E.g. \'{"a": 1, "b": 2}\'. If a subkey is provided, also allow to pass a string content between quotes', + ), + ], + subkey: Annotated[Optional[str], typer.Option(help="Specified subkey where the content will be replaced")] = None, + address: Annotated[Optional[str], typer.Option(help=help_strings.TARGET_ADDRESS)] = None, + channel: Annotated[Optional[str], typer.Option(help=help_strings.CHANNEL)] = settings.DEFAULT_CHANNEL, + inline: Annotated[bool, typer.Option(help="inline")] = False, + sync: Annotated[bool, typer.Option(help="Sync response")] = False, + private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING, + private_key_file: Annotated[ + Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE) + ] = settings.PRIVATE_KEY_FILE, + print_message: bool = False, + verbose: bool = True, debug: bool = False, -): - """Create or Update aggregate""" +) -> bool: + """Create or update an aggregate by key or subkey""" setup_logging(debug) account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + address = account.get_address() if address is None else address - try: - content_dict = json.loads(content) - except json.JSONDecodeError: - typer.echo("Invalid JSON for content. Please provide valid JSON.") + if key == "security" and not is_same_context(): + typer.echo(help_strings.AGGREGATE_SECURITY_KEY_PROTECTED) raise typer.Exit(1) + content_dict: dict | str = content + try: + content_dict = loads(content) + except JSONDecodeError as e: + if not subkey: + typer.echo("Invalid JSON for content. Please provide valid JSON") + raise typer.Exit(1) from e + + if subkey: + content_dict = {subkey: content_dict} + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: - message, _ = await client.create_aggregate( + message, status = await client.create_aggregate( key=key, content=content_dict, channel=channel, @@ -81,31 +156,335 @@ async def post( inline=inline, address=address, ) - log_message = json.dumps(message.dict(), indent=4, default=extended_json_encoder) - typer.echo(log_message) + content = f"{message.json(indent=4)}" + + if status != MessageStatus.REJECTED: + if print_message: + typer.echo(content) + if verbose: + label_subkey = f" -> {subkey}" if subkey else "" + typer.echo(f"Aggregate `{key}{label_subkey}` has been created/updated") + return True + elif verbose: + typer.echo(f"Aggregate creation/update has been rejected:\n{content}") + return False @app.command() async def get( - key: str = typer.Argument(..., help="Aggregate key to be fetched."), - address: Optional[str] = typer.Option(default=None, help="Address"), - private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + key: Annotated[str, typer.Argument(help="Aggregate key to fetch")], + subkeys: Annotated[ + Optional[str], + typer.Option( + help="Fetch specified subkey(s) only. Must be a comma separated list. E.g. `key1` or `key1,key2`", + ), + ] = None, + address: Annotated[Optional[str], typer.Option(help=help_strings.TARGET_ADDRESS)] = None, + private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING, + private_key_file: Annotated[ + Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE) + ] = settings.PRIVATE_KEY_FILE, + verbose: bool = True, + debug: bool = False, +) -> Optional[dict]: + """Fetch an aggregate by key or subkeys""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + address = account.get_address() if address is None else address + + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: + aggregates = None + try: + aggregates = await client.fetch_aggregate(address=address, key=key) + if subkeys: + aggregates = {k: v for k, v in aggregates.items() if k in subkeys.split(",")} + except ClientResponseError: + pass + + if verbose: + if not aggregates: + typer.echo("No aggregate found for the given key or subkeys") + else: + typer.echo(dumps(aggregates, indent=4, default=extended_json_encoder)) + + return aggregates + + +@app.command(name="list") +async def list_aggregates( + address: Annotated[Optional[str], typer.Option(help=help_strings.TARGET_ADDRESS)] = None, + private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING, + private_key_file: Annotated[ + Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE) + ] = settings.PRIVATE_KEY_FILE, + json: Annotated[bool, typer.Option(help="Print as json instead of rich table")] = False, + verbose: bool = True, + debug: bool = False, +) -> Optional[dict]: + """Display all aggregates associated to an account""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + address = account.get_address() if address is None else address + + aggr_link = f"{sanitize_url(settings.API_HOST)}/api/v0/aggregates/{address}.json" + async with ClientSession() as session: + aggregates = None + async with session.get(aggr_link) as resp: + if resp.status == 200: + aggregates = (await resp.json())["data"] + + if verbose: + if not aggregates: + typer.echo(f"Address: {address}\n\nNo aggregate data found\n") + elif json: + typer.echo(dumps(aggregates, indent=4, default=extended_json_encoder)) + else: + infos = [ + Text.from_markup(f"Address: [bright_cyan]{address}[/bright_cyan]\n\nKeys:"), + ] + for key, value in aggregates.items(): + infos.append( + Text.from_markup(f"\n↳ [orange1]{key}[/orange1]:"), + ) + if type(value) == dict and any([v is None for _, v in value.items()]): + infos.append( + Text.from_markup("\n[gray50]x empty[/gray50]"), + ) + else: + for k, v in value.items(): + infos.append( + Text.from_markup( + f"\n• [orchid]{k}[/orchid]: {v if type(v) is str else dumps(v, indent=4)}" + ), + ) + console = Console() + console.print( + Panel( + Text.assemble(*infos), + title="Aggregates", + border_style="bright_cyan", + expand=False, + title_align="left", + ) + ) + + return aggregates + + +@app.command() +async def authorize( + address: Annotated[str, typer.Argument(help=help_strings.TARGET_ADDRESS)], + chain: Annotated[Optional[Chain], typer.Option(help="Only on specified chain")] = None, + types: Annotated[ + Optional[str], typer.Option(help="Only for specified message types (comma separated list)") + ] = None, + channels: Annotated[Optional[str], typer.Option(help="Only on specified channels (comma separated list)")] = None, + post_types: Annotated[ + Optional[str], typer.Option(help="Only for specified post types (comma separated list)") + ] = None, + aggregate_keys: Annotated[ + Optional[str], typer.Option(help="Only for specified aggregate keys (comma separated list)") + ] = None, + private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING, + private_key_file: Annotated[ + Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE) + ] = settings.PRIVATE_KEY_FILE, + print_message: bool = False, + verbose: bool = True, debug: bool = False, ): - """Fetch an aggregate by key and content.""" + """Grant specific publishing permissions to an address to act on behalf of this account""" setup_logging(debug) account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - # if no address we load current account as a private key + data = await get( + key="security", + subkeys="authorizations", + address=account.get_address(), + private_key=private_key, + private_key_file=private_key_file, + verbose=False, + debug=debug, + ) + + authorizations = None + if data: + authorizations = data.get("authorizations") + new_auth: dict = {"address": address} + if chain: + new_auth["chain"] = chain.value + if types: + valid_types = [] + for t in types.split(","): + try: + valid_types.append(MessageType(t.upper()).value) + except ValueError as e: + print( + f"Invalid value passed into `--types`: {t}\nValid values: {', '.join([e.value for e in MessageType])}" + ) + raise typer.Exit(1) from e + new_auth["types"] = valid_types + if channels: + new_auth["channels"] = channels.split(",") + if post_types: + new_auth["post_types"] = post_types.split(",") + if aggregate_keys: + new_auth["aggregate_keys"] = aggregate_keys.split(",") + authorizations.append(new_auth) + if authorizations: + success = await post( + key="security", + subkey="authorizations", + content=dumps(authorizations), + address=None, + channel=settings.DEFAULT_CHANNEL, + inline=True, + sync=True, + private_key=private_key, + private_key_file=private_key_file, + print_message=print_message, + verbose=False, + debug=debug, + ) + if verbose: + if success: + typer.echo(f"Permissions has been added for {address}") + else: + typer.echo(f"Failed to add permissions for {address}") + + +@app.command() +async def revoke( + address: Annotated[str, typer.Argument(help=help_strings.TARGET_ADDRESS)], + private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING, + private_key_file: Annotated[ + Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE) + ] = settings.PRIVATE_KEY_FILE, + print_message: bool = False, + verbose: bool = True, + debug: bool = False, +): + """Revoke all publishing permissions from an address acting on behalf of this account""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + data = await get( + key="security", + subkeys="authorizations", + address=account.get_address(), + private_key=private_key, + private_key_file=private_key_file, + verbose=False, + debug=debug, + ) + + authorizations = None + if data: + old_authorizations = data.get("authorizations") + authorizations = [item for item in old_authorizations if item.get("address", "") != address] + if old_authorizations != authorizations: + success = await post( + key="security", + subkey="authorizations", + content=dumps(authorizations), + address=None, + channel=settings.DEFAULT_CHANNEL, + inline=True, + sync=True, + private_key=private_key, + private_key_file=private_key_file, + print_message=print_message, + verbose=False, + debug=debug, + ) + if verbose: + if success: + typer.echo(f"Permissions has been deleted for {address}") + else: + typer.echo(f"Failed to delete permissions for {address}") + elif verbose: + typer.echo(f"No permission found for {address}. Ignored") + + +@app.command() +async def permissions( + address: Annotated[Optional[str], typer.Option(help=help_strings.TARGET_ADDRESS)] = None, + private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING, + private_key_file: Annotated[ + Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE) + ] = settings.PRIVATE_KEY_FILE, + json: Annotated[bool, typer.Option(help="Print as json instead of rich table")] = False, + verbose: bool = True, + debug: bool = False, +) -> Optional[dict]: + """Display all permissions emitted by an account""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) address = account.get_address() if address is None else address - async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: - aggregates = await client.fetch_aggregate(address=address, key=key) + data = await get( + key="security", + subkeys="authorizations", + address=address, + private_key=private_key, + private_key_file=private_key_file, + verbose=False, + debug=debug, + ) - if aggregates: - typer.echo(json.dumps(aggregates, indent=4, default=extended_json_encoder)) - else: - typer.echo("No aggregates found for the given key and content.") + authorizations = None + if data: + authorizations = data.get("authorizations") + if authorizations: + if json: + typer.echo(dumps(authorizations, indent=4, default=extended_json_encoder)) + elif verbose: + infos = [ + Text.from_markup(f"Address: [bright_cyan]{address}[/bright_cyan]\n\nAuthorizations by address:"), + ] + auth_addresses: dict = {} + for auth in authorizations: + addr = auth["address"] + auth_address = auth_addresses[addr] = auth_addresses.get(addr, []) + keys = ["chain", "channels", "types", "post_types", "aggregate_keys"] + item = {key: auth.get(key) for key in keys if auth.get(key) is not None} + auth_address.append(item) + for addr, allowances in auth_addresses.items(): + infos.append( + Text.from_markup(f"\n↳ [orange1]{addr}[/orange1]"), + ) + for item in allowances: + display_item = "[green]all[/green]" + if item: + display_item = ", ".join( + [ + f"[orchid]{key}([white]{value if type(value) != list else ', '.join(value)}[/white])[/orchid]" + for key, value in item.items() + ] + ) + infos.append(Text.from_markup(f"\n• {display_item}")) + + console = Console() + console.print( + Panel( + Text.assemble(*infos), + title="Permissions", + border_style="bright_cyan", + expand=False, + title_align="left", + ) + ) + if not authorizations and verbose: + typer.echo(f"Address: {address}\n\nNo permission data found\n") + + return authorizations diff --git a/src/aleph_client/commands/help_strings.py b/src/aleph_client/commands/help_strings.py index fbdc48f7..79d7e2b2 100644 --- a/src/aleph_client/commands/help_strings.py +++ b/src/aleph_client/commands/help_strings.py @@ -5,7 +5,7 @@ REF = "Item hash of the message to update" SIGNABLE_MESSAGE = "Message to sign" CUSTOM_DOMAIN_TARGET_TYPES = "IPFS|PROGRAM|INSTANCE" -CUSTOM_DOMAIN_OWNER_ADDRESS = "Owner address, default current account" +CUSTOM_DOMAIN_OWNER_ADDRESS = "Owner address. Defaults to current account address" CUSTOM_DOMAIN_NAME = "Domain name. ex: aleph.im" CUSTOM_DOMAIN_ITEM_HASH = "Item hash" SKIP_VOLUME = "Skip prompt to attach more volumes" @@ -62,3 +62,7 @@ PROGRAM_UPDATABLE = "Allow program updates. By default, only the source code can be modified without requiring redeployement (same item hash). When enabled (set to True), this option allows to update any other field. However, such modifications will require a program redeployment (new item hash)" PROGRAM_KEEP_CODE = "Keep the source code intact instead of deleting it" PROGRAM_KEEP_PREV = "Keep the previous program intact instead of deleting it" +TARGET_ADDRESS = "Target address. Defaults to current account address" +AGGREGATE_SECURITY_KEY_PROTECTED = ( + "The aggregate key `security` is protected. Use `aleph aggregate [allow|revoke]` to manage it." +) diff --git a/tests/unit/mocks.py b/tests/unit/mocks.py index 3da4c8a8..5fa6a98f 100644 --- a/tests/unit/mocks.py +++ b/tests/unit/mocks.py @@ -6,6 +6,8 @@ from eth_utils.currency import to_wei from pydantic import BaseModel +from aleph_client.commands.node import NodeInfo + # Change to Aleph testnet settings.API_HOST = "https://api.twentysix.testnet.network" @@ -43,3 +45,83 @@ def create_mock_load_account(): mock_loader.return_value.update_flow = AsyncMock(return_value=FAKE_FLOW_HASH) mock_loader.return_value.delete_flow = AsyncMock(return_value=FAKE_FLOW_HASH) return mock_loader + + +async def mock_fetch_nodes() -> NodeInfo: + NODE_AGGREGATE = { + "address": "0xa1B3bb7d2332383D96b7796B908fB7f7F3c2Be10", + "data": { + "corechannel": { + "nodes": [ + { + "hash": "37bcf3b0de2b95168557dccd757e3fb9310f6182eb35173dd929e535dc8d18cc", + "name": "Aleph.Cloud.One", + "time": 1608436347.148, + "owner": "0x13CA00cD3BB1ded822AFF447a6fEC5ed9DaeCD65", + "score": 0.95672722675568, + "banner": "", + "locked": False, + "reward": "0x462b25B706688a7174d675e4787d2DBEE72aB71f", + "status": "active", + "address": "", + "manager": "", + "picture": "81410c35ea8d31569011c091d7c780e83b8e8d44bf292e6f8bf6316b162dda9e", + "stakers": { + "0x160f9C91858940BEBA3bacAD2Fc1c4D32635913b": 21359.3722761429, + "0x161F0F8d70971EB7fE65Fa3558e48442c338EBde": 16778.2001223581, + "0x2BACCdD22C27F84DE8a8EeC0aB7B2a4766E7C02d": 24072.424430756, + }, + "has_bonus": True, + "authorized": [], + "description": "Supporting Aleph from NULS POCM through to running a node. Moshe is a genius!\n\nPowered by Node Forge.", + "performance": 0.915326986415614, + "multiaddress": "/ip4/51.79.82.13/tcp/4025/p2p/QmfKB9q89aCX3wqkiqgis9SHfx2MznGd6LTsqektdKUBg5", + "total_staked": 1032817.18542335, + "score_updated": True, + "stream_reward": "", + "inactive_since": None, + "resource_nodes": [ + "d1401d7f2e4487b1b956acf8de6a48de5bc5ed9637516f901dfe4eb9f74ac214", + "3b06f6fb75902821eeeddf713837f6a2d38aedff8a7c66c7fa3192b461df6e6a", + "3fe5eecb0dc99be68e197d1ccf037aa4274d30b0f94f955cf765545bebad33c3", + "179317d603edf7c005286dcb79968be294218fdd73ccee3bef719006a0db664c", + "936d1ac993deef3b09c06674e05aa742f4270ec337b1d60ec8021fccaf8f6479", + ], + "decentralization": 0.534862998440633, + "registration_url": "", + "terms_and_conditions": "", + }, + ], + "resource_nodes": [ + { + "hash": "cb764fe80f76cd5ec395952263fcbf0f5d2cc0dfe1ed98c90e13734b3fb2df3e", + "name": "Aleph.im Confidential Host 1", + "time": 1723565390.963, + "type": "compute", + "owner": "0xFeF2b33478f906eDE5ee96110b2342861cF1569A", + "score": 0.931334273816828, + "banner": "", + "locked": False, + "parent": "c5a1295c20d5fb1df638e4ff7dee2239ab88c2843899bd26e4b0200a9f5ca82b", + "reward": "0xFeF2b33478f906eDE5ee96110b2342861cF1569A", + "status": "linked", + "address": "https://coco-1.crn.aleph.sh/", + "manager": "", + "picture": "", + "authorized": "", + "description": "", + "performance": 0.867383529585918, + "multiaddress": "", + "score_updated": True, + "stream_reward": "0xFeF2b33478f906eDE5ee96110b2342861cF1569A", + "inactive_since": None, + "decentralization": 0.991886443254677, + "registration_url": "", + "terms_and_conditions": "", + } + ], + } + }, + "info": {}, + } + return NodeInfo(**NODE_AGGREGATE) diff --git a/tests/unit/test_aggregate.py b/tests/unit/test_aggregate.py new file mode 100644 index 00000000..834b8e49 --- /dev/null +++ b/tests/unit/test_aggregate.py @@ -0,0 +1,218 @@ +from __future__ import annotations + +import contextlib +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import aiohttp +import pytest + +from aleph_client.commands.aggregate import ( + authorize, + forget, + get, + list_aggregates, + permissions, + post, + revoke, +) + +from .mocks import FAKE_ADDRESS_EVM, create_mock_load_account + +FAKE_AGGREGATE_DATA = dict( + AI=dict( + subscription="premium", + models=dict( + chatgpt=True, + claude=False, + libertai=True, + ), + active=True, + ), + security=dict(authorizations=[dict(address=FAKE_ADDRESS_EVM, types=["POST"])]), +) + + +@contextlib.asynccontextmanager +async def mock_client_session_get(self, aggr_link): + yield AsyncMock( + status=200, + raise_for_status=MagicMock(), + json=AsyncMock(return_value=dict(data=FAKE_AGGREGATE_DATA)), + ) + + +def create_mock_auth_client(return_fetch=FAKE_AGGREGATE_DATA): + mock_auth_client = AsyncMock( + create_aggregate=AsyncMock(return_value=(MagicMock(), "processed")), + fetch_aggregate=AsyncMock(return_value=return_fetch), + ) + mock_auth_client_class = MagicMock() + mock_auth_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_auth_client) + return mock_auth_client_class, mock_auth_client + + +@pytest.mark.parametrize( + ids=["by_key_only", "by_key_and_subkey", "by_key_and_subkeys"], + argnames="args", + argvalues=[ + dict(key="AI"), # by key only + dict(key="AI", subkeys="models"), # with subkey + dict(key="AI", subkeys="models,subscription"), # with subkeys + ], +) +@pytest.mark.asyncio +async def test_forget(capsys, args): + mock_load_account = create_mock_load_account() + mock_list_aggregates = AsyncMock(return_value=FAKE_AGGREGATE_DATA) + mock_auth_client_class, mock_auth_client = create_mock_auth_client() + + @patch("aleph_client.commands.aggregate._load_account", mock_load_account) + @patch("aleph_client.commands.aggregate.list_aggregates", mock_list_aggregates) + @patch("aleph_client.commands.aggregate.AuthenticatedAlephHttpClient", mock_auth_client_class) + async def run_forget(aggr_spec): + print() # For better display when pytest -v -s + return await forget(**aggr_spec) + + result = await run_forget(args) + assert result is True + mock_load_account.assert_called_once() + if "subkeys" not in args: + mock_list_aggregates.assert_called_once() + mock_auth_client.create_aggregate.assert_called_once() + captured = capsys.readouterr() + assert captured.out.endswith("has been deleted\n") + + +@pytest.mark.parametrize( + ids=["by_key_only", "by_key_and_subkey"], + argnames="args", + argvalues=[ + dict(key="AI", content='{"test": "ok"}'), # by key only + dict(key="AI", subkey="models", content='{"chatgpt": true, "claude": true, "libertai": true}'), # with subkey + ], +) +@pytest.mark.asyncio +async def test_post(capsys, args): + mock_load_account = create_mock_load_account() + mock_auth_client_class, mock_auth_client = create_mock_auth_client() + + @patch("aleph_client.commands.aggregate._load_account", mock_load_account) + @patch("aleph_client.commands.aggregate.AuthenticatedAlephHttpClient", mock_auth_client_class) + async def run_post(aggr_spec): + print() # For better display when pytest -v -s + return await post(**aggr_spec) + + result = await run_post(args) + assert result is True + mock_load_account.assert_called_once() + mock_auth_client.create_aggregate.assert_called_once() + captured = capsys.readouterr() + assert captured.out.endswith("has been created/updated\n") + + +@pytest.mark.parametrize( + ids=["by_key_only", "by_key_and_subkey", "by_key_and_subkeys"], + argnames=["args", "expected"], + argvalues=[ + (dict(key="AI"), FAKE_AGGREGATE_DATA["AI"]), # by key only + ( # with subkey + dict(key="AI", subkeys="subscription"), + dict(subscription=FAKE_AGGREGATE_DATA["AI"]["subscription"]), # type: ignore + ), + ( # with subkeys + dict(key="AI", subkeys="subscription,models"), + dict(subscription=FAKE_AGGREGATE_DATA["AI"]["subscription"], models=FAKE_AGGREGATE_DATA["AI"]["models"]), # type: ignore + ), + ], +) +@pytest.mark.asyncio +async def test_get(capsys, args, expected): + mock_load_account = create_mock_load_account() + mock_auth_client_class, mock_auth_client = create_mock_auth_client(return_fetch=FAKE_AGGREGATE_DATA["AI"]) + + @patch("aleph_client.commands.aggregate._load_account", mock_load_account) + @patch("aleph_client.commands.aggregate.AuthenticatedAlephHttpClient", mock_auth_client_class) + async def run_get(aggr_spec): + print() # For better display when pytest -v -s + return await get(**aggr_spec) + + aggregate = await run_get(args) + mock_load_account.assert_called_once() + mock_auth_client.fetch_aggregate.assert_called_once() + captured = capsys.readouterr() + assert aggregate == expected and expected == json.loads(captured.out) + + +@pytest.mark.asyncio +async def test_list_aggregates(): + mock_load_account = create_mock_load_account() + + @patch("aleph_client.commands.aggregate._load_account", mock_load_account) + @patch.object(aiohttp.ClientSession, "get", mock_client_session_get) + async def run_list_aggregates(): + print() # For better display when pytest -v -s + return await list_aggregates(address=FAKE_ADDRESS_EVM) + + aggregates = await run_list_aggregates() + mock_load_account.assert_called_once() + assert aggregates == FAKE_AGGREGATE_DATA + + +@pytest.mark.asyncio +async def test_authorize(capsys): + mock_load_account = create_mock_load_account() + mock_get = AsyncMock(return_value=FAKE_AGGREGATE_DATA["security"]) + mock_post = AsyncMock(return_value=True) + + @patch("aleph_client.commands.aggregate._load_account", mock_load_account) + @patch("aleph_client.commands.aggregate.get", mock_get) + @patch("aleph_client.commands.aggregate.post", mock_post) + async def run_authorize(): + print() # For better display when pytest -v -s + return await authorize(address=FAKE_ADDRESS_EVM, types="PROGRAM,FORGET") + + await run_authorize() + mock_load_account.assert_called_once() + mock_get.assert_called_once() + mock_post.assert_called_once() + captured = capsys.readouterr() + assert captured.out.endswith(f"Permissions has been added for {FAKE_ADDRESS_EVM}\n") + + +@pytest.mark.asyncio +async def test_revoke(capsys): + mock_load_account = create_mock_load_account() + mock_get = AsyncMock(return_value=FAKE_AGGREGATE_DATA["security"]) + mock_post = AsyncMock(return_value=True) + + @patch("aleph_client.commands.aggregate._load_account", mock_load_account) + @patch("aleph_client.commands.aggregate.get", mock_get) + @patch("aleph_client.commands.aggregate.post", mock_post) + async def run_revoke(): + print() # For better display when pytest -v -s + return await revoke(address=FAKE_ADDRESS_EVM) + + await run_revoke() + mock_load_account.assert_called_once() + mock_get.assert_called_once() + mock_post.assert_called_once() + captured = capsys.readouterr() + assert captured.out.endswith(f"Permissions has been deleted for {FAKE_ADDRESS_EVM}\n") + + +@pytest.mark.asyncio +async def test_permissions(): + mock_load_account = create_mock_load_account() + mock_get = AsyncMock(return_value=FAKE_AGGREGATE_DATA["security"]) + + @patch("aleph_client.commands.aggregate._load_account", mock_load_account) + @patch("aleph_client.commands.aggregate.get", mock_get) + async def run_permissions(): + print() # For better display when pytest -v -s + return await permissions(address=FAKE_ADDRESS_EVM, json=True) + + authorizations = await run_permissions() + mock_load_account.assert_called_once() + mock_get.assert_called_once() + assert authorizations == FAKE_AGGREGATE_DATA["security"]["authorizations"] # type: ignore