diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 264be819e..f33e02f6c 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -9,7 +9,7 @@ from os.path import abspath, exists, isdir, isfile, join from pathlib import Path from subprocess import CalledProcessError, check_output -from typing import Any, Literal, NewType +from typing import Any, List, Literal, NewType from aleph_message.models import Chain from aleph_message.models.execution.environment import HypervisorType @@ -280,8 +280,10 @@ class Settings(BaseSettings): description="Enable GPU pass-through support to VMs, only allowed for QEmu hypervisor", ) - # Tests on programs + # Settings to get from the network aggregates + SETTINGS_AGGREGATE_ADDRESS: str = "0xFba561a84A537fCaa567bb7A2257e7142701ae2A" + # Tests on programs FAKE_DATA_PROGRAM: Path | None = None BENCHMARK_FAKE_DATA_PROGRAM = Path(abspath(join(__file__, "../../../../examples/example_fastapi"))) diff --git a/src/aleph/vm/orchestrator/tasks.py b/src/aleph/vm/orchestrator/tasks.py index 75fff2364..9819ffcf2 100644 --- a/src/aleph/vm/orchestrator/tasks.py +++ b/src/aleph/vm/orchestrator/tasks.py @@ -4,6 +4,7 @@ import math import time from collections.abc import AsyncIterable +from decimal import Decimal from typing import TypeVar import aiohttp @@ -19,6 +20,10 @@ from yarl import URL from aleph.vm.conf import settings +from aleph.vm.orchestrator.utils import ( + get_community_wallet_address, + is_after_community_wallet_start, +) from aleph.vm.pool import VmPool from aleph.vm.utils import create_task_log_exceptions @@ -35,6 +40,7 @@ logger = logging.getLogger(__name__) Value = TypeVar("Value") +COMMUNITY_STREAM_RATIO = Decimal(0.2) async def retry_generator(generator: AsyncIterable[Value], max_seconds: int = 8) -> AsyncIterable[Value]: @@ -154,6 +160,7 @@ async def monitor_payments(app: web.Application): try: logger.debug("Monitoring balances task running") await check_payment(pool) + logger.debug("Monitoring balances task ended") except Exception as e: # Catch all exceptions as to never stop the task. logger.warning(f"check_payment failed {e}", exc_info=True) @@ -191,31 +198,62 @@ async def check_payment(pool: VmPool): logger.debug(f"Stopping {last_execution} due to insufficient balance") await pool.stop_vm(last_execution.vm_hash) required_balance = await compute_required_balance(executions) + community_wallet = await get_community_wallet_address() + if not community_wallet: + logger.error("Monitor payment ERROR: No community wallet set. Cannot check community payment") # Check if the balance held in the wallet is sufficient stream tier resources for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items(): for chain, executions in chains.items(): try: stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) + logger.debug( - f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" + f"Stream flow from {sender} to {settings.PAYMENT_RECEIVER_ADDRESS} = {stream} {chain.value}" ) + except ValueError as error: + logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}") + continue + try: + community_stream = await get_stream(sender=sender, receiver=community_wallet, chain=chain) + logger.debug(f"Stream flow from {sender} to {community_wallet} (community) : {stream} {chain}") + except ValueError as error: logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}") continue - required_stream = await compute_required_flow(executions) - logger.debug(f"Required stream for Sender {sender} executions: {required_stream}") - # Stop executions until the required stream is reached - while (stream + settings.PAYMENT_BUFFER) < required_stream: - try: - last_execution = executions.pop(-1) - except IndexError: # Empty list - logger.debug("No execution can be maintained due to insufficient stream") + while executions: + executions_with_community = [ + execution + for execution in executions + if await is_after_community_wallet_start(execution.times.started_at) + ] + + required_stream = await compute_required_flow(executions_with_community) + executions_without_community = [ + execution + for execution in executions + if not await is_after_community_wallet_start(execution.times.started_at) + ] + logger.info("flow community %s", executions_with_community) + logger.info("flow without community %s", executions_without_community) + required_stream_without_community = await compute_required_flow(executions_without_community) + + required_crn_stream = required_stream * (1 - COMMUNITY_STREAM_RATIO) + required_stream_without_community + required_community_stream = required_stream * COMMUNITY_STREAM_RATIO + logger.debug( + f"Stream for senders {sender} {len(executions)} executions. CRN : {stream} / {required_crn_stream}." + f"Community: {community_stream} / {required_community_stream}" + ) + # Can pay all executions + if (stream + settings.PAYMENT_BUFFER) > required_crn_stream and ( + community_stream + settings.PAYMENT_BUFFER + ) > required_community_stream: break - logger.debug(f"Stopping {last_execution} due to insufficient stream") + # Stop executions until the required stream is reached + last_execution = executions.pop(-1) + logger.info(f"Stopping {last_execution} of {sender} due to insufficient stream") await pool.stop_vm(last_execution.vm_hash) - required_stream = await compute_required_flow(executions) async def start_payment_monitoring_task(app: web.Application): diff --git a/src/aleph/vm/orchestrator/utils.py b/src/aleph/vm/orchestrator/utils.py new file mode 100644 index 000000000..17dcbca03 --- /dev/null +++ b/src/aleph/vm/orchestrator/utils.py @@ -0,0 +1,102 @@ +from datetime import datetime, timedelta, timezone +from logging import getLogger +from typing import Any, TypedDict + +import aiohttp + +from aleph.vm.conf import settings + +logger = getLogger(__name__) + + +class AggregateSettingsDict(TypedDict): + compatible_gpus: list[Any] + community_wallet_address: str + community_wallet_timestamp: int + + +LAST_AGGREGATE_SETTINGS: AggregateSettingsDict | None = None +LAST_AGGREGATE_SETTINGS_FETCHED_AT: datetime | None = None + + +async def fetch_aggregate_settings() -> AggregateSettingsDict | None: + """ + Get the settings Aggregate dict from the PyAleph API Aggregate. + + API Endpoint: + GET /api/v0/aggregates/{address}.json?keys=settings + + For more details, see the PyAleph API documentation: + https://github.com/aleph-im/pyaleph/blob/master/src/aleph/web/controllers/routes.py#L62 + """ + async with aiohttp.ClientSession() as session: + url = f"{settings.API_SERVER}/api/v0/aggregates/{settings.SETTINGS_AGGREGATE_ADDRESS}.json?keys=settings" + logger.info(f"Fetching settings aggregate from {url}") + resp = await session.get(url) + + # Raise an error if the request failed + resp.raise_for_status() + + resp_data = await resp.json() + return resp_data["data"]["settings"] + + +async def update_aggregate_settings(): + global LAST_AGGREGATE_SETTINGS # noqa: PLW0603 + global LAST_AGGREGATE_SETTINGS_FETCHED_AT # noqa: PLW0603 + + LAST_AGGREGATE_SETTINGS = await fetch_aggregate_settings() + if ( + not LAST_AGGREGATE_SETTINGS + or LAST_AGGREGATE_SETTINGS_FETCHED_AT + and datetime.now(tz=timezone.utc) - LAST_AGGREGATE_SETTINGS_FETCHED_AT > timedelta(minutes=1) + ): + try: + aggregate = await fetch_aggregate_settings() + LAST_AGGREGATE_SETTINGS = aggregate + LAST_AGGREGATE_SETTINGS_FETCHED_AT = datetime.now(tz=timezone.utc) + + except Exception: + logger.exception("Failed to fetch aggregate settings") + + +async def get_aggregate_settings() -> AggregateSettingsDict | None: + """The settings aggregate is a special aggregate used to share some common settings for VM setup + + Ensure the cached version is up to date and return it""" + await update_aggregate_settings() + + if not LAST_AGGREGATE_SETTINGS: + logger.error("No setting aggregate") + return LAST_AGGREGATE_SETTINGS + + +async def get_community_wallet_address() -> str | None: + setting_aggr = await get_aggregate_settings() + return setting_aggr and setting_aggr.get("community_wallet_address") + + +async def get_community_wallet_start() -> datetime: + """Community wallet start time. + + After this timestamp. New PAYG must include a payment to the community wallet""" + setting_aggr = await get_aggregate_settings() + if setting_aggr is None or "community_wallet_timestamp" not in setting_aggr: + return datetime.now(tz=timezone.utc) + timestamp = setting_aggr["community_wallet_timestamp"] + start_datetime = datetime.fromtimestamp(timestamp, tz=timezone.utc) + return start_datetime + + +async def is_after_community_wallet_start(dt: datetime | None = None) -> bool: + """Community wallet start time""" + if not dt: + dt = datetime.now(tz=timezone.utc) + start_dt = await get_community_wallet_start() + return dt > start_dt + + +def get_compatible_gpus() -> list[Any]: + if not LAST_AGGREGATE_SETTINGS: + return [] + return LAST_AGGREGATE_SETTINGS["compatible_gpus"] diff --git a/src/aleph/vm/orchestrator/views/__init__.py b/src/aleph/vm/orchestrator/views/__init__.py index 899a038f8..6e9460d1c 100644 --- a/src/aleph/vm/orchestrator/views/__init__.py +++ b/src/aleph/vm/orchestrator/views/__init__.py @@ -1,5 +1,4 @@ import binascii -import contextlib import logging from decimal import Decimal from hashlib import sha256 @@ -8,7 +7,6 @@ from pathlib import Path from secrets import compare_digest from string import Template -from typing import Optional import aiodns import aiohttp @@ -26,7 +24,7 @@ from aleph.vm.controllers.firecracker.program import FileTooLargeError from aleph.vm.hypervisors.firecracker.microvm import MicroVMFailedInitError from aleph.vm.orchestrator import payment, status -from aleph.vm.orchestrator.chain import STREAM_CHAINS, ChainInfo +from aleph.vm.orchestrator.chain import STREAM_CHAINS from aleph.vm.orchestrator.custom_logs import set_vm_for_logging from aleph.vm.orchestrator.messages import try_get_message from aleph.vm.orchestrator.metrics import get_execution_records @@ -39,6 +37,12 @@ from aleph.vm.orchestrator.pubsub import PubSub from aleph.vm.orchestrator.resources import Allocation, VMNotification from aleph.vm.orchestrator.run import run_code_on_request, start_persistent_vm +from aleph.vm.orchestrator.tasks import COMMUNITY_STREAM_RATIO +from aleph.vm.orchestrator.utils import ( + get_community_wallet_address, + is_after_community_wallet_start, + update_aggregate_settings, +) from aleph.vm.orchestrator.views.host_status import ( check_dns_ipv4, check_dns_ipv6, @@ -468,6 +472,7 @@ async def update_allocations(request: web.Request): @cors_allow_all async def notify_allocation(request: web.Request): """Notify instance allocation, only used for Pay as you Go feature""" + await update_aggregate_settings() try: data = await request.json() vm_notification = VMNotification.parse_obj(data) @@ -526,16 +531,44 @@ async def notify_allocation(request: web.Request): raise web.HTTPPaymentRequired(reason="Empty payment stream for this instance") required_flow: Decimal = await fetch_execution_flow_price(item_hash) - - if active_flow < required_flow: + community_wallet = await get_community_wallet_address() + required_crn_stream: Decimal + required_community_stream: Decimal + if await is_after_community_wallet_start() and community_wallet: + required_crn_stream = required_flow * (1 - COMMUNITY_STREAM_RATIO) + required_community_stream = required_flow * COMMUNITY_STREAM_RATIO + else: # No community wallet payment + required_crn_stream = required_flow + required_community_stream = Decimal(0) + + if active_flow < (required_crn_stream - settings.PAYMENT_BUFFER): active_flow_per_month = active_flow * 60 * 60 * 24 * (Decimal("30.41666666666923904761904784")) - required_flow_per_month = required_flow * 60 * 60 * 24 * Decimal("30.41666666666923904761904784") + required_flow_per_month = required_crn_stream * 60 * 60 * 24 * Decimal("30.41666666666923904761904784") return web.HTTPPaymentRequired( reason="Insufficient payment stream", text="Insufficient payment stream for this instance\n\n" - f"Required: {required_flow_per_month} / month (flow = {required_flow})\n" + f"Required: {required_flow_per_month} / month (flow = {required_crn_stream})\n" f"Present: {active_flow_per_month} / month (flow = {active_flow})", ) + + if community_wallet and required_community_stream: + community_flow: Decimal = await get_stream( + sender=message.sender, + receiver=community_wallet, + chain=message.content.payment.chain, + ) + if community_flow < (required_community_stream - settings.PAYMENT_BUFFER): + active_flow_per_month = community_flow * 60 * 60 * 24 * (Decimal("30.41666666666923904761904784")) + required_flow_per_month = ( + required_community_stream * 60 * 60 * 24 * Decimal("30.41666666666923904761904784") + ) + return web.HTTPPaymentRequired( + reason="Insufficient payment stream to community", + text="Insufficient payment stream for community \n\n" + f"Required: {required_flow_per_month} / month (flow = {required_community_stream})\n" + f"Present: {active_flow_per_month} / month (flow = {community_flow})\n" + f"Address: {community_wallet}", + ) else: return web.HTTPBadRequest(reason="Invalid payment method") diff --git a/src/aleph/vm/orchestrator/views/static/helpers.js b/src/aleph/vm/orchestrator/views/static/helpers.js index 8644a11aa..f7e9925dd 100644 --- a/src/aleph/vm/orchestrator/views/static/helpers.js +++ b/src/aleph/vm/orchestrator/views/static/helpers.js @@ -53,6 +53,42 @@ async function fetchHostCheckStatus () { return res; } +async function fetchHostSystemUsage () { + const q = await fetch('/about/usage/system'); + let res = { + status: q.status, + details: [] + } + if(q.ok){ + const answer = await q.json(); + const gpu_devices = answer.gpu.devices; + if (gpu_devices.length <= 0) { + res.status = "No GPUs detected"; + }else{ + res.status = ""; + } + } + else { + switch(Number(q.status)){ + case 500: + res.status = "Getting Node usage failed ❌"; + break; + default: + res.status = q.status; + } + } + + return res; +} + function objectToString (obj) { return Object.entries(obj).reduce((acc, [k, v]) => acc + `
  • ${k}: ${v}
  • \n`, ''); } diff --git a/src/aleph/vm/orchestrator/views/static/main.css b/src/aleph/vm/orchestrator/views/static/main.css index bf2cbbf85..2b14d4b60 100644 --- a/src/aleph/vm/orchestrator/views/static/main.css +++ b/src/aleph/vm/orchestrator/views/static/main.css @@ -5,6 +5,10 @@ body { max-width: 800px; } +details { + margin-top: 30px; +} + main { width: 90vw; margin: 2vh auto; diff --git a/src/aleph/vm/orchestrator/views/templates/index.html b/src/aleph/vm/orchestrator/views/templates/index.html index 09715acb5..8222fb8bd 100644 --- a/src/aleph/vm/orchestrator/views/templates/index.html +++ b/src/aleph/vm/orchestrator/views/templates/index.html @@ -3,8 +3,8 @@ Aleph.im Compute Node - - + +
    @@ -112,6 +112,25 @@

    IPv4

    IPv6

    +

    VM Egress IPv6

    +

    + VM Egress IPv6 is a test to check if virtual machines are able to connect to the IPv6 internet. + Enabling VM IPv6 Egress requires a specific configuration that is not applied automatically. It is not yet + required to run programs inside, but it's required to run instance, so will be mandatory soon. +

    +
    +

    + VM Egress IPv6 + + is ... + + + + + + +

    +
    +
    +

    GPUs

    +
    + Loading GPU list + + ... + + + + + + +
    +
    +
    ℹ️ More information @@ -138,25 +172,6 @@

    Latest metrics

    -

    VM Egress IPv6

    -

    - VM Egress IPv6 is a test to check if virtual machines are able to connect to the IPv6 internet. - Enabling VM IPv6 Egress requires a specific configuration that is not applied automatically. It is not yet - required to run virtual machines. -

    -
    -

    - VM Egress IPv6 - - is ... - - - - - - -

    -

    APIs

    Host status check API: /status/check/host @@ -214,7 +229,7 @@

    Version

    - +