Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions python/ray/serve/_private/deployment_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,18 +735,26 @@ def _get_replicas_to_stop(
for (
pending_launching_recovering_replica
) in pending_launching_recovering_replicas:
replicas_to_stop.add(pending_launching_recovering_replica)
if len(replicas_to_stop) == max_num_to_stop:
return replicas_to_stop
else:
replicas_to_stop.add(pending_launching_recovering_replica)

node_to_running_replicas_of_target_deployment = (
self._get_node_to_running_replicas(deployment_id)
)
node_to_running_replicas_of_all_deployments = (
self._get_node_to_running_replicas()
)

# _running_replicas preserves insertion order (oldest → newest).
# Reverse once so we have newest → oldest, then bucket by node.
ordered_running_replicas = list(self._running_replicas[deployment_id].items())
ordered_running_replicas.reverse()
ordered_running_replicas_of_target_deployment: Dict[
str, List[ReplicaID]
] = defaultdict(list)
for replica_id, replica_node_id in ordered_running_replicas:
ordered_running_replicas_of_target_deployment[replica_node_id].append(
replica_id
)

# Replicas on the head node has the lowest priority for downscaling
# since we cannot relinquish the head node.
def key(node_and_num_running_replicas_of_all_deployments):
Expand All @@ -760,15 +768,14 @@ def key(node_and_num_running_replicas_of_all_deployments):
for node_id, _ in sorted(
node_to_running_replicas_of_all_deployments.items(), key=key
):
if node_id not in node_to_running_replicas_of_target_deployment:
if node_id not in ordered_running_replicas_of_target_deployment:
continue
for running_replica in node_to_running_replicas_of_target_deployment[
node_id
]:

# Newest-first list for this node.
for replica_id in ordered_running_replicas_of_target_deployment[node_id]:
replicas_to_stop.add(replica_id)
if len(replicas_to_stop) == max_num_to_stop:
return replicas_to_stop
else:
replicas_to_stop.add(running_replica)

return replicas_to_stop

Expand Down
6 changes: 3 additions & 3 deletions python/ray/serve/tests/unit/test_deployment_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ def test_downscale_multiple_deployments():
# but it has more replicas of all deployments so
# we should stop replicas from node2.
assert len(deployment_to_replicas_to_stop[d1_id]) == 1
assert deployment_to_replicas_to_stop[d1_id] < {d1_r2_id, d1_r3_id}
assert deployment_to_replicas_to_stop[d1_id].issubset({d1_r2_id, d1_r3_id})

scheduler.on_replica_stopping(d1_r3_id)
scheduler.on_replica_stopping(d2_r3_id)
Expand Down Expand Up @@ -737,7 +737,7 @@ def test_downscale_head_node():
},
)
assert len(deployment_to_replicas_to_stop) == 1
assert deployment_to_replicas_to_stop[dep_id] < {r2_id, r3_id}
assert deployment_to_replicas_to_stop[dep_id].issubset({r2_id, r3_id})
scheduler.on_replica_stopping(deployment_to_replicas_to_stop[dep_id].pop())

deployment_to_replicas_to_stop = scheduler.schedule(
Expand Down Expand Up @@ -861,7 +861,7 @@ def test_downscale_single_deployment():
},
)
assert len(deployment_to_replicas_to_stop) == 1
assert deployment_to_replicas_to_stop[dep_id] == {r1_id, r2_id}
assert deployment_to_replicas_to_stop[dep_id] <= {r1_id, r2_id}
scheduler.on_replica_stopping(r1_id)
scheduler.on_replica_stopping(r2_id)
scheduler.on_deployment_deleted(dep_id)
Expand Down