Skip to content

Commit 83dafc9

Browse files
authored
Merge pull request #16 from hartikainen/testheart2
Initial implementation for heartbeat test
2 parents 66661be + ebb49cc commit 83dafc9

File tree

3 files changed

+73
-19
lines changed

3 files changed

+73
-19
lines changed

python/ray/autoscaler/autoscaler.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,15 +209,10 @@ def approx_workers_used(self):
209209
def num_workers_connected(self):
210210
return self._info()["NumNodesConnected"]
211211

212-
def info_string(self):
213-
return ", ".join(
214-
["{}={}".format(k, v) for k, v in sorted(self._info().items())])
215-
216-
def _info(self):
212+
def get_resource_usage(self):
217213
nodes_used = 0.0
218214
resources_used = {}
219215
resources_total = {}
220-
now = time.time()
221216
for ip, max_resources in self.static_resources_by_ip.items():
222217
avail_resources = self.dynamic_resources_by_ip[ip]
223218
max_frac = 0.0
@@ -234,6 +229,17 @@ def _info(self):
234229
if frac > max_frac:
235230
max_frac = frac
236231
nodes_used += max_frac
232+
233+
return nodes_used, resources_used, resources_total
234+
235+
def info_string(self):
236+
return ", ".join(
237+
["{}={}".format(k, v) for k, v in sorted(self._info().items())])
238+
239+
def _info(self):
240+
nodes_used, resources_used, resources_total = self.get_resource_usage()
241+
242+
now = time.time()
237243
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
238244
heartbeat_times = [
239245
now - t for t in self.last_heartbeat_time_by_ip.values()

python/ray/monitor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,9 @@ def xray_heartbeat_batch_handler(self, unused_channel, data):
122122
# Update the load metrics for this raylet.
123123
client_id = ray.utils.binary_to_hex(heartbeat_message.client_id)
124124
ip = self.raylet_id_to_ip_map.get(client_id)
125+
load_metrics_id = ip + "-" + client_id
125126
if ip:
126-
self.load_metrics.update(ip, static_resources,
127+
self.load_metrics.update(load_metrics_id, static_resources,
127128
dynamic_resources)
128129
else:
129130
logger.warning(

python/ray/tests/test_multi_node_2.py

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,26 @@ def test_internal_config(ray_start_cluster_head):
5959
assert ray.cluster_resources()["CPU"] == 1
6060

6161

62+
def verify_load_metrics(monitor, expected_resource_usage=None, timeout=10):
63+
while True:
64+
monitor.process_messages()
65+
resource_usage = monitor.load_metrics.get_resource_usage()
66+
67+
if expected_resource_usage is None:
68+
if all(x for x in resource_usage[1:]):
69+
break
70+
elif all(x == y for x, y in zip(resource_usage, expected_resource_usage)):
71+
break
72+
else:
73+
timeout -= 1
74+
time.sleep(1)
75+
76+
if timeout <= 0:
77+
raise ValueError("Should not be here.")
78+
79+
return resource_usage
80+
81+
6282
def test_heartbeats(ray_start_cluster_head):
6383
"""Unit test for `Cluster.wait_for_nodes`.
6484
@@ -67,28 +87,55 @@ def test_heartbeats(ray_start_cluster_head):
6787
cluster = ray_start_cluster_head
6888
monitor = Monitor(cluster.redis_address, None)
6989

90+
work_handles = []
91+
7092
@ray.remote
7193
class Actor():
72-
pass
94+
def work(self, timeout=10):
95+
time.sleep(timeout)
96+
return True
7397

7498
test_actors = [Actor.remote()]
75-
# This is only used to update the load metrics for the autoscaler.
7699

77100
monitor.subscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_CHANNEL)
78101
monitor.subscribe(ray.gcs_utils.XRAY_JOB_CHANNEL)
79102

80103
monitor.update_raylet_map()
81104
monitor._maybe_flush_gcs()
82-
# Process a round of messages.
83-
monitor.process_messages()
84-
from pprint import pprint; import ipdb; ipdb.set_trace(context=30)
85-
pprint(vars(monitor.load_metrics))
86-
87-
# worker_nodes = [cluster.add_node() for i in range(4)]
88-
# for i in range(3):
89-
# test_actors += [Actor.remote()]
90-
# check_resource_usage(monitor.get_heartbeat())
91-
# cluster.wait_for_nodes()
105+
106+
timeout = 5
107+
108+
verify_load_metrics(monitor, (0.0, {'CPU': 0.0}, {'CPU': 1.0}))
109+
110+
work_handles += [test_actors[0].work.remote(timeout=timeout * 2)]
111+
112+
verify_load_metrics(monitor, (1.0, {'CPU': 1.0}, {'CPU': 1.0}))
113+
114+
ray.get(work_handles)
115+
116+
num_workers = 4
117+
num_nodes_total = float(num_workers + 1)
118+
worker_nodes = [cluster.add_node() for i in range(num_workers)]
119+
120+
cluster.wait_for_nodes()
121+
monitor.update_raylet_map()
122+
monitor._maybe_flush_gcs()
123+
124+
verify_load_metrics(monitor, (0.0, {'CPU': 0.0}, {'CPU': num_nodes_total}))
125+
126+
work_handles = [test_actors[0].work.remote(timeout=timeout * 2)]
127+
for i in range(num_workers):
128+
new_actor = Actor.remote()
129+
work_handles += [new_actor.work.remote(timeout=timeout * 2)]
130+
test_actors += [new_actor]
131+
132+
verify_load_metrics(
133+
monitor,
134+
(num_nodes_total, {'CPU': num_nodes_total}, {'CPU': num_nodes_total}))
135+
136+
ray.get(work_handles)
137+
138+
verify_load_metrics(monitor, (0.0, {'CPU': 0.0}, {'CPU': num_nodes_total}))
92139

93140

94141
def test_wait_for_nodes(ray_start_cluster_head):

0 commit comments

Comments
 (0)