From 01610880870640d5441af965b3c541be12520a64 Mon Sep 17 00:00:00 2001 From: joshlee Date: Fri, 21 Nov 2025 01:06:13 +0000 Subject: [PATCH 01/11] Introduce core chaos network release tests + streaming gen and borrowing tests Signed-off-by: joshlee --- .../chaos_test/test_chaos_basic.py | 191 ++++++++++++++- release/release_tests.yaml | 228 ++++++++++++++++++ 2 files changed, 411 insertions(+), 8 deletions(-) diff --git a/release/nightly_tests/chaos_test/test_chaos_basic.py b/release/nightly_tests/chaos_test/test_chaos_basic.py index d6f07c0bde9a..7440361011a0 100644 --- a/release/nightly_tests/chaos_test/test_chaos_basic.py +++ b/release/nightly_tests/chaos_test/test_chaos_basic.py @@ -132,6 +132,162 @@ def add(self, letter): assert str(i) in letter_set, i +def run_streaming_generator_workload(total_num_cpus, smoke): + """Run streaming generator workload. + + The test runs 10 concurrent long-running streaming generators pinned to + different nodes. + This tests that streaming generators work correctly with retries when + there are node failures or transient network failures. + """ + + @ray.remote(num_cpus=1, max_retries=-1) + def streaming_generator(num_items, item_size_mb): + """Generator that yields large plasma objects.""" + for i in range(num_items): + data = np.zeros(item_size_mb * 1024 * 1024, dtype=np.uint8) + yield (i, data) + + @ray.remote(num_cpus=1, max_retries=-1) + def consume_streaming_generator(num_items, item_size_mb, node_name): + """Task that spawns and consumes a streaming generator.""" + print( + f"Starting streaming generator on {node_name}: " + f"{num_items} items of {item_size_mb}MB each" + ) + + gen = streaming_generator.remote(num_items, item_size_mb) + + count = 0 + total_bytes = 0 + for idx, data in gen: + count += 1 + total_bytes += data.nbytes + + print( + f"Completed streaming generator on {node_name}: " + f"{count} items, {total_bytes / (1024**3):.2f} GB" + ) + return (count, total_bytes) + + alive_nodes = [n for n in ray.nodes() if n.get("Alive", False)] + + NUM_GENERATORS = len(alive_nodes) + # For smoke mode, run fewer items + if smoke: + ITEMS_PER_GENERATOR = 10 + else: + ITEMS_PER_GENERATOR = 300 + ITEM_SIZE_MB = 10 + + print( + f"Starting {NUM_GENERATORS} concurrent streaming generators " + f"({ITEMS_PER_GENERATOR} items of {ITEM_SIZE_MB}MB each)" + ) + print( + f"Expected total data: " + f"{NUM_GENERATORS * ITEMS_PER_GENERATOR * ITEM_SIZE_MB / 1024:.2f} GB" + ) + + # Launch generators on different nodes in parallel + tasks = [] + for i in range(NUM_GENERATORS): + node = alive_nodes[i % len(alive_nodes)] + node_id = node["NodeID"] + node_name = node.get("NodeName", node_id[:8]) + + task = consume_streaming_generator.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + node_id=node_id, soft=False + ) + ).remote(ITEMS_PER_GENERATOR, ITEM_SIZE_MB, node_name) + tasks.append(task) + + results = ray.get(tasks) + + total_items = sum(count for count, _ in results) + total_bytes = sum(bytes_val for _, bytes_val in results) + + print("All generators completed:") + print( + f" Total items: {total_items} (expected {NUM_GENERATORS * ITEMS_PER_GENERATOR})" + ) + print(f" Total data: {total_bytes / (1024**3):.2f} GB") + + # Verify all items were received + assert ( + total_items == NUM_GENERATORS * ITEMS_PER_GENERATOR + ), f"Expected {NUM_GENERATORS * ITEMS_PER_GENERATOR} items, got {total_items}" + + # Consistency check + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) + + +def run_object_ref_borrowing_workload(total_num_cpus, smoke): + """Run object ref borrowing workload. + + This test checks that borrowed refs + remain valid even with node failures or transient network failures. + """ + + @ray.remote(num_cpus=1, max_retries=-1) + def create_object(size_mb): + data = np.zeros(size_mb * 1024 * 1024, dtype=np.uint8) + return data + + @ray.remote(num_cpus=1, max_retries=-1) + def borrow_object(borrowed_ref): + data = ray.get(borrowed_ref) + return len(data) + + # For smoke mode, run fewer iterations + if smoke: + NUM_ITERATIONS = 10 + else: + NUM_ITERATIONS = 1000 + OBJECT_SIZE_MB = 10 + + print(f"Starting {NUM_ITERATIONS} task pairs (A creates, B borrows)") + print(f"Object size: {OBJECT_SIZE_MB}MB per object") + print(f"Expected total data: {NUM_ITERATIONS * OBJECT_SIZE_MB / 1024:.2f} GB") + + refs = [] + for i in range(NUM_ITERATIONS): + ref = create_object.remote(OBJECT_SIZE_MB) + refs.append(borrow_object.remote(ref)) + sizes = ray.get(refs) + num_completed = len(sizes) + total_bytes = sum(sizes) + + print("All tasks completed:") + print(f" Tasks completed: {num_completed} (expected {NUM_ITERATIONS})") + print(f" Total data processed: {total_bytes / (1024**3):.2f} GB") + + # Assertions + assert ( + num_completed == NUM_ITERATIONS + ), f"Expected {NUM_ITERATIONS} completions, got {num_completed}" + expected_bytes = NUM_ITERATIONS * OBJECT_SIZE_MB * 1024 * 1024 + assert ( + total_bytes == expected_bytes + ), f"Expected {expected_bytes} bytes, got {total_bytes}" + + # Consistency check + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) + + def run_placement_group_workload(total_num_cpus, smoke): raise NotImplementedError @@ -180,6 +336,10 @@ def main(): workload = run_actor_workload elif args.workload == "pg": workload = run_placement_group_workload + elif args.workload == "streaming": + workload = run_streaming_generator_workload + elif args.workload == "borrowing": + workload = run_object_ref_borrowing_workload else: assert False @@ -202,12 +362,26 @@ def main(): # Step 3 print("Running with failures") start = time.time() - node_killer = ray.get_actor("ResourceKiller", namespace="release_test_namespace") - node_killer.run.remote() + node_killer = None + try: + node_killer = ray.get_actor( + "ResourceKiller", namespace="release_test_namespace" + ) + node_killer.run.remote() + print("ResourceKiller found and started") + except ValueError: + print( + "ResourceKiller not found - assuming external chaos injection " + "(e.g., iptables, network failures)" + ) + workload(total_num_cpus, args.smoke) print(f"Runtime when there are many failures: {time.time() - start}") - print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}") - node_killer.stop_run.remote() + + if node_killer is not None: + print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}") + node_killer.stop_run.remote() + used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) print("Memory usage with failures.") print(f"Peak memory usage: {round(used_gb, 2)}GB") @@ -215,10 +389,11 @@ def main(): # Report the result. ray.get(monitor_actor.stop_run.remote()) - print( - "Total number of killed nodes: " - f"{ray.get(node_killer.get_total_killed.remote())}" - ) + if node_killer is not None: + print( + "Total number of killed nodes: " + f"{ray.get(node_killer.get_total_killed.remote())}" + ) with open(os.environ["TEST_OUTPUT_JSON"], "w") as f: f.write( json.dumps( diff --git a/release/release_tests.yaml b/release/release_tests.yaml index c568da07e363..3b8b358cfad8 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3366,6 +3366,37 @@ variations: - __suffix__: aws +- name: chaos_many_tasks_iptable_failure_injection + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: + runtime_env: + - RAY_health_check_period_ms=10000 + - RAY_health_check_timeout_ms=100000 + - RAY_health_check_failure_threshold=10 + - RAY_gcs_rpc_server_connect_timeout_s=60 + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: > + python simulate_cross_az_network_failure.py --network-failure-interval 210 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=tasks + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + - name: chaos_many_actors_kill_raylet python: "3.10" group: core-nightly-test @@ -3413,6 +3444,203 @@ variations: - __suffix__: aws +- name: chaos_many_actors_iptable_failure_injection + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: + runtime_env: + - RAY_health_check_period_ms=10000 + - RAY_health_check_timeout_ms=100000 + - RAY_health_check_failure_threshold=10 + - RAY_gcs_rpc_server_connect_timeout_s=60 + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 4200 + wait_for_nodes: + num_nodes: 10 + script: > + python simulate_cross_az_network_failure.py --network-failure-interval 210 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=actors + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_streaming_generator_kill_raylet + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + prepare: python setup_chaos.py --no-start --max-to-kill 5 + script: python chaos_test/test_chaos_basic.py --workload=streaming + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_streaming_generator_terminate_instance + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance --max-to-kill 5 + script: python chaos_test/test_chaos_basic.py --workload=streaming + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_streaming_generator_iptable_failure_injection + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: + runtime_env: + - RAY_health_check_period_ms=10000 + - RAY_health_check_timeout_ms=100000 + - RAY_health_check_failure_threshold=10 + - RAY_gcs_rpc_server_connect_timeout_s=60 + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: > + python simulate_cross_az_network_failure.py --network-failure-interval 210 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=streaming + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_object_ref_borrowing_kill_raylet + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + prepare: python setup_chaos.py --no-start + script: python chaos_test/test_chaos_basic.py --workload=borrowing + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_object_ref_borrowing_terminate_instance + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance + script: python chaos_test/test_chaos_basic.py --workload=borrowing + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_object_ref_borrowing_iptable_failure_injection + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: + runtime_env: + - RAY_health_check_period_ms=10000 + - RAY_health_check_timeout_ms=100000 + - RAY_health_check_failure_threshold=10 + - RAY_gcs_rpc_server_connect_timeout_s=60 + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: > + python simulate_cross_az_network_failure.py --network-failure-interval 210 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=borrowing + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + - name: chaos_dask_on_ray_large_scale_test_no_spilling python: "3.10" group: data-tests From 4767e08016deccc8cf81eb1b9d525aaab2ab022a Mon Sep 17 00:00:00 2001 From: joshlee Date: Fri, 21 Nov 2025 01:58:22 +0000 Subject: [PATCH 02/11] Fix pytest errors Signed-off-by: joshlee --- release/nightly_tests/chaos_test/test_chaos_basic.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/release/nightly_tests/chaos_test/test_chaos_basic.py b/release/nightly_tests/chaos_test/test_chaos_basic.py index 7440361011a0..90f2edd2aa4d 100644 --- a/release/nightly_tests/chaos_test/test_chaos_basic.py +++ b/release/nightly_tests/chaos_test/test_chaos_basic.py @@ -198,7 +198,7 @@ def consume_streaming_generator(num_items, item_size_mb, node_name): task = consume_streaming_generator.options( scheduling_strategy=NodeAffinitySchedulingStrategy( - node_id=node_id, soft=False + node_id=node_id, soft=True ) ).remote(ITEMS_PER_GENERATOR, ITEM_SIZE_MB, node_name) tasks.append(task) @@ -242,8 +242,9 @@ def create_object(size_mb): return data @ray.remote(num_cpus=1, max_retries=-1) - def borrow_object(borrowed_ref): - data = ray.get(borrowed_ref) + def borrow_object(borrowed_refs): + """Receives a list of borrowed refs, gets the first one, returns size.""" + data = ray.get(borrowed_refs[0]) return len(data) # For smoke mode, run fewer iterations @@ -260,7 +261,7 @@ def borrow_object(borrowed_ref): refs = [] for i in range(NUM_ITERATIONS): ref = create_object.remote(OBJECT_SIZE_MB) - refs.append(borrow_object.remote(ref)) + refs.append(borrow_object.remote([ref])) sizes = ray.get(refs) num_completed = len(sizes) total_bytes = sum(sizes) From 40ecc32d4b7bdbb3140479d7b68bc9d0705230a6 Mon Sep 17 00:00:00 2001 From: joshlee Date: Fri, 21 Nov 2025 05:09:52 +0000 Subject: [PATCH 03/11] Fixing tests Signed-off-by: joshlee --- .../chaos_test/test_chaos_basic.py | 22 ++++++------------- release/release_tests.yaml | 8 +++---- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/release/nightly_tests/chaos_test/test_chaos_basic.py b/release/nightly_tests/chaos_test/test_chaos_basic.py index 90f2edd2aa4d..8ef9e23c0d94 100644 --- a/release/nightly_tests/chaos_test/test_chaos_basic.py +++ b/release/nightly_tests/chaos_test/test_chaos_basic.py @@ -146,28 +146,21 @@ def streaming_generator(num_items, item_size_mb): """Generator that yields large plasma objects.""" for i in range(num_items): data = np.zeros(item_size_mb * 1024 * 1024, dtype=np.uint8) - yield (i, data) + yield data @ray.remote(num_cpus=1, max_retries=-1) - def consume_streaming_generator(num_items, item_size_mb, node_name): + def consume_streaming_generator(num_items, item_size_mb): """Task that spawns and consumes a streaming generator.""" - print( - f"Starting streaming generator on {node_name}: " - f"{num_items} items of {item_size_mb}MB each" - ) - gen = streaming_generator.remote(num_items, item_size_mb) count = 0 total_bytes = 0 - for idx, data in gen: + for item_ref in gen: + # Each yielded item is an ObjectRef, need to get it + data = ray.get(item_ref) count += 1 total_bytes += data.nbytes - print( - f"Completed streaming generator on {node_name}: " - f"{count} items, {total_bytes / (1024**3):.2f} GB" - ) return (count, total_bytes) alive_nodes = [n for n in ray.nodes() if n.get("Alive", False)] @@ -194,13 +187,12 @@ def consume_streaming_generator(num_items, item_size_mb, node_name): for i in range(NUM_GENERATORS): node = alive_nodes[i % len(alive_nodes)] node_id = node["NodeID"] - node_name = node.get("NodeName", node_id[:8]) task = consume_streaming_generator.options( scheduling_strategy=NodeAffinitySchedulingStrategy( node_id=node_id, soft=True ) - ).remote(ITEMS_PER_GENERATOR, ITEM_SIZE_MB, node_name) + ).remote(ITEMS_PER_GENERATOR, ITEM_SIZE_MB) tasks.append(task) results = ray.get(tasks) @@ -251,7 +243,7 @@ def borrow_object(borrowed_refs): if smoke: NUM_ITERATIONS = 10 else: - NUM_ITERATIONS = 1000 + NUM_ITERATIONS = 1500 OBJECT_SIZE_MB = 10 print(f"Starting {NUM_ITERATIONS} task pairs (A creates, B borrows)") diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 3b8b358cfad8..566a678a721a 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3387,7 +3387,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 210 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=tasks + python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=tasks variations: - __suffix__: aws @@ -3465,7 +3465,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 210 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=actors + python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=actors variations: - __suffix__: aws @@ -3548,7 +3548,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 210 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=streaming + python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=streaming variations: - __suffix__: aws @@ -3631,7 +3631,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 210 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=borrowing + python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=borrowing variations: - __suffix__: aws From 3ddef9f673d9efc9c9c52a08bb895fc5a67096c4 Mon Sep 17 00:00:00 2001 From: joshlee Date: Fri, 21 Nov 2025 08:04:50 +0000 Subject: [PATCH 04/11] Fixing python tests Signed-off-by: joshlee --- .../chaos_test/test_chaos_basic.py | 35 ++++++++++++------- release/release_tests.yaml | 8 ++--- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/release/nightly_tests/chaos_test/test_chaos_basic.py b/release/nightly_tests/chaos_test/test_chaos_basic.py index 8ef9e23c0d94..f53329263fa4 100644 --- a/release/nightly_tests/chaos_test/test_chaos_basic.py +++ b/release/nightly_tests/chaos_test/test_chaos_basic.py @@ -170,7 +170,7 @@ def consume_streaming_generator(num_items, item_size_mb): if smoke: ITEMS_PER_GENERATOR = 10 else: - ITEMS_PER_GENERATOR = 300 + ITEMS_PER_GENERATOR = 500 ITEM_SIZE_MB = 10 print( @@ -243,33 +243,44 @@ def borrow_object(borrowed_refs): if smoke: NUM_ITERATIONS = 10 else: - NUM_ITERATIONS = 1500 + NUM_ITERATIONS = 2000 OBJECT_SIZE_MB = 10 - print(f"Starting {NUM_ITERATIONS} task pairs (A creates, B borrows)") + print(f"Starting borrowing test with {NUM_ITERATIONS * 2} total tasks") print(f"Object size: {OBJECT_SIZE_MB}MB per object") - print(f"Expected total data: {NUM_ITERATIONS * OBJECT_SIZE_MB / 1024:.2f} GB") + print(f"Expected total data: {NUM_ITERATIONS * 2 * OBJECT_SIZE_MB / 1024:.2f} GB") + + total_completed = 0 + total_bytes = 0 + + for i in range(NUM_ITERATIONS): + ref = create_object.remote(OBJECT_SIZE_MB) + task_ref = borrow_object.remote([ref]) + size = ray.get(task_ref) + total_completed += 1 + total_bytes += size refs = [] for i in range(NUM_ITERATIONS): ref = create_object.remote(OBJECT_SIZE_MB) refs.append(borrow_object.remote([ref])) sizes = ray.get(refs) - num_completed = len(sizes) - total_bytes = sum(sizes) + total_completed += len(sizes) + total_bytes += sum(sizes) print("All tasks completed:") - print(f" Tasks completed: {num_completed} (expected {NUM_ITERATIONS})") + print(f" Total tasks completed: {total_completed} (expected {NUM_ITERATIONS * 2})") print(f" Total data processed: {total_bytes / (1024**3):.2f} GB") # Assertions + expected_total_tasks = NUM_ITERATIONS * 2 assert ( - num_completed == NUM_ITERATIONS - ), f"Expected {NUM_ITERATIONS} completions, got {num_completed}" - expected_bytes = NUM_ITERATIONS * OBJECT_SIZE_MB * 1024 * 1024 + total_completed == expected_total_tasks + ), f"Expected {expected_total_tasks} completions, got {total_completed}" + expected_total_bytes = expected_total_tasks * OBJECT_SIZE_MB * 1024 * 1024 assert ( - total_bytes == expected_bytes - ), f"Expected {expected_bytes} bytes, got {total_bytes}" + total_bytes == expected_total_bytes + ), f"Expected {expected_total_bytes} bytes, got {total_bytes}" # Consistency check wait_for_condition( diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 566a678a721a..f2f34926abe9 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3490,7 +3490,7 @@ timeout: 3600 wait_for_nodes: num_nodes: 10 - prepare: python setup_chaos.py --no-start --max-to-kill 5 + prepare: python setup_chaos.py --no-start --max-to-kill 10 script: python chaos_test/test_chaos_basic.py --workload=streaming variations: @@ -3516,7 +3516,7 @@ timeout: 3600 wait_for_nodes: num_nodes: 10 - prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance --max-to-kill 5 + prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance --max-to-kill 10 script: python chaos_test/test_chaos_basic.py --workload=streaming variations: @@ -3548,7 +3548,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=streaming + python simulate_cross_az_network_failure.py --network-failure-interval 120 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=streaming variations: - __suffix__: aws @@ -3631,7 +3631,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=borrowing + python simulate_cross_az_network_failure.py --network-failure-interval 120 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=borrowing variations: - __suffix__: aws From b811b8159f6b394cbd9f768c1e5ee14fc72a20dd Mon Sep 17 00:00:00 2001 From: joshlee Date: Fri, 21 Nov 2025 09:41:41 +0000 Subject: [PATCH 05/11] Cleaning up python tests Signed-off-by: joshlee --- .../chaos_test/test_chaos_basic.py | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/release/nightly_tests/chaos_test/test_chaos_basic.py b/release/nightly_tests/chaos_test/test_chaos_basic.py index f53329263fa4..852e2dcaf498 100644 --- a/release/nightly_tests/chaos_test/test_chaos_basic.py +++ b/release/nightly_tests/chaos_test/test_chaos_basic.py @@ -143,29 +143,24 @@ def run_streaming_generator_workload(total_num_cpus, smoke): @ray.remote(num_cpus=1, max_retries=-1) def streaming_generator(num_items, item_size_mb): - """Generator that yields large plasma objects.""" for i in range(num_items): data = np.zeros(item_size_mb * 1024 * 1024, dtype=np.uint8) yield data @ray.remote(num_cpus=1, max_retries=-1) def consume_streaming_generator(num_items, item_size_mb): - """Task that spawns and consumes a streaming generator.""" gen = streaming_generator.remote(num_items, item_size_mb) count = 0 total_bytes = 0 for item_ref in gen: - # Each yielded item is an ObjectRef, need to get it data = ray.get(item_ref) count += 1 total_bytes += data.nbytes return (count, total_bytes) - alive_nodes = [n for n in ray.nodes() if n.get("Alive", False)] - - NUM_GENERATORS = len(alive_nodes) + NUM_GENERATORS = 10 # For smoke mode, run fewer items if smoke: ITEMS_PER_GENERATOR = 10 @@ -182,17 +177,10 @@ def consume_streaming_generator(num_items, item_size_mb): f"{NUM_GENERATORS * ITEMS_PER_GENERATOR * ITEM_SIZE_MB / 1024:.2f} GB" ) - # Launch generators on different nodes in parallel + # Launch generators in parallel tasks = [] for i in range(NUM_GENERATORS): - node = alive_nodes[i % len(alive_nodes)] - node_id = node["NodeID"] - - task = consume_streaming_generator.options( - scheduling_strategy=NodeAffinitySchedulingStrategy( - node_id=node_id, soft=True - ) - ).remote(ITEMS_PER_GENERATOR, ITEM_SIZE_MB) + task = consume_streaming_generator.remote(ITEMS_PER_GENERATOR, ITEM_SIZE_MB) tasks.append(task) results = ray.get(tasks) @@ -235,7 +223,6 @@ def create_object(size_mb): @ray.remote(num_cpus=1, max_retries=-1) def borrow_object(borrowed_refs): - """Receives a list of borrowed refs, gets the first one, returns size.""" data = ray.get(borrowed_refs[0]) return len(data) From 6f7264223996e1e97c63e6585291ab06809855a8 Mon Sep 17 00:00:00 2001 From: joshlee Date: Sat, 22 Nov 2025 01:23:00 +0000 Subject: [PATCH 06/11] Addressing comments + refactor Signed-off-by: joshlee --- python/ray/tests/test_chaos.py | 319 -------------- .../chaos_test/actor_workload.py | 82 ++++ .../object_ref_borrowing_workload.py | 73 ++++ .../streaming_generator_workload.py | 91 ++++ .../nightly_tests/chaos_test/task_workload.py | 48 +++ .../nightly_tests/chaos_test/test_chaos.py | 106 +++++ .../chaos_test/test_chaos_basic.py | 399 ------------------ release/release_tests.yaml | 124 +++++- 8 files changed, 512 insertions(+), 730 deletions(-) delete mode 100644 python/ray/tests/test_chaos.py create mode 100644 release/nightly_tests/chaos_test/actor_workload.py create mode 100644 release/nightly_tests/chaos_test/object_ref_borrowing_workload.py create mode 100644 release/nightly_tests/chaos_test/streaming_generator_workload.py create mode 100644 release/nightly_tests/chaos_test/task_workload.py create mode 100644 release/nightly_tests/chaos_test/test_chaos.py delete mode 100644 release/nightly_tests/chaos_test/test_chaos_basic.py diff --git a/python/ray/tests/test_chaos.py b/python/ray/tests/test_chaos.py deleted file mode 100644 index 19458773c2f4..000000000000 --- a/python/ray/tests/test_chaos.py +++ /dev/null @@ -1,319 +0,0 @@ -import random -import sys -import time - -import pytest - -import ray -from ray._common.test_utils import wait_for_condition -from ray._private.test_utils import ( - RayletKiller, - WorkerKillerActor, - get_and_run_resource_killer, - get_log_message, -) -from ray.cluster_utils import AutoscalingCluster -from ray.exceptions import ObjectLostError, RayTaskError -from ray.experimental import shuffle -from ray.tests.conftest import _ray_start_chaos_cluster -from ray.util.placement_group import placement_group -from ray.util.state.api import StateApiClient, list_nodes -from ray.util.state.common import ListApiOptions, StateResource - - -def assert_no_system_failure(p, timeout): - # Get all logs for 20 seconds. - logs = get_log_message(p, timeout=timeout) - for log in logs: - assert "SIG" not in log, "There's the segfault or SIGBART reported." - assert "Check failed" not in log, "There's the check failure reported." - - -@pytest.fixture -def set_kill_interval(request): - lineage_reconstruction_enabled, kill_interval = request.param - - request.param = { - "_system_config": { - "lineage_pinning_enabled": lineage_reconstruction_enabled, - "max_direct_call_object_size": 1000, - }, - "kill_interval": kill_interval, - "head_resources": { - "CPU": 0, - }, - "worker_node_types": { - "cpu_node": { - "resources": { - "CPU": 2, - }, - "node_config": { - "object_store_memory": int(200e6), - }, - "min_workers": 0, - "max_workers": 3, - }, - }, - } - cluster_fixture = _ray_start_chaos_cluster(request) - for x in cluster_fixture: - yield (lineage_reconstruction_enabled, kill_interval, cluster_fixture) - - -@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -@pytest.mark.parametrize( - "set_kill_interval", - [(True, None), (True, 20), (False, None), (False, 20)], - indirect=True, -) -def test_chaos_actor_retry(set_kill_interval): - # Chaos testing. - @ray.remote(num_cpus=0.25, max_restarts=-1, max_task_retries=-1) - class Actor: - def __init__(self): - self.letter_dict = set() - - def add(self, letter): - self.letter_dict.add(letter) - - NUM_CPUS = 16 - TOTAL_TASKS = 300 - - actors = [Actor.remote() for _ in range(NUM_CPUS)] - results = [] - for a in actors: - results.extend([a.add.remote(str(i)) for i in range(TOTAL_TASKS)]) - - start = time.time() - ray.get(results) - runtime_with_failure = time.time() - start - print(f"Runtime when there are many failures: {runtime_with_failure}") - - # TODO(sang): Currently, there are lots of SIGBART with - # plasma client failures. Fix it. - # assert_no_system_failure(p, 10) - - -def test_chaos_defer(monkeypatch, ray_start_cluster): - with monkeypatch.context() as m: - # defer for 3s - m.setenv( - "RAY_testing_asio_delay_us", - "NodeManagerService.grpc_client.PrepareBundleResources=2000000:2000000", - ) - m.setenv("RAY_event_stats", "true") - cluster = ray_start_cluster - cluster.add_node(num_cpus=1, object_store_memory=1e9) - cluster.wait_for_nodes() - ray.init(address="auto") # this will connect to gpu nodes - cluster.add_node(num_cpus=0, num_gpus=1) - bundle = [{"GPU": 1}, {"CPU": 1}] - pg = placement_group(bundle) - # PG will not be ready within 3s - with pytest.raises(ray.exceptions.GetTimeoutError): - ray.get(pg.ready(), timeout=1) - # it'll be ready eventually - ray.get(pg.ready()) - - -@ray.remote(num_cpus=0) -class ShuffleStatusTracker: - def __init__(self): - self.num_map = 0 - self.num_reduce = 0 - self.map_refs = [] - self.reduce_refs = [] - - def register_objectrefs(self, map_refs, reduce_refs): - self.map_refs = map_refs - self.reduce_refs = reduce_refs - - def get_progress(self): - if self.map_refs: - ready, self.map_refs = ray.wait( - self.map_refs, - timeout=1, - num_returns=len(self.map_refs), - fetch_local=False, - ) - if ready: - print("Still waiting on map refs", self.map_refs) - self.num_map += len(ready) - elif self.reduce_refs: - ready, self.reduce_refs = ray.wait( - self.reduce_refs, - timeout=1, - num_returns=len(self.reduce_refs), - fetch_local=False, - ) - if ready: - print("Still waiting on reduce refs", self.reduce_refs) - self.num_reduce += len(ready) - return self.num_map, self.num_reduce - - -@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -@pytest.mark.parametrize( - "set_kill_interval", [(False, None), (False, 60)], indirect=True -) -def test_nonstreaming_shuffle(set_kill_interval): - lineage_reconstruction_enabled, kill_interval, _ = set_kill_interval - try: - # Create our own tracker so that it gets scheduled onto the head node. - tracker = ShuffleStatusTracker.remote() - ray.get(tracker.get_progress.remote()) - assert len(ray.nodes()) == 1, ( - "Tracker actor may have been scheduled to remote node " - "and may get killed during the test" - ) - - shuffle.run( - ray_address="auto", - no_streaming=True, - num_partitions=200, - partition_size=1e6, - tracker=tracker, - ) - except (RayTaskError, ObjectLostError): - assert kill_interval is not None - assert not lineage_reconstruction_enabled - - -@pytest.mark.skip(reason="https://github.com/ray-project/ray/issues/20713") -@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -@pytest.mark.parametrize( - "set_kill_interval", - [(True, None), (True, 60), (False, None), (False, 60)], - indirect=True, -) -def test_streaming_shuffle(set_kill_interval): - lineage_reconstruction_enabled, kill_interval, _ = set_kill_interval - try: - # Create our own tracker so that it gets scheduled onto the head node. - tracker = ShuffleStatusTracker.remote() - ray.get(tracker.get_progress.remote()) - assert len(ray.nodes()) == 1, ( - "Tracker actor may have been scheduled to remote node " - "and may get killed during the test" - ) - - shuffle.run( - ray_address="auto", - no_streaming=False, - num_partitions=200, - partition_size=1e6, - tracker=tracker, - ) - except (RayTaskError, ObjectLostError): - assert kill_interval is not None - - # TODO(swang): Enable this once we implement support ray.put. - # assert not lineage_reconstruction_enabled - - -def test_worker_killer(): - ray.init() - - task_name = "worker_to_kill" - - @ray.remote - def worker_to_kill(): - time.sleep(3) - - # Run WorkerKillerActor to kill 3 tasks, and run remote "worker_to_kill" - # task with max_retries=3. 4 tasks in total (1 initial + 3 retries). - # First 3 tasks will be killed, the last retry will succeed. - worker_killer = get_and_run_resource_killer( - WorkerKillerActor, - 1, - max_to_kill=3, - kill_filter_fn=lambda: lambda task: task.name == task_name, - ) - worker_to_kill.options(name=task_name, max_retries=3).remote() - - def check(): - tasks = StateApiClient().list( - StateResource.TASKS, - options=ListApiOptions(filters=[("name", "=", task_name)]), - raise_on_missing_output=False, - ) - failed = 0 - finished = 0 - for task in tasks: - if task.state == "FAILED": - failed += 1 - elif task.state == "FINISHED": - finished += 1 - return failed == 3 and finished == 1 - - wait_for_condition(check, timeout=20) - - killed_tasks = ray.get(worker_killer.get_total_killed.remote()) - assert len(killed_tasks) == 3 - - tasks = StateApiClient().list( - StateResource.TASKS, - options=ListApiOptions(filters=[("name", "=", task_name)]), - raise_on_missing_output=False, - ) - for task in tasks: - if task.state == "FAILED": - assert (task.task_id, task.worker_pid) in killed_tasks - - ray.shutdown() - - -@pytest.mark.parametrize( - "autoscaler_v2", - [False, True], - ids=["v1", "v2"], -) -def test_node_killer_filter(autoscaler_v2): - # Initialize cluster with 1 head node and 2 worker nodes. - try: - cluster = AutoscalingCluster( - head_resources={"CPU": 0}, - worker_node_types={ - "cpu_node": { - "resources": { - "CPU": 1, - }, - "node_config": {}, - "min_workers": 2, - "max_workers": 2, - }, - }, - autoscaler_v2=autoscaler_v2, - idle_timeout_minutes=999, # it could idle killed before the killer. - ) - cluster.start() - ray.init() - - wait_for_condition(lambda: len(list_nodes()) > 2) - - # Choose random worker node to kill. - worker_nodes = [node for node in list_nodes() if not node["is_head_node"]] - node_to_kill = random.choice(worker_nodes) - node_killer = get_and_run_resource_killer( - RayletKiller, - 1, - max_to_kill=1, - kill_filter_fn=lambda: lambda node: node["NodeID"] == node_to_kill.node_id, - ) - - def check_killed(): - # Check that killed node is consistent across list_nodes() - killed = list(ray.get(node_killer.get_total_killed.remote())) - dead = [node.node_id for node in list_nodes() if node.state == "DEAD"] - if len(killed) != 1 or len(dead) != 1: - return False - return killed[0] == dead[0] == node_to_kill.node_id - - wait_for_condition(check_killed, timeout=100) - finally: - cluster.shutdown() - ray.shutdown() - - -if __name__ == "__main__": - sys.exit(pytest.main(["-sv", __file__])) diff --git a/release/nightly_tests/chaos_test/actor_workload.py b/release/nightly_tests/chaos_test/actor_workload.py new file mode 100644 index 000000000000..ad53976a54b7 --- /dev/null +++ b/release/nightly_tests/chaos_test/actor_workload.py @@ -0,0 +1,82 @@ +import ray +from ray._common.test_utils import wait_for_condition +from ray.data._internal.progress_bar import ProgressBar +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy + + +def run_actor_workload(total_num_cpus, smoke): + """Run actor-based workload. + + The test checks if actor restart -1 and task_retries -1 works + as expected. It basically requires many actors to report the + seqno to the centralized DB actor while there are failures. + If at least once is guaranteed upon failures, this test + shouldn't fail. + """ + + @ray.remote(num_cpus=0) + class DBActor: + def __init__(self): + self.letter_dict = set() + + def add(self, letter): + self.letter_dict.add(letter) + + def get(self): + return self.letter_dict + + @ray.remote(num_cpus=1, max_restarts=-1, max_task_retries=-1) + class ReportActor: + def __init__(self, db_actor): + self.db_actor = db_actor + + def add(self, letter): + ray.get(self.db_actor.add.remote(letter)) + + NUM_CPUS = int(total_num_cpus) + multiplier = 2 + # For smoke mode, run fewer tasks + if smoke: + multiplier = 1 + TOTAL_TASKS = int(300 * multiplier) + head_node_id = ray.get_runtime_context().get_node_id() + db_actors = [ + DBActor.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + node_id=head_node_id, soft=False + ) + ).remote() + for _ in range(NUM_CPUS) + ] + + pb = ProgressBar("Chaos test", TOTAL_TASKS * NUM_CPUS, "task") + actors = [] + for db_actor in db_actors: + actors.append(ReportActor.remote(db_actor)) + results = [] + highest_reported_num = 0 + for a in actors: + for _ in range(TOTAL_TASKS): + results.append(a.add.remote(str(highest_reported_num))) + highest_reported_num += 1 + pb.fetch_until_complete(results) + pb.close() + for actor in actors: + ray.kill(actor) + + # Consistency check + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) + letter_set = set() + for db_actor in db_actors: + letter_set.update(ray.get(db_actor.get.remote())) + # Make sure the DB actor didn't lose any report. + # If this assert fails, that means at least once actor task semantic + # wasn't guaranteed. + for i in range(highest_reported_num): + assert str(i) in letter_set, i diff --git a/release/nightly_tests/chaos_test/object_ref_borrowing_workload.py b/release/nightly_tests/chaos_test/object_ref_borrowing_workload.py new file mode 100644 index 000000000000..2a142632cb5f --- /dev/null +++ b/release/nightly_tests/chaos_test/object_ref_borrowing_workload.py @@ -0,0 +1,73 @@ +import numpy as np + +import ray +from ray._common.test_utils import wait_for_condition + + +def run_object_ref_borrowing_workload(total_num_cpus, smoke): + """Run object ref borrowing workload. + + This test checks that borrowed refs + remain valid even with node failures or transient network failures. + """ + + @ray.remote(num_cpus=1, max_retries=-1) + def create_object(size_mb): + data = np.zeros(size_mb * 1024 * 1024, dtype=np.uint8) + return data + + @ray.remote(num_cpus=1, max_retries=-1) + def borrow_object(borrowed_refs): + data = ray.get(borrowed_refs[0]) + return len(data) + + # For smoke mode, run fewer iterations + if smoke: + NUM_ITERATIONS = 10 + else: + NUM_ITERATIONS = 2000 + OBJECT_SIZE_MB = 10 + + print(f"Starting borrowing test with {NUM_ITERATIONS * 2} total tasks") + print(f"Object size: {OBJECT_SIZE_MB}MB per object") + print(f"Expected total data: {NUM_ITERATIONS * 2 * OBJECT_SIZE_MB / 1024:.2f} GB") + + total_completed = 0 + total_bytes = 0 + + for i in range(NUM_ITERATIONS): + ref = create_object.remote(OBJECT_SIZE_MB) + task_ref = borrow_object.remote([ref]) + size = ray.get(task_ref) + total_completed += 1 + total_bytes += size + + refs = [] + for i in range(NUM_ITERATIONS): + ref = create_object.remote(OBJECT_SIZE_MB) + refs.append(borrow_object.remote([ref])) + sizes = ray.get(refs) + total_completed += len(sizes) + total_bytes += sum(sizes) + + print("All tasks completed:") + print(f" Total tasks completed: {total_completed} (expected {NUM_ITERATIONS * 2})") + print(f" Total data processed: {total_bytes / (1024**3):.2f} GB") + + expected_total_tasks = NUM_ITERATIONS * 2 + assert ( + total_completed == expected_total_tasks + ), f"Expected {expected_total_tasks} completions, got {total_completed}" + expected_total_bytes = expected_total_tasks * OBJECT_SIZE_MB * 1024 * 1024 + assert ( + total_bytes == expected_total_bytes + ), f"Expected {expected_total_bytes} bytes, got {total_bytes}" + + # Consistency check + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) diff --git a/release/nightly_tests/chaos_test/streaming_generator_workload.py b/release/nightly_tests/chaos_test/streaming_generator_workload.py new file mode 100644 index 000000000000..d8bf6a917153 --- /dev/null +++ b/release/nightly_tests/chaos_test/streaming_generator_workload.py @@ -0,0 +1,91 @@ +import numpy as np + +import ray +from ray._common.test_utils import wait_for_condition +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy + + +def run_streaming_generator_workload(total_num_cpus, smoke): + """Run streaming generator workload. + + Spreads streaming generators across the nodes to ensure + chaos events affect the generators. Tests that streaming generators + work correctly with retries when there are node failures or transient + network failures. + """ + + @ray.remote(num_cpus=1, max_retries=-1) + def streaming_generator(num_items, item_size_mb): + for i in range(num_items): + data = np.zeros(item_size_mb * 1024 * 1024, dtype=np.uint8) + yield data + + @ray.remote(num_cpus=1, max_retries=-1) + def consume_streaming_generator(num_items, item_size_mb): + gen = streaming_generator.remote(num_items, item_size_mb) + + count = 0 + total_bytes = 0 + for item_ref in gen: + data = ray.get(item_ref) + count += 1 + total_bytes += data.nbytes + + return (count, total_bytes) + + # Get alive nodes to distribute generators across the cluster + alive_nodes = [n for n in ray.nodes() if n.get("Alive", False)] + NUM_GENERATORS = 2 * len(alive_nodes) + + # For smoke mode, run fewer items + if smoke: + ITEMS_PER_GENERATOR = 10 + else: + ITEMS_PER_GENERATOR = 500 + ITEM_SIZE_MB = 10 + + print( + f"Starting {NUM_GENERATORS} concurrent streaming generators " + f"({ITEMS_PER_GENERATOR} items of {ITEM_SIZE_MB}MB each)" + ) + print( + f"Expected total data: " + f"{NUM_GENERATORS * ITEMS_PER_GENERATOR * ITEM_SIZE_MB / 1024:.2f} GB" + ) + + # Distribute generators across nodes to maximize chaos impact + tasks = [] + for i in range(NUM_GENERATORS): + node = alive_nodes[i % len(alive_nodes)] + node_id = node["NodeID"] + + task = consume_streaming_generator.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + node_id=node_id, soft=True + ) + ).remote(ITEMS_PER_GENERATOR, ITEM_SIZE_MB) + tasks.append(task) + + results = ray.get(tasks) + + total_items = sum(count for count, _ in results) + total_bytes = sum(bytes_val for _, bytes_val in results) + + print("All generators completed:") + print( + f" Total items: {total_items} (expected {NUM_GENERATORS * ITEMS_PER_GENERATOR})" + ) + print(f" Total data: {total_bytes / (1024**3):.2f} GB") + + assert ( + total_items == NUM_GENERATORS * ITEMS_PER_GENERATOR + ), f"Expected {NUM_GENERATORS * ITEMS_PER_GENERATOR} items, got {total_items}" + + # Consistency check + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) diff --git a/release/nightly_tests/chaos_test/task_workload.py b/release/nightly_tests/chaos_test/task_workload.py new file mode 100644 index 000000000000..53a426f7fab8 --- /dev/null +++ b/release/nightly_tests/chaos_test/task_workload.py @@ -0,0 +1,48 @@ +import random +import string +import time + +import numpy as np + +import ray +from ray._common.test_utils import wait_for_condition +from ray.data._internal.progress_bar import ProgressBar + + +def run_task_workload(total_num_cpus, smoke): + """Run task-based workload that doesn't require object reconstruction.""" + + @ray.remote(num_cpus=1, max_retries=-1) + def task(): + def generate_data(size_in_kb=10): + return np.zeros(1024 * size_in_kb, dtype=np.uint8) + + a = "" + for _ in range(100000): + a = a + random.choice(string.ascii_letters) + return generate_data(size_in_kb=50) + + @ray.remote(num_cpus=1, max_retries=-1) + def invoke_nested_task(): + time.sleep(0.8) + return ray.get(task.remote()) + + multiplier = 75 + # For smoke mode, run fewer tasks + if smoke: + multiplier = 1 + TOTAL_TASKS = int(total_num_cpus * 2 * multiplier) + + pb = ProgressBar("Chaos test", TOTAL_TASKS, "task") + results = [invoke_nested_task.remote() for _ in range(TOTAL_TASKS)] + pb.block_until_complete(results) + pb.close() + + # Consistency check. + wait_for_condition( + lambda: ( + ray.cluster_resources().get("CPU", 0) + == ray.available_resources().get("CPU", 0) + ), + timeout=60, + ) diff --git a/release/nightly_tests/chaos_test/test_chaos.py b/release/nightly_tests/chaos_test/test_chaos.py new file mode 100644 index 000000000000..eff31496442e --- /dev/null +++ b/release/nightly_tests/chaos_test/test_chaos.py @@ -0,0 +1,106 @@ +import argparse +import json +import logging +import os +import time + +import ray +from ray._private.test_utils import monitor_memory_usage + +from task_workload import run_task_workload +from actor_workload import run_actor_workload +from streaming_generator_workload import run_streaming_generator_workload +from object_ref_borrowing_workload import run_object_ref_borrowing_workload + + +def parse_script_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--node-kill-interval", type=int, default=60) + parser.add_argument("--workload", type=str) + parser.add_argument("--smoke", action="store_true") + parser.add_argument("--disable-resource-killer", action="store_true") + return parser.parse_known_args() + + +def main(): + """Test task/actor/streaming generator/object ref borrowing chaos test. + + It tests the following scenarios: + 1. Raylet failures: Done by an actor calling Raylet's Shutdown RPC. + 2. EC2 instance termination: Done by an actor terminating + EC2 instances via AWS SDK. + 3. Network failures: Done by injecting network failures via iptables or env variables. + """ + args, _ = parse_script_args() + logging.info("Received arguments: {}".format(args)) + ray.init(address="auto") + total_num_cpus = ray.cluster_resources()["CPU"] + monitor_actor = monitor_memory_usage() + + # Select the workload based on the argument + workload = None + if args.workload == "tasks": + workload = run_task_workload + elif args.workload == "actors": + workload = run_actor_workload + elif args.workload == "streaming": + workload = run_streaming_generator_workload + elif args.workload == "borrowing": + workload = run_object_ref_borrowing_workload + else: + assert False + + print("Running with failures") + start = time.time() + node_killer = None + + if args.disable_resource_killer: + print("ResourceKiller disabled") + else: + node_killer = ray.get_actor( + "ResourceKiller", namespace="release_test_namespace" + ) + node_killer.run.remote() + print("ResourceKiller started") + + workload(total_num_cpus, args.smoke) + runtime_s = time.time() - start + runtime_s = round(runtime_s, 2) + print(f"Runtime when there are many failures: {runtime_s}") + + if node_killer is not None: + print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}") + node_killer.stop_run.remote() + + used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) + used_gb = round(used_gb, 2) + print("Memory usage with failures.") + print(f"Peak memory usage: {used_gb}GB") + print(f"Peak memory usage per processes:\n {usage}") + + ray.get(monitor_actor.stop_run.remote()) + + results = { + "time": runtime_s, + "_peak_memory": used_gb, + "_peak_process_memory": usage, + } + + results["perf_metrics"] = [ + { + "perf_metric_name": f"chaos_{args.workload}_runtime_s", + "perf_metric_value": runtime_s, + "perf_metric_type": "LATENCY", + }, + { + "perf_metric_name": f"chaos_{args.workload}_peak_memory_gb", + "perf_metric_value": used_gb, + "perf_metric_type": "MEMORY", + }, + ] + + with open(os.environ["TEST_OUTPUT_JSON"], "w") as f: + json.dump(results, f) + + +main() diff --git a/release/nightly_tests/chaos_test/test_chaos_basic.py b/release/nightly_tests/chaos_test/test_chaos_basic.py deleted file mode 100644 index 852e2dcaf498..000000000000 --- a/release/nightly_tests/chaos_test/test_chaos_basic.py +++ /dev/null @@ -1,399 +0,0 @@ -import argparse -import json -import logging -import os -import random -import string -import time - -import numpy as np - -import ray -from ray._common.test_utils import wait_for_condition -from ray._private.test_utils import monitor_memory_usage -from ray.data._internal.progress_bar import ProgressBar -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy - - -def run_task_workload(total_num_cpus, smoke): - """Run task-based workload that doesn't require object reconstruction.""" - - @ray.remote(num_cpus=1, max_retries=-1) - def task(): - def generate_data(size_in_kb=10): - return np.zeros(1024 * size_in_kb, dtype=np.uint8) - - a = "" - for _ in range(100000): - a = a + random.choice(string.ascii_letters) - return generate_data(size_in_kb=50) - - @ray.remote(num_cpus=1, max_retries=-1) - def invoke_nested_task(): - time.sleep(0.8) - return ray.get(task.remote()) - - multiplier = 75 - # For smoke mode, run fewer tasks - if smoke: - multiplier = 1 - TOTAL_TASKS = int(total_num_cpus * 2 * multiplier) - - pb = ProgressBar("Chaos test", TOTAL_TASKS, "task") - results = [invoke_nested_task.remote() for _ in range(TOTAL_TASKS)] - pb.block_until_complete(results) - pb.close() - - # Consistency check. - wait_for_condition( - lambda: ( - ray.cluster_resources().get("CPU", 0) - == ray.available_resources().get("CPU", 0) - ), - timeout=60, - ) - - -def run_actor_workload(total_num_cpus, smoke): - """Run actor-based workload. - - The test checks if actor restart -1 and task_retries -1 works - as expected. It basically requires many actors to report the - seqno to the centralized DB actor while there are failures. - If at least once is guaranteed upon failures, this test - shouldn't fail. - """ - - @ray.remote(num_cpus=0) - class DBActor: - def __init__(self): - self.letter_dict = set() - - def add(self, letter): - self.letter_dict.add(letter) - - def get(self): - return self.letter_dict - - @ray.remote(num_cpus=1, max_restarts=-1, max_task_retries=-1) - class ReportActor: - def __init__(self, db_actor): - self.db_actor = db_actor - - def add(self, letter): - ray.get(self.db_actor.add.remote(letter)) - - NUM_CPUS = int(total_num_cpus) - multiplier = 2 - # For smoke mode, run fewer tasks - if smoke: - multiplier = 1 - TOTAL_TASKS = int(300 * multiplier) - head_node_id = ray.get_runtime_context().get_node_id() - db_actors = [ - DBActor.options( - scheduling_strategy=NodeAffinitySchedulingStrategy( - node_id=head_node_id, soft=False - ) - ).remote() - for _ in range(NUM_CPUS) - ] - - pb = ProgressBar("Chaos test", TOTAL_TASKS * NUM_CPUS, "task") - actors = [] - for db_actor in db_actors: - actors.append(ReportActor.remote(db_actor)) - results = [] - highest_reported_num = 0 - for a in actors: - for _ in range(TOTAL_TASKS): - results.append(a.add.remote(str(highest_reported_num))) - highest_reported_num += 1 - pb.fetch_until_complete(results) - pb.close() - for actor in actors: - ray.kill(actor) - - # Consistency check - wait_for_condition( - lambda: ( - ray.cluster_resources().get("CPU", 0) - == ray.available_resources().get("CPU", 0) - ), - timeout=60, - ) - letter_set = set() - for db_actor in db_actors: - letter_set.update(ray.get(db_actor.get.remote())) - # Make sure the DB actor didn't lose any report. - # If this assert fails, that means at least once actor task semantic - # wasn't guaranteed. - for i in range(highest_reported_num): - assert str(i) in letter_set, i - - -def run_streaming_generator_workload(total_num_cpus, smoke): - """Run streaming generator workload. - - The test runs 10 concurrent long-running streaming generators pinned to - different nodes. - This tests that streaming generators work correctly with retries when - there are node failures or transient network failures. - """ - - @ray.remote(num_cpus=1, max_retries=-1) - def streaming_generator(num_items, item_size_mb): - for i in range(num_items): - data = np.zeros(item_size_mb * 1024 * 1024, dtype=np.uint8) - yield data - - @ray.remote(num_cpus=1, max_retries=-1) - def consume_streaming_generator(num_items, item_size_mb): - gen = streaming_generator.remote(num_items, item_size_mb) - - count = 0 - total_bytes = 0 - for item_ref in gen: - data = ray.get(item_ref) - count += 1 - total_bytes += data.nbytes - - return (count, total_bytes) - - NUM_GENERATORS = 10 - # For smoke mode, run fewer items - if smoke: - ITEMS_PER_GENERATOR = 10 - else: - ITEMS_PER_GENERATOR = 500 - ITEM_SIZE_MB = 10 - - print( - f"Starting {NUM_GENERATORS} concurrent streaming generators " - f"({ITEMS_PER_GENERATOR} items of {ITEM_SIZE_MB}MB each)" - ) - print( - f"Expected total data: " - f"{NUM_GENERATORS * ITEMS_PER_GENERATOR * ITEM_SIZE_MB / 1024:.2f} GB" - ) - - # Launch generators in parallel - tasks = [] - for i in range(NUM_GENERATORS): - task = consume_streaming_generator.remote(ITEMS_PER_GENERATOR, ITEM_SIZE_MB) - tasks.append(task) - - results = ray.get(tasks) - - total_items = sum(count for count, _ in results) - total_bytes = sum(bytes_val for _, bytes_val in results) - - print("All generators completed:") - print( - f" Total items: {total_items} (expected {NUM_GENERATORS * ITEMS_PER_GENERATOR})" - ) - print(f" Total data: {total_bytes / (1024**3):.2f} GB") - - # Verify all items were received - assert ( - total_items == NUM_GENERATORS * ITEMS_PER_GENERATOR - ), f"Expected {NUM_GENERATORS * ITEMS_PER_GENERATOR} items, got {total_items}" - - # Consistency check - wait_for_condition( - lambda: ( - ray.cluster_resources().get("CPU", 0) - == ray.available_resources().get("CPU", 0) - ), - timeout=60, - ) - - -def run_object_ref_borrowing_workload(total_num_cpus, smoke): - """Run object ref borrowing workload. - - This test checks that borrowed refs - remain valid even with node failures or transient network failures. - """ - - @ray.remote(num_cpus=1, max_retries=-1) - def create_object(size_mb): - data = np.zeros(size_mb * 1024 * 1024, dtype=np.uint8) - return data - - @ray.remote(num_cpus=1, max_retries=-1) - def borrow_object(borrowed_refs): - data = ray.get(borrowed_refs[0]) - return len(data) - - # For smoke mode, run fewer iterations - if smoke: - NUM_ITERATIONS = 10 - else: - NUM_ITERATIONS = 2000 - OBJECT_SIZE_MB = 10 - - print(f"Starting borrowing test with {NUM_ITERATIONS * 2} total tasks") - print(f"Object size: {OBJECT_SIZE_MB}MB per object") - print(f"Expected total data: {NUM_ITERATIONS * 2 * OBJECT_SIZE_MB / 1024:.2f} GB") - - total_completed = 0 - total_bytes = 0 - - for i in range(NUM_ITERATIONS): - ref = create_object.remote(OBJECT_SIZE_MB) - task_ref = borrow_object.remote([ref]) - size = ray.get(task_ref) - total_completed += 1 - total_bytes += size - - refs = [] - for i in range(NUM_ITERATIONS): - ref = create_object.remote(OBJECT_SIZE_MB) - refs.append(borrow_object.remote([ref])) - sizes = ray.get(refs) - total_completed += len(sizes) - total_bytes += sum(sizes) - - print("All tasks completed:") - print(f" Total tasks completed: {total_completed} (expected {NUM_ITERATIONS * 2})") - print(f" Total data processed: {total_bytes / (1024**3):.2f} GB") - - # Assertions - expected_total_tasks = NUM_ITERATIONS * 2 - assert ( - total_completed == expected_total_tasks - ), f"Expected {expected_total_tasks} completions, got {total_completed}" - expected_total_bytes = expected_total_tasks * OBJECT_SIZE_MB * 1024 * 1024 - assert ( - total_bytes == expected_total_bytes - ), f"Expected {expected_total_bytes} bytes, got {total_bytes}" - - # Consistency check - wait_for_condition( - lambda: ( - ray.cluster_resources().get("CPU", 0) - == ray.available_resources().get("CPU", 0) - ), - timeout=60, - ) - - -def run_placement_group_workload(total_num_cpus, smoke): - raise NotImplementedError - - -def parse_script_args(): - parser = argparse.ArgumentParser() - parser.add_argument("--node-kill-interval", type=int, default=60) - parser.add_argument("--workload", type=str) - parser.add_argument("--smoke", action="store_true") - return parser.parse_known_args() - - -def main(): - """Test task/actor/placement group basic chaos test. - - It tests the following scenarios: - 1. Raylet failures: This is done by an actor calling Raylet's Shutdown RPC. - 2. EC2 instance termination: This is done by an actor terminating - EC2 instances via AWS SDK. - - Currently, the test runs in 3 steps. Each step records the - peak memory usage to observe the memory usage while there - are node failures. - - Step 1: Warm up the cluster. It is needed to pre-start workers - if necessary. - - Step 2: Start the test without a failure. - - Step 3: Start the test with constant node failures. - """ - args, _ = parse_script_args() - logging.info("Received arguments: {}".format(args)) - ray.init(address="auto") - total_num_cpus = ray.cluster_resources()["CPU"] - total_nodes = 0 - for n in ray.nodes(): - if n["Alive"]: - total_nodes += 1 - monitor_actor = monitor_memory_usage() - - workload = None - if args.workload == "tasks": - workload = run_task_workload - elif args.workload == "actors": - workload = run_actor_workload - elif args.workload == "pg": - workload = run_placement_group_workload - elif args.workload == "streaming": - workload = run_streaming_generator_workload - elif args.workload == "borrowing": - workload = run_object_ref_borrowing_workload - else: - assert False - - # Step 1 - print("Warm up... Prestarting workers if necessary.") - start = time.time() - workload(total_num_cpus, args.smoke) - print(f"Runtime when warm up: {time.time() - start}") - - # Step 2 - print("Running without failures") - start = time.time() - workload(total_num_cpus, args.smoke) - print(f"Runtime when there are no failures: {time.time() - start}") - used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) - print("Memory usage without failures.") - print(f"Peak memory usage: {round(used_gb, 2)}GB") - print(f"Peak memory usage per processes:\n {usage}") - - # Step 3 - print("Running with failures") - start = time.time() - node_killer = None - try: - node_killer = ray.get_actor( - "ResourceKiller", namespace="release_test_namespace" - ) - node_killer.run.remote() - print("ResourceKiller found and started") - except ValueError: - print( - "ResourceKiller not found - assuming external chaos injection " - "(e.g., iptables, network failures)" - ) - - workload(total_num_cpus, args.smoke) - print(f"Runtime when there are many failures: {time.time() - start}") - - if node_killer is not None: - print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}") - node_killer.stop_run.remote() - - used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) - print("Memory usage with failures.") - print(f"Peak memory usage: {round(used_gb, 2)}GB") - print(f"Peak memory usage per processes:\n {usage}") - - # Report the result. - ray.get(monitor_actor.stop_run.remote()) - if node_killer is not None: - print( - "Total number of killed nodes: " - f"{ray.get(node_killer.get_total_killed.remote())}" - ) - with open(os.environ["TEST_OUTPUT_JSON"], "w") as f: - f.write( - json.dumps( - { - "_peak_memory": round(used_gb, 2), - "_peak_process_memory": usage, - } - ) - ) - - -main() diff --git a/release/release_tests.yaml b/release/release_tests.yaml index f2f34926abe9..86987c103901 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3319,6 +3319,31 @@ # Core Chaos tests ################## +- name: chaos_many_tasks_baseline + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: python chaos_test/test_chaos.py --workload=tasks --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + - name: chaos_many_tasks_kill_raylet python: "3.10" group: core-nightly-test @@ -3335,7 +3360,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start - script: python chaos_test/test_chaos_basic.py --workload=tasks + script: python chaos_test/test_chaos.py --workload=tasks variations: - __suffix__: aws @@ -3361,7 +3386,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance - script: python chaos_test/test_chaos_basic.py --workload=tasks + script: python chaos_test/test_chaos.py --workload=tasks variations: - __suffix__: aws @@ -3387,7 +3412,32 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=tasks + python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=tasks --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_many_actors_baseline + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 4200 + wait_for_nodes: + num_nodes: 10 + script: python chaos_test/test_chaos.py --workload=actors --disable-resource-killer variations: - __suffix__: aws @@ -3413,7 +3463,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start - script: python chaos_test/test_chaos_basic.py --workload=actors + script: python chaos_test/test_chaos.py --workload=actors variations: - __suffix__: aws @@ -3439,7 +3489,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance - script: python chaos_test/test_chaos_basic.py --workload=actors + script: python chaos_test/test_chaos.py --workload=actors variations: - __suffix__: aws @@ -3465,7 +3515,32 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=actors + python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=actors --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_streaming_generator_baseline + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: python chaos_test/test_chaos.py --workload=streaming --disable-resource-killer variations: - __suffix__: aws @@ -3491,7 +3566,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start --max-to-kill 10 - script: python chaos_test/test_chaos_basic.py --workload=streaming + script: python chaos_test/test_chaos.py --workload=streaming variations: - __suffix__: aws @@ -3517,7 +3592,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance --max-to-kill 10 - script: python chaos_test/test_chaos_basic.py --workload=streaming + script: python chaos_test/test_chaos.py --workload=streaming variations: - __suffix__: aws @@ -3548,7 +3623,32 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 120 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=streaming + python simulate_cross_az_network_failure.py --network-failure-interval 120 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=streaming --disable-resource-killer + + variations: + - __suffix__: aws + - __suffix__: gce + env: gce + frequency: manual + cluster: + cluster_compute: chaos_test/compute_template_gce.yaml + +- name: chaos_object_ref_borrowing_baseline + python: "3.10" + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: python chaos_test/test_chaos.py --workload=borrowing --disable-resource-killer variations: - __suffix__: aws @@ -3574,7 +3674,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start - script: python chaos_test/test_chaos_basic.py --workload=borrowing + script: python chaos_test/test_chaos.py --workload=borrowing variations: - __suffix__: aws @@ -3600,7 +3700,7 @@ wait_for_nodes: num_nodes: 10 prepare: python setup_chaos.py --no-start --chaos TerminateEC2Instance - script: python chaos_test/test_chaos_basic.py --workload=borrowing + script: python chaos_test/test_chaos.py --workload=borrowing variations: - __suffix__: aws @@ -3631,7 +3731,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 120 --network-failure-duration 5 --command python chaos_test/test_chaos_basic.py --workload=borrowing + python simulate_cross_az_network_failure.py --network-failure-interval 120 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=borrowing --disable-resource-killer variations: - __suffix__: aws From 1b5f2494639864e997a4ce47713f0df6bd1d64c2 Mon Sep 17 00:00:00 2001 From: joshlee Date: Sat, 22 Nov 2025 01:41:51 +0000 Subject: [PATCH 07/11] Update bazel file Signed-off-by: joshlee --- python/ray/tests/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index 438e509eb643..fe9bb2e64209 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -891,7 +891,6 @@ py_test_module_list( "test_actor.py", "test_actor_failures.py", "test_cancel.py", - "test_chaos.py", "test_core_worker_fault_tolerance.py", "test_failure.py", "test_failure_2.py", From f39bc4a00f9830604db6f131f4def7e2cec1a742 Mon Sep 17 00:00:00 2001 From: joshlee Date: Sat, 22 Nov 2025 01:48:53 +0000 Subject: [PATCH 08/11] AI comment Signed-off-by: joshlee --- release/nightly_tests/chaos_test/test_chaos.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/nightly_tests/chaos_test/test_chaos.py b/release/nightly_tests/chaos_test/test_chaos.py index eff31496442e..bab7e8f891ae 100644 --- a/release/nightly_tests/chaos_test/test_chaos.py +++ b/release/nightly_tests/chaos_test/test_chaos.py @@ -69,8 +69,8 @@ def main(): print(f"Runtime when there are many failures: {runtime_s}") if node_killer is not None: - print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}") node_killer.stop_run.remote() + print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}") used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) used_gb = round(used_gb, 2) From f633e85bb45a445b24f92ae5325fa2b457496526 Mon Sep 17 00:00:00 2001 From: joshlee Date: Sat, 22 Nov 2025 02:08:06 +0000 Subject: [PATCH 09/11] AI comments Signed-off-by: joshlee --- release/nightly_tests/chaos_test/test_chaos.py | 3 +-- release/release_tests.yaml | 10 ---------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/release/nightly_tests/chaos_test/test_chaos.py b/release/nightly_tests/chaos_test/test_chaos.py index bab7e8f891ae..d3cfe47cdbde 100644 --- a/release/nightly_tests/chaos_test/test_chaos.py +++ b/release/nightly_tests/chaos_test/test_chaos.py @@ -50,7 +50,6 @@ def main(): else: assert False - print("Running with failures") start = time.time() node_killer = None @@ -66,7 +65,7 @@ def main(): workload(total_num_cpus, args.smoke) runtime_s = time.time() - start runtime_s = round(runtime_s, 2) - print(f"Runtime when there are many failures: {runtime_s}") + print(f"Total runtime: {runtime_s}") if node_killer is not None: node_killer.stop_run.remote() diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 86987c103901..14b090651f7c 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3596,11 +3596,6 @@ variations: - __suffix__: aws - - __suffix__: gce - env: gce - frequency: manual - cluster: - cluster_compute: chaos_test/compute_template_gce.yaml - name: chaos_streaming_generator_iptable_failure_injection python: "3.10" @@ -3704,11 +3699,6 @@ variations: - __suffix__: aws - - __suffix__: gce - env: gce - frequency: manual - cluster: - cluster_compute: chaos_test/compute_template_gce.yaml - name: chaos_object_ref_borrowing_iptable_failure_injection python: "3.10" From 4d9b0837f0bdb88d838ecea2efddba537699f21a Mon Sep 17 00:00:00 2001 From: joshlee Date: Sat, 22 Nov 2025 02:13:33 +0000 Subject: [PATCH 10/11] AI comments Signed-off-by: joshlee --- release/nightly_tests/chaos_test/test_chaos.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/nightly_tests/chaos_test/test_chaos.py b/release/nightly_tests/chaos_test/test_chaos.py index d3cfe47cdbde..21fd1317437c 100644 --- a/release/nightly_tests/chaos_test/test_chaos.py +++ b/release/nightly_tests/chaos_test/test_chaos.py @@ -50,7 +50,6 @@ def main(): else: assert False - start = time.time() node_killer = None if args.disable_resource_killer: @@ -62,6 +61,7 @@ def main(): node_killer.run.remote() print("ResourceKiller started") + start = time.time() workload(total_num_cpus, args.smoke) runtime_s = time.time() - start runtime_s = round(runtime_s, 2) From b60acec4f3ab576addf4bd1e1285c9c4620fceb1 Mon Sep 17 00:00:00 2001 From: joshlee Date: Sat, 22 Nov 2025 04:32:15 +0000 Subject: [PATCH 11/11] Shorten iptable injection interval Signed-off-by: joshlee --- release/release_tests.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 14b090651f7c..5b7c0fda9170 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3412,7 +3412,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=tasks --disable-resource-killer + python simulate_cross_az_network_failure.py --network-failure-interval 60 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=tasks --disable-resource-killer variations: - __suffix__: aws @@ -3515,7 +3515,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 180 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=actors --disable-resource-killer + python simulate_cross_az_network_failure.py --network-failure-interval 60 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=actors --disable-resource-killer variations: - __suffix__: aws @@ -3618,7 +3618,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 120 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=streaming --disable-resource-killer + python simulate_cross_az_network_failure.py --network-failure-interval 60 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=streaming --disable-resource-killer variations: - __suffix__: aws @@ -3721,7 +3721,7 @@ wait_for_nodes: num_nodes: 10 script: > - python simulate_cross_az_network_failure.py --network-failure-interval 120 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=borrowing --disable-resource-killer + python simulate_cross_az_network_failure.py --network-failure-interval 60 --network-failure-duration 5 --command python chaos_test/test_chaos.py --workload=borrowing --disable-resource-killer variations: - __suffix__: aws