Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ jobs:
if: matrix.os == 'ubuntu-24.04'
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: aleph-im/aleph-sdk-python
slug: aleph-im/aleph-client
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classifiers = [
]

dependencies = [
"aleph-sdk-python>=1.0.0rc1",
"aleph-sdk-python>=1.0.0rc2",
"aleph-message>=0.4.6",
"aiohttp==3.9.5",
"typer==0.12.3",
Expand Down
163 changes: 152 additions & 11 deletions src/aleph_client/commands/instance/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import json
import logging
from pathlib import Path
from typing import List, Optional, Tuple, Union, cast
Expand All @@ -9,14 +10,15 @@
from aiohttp import ClientResponseError, ClientSession
from aleph.sdk import AlephHttpClient, AuthenticatedAlephHttpClient
from aleph.sdk.account import _load_account
from aleph.sdk.client.vm_client import VmClient
from aleph.sdk.conf import settings as sdk_settings
from aleph.sdk.exceptions import (
ForgottenMessageError,
InsufficientFundsError,
MessageNotFoundError,
)
from aleph.sdk.query.filters import MessageFilter
from aleph.sdk.types import AccountFromPrivateKey, StorageEnum
from aleph.sdk.types import Account, AccountFromPrivateKey, StorageEnum
from aleph_message.models import InstanceMessage, StoreMessage
from aleph_message.models.base import Chain, MessageType
from aleph_message.models.execution.base import Payment, PaymentType
Expand All @@ -29,14 +31,15 @@

from aleph_client.commands import help_strings
from aleph_client.commands.instance.display import fetch_crn_info
from aleph_client.commands.node import NodeInfo, _fetch_nodes
from aleph_client.commands.utils import (
get_or_prompt_volumes,
setup_logging,
validated_int_prompt,
validated_prompt,
)
from aleph_client.conf import settings
from aleph_client.utils import AsyncTyper
from aleph_client.utils import AsyncTyper, fetch_json

logger = logging.getLogger(__name__)
app = AsyncTyper(no_args_is_help=True)
Expand Down Expand Up @@ -254,26 +257,39 @@ async def delete(
typer.echo(f"Instance {item_hash} has been deleted. It will be removed by the scheduler in a few minutes.")


async def _get_ipv6_address(message: InstanceMessage) -> Tuple[str, str]:
async def _get_ipv6_address(message: InstanceMessage, node_list: NodeInfo) -> Tuple[str, str]:
async with ClientSession() as session:
try:
resp = await session.get(f"https://scheduler.api.aleph.cloud/api/v0/allocation/{message.item_hash}")
resp.raise_for_status()
status = await resp.json()
return status["vm_hash"], status["vm_ipv6"]
except ClientResponseError:
if not message.content.payment:
# Fetch from the scheduler API directly if no payment
status = await fetch_json(
session,
f"https://scheduler.api.aleph.cloud/api/v0/allocation/{message.item_hash}",
)
return status["vm_hash"], status["vm_ipv6"]
for node in node_list.nodes:
if node["stream_reward"] == message.content.payment.receiver:

# Fetch from the CRN API if payment
executions = await fetch_json(session, f"{node['address']}about/executions/list")
if message.item_hash in executions:
ipv6_address = executions[message.item_hash]["networking"]["ipv6"]
return message.item_hash, ipv6_address

return message.item_hash, "Not available (yet)"
except ClientResponseError as e:
return message.item_hash, f"Not available (yet), server not responding : {e}"


async def _show_instances(messages: List[InstanceMessage]):
async def _show_instances(messages: List[InstanceMessage], node_list: NodeInfo):
table = Table(box=box.SIMPLE_HEAVY)
table.add_column("Item Hash", style="cyan")
table.add_column("Vcpus", style="magenta")
table.add_column("Memory", style="magenta")
table.add_column("Disk size", style="magenta")
table.add_column("IPv6 address", style="yellow")

scheduler_responses = dict(await asyncio.gather(*[_get_ipv6_address(message) for message in messages]))
scheduler_responses = dict(await asyncio.gather(*[_get_ipv6_address(message, node_list) for message in messages]))

for message in messages:
table.add_row(
Expand Down Expand Up @@ -320,4 +336,129 @@ async def list(
else:
# Since we filtered on message type, we can safely cast as InstanceMessage.
messages = cast(List[InstanceMessage], resp.messages)
await _show_instances(messages)
resource_nodes: NodeInfo = await _fetch_nodes()
await _show_instances(messages, resource_nodes)


@app.command()
async def expire(
vm_id: str,
domain: str,
private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY),
private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE),
debug: bool = False,
):
"""expire an instance"""

setup_logging(debug)
account = _load_account(private_key, private_key_file)

async with VmClient(account, domain) as manager:
status, result = await manager.expire_instance(vm_id=vm_id)
if status != 200:
typer.echo(f"Status : {status}")
typer.echo(result)


@app.command()
async def erase(
vm_id: str,
domain: str,
private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY),
private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE),
debug: bool = False,
):
"""erase an instance"""

setup_logging(debug)

account = _load_account(private_key, private_key_file)

async with VmClient(account, domain) as manager:
status, result = await manager.erase_instance(vm_id=vm_id)
if status != 200:
typer.echo(f"Status : {status}")
typer.echo(result)


@app.command()
async def reboot(
vm_id: str,
domain: str,
private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY),
private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE),
debug: bool = False,
):
"""reboot an instance"""

setup_logging(debug)

account = _load_account(private_key, private_key_file)

async with VmClient(account, domain) as manager:
status, result = await manager.reboot_instance(vm_id=vm_id)
if status != 200:
typer.echo(f"Status : {status}")
typer.echo(result)


@app.command()
async def allocate(
vm_id: str,
domain: str,
private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY),
private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE),
debug: bool = False,
):
"""Tell the CRN to start an instance with Pay as you go"""

setup_logging(debug)

account = _load_account(private_key, private_key_file)

async with VmClient(account, domain) as manager:
status, result = await manager.start_instance(vm_id=vm_id)
if status != 200:
typer.echo(f"Status : {status}")
typer.echo(result)


@app.command()
async def logs(
vm_id: str,
domain: str,
private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY),
private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE),
debug: bool = False,
):
"""logs of the instance"""
setup_logging(debug)

account = _load_account(private_key, private_key_file)

async with VmClient(account, domain) as manager:
async for log in manager.get_logs(vm_id=vm_id):
log_data = json.loads(log)
if "message" in log_data:
typer.echo(log_data["message"])


@app.command()
async def stop(
vm_id: str,
domain: str,
private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY),
private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE),
debug: bool = False,
):
"""Stop an instance"""

setup_logging(debug)

account = _load_account(private_key, private_key_file)

async with VmClient(account, domain) as manager:
status, result = await manager.stop_instance(vm_id=vm_id)
if status != 200:
typer.echo(f"Status : {status}")
typer.echo(result)
8 changes: 7 additions & 1 deletion src/aleph_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from zipfile import BadZipFile, ZipFile

import typer
from aiohttp import ClientResponseError, ClientSession
from aleph.sdk.types import GenericMessage
from aleph_message.models.base import MessageType
from aleph_message.models.execution.base import Encoding
Expand All @@ -17,7 +18,6 @@

logger = logging.getLogger(__name__)


try:
import magic
except ImportError:
Expand Down Expand Up @@ -85,3 +85,9 @@ def callback(self, *args, **kwargs):
def command(self, *args, **kwargs):
decorator = super().command(*args, **kwargs)
return partial(self.maybe_run_async, decorator)


async def fetch_json(session: ClientSession, url: str) -> dict:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()