Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ec1c6ee
fix heartbeats
richardliaw Jul 13, 2019
4c46617
lint
richardliaw Jul 13, 2019
66661be
Fix heartbeats test
hartikainen Jul 14, 2019
1d0ad83
Initial implementation for heartbeat test
hartikainen Jul 14, 2019
e6a0946
Rename XRAY_DRIVER_CHANNEL -> XRAY_JOB_CHANNEL
hartikainen Jul 14, 2019
ebb49cc
Use string keys instead of binary in heartbeat test
hartikainen Jul 14, 2019
83dafc9
Merge pull request #16 from hartikainen/testheart2
richardliaw Jul 14, 2019
8f595e6
tests
richardliaw Jul 14, 2019
39e4a00
lint
richardliaw Jul 14, 2019
60aebdd
try debug
richardliaw Jul 14, 2019
86508c3
Revert "try debug"
hartikainen Jul 15, 2019
8dc289d
Fix heartbeat data available resources in node manager.
hartikainen Jul 15, 2019
fb01987
Merge branch 'master' into testheart
hartikainen Jul 15, 2019
cdb5bd6
trytest
richardliaw Jul 15, 2019
30192ca
logging
richardliaw Jul 15, 2019
af26a60
ok
richardliaw Jul 15, 2019
0505c9e
trythis
Jul 16, 2019
02e4c0d
revert
richardliaw Jul 16, 2019
a0b5538
Merge branch 'master' into testheart
richardliaw Jul 16, 2019
d0d131d
lint
richardliaw Jul 16, 2019
2ecb624
oops
richardliaw Jul 16, 2019
8e931a9
proper resource track
richardliaw Jul 16, 2019
b4d1d15
food
richardliaw Jul 16, 2019
2358370
lint
richardliaw Jul 16, 2019
289ac0f
fix
Jul 16, 2019
74b2c8e
multiexec
richardliaw Jul 16, 2019
a6c9ff5
format
richardliaw Jul 16, 2019
eae089e
remove
Jul 16, 2019
1ea36ad
fixup code
richardliaw Jul 16, 2019
22900bd
Merge branch 'testheart' of https://github.com/richardliaw/ray into t…
richardliaw Jul 16, 2019
af3f1e3
cleanup
richardliaw Jul 16, 2019
85cde51
lint
richardliaw Jul 16, 2019
be011a8
Merge branch 'master' into testheart
hartikainen Jul 16, 2019
16c27d0
Apply suggestions from code review
richardliaw Jul 16, 2019
3671e0f
monitor
richardliaw Jul 16, 2019
2f7b771
fixtest
richardliaw Jul 16, 2019
af6d7ec
fixtest
richardliaw Jul 17, 2019
a6afae0
fix
richardliaw Jul 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,10 @@ def approx_workers_used(self):
def num_workers_connected(self):
return self._info()["NumNodesConnected"]

def info_string(self):
return ", ".join(
["{}={}".format(k, v) for k, v in sorted(self._info().items())])

def _info(self):
def get_resource_usage(self):
nodes_used = 0.0
resources_used = {}
resources_total = {}
now = time.time()
for ip, max_resources in self.static_resources_by_ip.items():
avail_resources = self.dynamic_resources_by_ip[ip]
max_frac = 0.0
Expand All @@ -234,6 +229,17 @@ def _info(self):
if frac > max_frac:
max_frac = frac
nodes_used += max_frac

return nodes_used, resources_used, resources_total

def info_string(self):
return ", ".join(
["{}={}".format(k, v) for k, v in sorted(self._info().items())])

def _info(self):
nodes_used, resources_used, resources_total = self.get_resource_usage()

now = time.time()
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
heartbeat_times = [
now - t for t in self.last_heartbeat_time_by_ip.values()
Expand Down
36 changes: 21 additions & 15 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Monitor(object):
def __init__(self, redis_address, autoscaling_config, redis_password=None):
# Initialize the Redis clients.
ray.state.state._initialize_global_state(
args.redis_address, redis_password=redis_password)
redis_address, redis_password=redis_password)
self.redis = ray.services.create_redis_client(
redis_address, password=redis_password)
# Setup subscriptions to the primary Redis server and the Redis shards.
Expand Down Expand Up @@ -106,25 +106,22 @@ def xray_heartbeat_batch_handler(self, unused_channel, data):

message = ray.gcs_utils.HeartbeatBatchTableData.FromString(
heartbeat_data)

for heartbeat_message in message.batch:
num_resources = len(heartbeat_message.resources_available_label)
static_resources = {}
dynamic_resources = {}
for i in range(num_resources):
dyn = heartbeat_message.resources_available_label[i]
static = heartbeat_message.resources_total_label[i]
dynamic_resources[dyn] = (
heartbeat_message.resources_available_capacity[i])
static_resources[static] = (
heartbeat_message.resources_total_capacity[i])
total_resources = dict(
zip(heartbeat_message.resources_total_label,
heartbeat_message.resources_total_capacity))
available_resources = dict(
zip(heartbeat_message.resources_available_label,
heartbeat_message.resources_available_capacity))
for resource in total_resources:
available_resources.setdefault(resource, 0.0)

# Update the load metrics for this raylet.
client_id = ray.utils.binary_to_hex(heartbeat_message.client_id)
ip = self.raylet_id_to_ip_map.get(client_id)
if ip:
self.load_metrics.update(ip, static_resources,
dynamic_resources)
self.load_metrics.update(ip, total_resources,
available_resources)
else:
logger.warning(
"Monitor: "
Expand Down Expand Up @@ -250,14 +247,23 @@ def process_messages(self, max_messages=10000):
# Call the handler.
message_handler(channel, data)

def update_raylet_map(self):
def update_raylet_map(self, _append_port=False):
"""Updates internal raylet map.

Args:
_append_port (bool): Defaults to False. Appending the port is
useful in testing, as mock clusters have many nodes with
the same IP and cannot be uniquely identified.
"""
all_raylet_nodes = ray.nodes()
self.raylet_id_to_ip_map = {}
for raylet_info in all_raylet_nodes:
client_id = (raylet_info.get("DBClientID")
or raylet_info["ClientID"])
ip_address = (raylet_info.get("AuxAddress")
or raylet_info["NodeManagerAddress"]).split(":")[0]
if _append_port:
ip_address += ":" + str(raylet_info["NodeManagerPort"])
self.raylet_id_to_ip_map[client_id] = ip_address

def _maybe_flush_gcs(self):
Expand Down
1 change: 1 addition & 0 deletions python/ray/setup-dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def do_link(package, force=False):
do_link("autoscaler", force=args.yes)
do_link("scripts", force=args.yes)
do_link("internal", force=args.yes)
do_link("tests", force=args.yes)
do_link("experimental", force=args.yes)
print("Created links.\n\nIf you run into issues initializing Ray, please "
"ensure that your local repo and the installed Ray are in sync "
Expand Down
117 changes: 117 additions & 0 deletions python/ray/tests/test_multi_node_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import ray
import ray.ray_constants as ray_constants
from ray.monitor import Monitor
from ray.tests.cluster_utils import Cluster
from ray.tests.conftest import generate_internal_config_map

Expand Down Expand Up @@ -58,6 +59,122 @@ def test_internal_config(ray_start_cluster_head):
assert ray.cluster_resources()["CPU"] == 1


def setup_monitor(redis_address):
monitor = Monitor(redis_address, None)
monitor.subscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_CHANNEL)
monitor.subscribe(ray.gcs_utils.XRAY_JOB_CHANNEL) # TODO: Remove?
monitor.update_raylet_map(_append_port=True)
monitor._maybe_flush_gcs()
return monitor


def verify_load_metrics(monitor, expected_resource_usage=None, timeout=10):
while True:
monitor.process_messages()
resource_usage = monitor.load_metrics.get_resource_usage()

if expected_resource_usage is None:
if all(x for x in resource_usage[1:]):
break
elif all(x == y
for x, y in zip(resource_usage, expected_resource_usage)):
break
else:
timeout -= 1
time.sleep(1)

if timeout <= 0:
raise ValueError("Timeout. {} != {}".format(
resource_usage, expected_resource_usage))

return resource_usage


@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 1,
}, {
"num_cpus": 2,
}],
indirect=True)
def test_heartbeats_single(ray_start_cluster_head):
"""Unit test for `Cluster.wait_for_nodes`.

Test proper metrics.
"""
cluster = ray_start_cluster_head
timeout = 5
monitor = setup_monitor(cluster.redis_address)
total_cpus = ray.state.cluster_resources()["CPU"]
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": total_cpus}))

@ray.remote
def work(timeout):
time.sleep(timeout)
return True

work_handle = work.remote(timeout * 2)
verify_load_metrics(monitor, (1.0 / total_cpus, {
"CPU": 1.0
}, {
"CPU": total_cpus
}))
ray.get(work_handle)

@ray.remote
class Actor(object):
def work(self, timeout):
time.sleep(timeout)
return True

test_actor = Actor.remote()
work_handle = test_actor.work.remote(timeout * 2)

verify_load_metrics(monitor, (1.0 / total_cpus, {
"CPU": 1.0
}, {
"CPU": total_cpus
}))

ray.get(work_handle)


def test_heartbeats_cluster(ray_start_cluster_head):
"""Unit test for `Cluster.wait_for_nodes`.

Test proper metrics.
"""
cluster = ray_start_cluster_head
timeout = 5
num_workers_nodes = 4
num_nodes_total = int(num_workers_nodes + 1)
[cluster.add_node() for i in range(num_workers_nodes)]
cluster.wait_for_nodes()
monitor = setup_monitor(cluster.redis_address)

verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": num_nodes_total}))

@ray.remote
class Actor(object):
def work(self, timeout):
time.sleep(timeout)
return True

test_actors = [Actor.remote() for i in range(num_nodes_total)]

work_handles = [actor.work.remote(timeout * 2) for actor in test_actors]

verify_load_metrics(monitor, (num_nodes_total, {
"CPU": num_nodes_total
}, {
"CPU": num_nodes_total
}))

ray.get(work_handles)
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": num_nodes_total}))
ray.shutdown()


def test_wait_for_nodes(ray_start_cluster_head):
"""Unit test for `Cluster.wait_for_nodes`.

Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,8 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id,
SchedulingResources &remote_resources = it->second;

ResourceSet remote_available(
VectorFromProtobuf(heartbeat_data.resources_total_label()),
VectorFromProtobuf(heartbeat_data.resources_total_capacity()));
VectorFromProtobuf(heartbeat_data.resources_available_label()),
VectorFromProtobuf(heartbeat_data.resources_available_capacity()));
ResourceSet remote_load(VectorFromProtobuf(heartbeat_data.resource_load_label()),
VectorFromProtobuf(heartbeat_data.resource_load_capacity()));
// TODO(atumanov): assert that the load is a non-empty ResourceSet.
Expand Down