From 09b12064381bbc8595ef653b84d28ea6e82bd0f1 Mon Sep 17 00:00:00 2001 From: mattip Date: Sun, 18 Aug 2024 11:43:36 +0300 Subject: [PATCH 1/3] move test file in preparation for split Signed-off-by: mattip --- python/ray/tests/{test_actor_retry.py => test_actor_retry1.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename python/ray/tests/{test_actor_retry.py => test_actor_retry1.py} (100%) diff --git a/python/ray/tests/test_actor_retry.py b/python/ray/tests/test_actor_retry1.py similarity index 100% rename from python/ray/tests/test_actor_retry.py rename to python/ray/tests/test_actor_retry1.py From e6708376861b6a7ba2e03c86b9ec3daa38cad4aa Mon Sep 17 00:00:00 2001 From: mattip Date: Sun, 18 Aug 2024 11:51:54 +0300 Subject: [PATCH 2/3] split tests over two files Signed-off-by: mattip --- python/ray/tests/test_actor_retry1.py | 256 -------------------- python/ray/tests/test_actor_retry2.py | 327 ++++++++++++++++++++++++++ 2 files changed, 327 insertions(+), 256 deletions(-) create mode 100644 python/ray/tests/test_actor_retry2.py diff --git a/python/ray/tests/test_actor_retry1.py b/python/ray/tests/test_actor_retry1.py index 5ba9c9f7f0e80..efbd9f8ff74e3 100644 --- a/python/ray/tests/test_actor_retry1.py +++ b/python/ray/tests/test_actor_retry1.py @@ -64,54 +64,6 @@ def raise_or_exit(self, counter, actions): return c -@ray.remote(max_task_retries=3) -class AsyncTroubleMaker: - """ - Same as TroubleMaker, just all methods are async. - """ - - @ray.method(max_task_retries=5, retry_exceptions=[MyError]) - async def may_raise_n_times(self, counter, n): - c = ray.get(counter.increment.remote()) - print(f"may_raise_n_times, n = {n}, count = {c}") - if c < n: - print(f"method raises in {c} th call, want {n} times") - raise MyError() - return c - - @ray.method(retry_exceptions=[MyError]) - async def raise_or_exit(self, counter, actions): - c = await counter.increment.remote() - action = "return" if c >= len(actions) else actions[c] - print(f"raise_or_exit, action = {action}, count = {c}") - - if action == "raise": - raise MyError() - elif action == "exit": - # import signal - # sys.exit(1) -> hang - # ray.actor.exit_actor() -> failed, no retry - # os.kill(os.getpid(), signal.SIGTERM) -> ignored, continued to return - # os.kill(os.getpid(), signal.SIGKILL) -> retries - os._exit(0) - return -42 - else: - return c - - @ray.method(num_returns="streaming") # retry_exceptions=None aka False. - async def yield_or_raise(self, counter, actions): - while True: - c = await counter.increment.remote() - a = actions[c] - if isinstance(a, BaseException): - raise a - else: - yield a - if c == len(actions) - 1: - # don't over call counter. Only call #yield and #raise times. - return - - def test_method_raise_5_times(shutdown_only): counter = Counter.remote() trouble_maker = TroubleMaker.remote() @@ -139,93 +91,6 @@ def test_method_no_retry_without_retry_exceptions(shutdown_only): assert ray.get(counter.get_count.remote()) == 1 -def test_generator_method_no_retry_without_retry_exceptions(shutdown_only): - counter = Counter.remote() - trouble_maker = AsyncTroubleMaker.remote() - - gen = trouble_maker.yield_or_raise.remote( - counter, - [ - # First round: 1 then raise - 1, - MyError(), - # No retry, no second round - 1, - 2, - ], - ) - assert ray.get(next(gen)) == 1 - with pytest.raises(MyError): - ray.get(next(gen)) - with pytest.raises(StopIteration): - ray.get(next(gen)) - assert ray.get(counter.get_count.remote()) == 2 - - -def test_generator_method_retry_exact_times(shutdown_only): - counter = Counter.remote() - trouble_maker = AsyncTroubleMaker.remote() - - # Should retry out max_task_retries=3 times - gen = trouble_maker.yield_or_raise.options(retry_exceptions=[MyError]).remote( - counter, - [ - # First round - 1, - MyError(), - # retry 1 - 1, - MyError(), - # retry 2 - 1, - MyError(), - # retry 3 - 1, - 2, - 3, - ], - ) - assert ray.get(next(gen)) == 1 - assert ray.get(next(gen)) == 2 - assert ray.get(next(gen)) == 3 - with pytest.raises(StopIteration): - ray.get(next(gen)) - assert ray.get(counter.get_count.remote()) == 9 - - -def test_generator_method_does_not_over_retry(shutdown_only): - counter = Counter.remote() - trouble_maker = AsyncTroubleMaker.remote() - - # Should retry out max_task_retries=3 times - gen = trouble_maker.yield_or_raise.options(retry_exceptions=[MyError]).remote( - counter, - [ - # First round - 1, - MyError(), - # retry 1 - 1, - MyError(), - # retry 2, - 1, - MyError(), - # retry 3 - 1, - MyError(), - # no retry 4! - 1, - 2, - ], - ) - assert ray.get(next(gen)) == 1 - with pytest.raises(MyError): - ray.get(next(gen)) - with pytest.raises(StopIteration): - ray.get(next(gen)) - assert ray.get(counter.get_count.remote()) == 8 - - def test_options_takes_precedence(shutdown_only): counter = Counter.remote() trouble_maker = TroubleMaker.remote() @@ -253,106 +118,6 @@ def test_options_takes_precedence_no_over_retry(shutdown_only): assert ray.get(counter.get_count.remote()) == 11 -@pytest.mark.parametrize( - "actions", - [ - ["exit", "raise", "raise"], - ["raise", "exit", "raise"], - ["raise", "raise", "exit"], - ["raise", "raise", "raise"], - ], - ids=lambda lst: ",".join(lst), # test case show name -) -@pytest.mark.parametrize("actor_model", ["sync", "async", "threaded"]) -@pytest.mark.parametrize("max_restarts", [-1, 4], ids=lambda r: f"max_restarts({r})") -@pytest.mark.parametrize( - "max_task_retries", [-1, 4], ids=lambda r: f"max_task_retries({r})" -) -def test_method_raise_and_exit( - actions, actor_model, max_restarts, max_task_retries, shutdown_only -): - """ - Test we can endure a mix of raises and exits. Note the number of exits we can endure - is subject to max_restarts. - - The retry behavior should work for Async actors and Threaded actors. - The retry behavior should work if the max_task_retries or max_restarts are -1 - (infinite retry). - """ - if actor_model == "sync": - trouble_maker = TroubleMaker.options(max_restarts=max_restarts).remote() - elif actor_model == "async": - trouble_maker = AsyncTroubleMaker.options(max_restarts=max_restarts).remote() - elif actor_model == "threaded": - trouble_maker = TroubleMaker.options( - max_restarts=max_restarts, max_concurrency=2 - ).remote() - else: - assert False, f"unrecognized actor_model: {actor_model}" - - counter = Counter.remote() - assert ( - ray.get( - trouble_maker.raise_or_exit.options( - max_task_retries=max_task_retries - ).remote(counter, actions) - ) - == 3 - ) - # 4 = 1 initial + 3 retries (with the 1 restart included) - assert ray.get(counter.get_count.remote()) == 4 - - -@pytest.mark.parametrize( - "actions_and_error", - [ - (["raise", "raise", "raise"], MyError), - (["exit", "raise", "raise"], MyError), - (["raise", "exit", "raise"], MyError), - # Last try is exit, the actor restarted. - (["raise", "raise", "exit"], ray.exceptions.ActorUnavailableError), - # Last try is exit, the actor is dead (exceeded max_restarts). - (["raise", "exit", "exit"], ray.exceptions.ActorDiedError), - ], - ids=lambda p: ",".join(p[0]), # test case show name -) -@pytest.mark.parametrize("actor_model", ["sync", "async", "threaded"]) -def test_method_raise_and_exit_no_over_retry( - actions_and_error, actor_model, shutdown_only -): - """ - Test we do not over retry. - - The retry behavior should work for Async actors and Threaded actors. - The retry behavior should work if the max_task_retries or max_restarts are -1 - (infinite retry). - """ - max_restarts = 1 - max_task_retries = 2 - actions, error = actions_and_error - - if actor_model == "sync": - trouble_maker = TroubleMaker.options(max_restarts=max_restarts).remote() - elif actor_model == "async": - trouble_maker = AsyncTroubleMaker.options(max_restarts=max_restarts).remote() - elif actor_model == "threaded": - trouble_maker = TroubleMaker.options( - max_restarts=max_restarts, max_concurrency=2 - ).remote() - else: - assert False, f"unrecognized actor_model: {actor_model}" - - counter = Counter.remote() - with pytest.raises(error): - ray.get( - trouble_maker.raise_or_exit.options( - max_task_retries=max_task_retries - ).remote(counter, actions) - ) - # 3 = 1 initial + 2 retries (with the 1 restart included) - assert ray.get(counter.get_count.remote()) == 3 - - def test_method_exit_no_over_retry_max_restarts(shutdown_only): """ Even if we have enough max_task_retries, we may still raise due to max_restarts. @@ -369,27 +134,6 @@ def test_method_exit_no_over_retry_max_restarts(shutdown_only): assert ray.get(counter.get_count.remote()) == 3 -@pytest.mark.parametrize( - "is_async", [False, True], ids=lambda a: "async" if a else "sync" -) -def test_exit_only(is_async, shutdown_only): - """ - Sanity testing: only do exit-retry works - """ - actor_class = AsyncTroubleMaker if is_async else TroubleMaker - counter = Counter.remote() - trouble_maker = actor_class.options(max_restarts=2).remote() - with pytest.raises(ray.exceptions.RayActorError): - ret = ray.get( - trouble_maker.raise_or_exit.options(max_task_retries=2).remote( - counter, ["exit", "exit", "exit"] - ) - ) - print(f"should not print: ret = {ret}") - # 3 = 1 initial + 2 retries (with the 2 restarts included) - assert ray.get(counter.get_count.remote()) == 3 - - def test_exit_only_no_over_retry(shutdown_only): """ Sanity testing: only do exit-retry works diff --git a/python/ray/tests/test_actor_retry2.py b/python/ray/tests/test_actor_retry2.py new file mode 100644 index 0000000000000..a292359ecc5e5 --- /dev/null +++ b/python/ray/tests/test_actor_retry2.py @@ -0,0 +1,327 @@ +import os +import sys + +import pytest + +import ray + + +class MyError(Exception): + pass + + +@ray.remote +class Counter: + def __init__(self) -> None: + self.count = 0 + + def increment(self) -> int: + c = self.count + self.count += 1 + return c + + def get_count(self) -> int: + return self.count + + +# TODO: also do work for async and threaded actors +@ray.remote(max_task_retries=3) +class TroubleMaker: + @ray.method(max_task_retries=5, retry_exceptions=[MyError]) + def may_raise_n_times(self, counter, n): + """ + Raises if there were n calls before this call. + Returns the number of calls before this call, if it's > n. + """ + c = ray.get(counter.increment.remote()) + print(f"may_raise_n_times, n = {n}, count = {c}") + if c < n: + print(f"method raises in {c} th call, want {n} times") + raise MyError() + return c + + @ray.method(retry_exceptions=[MyError]) + def raise_or_exit(self, counter, actions): + """ + Increments the counter and performs an action based on the param actions[count]. + If count >= len(actions), return the count. + + Note: This method doesn't set `max_task_retries`. Ray expects it to inherit + max_task_retries = 3. + + @param actions: List["raise" | "exit"] + """ + + c = ray.get(counter.increment.remote()) + action = "return" if c >= len(actions) else actions[c] + print(f"raise_or_exit, action = {action}, count = {c}") + + if action == "raise": + raise MyError() + elif action == "exit": + sys.exit(1) + else: + return c + + +@ray.remote(max_task_retries=3) +class AsyncTroubleMaker: + """ + Same as TroubleMaker, just all methods are async. + """ + + @ray.method(max_task_retries=5, retry_exceptions=[MyError]) + async def may_raise_n_times(self, counter, n): + c = ray.get(counter.increment.remote()) + print(f"may_raise_n_times, n = {n}, count = {c}") + if c < n: + print(f"method raises in {c} th call, want {n} times") + raise MyError() + return c + + @ray.method(retry_exceptions=[MyError]) + async def raise_or_exit(self, counter, actions): + c = await counter.increment.remote() + action = "return" if c >= len(actions) else actions[c] + print(f"raise_or_exit, action = {action}, count = {c}") + + if action == "raise": + raise MyError() + elif action == "exit": + # import signal + # sys.exit(1) -> hang + # ray.actor.exit_actor() -> failed, no retry + # os.kill(os.getpid(), signal.SIGTERM) -> ignored, continued to return + # os.kill(os.getpid(), signal.SIGKILL) -> retries + os._exit(0) + return -42 + else: + return c + + @ray.method(num_returns="streaming") # retry_exceptions=None aka False. + async def yield_or_raise(self, counter, actions): + while True: + c = await counter.increment.remote() + a = actions[c] + if isinstance(a, BaseException): + raise a + else: + yield a + if c == len(actions) - 1: + # don't over call counter. Only call #yield and #raise times. + return + + +def test_generator_method_no_retry_without_retry_exceptions(shutdown_only): + counter = Counter.remote() + trouble_maker = AsyncTroubleMaker.remote() + + gen = trouble_maker.yield_or_raise.remote( + counter, + [ + # First round: 1 then raise + 1, + MyError(), + # No retry, no second round + 1, + 2, + ], + ) + assert ray.get(next(gen)) == 1 + with pytest.raises(MyError): + ray.get(next(gen)) + with pytest.raises(StopIteration): + ray.get(next(gen)) + assert ray.get(counter.get_count.remote()) == 2 + + +def test_generator_method_retry_exact_times(shutdown_only): + counter = Counter.remote() + trouble_maker = AsyncTroubleMaker.remote() + + # Should retry out max_task_retries=3 times + gen = trouble_maker.yield_or_raise.options(retry_exceptions=[MyError]).remote( + counter, + [ + # First round + 1, + MyError(), + # retry 1 + 1, + MyError(), + # retry 2 + 1, + MyError(), + # retry 3 + 1, + 2, + 3, + ], + ) + assert ray.get(next(gen)) == 1 + assert ray.get(next(gen)) == 2 + assert ray.get(next(gen)) == 3 + with pytest.raises(StopIteration): + ray.get(next(gen)) + assert ray.get(counter.get_count.remote()) == 9 + + +def test_generator_method_does_not_over_retry(shutdown_only): + counter = Counter.remote() + trouble_maker = AsyncTroubleMaker.remote() + + # Should retry out max_task_retries=3 times + gen = trouble_maker.yield_or_raise.options(retry_exceptions=[MyError]).remote( + counter, + [ + # First round + 1, + MyError(), + # retry 1 + 1, + MyError(), + # retry 2, + 1, + MyError(), + # retry 3 + 1, + MyError(), + # no retry 4! + 1, + 2, + ], + ) + assert ray.get(next(gen)) == 1 + with pytest.raises(MyError): + ray.get(next(gen)) + with pytest.raises(StopIteration): + ray.get(next(gen)) + assert ray.get(counter.get_count.remote()) == 8 + + +@pytest.mark.parametrize( + "actions", + [ + ["exit", "raise", "raise"], + ["raise", "exit", "raise"], + ["raise", "raise", "exit"], + ["raise", "raise", "raise"], + ], + ids=lambda lst: ",".join(lst), # test case show name +) +@pytest.mark.parametrize("actor_model", ["sync", "async", "threaded"]) +@pytest.mark.parametrize("max_restarts", [-1, 4], ids=lambda r: f"max_restarts({r})") +@pytest.mark.parametrize( + "max_task_retries", [-1, 4], ids=lambda r: f"max_task_retries({r})" +) +def test_method_raise_and_exit( + actions, actor_model, max_restarts, max_task_retries, shutdown_only +): + """ + Test we can endure a mix of raises and exits. Note the number of exits we can endure + is subject to max_restarts. + + The retry behavior should work for Async actors and Threaded actors. + The retry behavior should work if the max_task_retries or max_restarts are -1 + (infinite retry). + """ + if actor_model == "sync": + trouble_maker = TroubleMaker.options(max_restarts=max_restarts).remote() + elif actor_model == "async": + trouble_maker = AsyncTroubleMaker.options(max_restarts=max_restarts).remote() + elif actor_model == "threaded": + trouble_maker = TroubleMaker.options( + max_restarts=max_restarts, max_concurrency=2 + ).remote() + else: + assert False, f"unrecognized actor_model: {actor_model}" + + counter = Counter.remote() + assert ( + ray.get( + trouble_maker.raise_or_exit.options( + max_task_retries=max_task_retries + ).remote(counter, actions) + ) + == 3 + ) + # 4 = 1 initial + 3 retries (with the 1 restart included) + assert ray.get(counter.get_count.remote()) == 4 + + +@pytest.mark.parametrize( + "actions_and_error", + [ + (["raise", "raise", "raise"], MyError), + (["exit", "raise", "raise"], MyError), + (["raise", "exit", "raise"], MyError), + # Last try is exit, the actor restarted. + (["raise", "raise", "exit"], ray.exceptions.ActorUnavailableError), + # Last try is exit, the actor is dead (exceeded max_restarts). + (["raise", "exit", "exit"], ray.exceptions.ActorDiedError), + ], + ids=lambda p: ",".join(p[0]), # test case show name +) +@pytest.mark.parametrize("actor_model", ["sync", "async", "threaded"]) +def test_method_raise_and_exit_no_over_retry( + actions_and_error, actor_model, shutdown_only +): + """ + Test we do not over retry. + + The retry behavior should work for Async actors and Threaded actors. + The retry behavior should work if the max_task_retries or max_restarts are -1 + (infinite retry). + """ + max_restarts = 1 + max_task_retries = 2 + actions, error = actions_and_error + + if actor_model == "sync": + trouble_maker = TroubleMaker.options(max_restarts=max_restarts).remote() + elif actor_model == "async": + trouble_maker = AsyncTroubleMaker.options(max_restarts=max_restarts).remote() + elif actor_model == "threaded": + trouble_maker = TroubleMaker.options( + max_restarts=max_restarts, max_concurrency=2 + ).remote() + else: + assert False, f"unrecognized actor_model: {actor_model}" + + counter = Counter.remote() + with pytest.raises(error): + ray.get( + trouble_maker.raise_or_exit.options( + max_task_retries=max_task_retries + ).remote(counter, actions) + ) + # 3 = 1 initial + 2 retries (with the 1 restart included) + assert ray.get(counter.get_count.remote()) == 3 + + +@pytest.mark.parametrize( + "is_async", [False, True], ids=lambda a: "async" if a else "sync" +) +def test_exit_only(is_async, shutdown_only): + """ + Sanity testing: only do exit-retry works + """ + actor_class = AsyncTroubleMaker if is_async else TroubleMaker + counter = Counter.remote() + trouble_maker = actor_class.options(max_restarts=2).remote() + with pytest.raises(ray.exceptions.RayActorError): + ret = ray.get( + trouble_maker.raise_or_exit.options(max_task_retries=2).remote( + counter, ["exit", "exit", "exit"] + ) + ) + print(f"should not print: ret = {ret}") + # 3 = 1 initial + 2 retries (with the 2 restarts included) + assert ray.get(counter.get_count.remote()) == 3 + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) From 33cc1254758b95cbc3922517d46f139b15f7aeeb Mon Sep 17 00:00:00 2001 From: mattip Date: Sun, 18 Aug 2024 11:58:39 +0300 Subject: [PATCH 3/3] adapt CI to the split test files Signed-off-by: mattip --- ci/ray_ci/core.tests.yml | 3 ++- python/ray/tests/BUILD | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ci/ray_ci/core.tests.yml b/ci/ray_ci/core.tests.yml index d001ed01e42c5..14d4d08ee9647 100644 --- a/ci/ray_ci/core.tests.yml +++ b/ci/ray_ci/core.tests.yml @@ -1,6 +1,7 @@ flaky_tests: - windows://:metric_exporter_grpc_test - - windows://python/ray/tests:test_actor_retry + - windows://python/ray/tests:test_actor_retry1 + - windows://python/ray/tests:test_actor_retry2 - windows://python/ray/tests:test_object_spilling - windows://python/ray/tests:test_object_spilling_asan - windows://python/ray/tests:test_object_spilling_debug_mode diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 686833dc035eb..e9e7dfd51d445 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -445,7 +445,8 @@ py_test_module_list( py_test_module_list( files = [ "test_actor.py", - "test_actor_retry.py", + "test_actor_retry1.py", + "test_actor_retry2.py", "test_actor_failures.py", "test_actor_resources.py", "test_failure.py",