Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions python/ray/_common/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio

import ray


@ray.remote(num_cpus=0)
class SignalActor:
def __init__(self):
self.ready_event = asyncio.Event()
self.num_waiters = 0

def send(self, clear: bool = False):
self.ready_event.set()
if clear:
self.ready_event.clear()

async def wait(self, should_wait: bool = True):
if should_wait:
self.num_waiters += 1
await self.ready_event.wait()
self.num_waiters -= 1

async def cur_num_waiters(self) -> int:
return self.num_waiters


@ray.remote(num_cpus=0)
class Semaphore:
def __init__(self, value: int = 1):
self._sema = asyncio.Semaphore(value=value)

async def acquire(self):
await self._sema.acquire()

async def release(self):
self._sema.release()

async def locked(self) -> bool:
return self._sema.locked()
1 change: 1 addition & 0 deletions python/ray/_common/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("//bazel:python.bzl", "py_test_module_list")
py_test_module_list(
size = "small",
files = [
"test_test_utils.py",
"test_utils.py",
],
tags = [
Expand Down
90 changes: 90 additions & 0 deletions python/ray/_common/tests/test_test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import pytest
import sys
import ray
from ray._common.test_utils import SignalActor, Semaphore
from ray._private.test_utils import wait_for_condition
import time


@pytest.fixture(scope="module")
def ray_init():
ray.init(num_cpus=4)
yield
ray.shutdown()


def test_signal_actor_basic(ray_init):
signal = SignalActor.remote()

# Test initial state
assert ray.get(signal.cur_num_waiters.remote()) == 0

# Test send and wait
ray.get(signal.send.remote())
signal.wait.remote()
assert ray.get(signal.cur_num_waiters.remote()) == 0


def test_signal_actor_multiple_waiters(ray_init):
signal = SignalActor.remote()

# Create multiple waiters
for _ in range(3):
signal.wait.remote()

# Check number of waiters
wait_for_condition(lambda: ray.get(signal.cur_num_waiters.remote()) == 3)

# Send signal and wait for all waiters
ray.get(signal.send.remote())

# Verify all waiters are done
wait_for_condition(lambda: ray.get(signal.cur_num_waiters.remote()) == 0)

# check that .wait() doesn't block if the signal is already sent
ray.get(signal.wait.remote())

assert ray.get(signal.cur_num_waiters.remote()) == 0

# clear the signal
ray.get(signal.send.remote(clear=True))
signal.wait.remote()
# Verify all waiters are done
wait_for_condition(lambda: ray.get(signal.cur_num_waiters.remote()) == 1)

ray.get(signal.send.remote())


def test_semaphore_basic(ray_init):
sema = Semaphore.remote(value=2)

# Test initial state
wait_for_condition(lambda: ray.get(sema.locked.remote()) is False)

# Test acquire and release
ray.get(sema.acquire.remote())
ray.get(sema.acquire.remote())
wait_for_condition(lambda: ray.get(sema.locked.remote()) is True)

ray.get(sema.release.remote())
ray.get(sema.release.remote())
wait_for_condition(lambda: ray.get(sema.locked.remote()) is False)


def test_semaphore_concurrent(ray_init):
sema = Semaphore.remote(value=2)

def worker():
ray.get(sema.acquire.remote())
time.sleep(0.1)
ray.get(sema.release.remote())

# Create multiple workers
_ = [worker() for _ in range(4)]

# Verify semaphore is not locked
wait_for_condition(lambda: ray.get(sema.locked.remote()) is False)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
36 changes: 0 additions & 36 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,42 +820,6 @@ def get(self):
return self.items


@ray.remote(num_cpus=0)
class SignalActor:
def __init__(self):
self.ready_event = asyncio.Event()
self.num_waiters = 0

def send(self, clear=False):
self.ready_event.set()
if clear:
self.ready_event.clear()

async def wait(self, should_wait=True):
if should_wait:
self.num_waiters += 1
await self.ready_event.wait()
self.num_waiters -= 1

async def cur_num_waiters(self):
return self.num_waiters


@ray.remote(num_cpus=0)
class Semaphore:
def __init__(self, value=1):
self._sema = asyncio.Semaphore(value=value)

async def acquire(self):
await self._sema.acquire()

async def release(self):
self._sema.release()

async def locked(self):
return self._sema.locked()


def dicts_equal(dict1, dict2, abs_tol=1e-4):
"""Compares to dicts whose values may be floating point numbers."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from ray._private.test_utils import (
run_string_as_driver_nonblocking,
wait_for_pid_to_exit,
SignalActor,
)
from ray._common.test_utils import SignalActor
import signal

from ray.dag.tests.experimental.actor_defs import Actor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
import pytest

import ray
from ray._common.test_utils import SignalActor
from ray._private.ray_constants import (
DEFAULT_DASHBOARD_AGENT_LISTEN_PORT,
KV_HEAD_NODE_ID_KEY,
KV_NAMESPACE_JOB,
RAY_ADDRESS_ENVIRONMENT_VARIABLE,
)
from ray._private.test_utils import (
SignalActor,
async_wait_for_condition,
wait_for_condition,
)
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import ray
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray._private.usage import usage_lib
from ray.cluster_utils import AutoscalingCluster, Cluster
from ray.serve._private.test_utils import (
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_actor_replica_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import ray
from ray import ObjectRef, ObjectRefGenerator
from ray._common.test_utils import SignalActor
from ray._common.utils import get_or_create_event_loop
from ray._private.test_utils import SignalActor
from ray.serve._private.common import (
DeploymentID,
ReplicaID,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import ray
from ray import serve
from ray._private.test_utils import SignalActor
from ray._common.test_utils import SignalActor
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
from ray.serve.handle import DeploymentHandle

Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

import ray
from ray import serve
from ray._common.test_utils import SignalActor
from ray._private.pydantic_compat import BaseModel, ValidationError
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._private.test_utils import wait_for_condition
from ray.serve._private.api import call_user_app_builder_with_args_if_necessary
from ray.serve._private.common import DeploymentID
from ray.serve._private.constants import (
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import ray
import ray.util.state as state_api
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray.serve._private.common import (
DeploymentID,
DeploymentStatus,
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_backpressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

import ray
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray.serve.exceptions import BackPressureError
from ray.serve.generated import serve_pb2, serve_pb2_grpc

Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

import ray
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray.cluster_utils import Cluster
from ray.exceptions import RayActorError
from ray.serve._private.common import DeploymentID, ReplicaState
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

import ray
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray.exceptions import RayTaskError
from ray.serve._private.common import DeploymentID, ReplicaState
from ray.serve._private.constants import (
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

import ray
from ray import serve
from ray._common.test_utils import SignalActor
from ray._private.pydantic_compat import ValidationError
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._private.test_utils import wait_for_condition
from ray.serve._private.utils import get_random_string
from ray.serve.exceptions import RayServeException

Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_deploy_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

import ray
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray.serve._private.common import DeploymentStatus
from ray.serve._private.logging_utils import get_serve_logs_dir
from ray.serve._private.test_utils import check_deployment_status, check_num_replicas_eq
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_deploy_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import ray._private.state
import ray.actor
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray.serve._private.client import ServeControllerClient
from ray.serve._private.common import DeploymentID, DeploymentStatus, ReplicaID
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME, SERVE_NAMESPACE
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import ray
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray.exceptions import RayActorError
from ray.serve._private.common import DeploymentID
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

import ray
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray.exceptions import GetTimeoutError
from ray.serve._private.client import ServeControllerClient
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import ray
from ray import serve
from ray._private.test_utils import SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import wait_for_condition
from ray.cluster_utils import Cluster
from ray.serve._private.constants import SERVE_NAMESPACE
from ray.serve._private.test_utils import (
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_handle_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

import ray
from ray import serve
from ray._common.test_utils import SignalActor
from ray._common.utils import get_or_create_event_loop
from ray._private.test_utils import SignalActor, async_wait_for_condition
from ray._private.test_utils import async_wait_for_condition
from ray.serve._private.constants import (
RAY_SERVE_FORCE_LOCAL_TESTING_MODE,
)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_handle_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import ray
from ray import serve
from ray._common.test_utils import SignalActor
from ray._private.test_utils import (
SignalActor,
async_wait_for_condition,
wait_for_condition,
)
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_http_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import ray
from ray import serve
from ray._private.test_utils import Collector, SignalActor, wait_for_condition
from ray._common.test_utils import SignalActor
from ray._private.test_utils import Collector, wait_for_condition
from ray.serve._private.test_utils import send_signal_on_cancellation
from ray.serve.exceptions import RequestCancelledError

Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import ray
import ray.util.state as state_api
from ray import serve
from ray._common.test_utils import SignalActor
from ray._private.test_utils import (
SignalActor,
fetch_prometheus_metrics,
wait_for_condition,
)
Expand Down
Loading