Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1633,31 +1633,31 @@ def verify_scaling_decisions(self, signal_A, signal_B):

# ---- Deployment A ----
ray.get(signal_A.send.remote(clear=True))
[hA.remote() for _ in range(40)]
results = [hA.remote() for _ in range(40)]
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 40)
wait_for_condition(check_num_replicas_eq, name="A", target=2)

ray.get(signal_A.send.remote(clear=True))
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 0)
[hA.remote() for _ in range(70)]
assert all(result.result(timeout_s=10) for result in results)
results = [hA.remote() for _ in range(70)]
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 70)
wait_for_condition(check_num_replicas_eq, name="A", target=4)
ray.get(signal_A.send.remote())
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 0)
assert all(result.result(timeout_s=10) for result in results)

# ---- Deployment B ----
ray.get(signal_B.send.remote(clear=True))
[hB.remote() for _ in range(50)]
results = [hB.remote() for _ in range(50)]
wait_for_condition(lambda: ray.get(signal_B.cur_num_waiters.remote()) == 50)
wait_for_condition(check_num_replicas_eq, name="B", target=3)

ray.get(signal_B.send.remote(clear=True))
wait_for_condition(lambda: ray.get(signal_B.cur_num_waiters.remote()) == 0)
[hB.remote() for _ in range(120)]
assert all(result.result(timeout_s=10) for result in results)
results = [hB.remote() for _ in range(120)]
wait_for_condition(lambda: ray.get(signal_B.cur_num_waiters.remote()) == 120)
wait_for_condition(check_num_replicas_eq, name="B", target=5)
ray.get(signal_B.send.remote())
wait_for_condition(lambda: ray.get(signal_B.cur_num_waiters.remote()) == 0)
assert all(result.result(timeout_s=10) for result in results)

@pytest.mark.parametrize(
"policy",
Expand Down Expand Up @@ -1748,11 +1748,11 @@ def test_autoscaling_policy_switchback(self, serve_instance_with_two_signal):
wait_for_condition(check_running, timeout=15)

hA = serve.get_deployment_handle("A", app_name=SERVE_DEFAULT_APP_NAME)
[hA.remote() for _ in range(60)]
results = [hA.remote() for _ in range(60)]
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 60)
wait_for_condition(check_num_replicas_eq, name="A", target=3)
ray.get(signal_A.send.remote())
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 0)
assert all(result.result(timeout_s=10) for result in results)
ray.get(signal_A.send.remote(clear=True))

# Switch to app-level policy
Expand Down Expand Up @@ -1797,19 +1797,19 @@ def test_autoscaling_policy_switchback(self, serve_instance_with_two_signal):
wait_for_condition(check_running, timeout=15)

hA = serve.get_deployment_handle("A", app_name=SERVE_DEFAULT_APP_NAME)
[hA.remote() for _ in range(120)]
results = [hA.remote() for _ in range(120)]
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 120)
wait_for_condition(check_num_replicas_eq, name="A", target=4)
ray.get(signal_A.send.remote())
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 0)
assert all(result.result(timeout_s=10) for result in results)
ray.get(signal_A.send.remote(clear=True))

hB = serve.get_deployment_handle("B", app_name=SERVE_DEFAULT_APP_NAME)
[hB.remote() for _ in range(120)]
results = [hB.remote() for _ in range(120)]
wait_for_condition(lambda: ray.get(signal_B.cur_num_waiters.remote()) == 120)
wait_for_condition(check_num_replicas_eq, name="B", target=5)
ray.get(signal_B.send.remote())
wait_for_condition(lambda: ray.get(signal_B.cur_num_waiters.remote()) == 0)
assert all(result.result(timeout_s=10) for result in results)
ray.get(signal_B.send.remote(clear=True))

# switch back to deployment-level policy
Expand Down Expand Up @@ -1841,14 +1841,14 @@ def test_autoscaling_policy_switchback(self, serve_instance_with_two_signal):
wait_for_condition(check_running, timeout=15)

hA = serve.get_deployment_handle("A", app_name=SERVE_DEFAULT_APP_NAME)
[hA.remote() for _ in range(120)]
results = [hA.remote() for _ in range(120)]
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 120)
wait_for_condition(check_num_replicas_eq, name="A", target=3)
ray.get(signal_A.send.remote())
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 0)
assert all(result.result(timeout_s=10) for result in results)

def test_autoscaling_policy_enable_disable(self, serve_instance_with_two_signal):
client, signal_A, signal_B = serve_instance_with_two_signal
client, signal_A, _ = serve_instance_with_two_signal

config_template = {
"import_path": "ray.serve.tests.test_config_files.get_multi_deployment_signal_app.app",
Expand All @@ -1866,11 +1866,11 @@ def test_autoscaling_policy_enable_disable(self, serve_instance_with_two_signal)
wait_for_condition(check_running, timeout=15)

hA = serve.get_deployment_handle("A", app_name=SERVE_DEFAULT_APP_NAME)
[hA.remote() for _ in range(120)]
results = [hA.remote() for _ in range(120)]
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 120)
wait_for_condition(check_num_replicas_eq, name="A", target=1)
ray.get(signal_A.send.remote(clear=True))
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 0)
assert all(result.result(timeout_s=10) for result in results)

config_template = {
"import_path": "ray.serve.tests.test_config_files.get_multi_deployment_signal_app.app",
Expand Down Expand Up @@ -1899,11 +1899,11 @@ def test_autoscaling_policy_enable_disable(self, serve_instance_with_two_signal)
wait_for_condition(check_running, timeout=15)

hA = serve.get_deployment_handle("A", app_name=SERVE_DEFAULT_APP_NAME)
[hA.remote() for _ in range(120)]
results = [hA.remote() for _ in range(120)]
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 120)
wait_for_condition(check_num_replicas_eq, name="A", target=4)
ray.get(signal_A.send.remote(clear=True))
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 0)
assert all(result.result(timeout_s=10) for result in results)

# turn off app-level autoscaling policy
config_template = {
Expand All @@ -1922,11 +1922,11 @@ def test_autoscaling_policy_enable_disable(self, serve_instance_with_two_signal)

wait_for_condition(check_num_replicas_eq, name="A", target=1)
hA = serve.get_deployment_handle("A", app_name=SERVE_DEFAULT_APP_NAME)
[hA.remote() for _ in range(120)]
results = [hA.remote() for _ in range(120)]
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 120)
wait_for_condition(check_num_replicas_eq, name="A", target=1)
ray.get(signal_A.send.remote(clear=True))
wait_for_condition(lambda: ray.get(signal_A.cur_num_waiters.remote()) == 0)
assert all(result.result(timeout_s=10) for result in results)


if __name__ == "__main__":
Expand Down