diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index 438e509eb643..fe9bb2e64209 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -891,7 +891,6 @@ py_test_module_list( "test_actor.py", "test_actor_failures.py", "test_cancel.py", - "test_chaos.py", "test_core_worker_fault_tolerance.py", "test_failure.py", "test_failure_2.py", diff --git a/python/ray/tests/test_chaos.py b/python/ray/tests/test_chaos.py deleted file mode 100644 index 19458773c2f4..000000000000 --- a/python/ray/tests/test_chaos.py +++ /dev/null @@ -1,319 +0,0 @@ -import random -import sys -import time - -import pytest - -import ray -from ray._common.test_utils import wait_for_condition -from ray._private.test_utils import ( - RayletKiller, - WorkerKillerActor, - get_and_run_resource_killer, - get_log_message, -) -from ray.cluster_utils import AutoscalingCluster -from ray.exceptions import ObjectLostError, RayTaskError -from ray.experimental import shuffle -from ray.tests.conftest import _ray_start_chaos_cluster -from ray.util.placement_group import placement_group -from ray.util.state.api import StateApiClient, list_nodes -from ray.util.state.common import ListApiOptions, StateResource - - -def assert_no_system_failure(p, timeout): - # Get all logs for 20 seconds. - logs = get_log_message(p, timeout=timeout) - for log in logs: - assert "SIG" not in log, "There's the segfault or SIGBART reported." - assert "Check failed" not in log, "There's the check failure reported." - - -@pytest.fixture -def set_kill_interval(request): - lineage_reconstruction_enabled, kill_interval = request.param - - request.param = { - "_system_config": { - "lineage_pinning_enabled": lineage_reconstruction_enabled, - "max_direct_call_object_size": 1000, - }, - "kill_interval": kill_interval, - "head_resources": { - "CPU": 0, - }, - "worker_node_types": { - "cpu_node": { - "resources": { - "CPU": 2, - }, - "node_config": { - "object_store_memory": int(200e6), - }, - "min_workers": 0, - "max_workers": 3, - }, - }, - } - cluster_fixture = _ray_start_chaos_cluster(request) - for x in cluster_fixture: - yield (lineage_reconstruction_enabled, kill_interval, cluster_fixture) - - -@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -@pytest.mark.parametrize( - "set_kill_interval", - [(True, None), (True, 20), (False, None), (False, 20)], - indirect=True, -) -def test_chaos_actor_retry(set_kill_interval): - # Chaos testing. - @ray.remote(num_cpus=0.25, max_restarts=-1, max_task_retries=-1) - class Actor: - def __init__(self): - self.letter_dict = set() - - def add(self, letter): - self.letter_dict.add(letter) - - NUM_CPUS = 16 - TOTAL_TASKS = 300 - - actors = [Actor.remote() for _ in range(NUM_CPUS)] - results = [] - for a in actors: - results.extend([a.add.remote(str(i)) for i in range(TOTAL_TASKS)]) - - start = time.time() - ray.get(results) - runtime_with_failure = time.time() - start - print(f"Runtime when there are many failures: {runtime_with_failure}") - - # TODO(sang): Currently, there are lots of SIGBART with - # plasma client failures. Fix it. - # assert_no_system_failure(p, 10) - - -def test_chaos_defer(monkeypatch, ray_start_cluster): - with monkeypatch.context() as m: - # defer for 3s - m.setenv( - "RAY_testing_asio_delay_us", - "NodeManagerService.grpc_client.PrepareBundleResources=2000000:2000000", - ) - m.setenv("RAY_event_stats", "true") - cluster = ray_start_cluster - cluster.add_node(num_cpus=1, object_store_memory=1e9) - cluster.wait_for_nodes() - ray.init(address="auto") # this will connect to gpu nodes - cluster.add_node(num_cpus=0, num_gpus=1) - bundle = [{"GPU": 1}, {"CPU": 1}] - pg = placement_group(bundle) - # PG will not be ready within 3s - with pytest.raises(ray.exceptions.GetTimeoutError): - ray.get(pg.ready(), timeout=1) - # it'll be ready eventually - ray.get(pg.ready()) - - -@ray.remote(num_cpus=0) -class ShuffleStatusTracker: - def __init__(self): - self.num_map = 0 - self.num_reduce = 0 - self.map_refs = [] - self.reduce_refs = [] - - def register_objectrefs(self, map_refs, reduce_refs): - self.map_refs = map_refs - self.reduce_refs = reduce_refs - - def get_progress(self): - if self.map_refs: - ready, self.map_refs = ray.wait( - self.map_refs, - timeout=1, - num_returns=len(self.map_refs), - fetch_local=False, - ) - if ready: - print("Still waiting on map refs", self.map_refs) - self.num_map += len(ready) - elif self.reduce_refs: - ready, self.reduce_refs = ray.wait( - self.reduce_refs, - timeout=1, - num_returns=len(self.reduce_refs), - fetch_local=False, - ) - if ready: - print("Still waiting on reduce refs", self.reduce_refs) - self.num_reduce += len(ready) - return self.num_map, self.num_reduce - - -@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -@pytest.mark.parametrize( - "set_kill_interval", [(False, None), (False, 60)], indirect=True -) -def test_nonstreaming_shuffle(set_kill_interval): - lineage_reconstruction_enabled, kill_interval, _ = set_kill_interval - try: - # Create our own tracker so that it gets scheduled onto the head node. - tracker = ShuffleStatusTracker.remote() - ray.get(tracker.get_progress.remote()) - assert len(ray.nodes()) == 1, ( - "Tracker actor may have been scheduled to remote node " - "and may get killed during the test" - ) - - shuffle.run( - ray_address="auto", - no_streaming=True, - num_partitions=200, - partition_size=1e6, - tracker=tracker, - ) - except (RayTaskError, ObjectLostError): - assert kill_interval is not None - assert not lineage_reconstruction_enabled - - -@pytest.mark.skip(reason="https://github.com/ray-project/ray/issues/20713") -@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -@pytest.mark.parametrize( - "set_kill_interval", - [(True, None), (True, 60), (False, None), (False, 60)], - indirect=True, -) -def test_streaming_shuffle(set_kill_interval): - lineage_reconstruction_enabled, kill_interval, _ = set_kill_interval - try: - # Create our own tracker so that it gets scheduled onto the head node. - tracker = ShuffleStatusTracker.remote() - ray.get(tracker.get_progress.remote()) - assert len(ray.nodes()) == 1, ( - "Tracker actor may have been scheduled to remote node " - "and may get killed during the test" - ) - - shuffle.run( - ray_address="auto", - no_streaming=False, - num_partitions=200, - partition_size=1e6, - tracker=tracker, - ) - except (RayTaskError, ObjectLostError): - assert kill_interval is not None - - # TODO(swang): Enable this once we implement support ray.put. - # assert not lineage_reconstruction_enabled - - -def test_worker_killer(): - ray.init() - - task_name = "worker_to_kill" - - @ray.remote - def worker_to_kill(): - time.sleep(3) - - # Run WorkerKillerActor to kill 3 tasks, and run remote "worker_to_kill" - # task with max_retries=3. 4 tasks in total (1 initial + 3 retries). - # First 3 tasks will be killed, the last retry will succeed. - worker_killer = get_and_run_resource_killer( - WorkerKillerActor, - 1, - max_to_kill=3, - kill_filter_fn=lambda: lambda task: task.name == task_name, - ) - worker_to_kill.options(name=task_name, max_retries=3).remote() - - def check(): - tasks = StateApiClient().list( - StateResource.TASKS, - options=ListApiOptions(filters=[("name", "=", task_name)]), - raise_on_missing_output=False, - ) - failed = 0 - finished = 0 - for task in tasks: - if task.state == "FAILED": - failed += 1 - elif task.state == "FINISHED": - finished += 1 - return failed == 3 and finished == 1 - - wait_for_condition(check, timeout=20) - - killed_tasks = ray.get(worker_killer.get_total_killed.remote()) - assert len(killed_tasks) == 3 - - tasks = StateApiClient().list( - StateResource.TASKS, - options=ListApiOptions(filters=[("name", "=", task_name)]), - raise_on_missing_output=False, - ) - for task in tasks: - if task.state == "FAILED": - assert (task.task_id, task.worker_pid) in killed_tasks - - ray.shutdown() - - -@pytest.mark.parametrize( - "autoscaler_v2", - [False, True], - ids=["v1", "v2"], -) -def test_node_killer_filter(autoscaler_v2): - # Initialize cluster with 1 head node and 2 worker nodes. - try: - cluster = AutoscalingCluster( - head_resources={"CPU": 0}, - worker_node_types={ - "cpu_node": { - "resources": { - "CPU": 1, - }, - "node_config": {}, - "min_workers": 2, - "max_workers": 2, - }, - }, - autoscaler_v2=autoscaler_v2, - idle_timeout_minutes=999, # it could idle killed before the killer. - ) - cluster.start() - ray.init() - - wait_for_condition(lambda: len(list_nodes()) > 2) - - # Choose random worker node to kill. - worker_nodes = [node for node in list_nodes() if not node["is_head_node"]] - node_to_kill = random.choice(worker_nodes) - node_killer = get_and_run_resource_killer( - RayletKiller, - 1, - max_to_kill=1, - kill_filter_fn=lambda: lambda node: node["NodeID"] == node_to_kill.node_id, - ) - - def check_killed(): - # Check that killed node is consistent across list_nodes() - killed = list(ray.get(node_killer.get_total_killed.remote())) - dead = [node.node_id for node in list_nodes() if node.state == "DEAD"] - if len(killed) != 1 or len(dead) != 1: - return False - return killed[0] == dead[0] == node_to_kill.node_id - - wait_for_condition(check_killed, timeout=100) - finally: - cluster.shutdown() - ray.shutdown() - - -if __name__ == "__main__": - sys.exit(pytest.main(["-sv", __file__])) diff --git a/release/nightly_tests/chaos_test/actor_workload.py b/release/nightly_tests/chaos_test/actor_workload.py new file mode 100644 index 000000000000..ad53976a54b7 --- /dev/null +++ b/release/nightly_tests/chaos_test/actor_workload.py @@ -0,0 +1,82 @@ +import ray +from ray._common.test_utils import wait_for_condition +from ray.data._internal.progress_bar import ProgressBar +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy + + +def run_actor_workload(total_num_cpus, smoke): + """Run actor-based workload. + + The test checks if actor restart -1 and task_retries -1 works + as expected. It basically requires many actors to report the + seqno to the centralized DB actor while there are failures. + If at least once is guaranteed upon failures, this test + shouldn't fail. + """ + + @ray.remote(num_cpus=0) + class DBActor: + def __init__(self): + self.letter_dict = set() + + def add(self, letter): + self.letter_dict.add(letter) + + def get(self): + return self.letter_dict + + @ray.remote(num_cpus=1, max_restarts=-1, max_task_retries=-1) + class ReportActor: + def __init__(self, db_actor): + self.db_actor = db_actor + + def add(self, letter): + ray.get(self.db_actor.add.remote(letter)) + + NUM_CPUS = int(total_num_cpus) + multiplier = 2 + # For smoke mode, run fewer tasks + if smoke: + multiplier = 1 + TOTAL_TASKS = int(300 * multiplier) + head_node_id = ray.get_runtime_context().get_node_id() + db_actors = [ + DBActor.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + node_id=head_node_id, soft=False + ) + ).remote() + for _ in range(NUM_CPUS) + ] + + pb = ProgressBar("Chaos test", TOTAL_TASKS * NUM_CPUS, "task") + actors = [] + for db_actor in db_actors: + actors.append(ReportActor.remote(db_actor)) + results = [] + highest_reported_num = 0 + for a in actors: + for _ in range(TOTAL_TASKS): + results.append(a.add.remote(str(highest_reported_num))) + highest_reported_num += 1 + pb.fetch_until_complete(results) + pb.close() + for actor in actors: + ray.kill(actor) + + # Consistency check + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) + letter_set = set() + for db_actor in db_actors: + letter_set.update(ray.get(db_actor.get.remote())) + # Make sure the DB actor didn't lose any report. + # If this assert fails, that means at least once actor task semantic + # wasn't guaranteed. + for i in range(highest_reported_num): + assert str(i) in letter_set, i diff --git a/release/nightly_tests/chaos_test/object_ref_borrowing_workload.py b/release/nightly_tests/chaos_test/object_ref_borrowing_workload.py new file mode 100644 index 000000000000..2a142632cb5f --- /dev/null +++ b/release/nightly_tests/chaos_test/object_ref_borrowing_workload.py @@ -0,0 +1,73 @@ +import numpy as np + +import ray +from ray._common.test_utils import wait_for_condition + + +def run_object_ref_borrowing_workload(total_num_cpus, smoke): + """Run object ref borrowing workload. + + This test checks that borrowed refs + remain valid even with node failures or transient network failures. + """ + + @ray.remote(num_cpus=1, max_retries=-1) + def create_object(size_mb): + data = np.zeros(size_mb * 1024 * 1024, dtype=np.uint8) + return data + + @ray.remote(num_cpus=1, max_retries=-1) + def borrow_object(borrowed_refs): + data = ray.get(borrowed_refs[0]) + return len(data) + + # For smoke mode, run fewer iterations + if smoke: + NUM_ITERATIONS = 10 + else: + NUM_ITERATIONS = 2000 + OBJECT_SIZE_MB = 10 + + print(f"Starting borrowing test with {NUM_ITERATIONS * 2} total tasks") + print(f"Object size: {OBJECT_SIZE_MB}MB per object") + print(f"Expected total data: {NUM_ITERATIONS * 2 * OBJECT_SIZE_MB / 1024:.2f} GB") + + total_completed = 0 + total_bytes = 0 + + for i in range(NUM_ITERATIONS): + ref = create_object.remote(OBJECT_SIZE_MB) + task_ref = borrow_object.remote([ref]) + size = ray.get(task_ref) + total_completed += 1 + total_bytes += size + + refs = [] + for i in range(NUM_ITERATIONS): + ref = create_object.remote(OBJECT_SIZE_MB) + refs.append(borrow_object.remote([ref])) + sizes = ray.get(refs) + total_completed += len(sizes) + total_bytes += sum(sizes) + + print("All tasks completed:") + print(f" Total tasks completed: {total_completed} (expected {NUM_ITERATIONS * 2})") + print(f" Total data processed: {total_bytes / (1024**3):.2f} GB") + + expected_total_tasks = NUM_ITERATIONS * 2 + assert ( + total_completed == expected_total_tasks + ), f"Expected {expected_total_tasks} completions, got {total_completed}" + expected_total_bytes = expected_total_tasks * OBJECT_SIZE_MB * 1024 * 1024 + assert ( + total_bytes == expected_total_bytes + ), f"Expected {expected_total_bytes} bytes, got {total_bytes}" + + # Consistency check + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) diff --git a/release/nightly_tests/chaos_test/streaming_generator_workload.py b/release/nightly_tests/chaos_test/streaming_generator_workload.py new file mode 100644 index 000000000000..d8bf6a917153 --- /dev/null +++ b/release/nightly_tests/chaos_test/streaming_generator_workload.py @@ -0,0 +1,91 @@ +import numpy as np + +import ray +from ray._common.test_utils import wait_for_condition +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy + + +def run_streaming_generator_workload(total_num_cpus, smoke): + """Run streaming generator workload. + + Spreads streaming generators across the nodes to ensure + chaos events affect the generators. Tests that streaming generators + work correctly with retries when there are node failures or transient + network failures. + """ + + @ray.remote(num_cpus=1, max_retries=-1) + def streaming_generator(num_items, item_size_mb): + for i in range(num_items): + data = np.zeros(item_size_mb * 1024 * 1024, dtype=np.uint8) + yield data + + @ray.remote(num_cpus=1, max_retries=-1) + def consume_streaming_generator(num_items, item_size_mb): + gen = streaming_generator.remote(num_items, item_size_mb) + + count = 0 + total_bytes = 0 + for item_ref in gen: + data = ray.get(item_ref) + count += 1 + total_bytes += data.nbytes + + return (count, total_bytes) + + # Get alive nodes to distribute generators across the cluster + alive_nodes = [n for n in ray.nodes() if n.get("Alive", False)] + NUM_GENERATORS = 2 * len(alive_nodes) + + # For smoke mode, run fewer items + if smoke: + ITEMS_PER_GENERATOR = 10 + else: + ITEMS_PER_GENERATOR = 500 + ITEM_SIZE_MB = 10 + + print( + f"Starting {NUM_GENERATORS} concurrent streaming generators " + f"({ITEMS_PER_GENERATOR} items of {ITEM_SIZE_MB}MB each)" + ) + print( + f"Expected total data: " + f"{NUM_GENERATORS * ITEMS_PER_GENERATOR * ITEM_SIZE_MB / 1024:.2f} GB" + ) + + # Distribute generators across nodes to maximize chaos impact + tasks = [] + for i in range(NUM_GENERATORS): + node = alive_nodes[i % len(alive_nodes)] + node_id = node["NodeID"] + + task = consume_streaming_generator.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + node_id=node_id, soft=True + ) + ).remote(ITEMS_PER_GENERATOR, ITEM_SIZE_MB) + tasks.append(task) + + results = ray.get(tasks) + + total_items = sum(count for count, _ in results) + total_bytes = sum(bytes_val for _, bytes_val in results) + + print("All generators completed:") + print( + f" Total items: {total_items} (expected {NUM_GENERATORS * ITEMS_PER_GENERATOR})" + ) + print(f" Total data: {total_bytes / (1024**3):.2f} GB") + + assert ( + total_items == NUM_GENERATORS * ITEMS_PER_GENERATOR + ), f"Expected {NUM_GENERATORS * ITEMS_PER_GENERATOR} items, got {total_items}" + + # Consistency check + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) diff --git a/release/nightly_tests/chaos_test/task_workload.py b/release/nightly_tests/chaos_test/task_workload.py new file mode 100644 index 000000000000..53a426f7fab8 --- /dev/null +++ b/release/nightly_tests/chaos_test/task_workload.py @@ -0,0 +1,48 @@ +import random +import string +import time + +import numpy as np + +import ray +from ray._common.test_utils import wait_for_condition +from ray.data._internal.progress_bar import ProgressBar + + +def run_task_workload(total_num_cpus, smoke): + """Run task-based workload that doesn't require object reconstruction.""" + + @ray.remote(num_cpus=1, max_retries=-1) + def task(): + def generate_data(size_in_kb=10): + return np.zeros(1024 * size_in_kb, dtype=np.uint8) + + a = "" + for _ in range(100000): + a = a + random.choice(string.ascii_letters) + return generate_data(size_in_kb=50) + + @ray.remote(num_cpus=1, max_retries=-1) + def invoke_nested_task(): + time.sleep(0.8) + return ray.get(task.remote()) + + multiplier = 75 + # For smoke mode, run fewer tasks + if smoke: + multiplier = 1 + TOTAL_TASKS = int(total_num_cpus * 2 * multiplier) + + pb = ProgressBar("Chaos test", TOTAL_TASKS, "task") + results = [invoke_nested_task.remote() for _ in range(TOTAL_TASKS)] + pb.block_until_complete(results) + pb.close() + + # Consistency check. + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) diff --git a/release/nightly_tests/chaos_test/test_chaos.py b/release/nightly_tests/chaos_test/test_chaos.py new file mode 100644 index 000000000000..21fd1317437c --- /dev/null +++ b/release/nightly_tests/chaos_test/test_chaos.py @@ -0,0 +1,105 @@ +import argparse +import json +import logging +import os +import time + +import ray +from ray._private.test_utils import monitor_memory_usage + +from task_workload import run_task_workload +from actor_workload import run_actor_workload +from streaming_generator_workload import run_streaming_generator_workload +from object_ref_borrowing_workload import run_object_ref_borrowing_workload + + +def parse_script_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--node-kill-interval", type=int, default=60) + parser.add_argument("--workload", type=str) + parser.add_argument("--smoke", action="store_true") + parser.add_argument("--disable-resource-killer", action="store_true") + return parser.parse_known_args() + + +def main(): + """Test task/actor/streaming generator/object ref borrowing chaos test. + + It tests the following scenarios: + 1. Raylet failures: Done by an actor calling Raylet's Shutdown RPC. + 2. EC2 instance termination: Done by an actor terminating + EC2 instances via AWS SDK. + 3. Network failures: Done by injecting network failures via iptables or env variables. + """ + args, _ = parse_script_args() + logging.info("Received arguments: {}".format(args)) + ray.init(address="auto") + total_num_cpus = ray.cluster_resources()["CPU"] + monitor_actor = monitor_memory_usage() + + # Select the workload based on the argument + workload = None + if args.workload == "tasks": + workload = run_task_workload + elif args.workload == "actors": + workload = run_actor_workload + elif args.workload == "streaming": + workload = run_streaming_generator_workload + elif args.workload == "borrowing": + workload = run_object_ref_borrowing_workload + else: + assert False + + node_killer = None + + if args.disable_resource_killer: + print("ResourceKiller disabled") + else: + node_killer = ray.get_actor( + "ResourceKiller", namespace="release_test_namespace" + ) + node_killer.run.remote() + print("ResourceKiller started") + + start = time.time() + workload(total_num_cpus, args.smoke) + runtime_s = time.time() - start + runtime_s = round(runtime_s, 2) + print(f"Total runtime: {runtime_s}") + + if node_killer is not None: + node_killer.stop_run.remote() + print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}") + + used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) + used_gb = round(used_gb, 2) + print("Memory usage with failures.") + print(f"Peak memory usage: {used_gb}GB") + print(f"Peak memory usage per processes:\n {usage}") + + ray.get(monitor_actor.stop_run.remote()) + + results = { + "time": runtime_s, + "_peak_memory": used_gb, + "_peak_process_memory": usage, + } + + results["perf_metrics"] = [ + { + "perf_metric_name": f"chaos_{args.workload}_runtime_s", + "perf_metric_value": runtime_s, + "perf_metric_type": "LATENCY", + }, + { + "perf_metric_name": f"chaos_{args.workload}_peak_memory_gb", + "perf_metric_value": used_gb, + "perf_metric_type": "MEMORY", + }, + ] + + with open(os.environ["TEST_OUTPUT_JSON"], "w") as f: + json.dump(results, f) + + +main() diff --git a/release/nightly_tests/chaos_test/test_chaos_basic.py b/release/nightly_tests/chaos_test/test_chaos_basic.py deleted file mode 100644 index d6f07c0bde9a..000000000000 --- a/release/nightly_tests/chaos_test/test_chaos_basic.py +++ /dev/null @@ -1,233 +0,0 @@ -import argparse -import json -import logging -import os -import random -import string -import time - -import numpy as np - -import ray -from ray._common.test_utils import wait_for_condition -from ray._private.test_utils import monitor_memory_usage -from ray.data._internal.progress_bar import ProgressBar -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy - - -def run_task_workload(total_num_cpus, smoke): - """Run task-based workload that doesn't require object reconstruction.""" - - @ray.remote(num_cpus=1, max_retries=-1) - def task(): - def generate_data(size_in_kb=10): - return np.zeros(1024 * size_in_kb, dtype=np.uint8) - - a = "" - for _ in range(100000): - a = a + random.choice(string.ascii_letters) - return generate_data(size_in_kb=50) - - @ray.remote(num_cpus=1, max_retries=-1) - def invoke_nested_task(): - time.sleep(0.8) - return ray.get(task.remote()) - - multiplier = 75 - # For smoke mode, run fewer tasks - if smoke: - multiplier = 1 - TOTAL_TASKS = int(total_num_cpus * 2 * multiplier) - - pb = ProgressBar("Chaos test", TOTAL_TASKS, "task") - results = [invoke_nested_task.remote() for _ in range(TOTAL_TASKS)] - pb.block_until_complete(results) - pb.close() - - # Consistency check. - wait_for_condition( - lambda: ( - ray.cluster_resources().get("CPU", 0) - == ray.available_resources().get("CPU", 0) - ), - timeout=60, - ) - - -def run_actor_workload(total_num_cpus, smoke): - """Run actor-based workload. - - The test checks if actor restart -1 and task_retries -1 works - as expected. It basically requires many actors to report the - seqno to the centralized DB actor while there are failures. - If at least once is guaranteed upon failures, this test - shouldn't fail. - """ - - @ray.remote(num_cpus=0) - class DBActor: - def __init__(self): - self.letter_dict = set() - - def add(self, letter): - self.letter_dict.add(letter) - - def get(self): - return self.letter_dict - - @ray.remote(num_cpus=1, max_restarts=-1, max_task_retries=-1) - class ReportActor: - def __init__(self, db_actor): - self.db_actor = db_actor - - def add(self, letter): - ray.get(self.db_actor.add.remote(letter)) - - NUM_CPUS = int(total_num_cpus) - multiplier = 2 - # For smoke mode, run fewer tasks - if smoke: - multiplier = 1 - TOTAL_TASKS = int(300 * multiplier) - head_node_id = ray.get_runtime_context().get_node_id() - db_actors = [ - DBActor.options( - scheduling_strategy=NodeAffinitySchedulingStrategy( - node_id=head_node_id, soft=False - ) - ).remote() - for _ in range(NUM_CPUS) - ] - - pb = ProgressBar("Chaos test", TOTAL_TASKS * NUM_CPUS, "task") - actors = [] - for db_actor in db_actors: - actors.append(ReportActor.remote(db_actor)) - results = [] - highest_reported_num = 0 - for a in actors: - for _ in range(TOTAL_TASKS): - results.append(a.add.remote(str(highest_reported_num))) - highest_reported_num += 1 - pb.fetch_until_complete(results) - pb.close() - for actor in actors: - ray.kill(actor) - - # Consistency check - wait_for_condition( - lambda: ( - ray.cluster_resources().get("CPU", 0) - == ray.available_resources().get("CPU", 0) - ), - timeout=60, - ) - letter_set = set() - for db_actor in db_actors: - letter_set.update(ray.get(db_actor.get.remote())) - # Make sure the DB actor didn't lose any report. - # If this assert fails, that means at least once actor task semantic - # wasn't guaranteed. - for i in range(highest_reported_num): - assert str(i) in letter_set, i - - -def run_placement_group_workload(total_num_cpus, smoke): - raise NotImplementedError - - -def parse_script_args(): - parser = argparse.ArgumentParser() - parser.add_argument("--node-kill-interval", type=int, default=60) - parser.add_argument("--workload", type=str) - parser.add_argument("--smoke", action="store_true") - return parser.parse_known_args() - - -def main(): - """Test task/actor/placement group basic chaos test. - - It tests the following scenarios: - 1. Raylet failures: This is done by an actor calling Raylet's Shutdown RPC. - 2. EC2 instance termination: This is done by an actor terminating - EC2 instances via AWS SDK. - - Currently, the test runs in 3 steps. Each step records the - peak memory usage to observe the memory usage while there - are node failures. - - Step 1: Warm up the cluster. It is needed to pre-start workers - if necessary. - - Step 2: Start the test without a failure. - - Step 3: Start the test with constant node failures. - """ - args, _ = parse_script_args() - logging.info("Received arguments: {}".format(args)) - ray.init(address="auto") - total_num_cpus = ray.cluster_resources()["CPU"] - total_nodes = 0 - for n in ray.nodes(): - if n["Alive"]: - total_nodes += 1 - monitor_actor = monitor_memory_usage() - - workload = None - if args.workload == "tasks": - workload = run_task_workload - elif args.workload == "actors": - workload = run_actor_workload - elif args.workload == "pg": - workload = run_placement_group_workload - else: - assert False - - # Step 1 - print("Warm up... Prestarting workers if necessary.") - start = time.time() - workload(total_num_cpus, args.smoke) - print(f"Runtime when warm up: {time.time() - start}") - - # Step 2 - print("Running without failures") - start = time.time() - workload(total_num_cpus, args.smoke) - print(f"Runtime when there are no failures: {time.time() - start}") - used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) - print("Memory usage without failures.") - print(f"Peak memory usage: {round(used_gb, 2)}GB") - print(f"Peak memory usage per processes:\n {usage}") - - # Step 3 - print("Running with failures") - start = time.time() - node_killer = ray.get_actor("ResourceKiller", namespace="release_test_namespace") - node_killer.run.remote() - workload(total_num_cpus, args.smoke) - print(f"Runtime when there are many failures: {time.time() - start}") - print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}") - node_killer.stop_run.remote() - used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) - print("Memory usage with failures.") - print(f"Peak memory usage: {round(used_gb, 2)}GB") - print(f"Peak memory usage per processes:\n {usage}") - - # Report the result. - ray.get(monitor_actor.stop_run.remote()) - print( - "Total number of killed nodes: " - f"{ray.get(node_killer.get_total_killed.remote())}" - ) - with open(os.environ["TEST_OUTPUT_JSON"], "w") as f: - f.write( - json.dumps( - { - "_peak_memory": round(used_gb, 2), - "_peak_process_memory": usage, - } - ) - ) - - -main() diff --git a/release/release_tests.yaml b/release/release_tests.yaml index c568da07e363..5b7c0fda9170 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3319,6 +3319,31 @@ # Core Chaos tests ################## +- name: chaos_many_tasks_baseline + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: python chaos_test/test_chaos.py --workload=tasks --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + - name: chaos_many_tasks_kill_raylet python: "3.10" group: core-nightly-test @@ -3335,7 +3360,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start - script: python chaos_test/test_chaos_basic.py --workload=tasks + script: python chaos_test/test_chaos.py --workload=tasks variations: - __suffix__: aws @@ -3361,11 +3386,67 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance - script: python chaos_test/test_chaos_basic.py --workload=tasks + script: python chaos_test/test_chaos.py --workload=tasks variations: - __suffix__: aws +- name: chaos_many_tasks_iptable_failure_injection + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: + runtime_env: + - RAY_health_check_period_ms=10000 + - RAY_health_check_timeout_ms=100000 + - RAY_health_check_failure_threshold=10 + - RAY_gcs_rpc_server_connect_timeout_s=60 + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: > + python simulate_cross_az_network_failure.py --network-failure-interval 60 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=tasks --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_many_actors_baseline + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 4200 + wait_for_nodes: + num_nodes: 10 + script: python chaos_test/test_chaos.py --workload=actors --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + - name: chaos_many_actors_kill_raylet python: "3.10" group: core-nightly-test @@ -3382,7 +3463,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start - script: python chaos_test/test_chaos_basic.py --workload=actors + script: python chaos_test/test_chaos.py --workload=actors variations: - __suffix__: aws @@ -3408,10 +3489,247 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance - script: python chaos_test/test_chaos_basic.py --workload=actors + script: python chaos_test/test_chaos.py --workload=actors + + variations: + - __suffix__: aws + +- name: chaos_many_actors_iptable_failure_injection + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: + runtime_env: + - RAY_health_check_period_ms=10000 + - RAY_health_check_timeout_ms=100000 + - RAY_health_check_failure_threshold=10 + - RAY_gcs_rpc_server_connect_timeout_s=60 + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 4200 + wait_for_nodes: + num_nodes: 10 + script: > + python simulate_cross_az_network_failure.py --network-failure-interval 60 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=actors --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_streaming_generator_baseline + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: python chaos_test/test_chaos.py --workload=streaming --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_streaming_generator_kill_raylet + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + prepare: python setup_chaos.py --no-start --max-to-kill 10 + script: python chaos_test/test_chaos.py --workload=streaming + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_streaming_generator_terminate_instance + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance --max-to-kill 10 + script: python chaos_test/test_chaos.py --workload=streaming + + variations: + - __suffix__: aws + +- name: chaos_streaming_generator_iptable_failure_injection + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: + runtime_env: + - RAY_health_check_period_ms=10000 + - RAY_health_check_timeout_ms=100000 + - RAY_health_check_failure_threshold=10 + - RAY_gcs_rpc_server_connect_timeout_s=60 + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: > + python simulate_cross_az_network_failure.py --network-failure-interval 60 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=streaming --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_object_ref_borrowing_baseline + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: python chaos_test/test_chaos.py --workload=borrowing --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_object_ref_borrowing_kill_raylet + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + prepare: python setup_chaos.py --no-start + script: python chaos_test/test_chaos.py --workload=borrowing + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_object_ref_borrowing_terminate_instance + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance + script: python chaos_test/test_chaos.py --workload=borrowing + + variations: + - __suffix__: aws + +- name: chaos_object_ref_borrowing_iptable_failure_injection + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: + runtime_env: + - RAY_health_check_period_ms=10000 + - RAY_health_check_timeout_ms=100000 + - RAY_health_check_failure_threshold=10 + - RAY_gcs_rpc_server_connect_timeout_s=60 + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: > + python simulate_cross_az_network_failure.py --network-failure-interval 60 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=borrowing --disable-resource-killer variations: - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml - name: chaos_dask_on_ray_large_scale_test_no_spilling python: "3.10"