diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 7355d533..56faaee9 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -48,4 +48,4 @@ jobs: if: matrix.os == 'ubuntu-24.04' with: token: ${{ secrets.CODECOV_TOKEN }} - slug: aleph-im/aleph-sdk-python + slug: aleph-im/aleph-client diff --git a/pyproject.toml b/pyproject.toml index 62045dcc..291b5ffc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ ] dependencies = [ - "aleph-sdk-python>=1.0.0rc1", + "aleph-sdk-python>=1.0.0rc2", "aleph-message>=0.4.6", "aiohttp==3.9.5", "typer==0.12.3", diff --git a/src/aleph_client/commands/instance/__init__.py b/src/aleph_client/commands/instance/__init__.py index 12e013a3..2978b15b 100644 --- a/src/aleph_client/commands/instance/__init__.py +++ b/src/aleph_client/commands/instance/__init__.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import json import logging from pathlib import Path from typing import List, Optional, Tuple, Union, cast @@ -9,6 +10,7 @@ from aiohttp import ClientResponseError, ClientSession from aleph.sdk import AlephHttpClient, AuthenticatedAlephHttpClient from aleph.sdk.account import _load_account +from aleph.sdk.client.vm_client import VmClient from aleph.sdk.conf import settings as sdk_settings from aleph.sdk.exceptions import ( ForgottenMessageError, @@ -16,7 +18,7 @@ MessageNotFoundError, ) from aleph.sdk.query.filters import MessageFilter -from aleph.sdk.types import AccountFromPrivateKey, StorageEnum +from aleph.sdk.types import Account, AccountFromPrivateKey, StorageEnum from aleph_message.models import InstanceMessage, StoreMessage from aleph_message.models.base import Chain, MessageType from aleph_message.models.execution.base import Payment, PaymentType @@ -29,6 +31,7 @@ from aleph_client.commands import help_strings from aleph_client.commands.instance.display import fetch_crn_info +from aleph_client.commands.node import NodeInfo, _fetch_nodes from aleph_client.commands.utils import ( get_or_prompt_volumes, setup_logging, @@ -36,7 +39,7 @@ validated_prompt, ) from aleph_client.conf import settings -from aleph_client.utils import AsyncTyper +from aleph_client.utils import AsyncTyper, fetch_json logger = logging.getLogger(__name__) app = AsyncTyper(no_args_is_help=True) @@ -254,18 +257,31 @@ async def delete( typer.echo(f"Instance {item_hash} has been deleted. It will be removed by the scheduler in a few minutes.") -async def _get_ipv6_address(message: InstanceMessage) -> Tuple[str, str]: +async def _get_ipv6_address(message: InstanceMessage, node_list: NodeInfo) -> Tuple[str, str]: async with ClientSession() as session: try: - resp = await session.get(f"https://scheduler.api.aleph.cloud/api/v0/allocation/{message.item_hash}") - resp.raise_for_status() - status = await resp.json() - return status["vm_hash"], status["vm_ipv6"] - except ClientResponseError: + if not message.content.payment: + # Fetch from the scheduler API directly if no payment + status = await fetch_json( + session, + f"https://scheduler.api.aleph.cloud/api/v0/allocation/{message.item_hash}", + ) + return status["vm_hash"], status["vm_ipv6"] + for node in node_list.nodes: + if node["stream_reward"] == message.content.payment.receiver: + + # Fetch from the CRN API if payment + executions = await fetch_json(session, f"{node['address']}about/executions/list") + if message.item_hash in executions: + ipv6_address = executions[message.item_hash]["networking"]["ipv6"] + return message.item_hash, ipv6_address + return message.item_hash, "Not available (yet)" + except ClientResponseError as e: + return message.item_hash, f"Not available (yet), server not responding : {e}" -async def _show_instances(messages: List[InstanceMessage]): +async def _show_instances(messages: List[InstanceMessage], node_list: NodeInfo): table = Table(box=box.SIMPLE_HEAVY) table.add_column("Item Hash", style="cyan") table.add_column("Vcpus", style="magenta") @@ -273,7 +289,7 @@ async def _show_instances(messages: List[InstanceMessage]): table.add_column("Disk size", style="magenta") table.add_column("IPv6 address", style="yellow") - scheduler_responses = dict(await asyncio.gather(*[_get_ipv6_address(message) for message in messages])) + scheduler_responses = dict(await asyncio.gather(*[_get_ipv6_address(message, node_list) for message in messages])) for message in messages: table.add_row( @@ -320,4 +336,129 @@ async def list( else: # Since we filtered on message type, we can safely cast as InstanceMessage. messages = cast(List[InstanceMessage], resp.messages) - await _show_instances(messages) + resource_nodes: NodeInfo = await _fetch_nodes() + await _show_instances(messages, resource_nodes) + + +@app.command() +async def expire( + vm_id: str, + domain: str, + private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """expire an instance""" + + setup_logging(debug) + account = _load_account(private_key, private_key_file) + + async with VmClient(account, domain) as manager: + status, result = await manager.expire_instance(vm_id=vm_id) + if status != 200: + typer.echo(f"Status : {status}") + typer.echo(result) + + +@app.command() +async def erase( + vm_id: str, + domain: str, + private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """erase an instance""" + + setup_logging(debug) + + account = _load_account(private_key, private_key_file) + + async with VmClient(account, domain) as manager: + status, result = await manager.erase_instance(vm_id=vm_id) + if status != 200: + typer.echo(f"Status : {status}") + typer.echo(result) + + +@app.command() +async def reboot( + vm_id: str, + domain: str, + private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """reboot an instance""" + + setup_logging(debug) + + account = _load_account(private_key, private_key_file) + + async with VmClient(account, domain) as manager: + status, result = await manager.reboot_instance(vm_id=vm_id) + if status != 200: + typer.echo(f"Status : {status}") + typer.echo(result) + + +@app.command() +async def allocate( + vm_id: str, + domain: str, + private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Tell the CRN to start an instance with Pay as you go""" + + setup_logging(debug) + + account = _load_account(private_key, private_key_file) + + async with VmClient(account, domain) as manager: + status, result = await manager.start_instance(vm_id=vm_id) + if status != 200: + typer.echo(f"Status : {status}") + typer.echo(result) + + +@app.command() +async def logs( + vm_id: str, + domain: str, + private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """logs of the instance""" + setup_logging(debug) + + account = _load_account(private_key, private_key_file) + + async with VmClient(account, domain) as manager: + async for log in manager.get_logs(vm_id=vm_id): + log_data = json.loads(log) + if "message" in log_data: + typer.echo(log_data["message"]) + + +@app.command() +async def stop( + vm_id: str, + domain: str, + private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Stop an instance""" + + setup_logging(debug) + + account = _load_account(private_key, private_key_file) + + async with VmClient(account, domain) as manager: + status, result = await manager.stop_instance(vm_id=vm_id) + if status != 200: + typer.echo(f"Status : {status}") + typer.echo(result) diff --git a/src/aleph_client/utils.py b/src/aleph_client/utils.py index 3aef21c0..c8f78aba 100644 --- a/src/aleph_client/utils.py +++ b/src/aleph_client/utils.py @@ -9,6 +9,7 @@ from zipfile import BadZipFile, ZipFile import typer +from aiohttp import ClientResponseError, ClientSession from aleph.sdk.types import GenericMessage from aleph_message.models.base import MessageType from aleph_message.models.execution.base import Encoding @@ -17,7 +18,6 @@ logger = logging.getLogger(__name__) - try: import magic except ImportError: @@ -85,3 +85,9 @@ def callback(self, *args, **kwargs): def command(self, *args, **kwargs): decorator = super().command(*args, **kwargs) return partial(self.maybe_run_async, decorator) + + +async def fetch_json(session: ClientSession, url: str) -> dict: + async with session.get(url) as resp: + resp.raise_for_status() + return await resp.json()