Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster cleanup #246

Merged
merged 4 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ jobs:
run: |
poetry run ray exec golem-cluster.tests.yaml 'free -h'
poetry run ray exec golem-cluster.tests.yaml 'ps -Heo size,command --sort -size'
poetry run ray rsync-down golem-cluster.tests.yaml /root/mem_usage.log mem_usage.log
poetry run ray exec golem-cluster.tests.yaml 'ray cluster-dump --local --debug-state --processes-verbose -o ray_cluster_dump.tar.gz'
poetry run ray rsync-down golem-cluster.tests.yaml ray_cluster_dump.tar.gz ray_cluster_dump.tar.gz

Expand Down Expand Up @@ -126,4 +125,3 @@ jobs:
/root/.local/share/ray_on_golem/yagna.log
/tmp/goth-tests
ray_cluster_dump.tar.gz
mem_usage.log
2 changes: 1 addition & 1 deletion .github/workflows/start-goth.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ python -m pip install --upgrade setuptools wheel

echo INSTALLING DEPENDENCIES
python -m pip install --extra-index-url https://test.pypi.org/simple/ goth==$GOTH_VERSION
python -m pip install pytest pytest-asyncio pexpect "requests<2.32.0"
python -m pip install pytest pytest-asyncio pexpect "requests<2.32.0" # Remove requests after fixed https://github.com/docker/docker-py/issues/3256

echo CREATING ASSETS
python -m goth create-assets .envs/goth/assets
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ COPY ray_on_golem/__init__.py /app/ray_on_golem/__init__.py

RUN pip install poetry && \
poetry config virtualenvs.create false
RUN poetry install --no-interaction --no-ansi --only ray

RUN pip config set global.index-url https://pypi.dev.golem.network/simple
RUN pip install pillow
RUN python -m venv --system-site-packages /root/venv
RUN /bin/bash -c "source /root/venv/bin/activate && poetry install --no-interaction --no-ansi --only ray"

COPY ray_on_golem /app/ray_on_golem/

Expand Down
1 change: 0 additions & 1 deletion golem-cluster.mini.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,3 @@ worker_start_ray_commands: [
# This will be removed by ray-on-golem on the fly.
head_node: true
worker_nodes: true

2 changes: 1 addition & 1 deletion golem-cluster.override-goth.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ provider:
available_node_types:
ray.head.default:
node_config:
priority_subnet_tag: "goth"
priority_subnet_tag: "goth"
3 changes: 1 addition & 2 deletions golem-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ provider:
#payment_network: "polygon"

# Maximum amount of GLMs that's going to be spent for the whole cluster
total_budget: 1
total_budget: 5
lucekdudek marked this conversation as resolved.
Show resolved Hide resolved

# Common parameters for all node types. Can be overridden in available_node_types
node_config:
Expand Down Expand Up @@ -137,4 +137,3 @@ worker_start_ray_commands: [
# This will be removed by ray-on-golem on the fly.
head_node: true
worker_nodes: true

4 changes: 2 additions & 2 deletions ray_on_golem/provider/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ def _apply_config_defaults(config: Dict[str, Any]) -> None:

config["provider"]["parameters"] = provider_parameters

for node_type in config.get("available_node_types", {}).values():
for node_type_name, node_type in config.get("available_node_types", {}).items():
result: Dict = {}

if node_type == "ray.head.default":
if node_type_name == config.get("head_node_type", "ray.head.default"):
result = deepcopy(HEAD_NODE_DEFAULTS)

dpath.merge(
Expand Down
97 changes: 58 additions & 39 deletions ray_on_golem/server/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,24 @@
import logging
from collections import defaultdict
from functools import partial
from typing import DefaultDict, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Type
from typing import (
Callable,
DefaultDict,
Dict,
Iterable,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
)

from golem.managers import PaymentManager
from golem.utils.asyncio import create_task_with_logging
from golem.utils.asyncio.tasks import resolve_maybe_awaitable
from golem.utils.logging import get_trace_id_name
from golem.utils.typing import MaybeAwaitable
from ray.autoscaler.tags import NODE_KIND_HEAD, TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE

from ray_on_golem.server import utils
Expand Down Expand Up @@ -41,13 +54,15 @@ def __init__(
webserver_port: int,
name: str,
provider_parameters: ProviderParametersData,
on_stop: Optional[Callable[["Cluster"], MaybeAwaitable[None]]] = None,
) -> None:
super().__init__()

self._golem_service = golem_service
self._name = name
self._provider_parameters = provider_parameters
self._webserver_port = webserver_port
self._on_stop = on_stop

self._manager_stacks: Dict[StackHash, ManagerStack] = {}
self._manager_stacks_locks: DefaultDict[StackHash, asyncio.Semaphore] = defaultdict(
Expand All @@ -64,6 +79,9 @@ def __init__(

self._state: NodeState = NodeState.terminated

def __str__(self) -> str:
return self._name

@property
def nodes(self) -> Mapping[str, ClusterNode]:
"""Read-only map of named nodes.
Expand All @@ -85,27 +103,27 @@ async def start(self) -> None:
"""Start the cluster and its internal state."""

if self._state in (NodeState.pending, NodeState.running):
logger.info("Not starting `%s` cluster as it's already running or starting", self._name)
logger.info("Not starting `%s` cluster as it's already running or starting", self)
return

logger.info("Starting `%s` cluster...", self._name)
logger.info("Starting `%s` cluster...", self)

self._state = NodeState.pending

await self._payment_manager.start()

self._state = NodeState.running

logger.info("Starting `%s` cluster done", self._name)
logger.info("Starting `%s` cluster done", self)

async def stop(self, clear: bool = True) -> None:
async def stop(self, call_events: bool = True) -> None:
"""Stop the cluster."""

if self._state in (NodeState.terminating, NodeState.terminated):
logger.info("Not stopping `%s` cluster as it's already stopped or stopping", self._name)
logger.info("Not stopping `%s` cluster as it's already stopped or stopping", self)
return

logger.info("Stopping `%s` cluster...", self._name)
logger.info("Stopping `%s` cluster...", self)

self._state = NodeState.terminating

Expand All @@ -116,24 +134,22 @@ async def stop(self, clear: bool = True) -> None:
await self._payment_manager.stop()

self._state = NodeState.terminated

if clear:
self.clear()

logger.info("Stopping `%s` cluster done", self._name)

def clear(self) -> None:
"""Clear the internal state of the cluster."""

if self._state != NodeState.terminated:
logger.info("Not clearing `%s` cluster as it's not stopped", self._name)
return

self._nodes.clear()
self._nodes_id_counter = 0
self._manager_stacks.clear()
self._manager_stacks_locks.clear()

if self._on_stop and call_events:
create_task_with_logging(
resolve_maybe_awaitable(self._on_stop(self)),
trace_id=get_trace_id_name(self, "on-stop"),
)

logger.info("Stopping `%s` cluster done", self)

def is_running(self) -> bool:
return self._state != NodeState.terminated

def get_non_terminated_nodes(self) -> Sequence["ClusterNode"]:
"""Return cluster nodes that are running on the cluster."""

Expand Down Expand Up @@ -258,7 +274,7 @@ async def _get_or_create_manager_stack(
return stack

async def _remove_manager_stack(self, stack_hash: StackHash) -> None:
logger.info(f"Removing stack `%s`...", stack_hash)
logger.info("Removing stack `%s`...", stack_hash)

async with self._manager_stacks_locks[stack_hash]:
await self._manager_stacks[stack_hash].stop()
Expand All @@ -269,38 +285,41 @@ async def _remove_manager_stack(self, stack_hash: StackHash) -> None:
if not self._manager_stacks_locks[stack_hash].locked():
del self._manager_stacks_locks[stack_hash]

logger.info(f"Removing stack `%s` done", stack_hash)
logger.info("Removing stack `%s` done", stack_hash)

def _on_node_stop(self, node: ClusterNode) -> None:
async def _on_node_stop(self, node: ClusterNode) -> None:
non_terminated_nodes = self.get_non_terminated_nodes()
if not non_terminated_nodes:
logger.debug("No more nodes running on the cluster, scheduling cluster stop")
logger.debug("No more nodes running on the cluster, cluster will stop")

create_task_with_logging(
self.stop(),
trace_id=get_trace_id_name(self, "on-node-stop-cluster-stop"),
)
await self.stop()

return

if isinstance(node, HeadClusterNode):
logger.debug("Head node is not running, cluster will stop")

any_manager_stopped = False
await self.stop()

return

manager_stack_stop_coros = []

# TODO: Consider moving this logic directly to manager stack with more event-based approach
for manager_stack in node.manager_stacks:
if any(manager_stack in n.manager_stacks for n in self.get_non_terminated_nodes()):
continue

logger.debug(
"No more nodes running on the `%s` manager stack, scheduling manager stack stop",
"No more nodes running on the `%s` manager stack, manager stack will stop",
manager_stack,
)

any_manager_stopped = True
stack_hash = node.node_config.get_hash()

create_task_with_logging(
self._manager_stacks[stack_hash].stop(),
trace_id=get_trace_id_name(self, "on-node-stop-manager-stack-stop"),
)
manager_stack_stop_coros.append(manager_stack.stop())

if not any_manager_stopped:
if manager_stack_stop_coros:
await asyncio.gather(*manager_stack_stop_coros)
else:
logger.debug(
"Cluster and manager stack have some nodes still running, nothing to stop."
"Cluster and its manager stacks have some nodes still running, nothing to stop"
)
51 changes: 15 additions & 36 deletions ray_on_golem/server/cluster/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Collection, List, Optional, Sequence, Tuple
from typing import Callable, Collection, List, Optional, Sequence, Tuple

from golem.exceptions import GolemException
from golem.managers.base import ManagerException, WorkContext
Expand Down Expand Up @@ -31,9 +31,6 @@
from ray_on_golem.server.utils import get_provider_desc
from ray_on_golem.utils import get_ssh_command, get_ssh_command_args, run_subprocess_output

if TYPE_CHECKING:
from ray_on_golem.server.cluster import Cluster

RAY_GCS_PORT = 6379
RAY_DASHBOARD_PORT = 8265

Expand All @@ -48,7 +45,6 @@ class ClusterNode(WarningMessagesMixin, NodeData):
ssh_public_key_path: Path
ssh_user: str

_cluster: "Cluster"
_golem_service: GolemService
_manager_stack: ManagerStack

Expand All @@ -64,7 +60,6 @@ class ClusterNode(WarningMessagesMixin, NodeData):

def __init__(
self,
cluster: "Cluster",
golem_service: GolemService,
manager_stack: ManagerStack,
priority_agreement_timeout: timedelta,
Expand All @@ -74,7 +69,6 @@ def __init__(
) -> None:
super().__init__(**kwargs)

self._cluster = cluster
self._golem_service = golem_service
self._manager_stack = manager_stack
self._priority_manager_stack = priority_manager_stack
Expand Down Expand Up @@ -211,7 +205,10 @@ async def stop(self, call_events: bool = True) -> None:
self.ssh_proxy_command = None

if self._on_stop and call_events:
await resolve_maybe_awaitable(self._on_stop(self))
create_task_with_logging(
resolve_maybe_awaitable(self._on_stop(self)),
trace_id=get_trace_id_name(self, "on-stop"),
)

logger.info("Stopping `%s` node done", self)

Expand Down Expand Up @@ -450,6 +447,16 @@ async def _stop_sidecars(self) -> None:

logger.info("Stopping `%s` node sidecars done", self)

async def _on_monitor_check_failed(self, monitor: MonitorClusterNodeSidecar) -> None:
provider_desc = await get_provider_desc(self.activity)

message = f"Terminating node `%s` %s {monitor.name} is no longer accessible"

logger.warning(message, self.node_id, provider_desc)
self.add_warning_message(message, self.node_id, provider_desc)

await self.stop()


class WorkerClusterNode(ClusterNode):
"""Self-contained element that represents explicitly a Ray worker node."""
Expand All @@ -470,20 +477,6 @@ def _prepare_sidecars(self) -> Collection[ClusterNodeSidecar]:
),
)

async def _on_monitor_check_failed(
self, monitor: MonitorClusterNodeSidecar, node: ClusterNode
) -> bool:
provider_desc = await get_provider_desc(self.activity)

message = f"Terminating node as worker `%s` %s {monitor.name} is no longer accessible"

logger.warning(message, self.node_id, provider_desc)
self.add_warning_message(message, self.node_id, provider_desc)

create_task_with_logging(self.stop())

return True


class HeadClusterNode(WorkerClusterNode):
"""Self-contained element that represents explicitly a Ray head node."""
Expand Down Expand Up @@ -517,17 +510,3 @@ def _prepare_sidecars(self) -> Collection[ClusterNodeSidecar]:
)

return sidecars

async def _on_monitor_check_failed(
self, monitor: MonitorClusterNodeSidecar, node: ClusterNode
) -> bool:
provider_desc = await get_provider_desc(self.activity)

message = f"Terminating cluster as head `%s` %s {monitor.name} is no longer accessible"

logger.warning(message, self.node_id, provider_desc)
self.add_warning_message(message, self.node_id, provider_desc)

create_task_with_logging(self._cluster.stop(clear=False))

return True
Loading