diff --git a/bazel/python.bzl b/bazel/python.bzl index 2b6f548a24d0..523792fdc4eb 100644 --- a/bazel/python.bzl +++ b/bazel/python.bzl @@ -1,12 +1,22 @@ -# py_test_module_list creates a py_test target for each +# py_test_module_list creates a py_test target for each # Python file in `files` def py_test_module_list(files, size, deps, extra_srcs, **kwargs): - for file in files: - # remove .py - name = file[:-3] - native.py_test( - name=name, - size=size, - srcs=extra_srcs+[file], - **kwargs - ) + for file in files: + # remove .py + name = file[:-3] + native.py_test( + name = name, + size = size, + srcs = extra_srcs + [file], + **kwargs + ) + +def py_test_run_all_subdirectory(include, exclude, extra_srcs, **kwargs): + for file in native.glob(include = include, exclude = exclude): + print(file) + basename = file.rpartition("/")[-1] + native.py_test( + name = basename[:-3], + srcs = extra_srcs + [file], + **kwargs + ) diff --git a/dashboard/BUILD b/dashboard/BUILD index 15ed537e68be..4a4d0894c510 100644 --- a/dashboard/BUILD +++ b/dashboard/BUILD @@ -1,13 +1,19 @@ +load("//bazel:python.bzl", "py_test_run_all_subdirectory") + # This is a dummy test dependency that causes the above tests to be # re-run if any of these files changes. py_library( name = "dashboard_lib", - srcs = glob(["**/*.py"],exclude=["tests/*"]), + srcs = glob( + ["**/*.py"], + exclude = ["tests/*"], + ), ) -py_test( - name = "test_dashboard", +py_test_run_all_subdirectory( size = "small", - srcs = glob(["tests/*.py"]), - deps = [":dashboard_lib"] + include = ["**/test*.py"], + exclude = ["modules/test/**"], + extra_srcs = [], + tags = ["exclusive"], ) diff --git a/dashboard/agent.py b/dashboard/agent.py index f0fce2f917f7..b7c379da3cb5 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -24,6 +24,11 @@ from ray.core.generated import agent_manager_pb2_grpc import psutil +try: + create_task = asyncio.create_task +except AttributeError: + create_task = asyncio.ensure_future + logger = logging.getLogger(__name__) routes = dashboard_utils.ClassMethodRouteTable @@ -36,6 +41,7 @@ def __init__(self, redis_password=None, temp_dir=None, log_dir=None, + metrics_export_port=None, node_manager_port=None, object_store_name=None, raylet_name=None): @@ -45,6 +51,7 @@ def __init__(self, self.redis_password = redis_password self.temp_dir = temp_dir self.log_dir = log_dir + self.metrics_export_port = metrics_export_port self.node_manager_port = node_manager_port self.object_store_name = object_store_name self.raylet_name = raylet_name @@ -69,7 +76,7 @@ def _load_modules(self): c = cls(self) dashboard_utils.ClassMethodRouteTable.bind(c) modules.append(c) - logger.info("Loaded {} modules.".format(len(modules))) + logger.info("Loaded %d modules.", len(modules)) return modules async def run(self): @@ -86,7 +93,7 @@ async def _check_parent(): dashboard_consts. DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS) - check_parent_task = asyncio.create_task(_check_parent()) + check_parent_task = create_task(_check_parent()) # Create an aioredis client for all modules. try: @@ -172,6 +179,11 @@ async def _check_parent(): required=True, type=str, help="The address to use for Redis.") + parser.add_argument( + "--metrics-export-port", + required=True, + type=int, + help="The port to expose metrics through Prometheus.") parser.add_argument( "--node-manager-port", required=True, @@ -277,6 +289,7 @@ async def _check_parent(): redis_password=args.redis_password, temp_dir=temp_dir, log_dir=log_dir, + metrics_export_port=args.metrics_export_port, node_manager_port=args.node_manager_port, object_store_name=args.object_store_name, raylet_name=args.raylet_name) diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py index f8f19ef212af..6abc9293f88e 100644 --- a/dashboard/dashboard.py +++ b/dashboard/dashboard.py @@ -37,10 +37,10 @@ def setup_static_dir(): errno.ENOENT, "Dashboard build directory not found. If installing " "from source, please follow the additional steps " "required to build the dashboard" - "(cd python/ray/{}/client " + f"(cd python/ray/{module_name}/client " "&& npm install " "&& npm ci " - "&& npm run build)".format(module_name), build_dir) + "&& npm run build)", build_dir) static_dir = os.path.join(build_dir, "static") routes.static("/static", static_dir, follow_symlinks=True) @@ -59,14 +59,21 @@ class Dashboard: port(int): Port number of dashboard aiohttp server. redis_address(str): GCS address of a Ray cluster redis_password(str): Redis password to access GCS + log_dir(str): Log directory of dashboard. """ - def __init__(self, host, port, redis_address, redis_password=None): + def __init__(self, + host, + port, + redis_address, + redis_password=None, + log_dir=None): self.dashboard_head = dashboard_head.DashboardHead( http_host=host, http_port=port, redis_address=redis_address, - redis_password=redis_password) + redis_password=redis_password, + log_dir=log_dir) # Setup Dashboard Routes build_dir = setup_static_dir() @@ -197,7 +204,8 @@ async def run(self): args.host, args.port, args.redis_address, - redis_password=args.redis_password) + redis_password=args.redis_password, + log_dir=log_dir) loop = asyncio.get_event_loop() loop.run_until_complete(dashboard.run()) except Exception as e: diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 65e3bb449628..1ee454917671 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -19,7 +19,7 @@ class DataSource: # {actor id hex(str): actor table data(dict of ActorTableData # in gcs.proto)} actors = Dict() - # {ip address(str): dashboard agent grpc server port(int)} + # {ip address(str): dashboard agent [http port(int), grpc port(int)]} agents = Dict() # {ip address(str): gcs node info(dict of GcsNodeInfo in gcs.proto)} nodes = Dict() @@ -56,7 +56,7 @@ async def get_node_actors(cls, hostname): node_worker_id_set.add(worker_stats["workerId"]) node_actors = {} for actor_id, actor_table_data in DataSource.actors.items(): - if actor_table_data["workerId"] in node_worker_id_set: + if actor_table_data["address"]["workerId"] in node_worker_id_set: node_actors[actor_id] = actor_table_data return node_actors @@ -102,6 +102,7 @@ async def get_all_node_summary(cls): for hostname in DataSource.hostname_to_ip.keys(): node_info = await cls.get_node_info(hostname) node_info.pop("workers", None) + node_info.pop("actors", None) node_info["raylet"].pop("workersStats", None) node_info["raylet"].pop("viewData", None) all_nodes_summary.append(node_info) diff --git a/dashboard/head.py b/dashboard/head.py index 7b40ef013064..9a8f273796f3 100644 --- a/dashboard/head.py +++ b/dashboard/head.py @@ -28,7 +28,8 @@ def gcs_node_info_to_dict(message): class DashboardHead: - def __init__(self, http_host, http_port, redis_address, redis_password): + def __init__(self, http_host, http_port, redis_address, redis_password, + log_dir): # NodeInfoGcsService self._gcs_node_info_stub = None self._gcs_rpc_error_counter = 0 @@ -37,6 +38,7 @@ def __init__(self, http_host, http_port, redis_address, redis_password): self.http_port = http_port self.redis_address = dashboard_utils.address_tuple(redis_address) self.redis_password = redis_password + self.log_dir = log_dir self.aioredis_client = None self.aiogrpc_gcs_channel = None self.http_session = None @@ -74,6 +76,22 @@ async def _update_nodes(self): while True: try: nodes = await self._get_nodes() + + # Get correct node info by state, + # 1. The node is ALIVE if any ALIVE node info + # of the hostname exists. + # 2. The node is DEAD if all node info of the + # hostname are DEAD. + hostname_to_node_info = {} + for node in nodes: + hostname = node["nodeManagerAddress"] + assert node["state"] in ["ALIVE", "DEAD"] + choose = hostname_to_node_info.get(hostname) + if choose is not None and choose["state"] == "ALIVE": + continue + hostname_to_node_info[hostname] = node + nodes = hostname_to_node_info.values() + self._gcs_rpc_error_counter = 0 node_ips = [node["nodeManagerAddress"] for node in nodes] node_hostnames = [ @@ -83,13 +101,11 @@ async def _update_nodes(self): agents = dict(DataSource.agents) for node in nodes: node_ip = node["nodeManagerAddress"] - if node_ip not in agents: - key = "{}{}".format( - dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX, - node_ip) - agent_port = await self.aioredis_client.get(key) - if agent_port: - agents[node_ip] = json.loads(agent_port) + key = "{}{}".format( + dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX, node_ip) + agent_port = await self.aioredis_client.get(key) + if agent_port: + agents[node_ip] = json.loads(agent_port) for ip in agents.keys() - set(node_ips): agents.pop(ip, None) @@ -99,8 +115,8 @@ async def _update_nodes(self): dict(zip(node_hostnames, node_ips))) DataSource.ip_to_hostname.reset( dict(zip(node_ips, node_hostnames))) - except aiogrpc.AioRpcError as ex: - logger.exception(ex) + except aiogrpc.AioRpcError: + logger.exception("Got AioRpcError when updating nodes.") self._gcs_rpc_error_counter += 1 if self._gcs_rpc_error_counter > \ dashboard_consts.MAX_COUNT_OF_GCS_RPC_ERROR: @@ -109,8 +125,8 @@ async def _update_nodes(self): self._gcs_rpc_error_counter, dashboard_consts.MAX_COUNT_OF_GCS_RPC_ERROR) sys.exit(-1) - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception("Error updating nodes.") finally: await asyncio.sleep( dashboard_consts.UPDATE_NODES_INTERVAL_SECONDS) @@ -126,7 +142,7 @@ def _load_modules(self): c = cls(self) dashboard_utils.ClassMethodRouteTable.bind(c) modules.append(c) - logger.info("Loaded {} modules.".format(len(modules))) + logger.info("Loaded %d modules.", len(modules)) return modules async def run(self): @@ -183,8 +199,8 @@ async def _async_notify(): co = await dashboard_utils.NotifyQueue.get() try: await co - except Exception as e: - logger.exception(e) + except Exception: + logger.exception(f"Error notifying coroutine {co}") async def _purge_data(): """Purge data in datacenter.""" @@ -193,8 +209,8 @@ async def _purge_data(): dashboard_consts.PURGE_DATA_INTERVAL_SECONDS) try: await DataOrganizer.purge() - except Exception as e: - logger.exception(e) + except Exception: + logger.exception("Error purging data.") modules = self._load_modules() diff --git a/dashboard/modules/log/__init__.py b/dashboard/modules/log/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/dashboard/modules/log/log_agent.py b/dashboard/modules/log/log_agent.py new file mode 100644 index 000000000000..8ef31d96852b --- /dev/null +++ b/dashboard/modules/log/log_agent.py @@ -0,0 +1,19 @@ +import logging + +import mimetypes + +import ray.new_dashboard.utils as dashboard_utils + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + + +class LogAgent(dashboard_utils.DashboardAgentModule): + def __init__(self, dashboard_agent): + super().__init__(dashboard_agent) + mimetypes.add_type("text/plain", ".err") + mimetypes.add_type("text/plain", ".out") + routes.static("/logs", self._dashboard_agent.log_dir, show_index=True) + + async def run(self, server): + pass diff --git a/dashboard/modules/log/log_head.py b/dashboard/modules/log/log_head.py new file mode 100644 index 000000000000..5f1a1f6dcf0a --- /dev/null +++ b/dashboard/modules/log/log_head.py @@ -0,0 +1,69 @@ +import logging + +import mimetypes + +import aiohttp.web +import ray.new_dashboard.utils as dashboard_utils +from ray.new_dashboard.datacenter import DataSource, GlobalSignals + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + + +class LogHead(dashboard_utils.DashboardHeadModule): + LOG_URL_TEMPLATE = "http://{ip}:{port}/logs" + + def __init__(self, dashboard_head): + super().__init__(dashboard_head) + mimetypes.add_type("text/plain", ".err") + mimetypes.add_type("text/plain", ".out") + routes.static("/logs", self._dashboard_head.log_dir, show_index=True) + GlobalSignals.node_info_fetched.append( + self.insert_log_url_to_node_info) + + async def insert_log_url_to_node_info(self, node_info): + ip = node_info.get("ip") + if ip is None: + return + agent_port = DataSource.agents.get(ip) + if agent_port is None: + return + agent_http_port, _ = agent_port + log_url = self.LOG_URL_TEMPLATE.format(ip=ip, port=agent_http_port) + node_info["logUrl"] = log_url + + @routes.get("/log_index") + async def get_log_index(self, req) -> aiohttp.web.Response: + url_list = [] + for ip, ports in DataSource.agents.items(): + url_list.append( + self.LOG_URL_TEMPLATE.format(ip=ip, port=str(ports[0]))) + if self._dashboard_head.ip not in DataSource.agents: + url_list.append( + self.LOG_URL_TEMPLATE.format( + ip=self._dashboard_head.ip, + port=self._dashboard_head.http_port)) + return aiohttp.web.Response( + text=self._directory_as_html(url_list), content_type="text/html") + + @staticmethod + def _directory_as_html(url_list) -> str: + # returns directory's index as html + + index_of = "Index of logs" + h1 = f"

{index_of}

" + + index_list = [] + for url in sorted(url_list): + index_list.append(f'
  • {url}
  • ') + index_list = "\n".join(index_list) + ul = f"" + body = f"\n{h1}\n{ul}\n" + + head_str = f"\n{index_of}\n" + html = f"\n{head_str}\n{body}\n" + + return html + + async def run(self, server): + pass diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/test_log.py new file mode 100644 index 000000000000..a40643f9035e --- /dev/null +++ b/dashboard/modules/log/test_log.py @@ -0,0 +1,115 @@ +import os +import sys +import logging +import requests +import socket +import time +import traceback +import html.parser +import urllib.parse + +import pytest +import ray +from ray.new_dashboard.tests.conftest import * # noqa +from ray.test_utils import ( + format_web_url, + wait_until_server_available, +) + +os.environ["RAY_USE_NEW_DASHBOARD"] = "1" + +logger = logging.getLogger(__name__) + + +class LogUrlParser(html.parser.HTMLParser): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._urls = [] + + def handle_starttag(self, tag, attrs): + if tag == "a": + self._urls.append(dict(attrs)["href"]) + + def error(self, message): + logger.error(message) + + def get_urls(self): + return self._urls + + +def test_log(ray_start_with_dashboard): + @ray.remote + def write_log(s): + print(s) + + test_log_text = "test_log_text" + ray.get(write_log.remote(test_log_text)) + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) + is True) + webui_url = ray_start_with_dashboard["webui_url"] + webui_url = format_web_url(webui_url) + + timeout_seconds = 10 + start_time = time.time() + last_ex = None + while True: + time.sleep(1) + try: + response = requests.get(webui_url + "/log_index") + response.raise_for_status() + parser = LogUrlParser() + parser.feed(response.text) + all_nodes_log_urls = parser.get_urls() + assert len(all_nodes_log_urls) == 1 + + response = requests.get(all_nodes_log_urls[0]) + response.raise_for_status() + parser = LogUrlParser() + parser.feed(response.text) + + # Search test_log_text from all worker logs. + parsed_url = urllib.parse.urlparse(all_nodes_log_urls[0]) + paths = parser.get_urls() + urls = [] + for p in paths: + if "worker" in p: + urls.append(parsed_url._replace(path=p).geturl()) + + for u in urls: + response = requests.get(u) + response.raise_for_status() + if test_log_text in response.text: + break + else: + raise Exception(f"Can't find {test_log_text} from {urls}") + + # Test range request. + response = requests.get( + webui_url + "/logs/dashboard.log", + headers={"Range": "bytes=43-51"}) + response.raise_for_status() + assert response.text == "Dashboard" + + # Test logUrl in node info. + response = requests.get(webui_url + + f"/nodes/{socket.gethostname()}") + response.raise_for_status() + node_info = response.json() + assert node_info["result"] is True + node_info = node_info["data"]["detail"] + assert "logUrl" in node_info + assert node_info["logUrl"] in all_nodes_log_urls + break + except Exception as ex: + last_ex = ex + finally: + if time.time() > start_time + timeout_seconds: + ex_stack = traceback.format_exception( + type(last_ex), last_ex, + last_ex.__traceback__) if last_ex else [] + ex_stack = "".join(ex_stack) + raise Exception(f"Timed out while testing, {ex_stack}") + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index f87569e24e1a..f129a702e4e3 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -6,6 +6,7 @@ import socket import subprocess import sys +import traceback import aioredis @@ -17,6 +18,7 @@ import ray.utils from ray.core.generated import reporter_pb2 from ray.core.generated import reporter_pb2_grpc +from ray.metrics_agent import MetricsAgent import psutil logger = logging.getLogger(__name__) @@ -67,31 +69,46 @@ def __init__(self, dashboard_agent): self._hostname = socket.gethostname() self._workers = set() self._network_stats_hist = [(0, (0.0, 0.0))] # time, (sent, recv) + self._metrics_agent = MetricsAgent(dashboard_agent.metrics_export_port) async def GetProfilingStats(self, request, context): pid = request.pid duration = request.duration profiling_file_path = os.path.join(ray.utils.get_ray_temp_dir(), - "{}_profiling.txt".format(pid)) + f"{pid}_profiling.txt") sudo = "sudo" if ray.utils.get_user() != "root" else "" - process = subprocess.Popen( - (f"{sudo} $(which py-spy) record -o {profiling_file_path} -p {pid}" - f" -d {duration} -f speedscope"), + process = await asyncio.create_subprocess_shell( + f"{sudo} $(which py-spy) record " + f"-o {profiling_file_path} -p {pid} -d {duration} -f speedscope", stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - stdout, stderr = process.communicate() + stdout, stderr = await process.communicate() if process.returncode != 0: profiling_stats = "" else: with open(profiling_file_path, "r") as f: profiling_stats = f.read() return reporter_pb2.GetProfilingStatsReply( - profiling_stats=profiling_stats, stdout=stdout, stderr=stderr) + profiling_stats=profiling_stats, std_out=stdout, std_err=stderr) async def ReportMetrics(self, request, context): - # TODO(sang): Process metrics here. - return reporter_pb2.ReportMetricsReply() + # NOTE: Exceptions are not propagated properly + # when we don't catch them here. + try: + metrcs_description_required = ( + self._metrics_agent.record_metrics_points( + request.metrics_points)) + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + + # If metrics description is missing, we should notify cpp processes + # that we need them. Cpp processes will then report them to here. + # We need it when (1) a new metric is reported (application metric) + # (2) a reporter goes down and restarted (currently not implemented). + return reporter_pb2.ReportMetricsReply( + metrcs_description_required=metrcs_description_required) @staticmethod def _get_cpu_percent(): @@ -223,8 +240,8 @@ async def _perform_iteration(self): await aioredis_client.publish( "{}{}".format(reporter_consts.REPORTER_PREFIX, self._hostname), jsonify_asdict(stats)) - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception("Error publishing node physical stats.") await asyncio.sleep( reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000) diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index fdd262a604bd..6046fd325781 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -1,6 +1,5 @@ import json import logging -import uuid import aiohttp.web from aioredis.pubsub import Receiver @@ -24,58 +23,32 @@ class ReportHead(dashboard_utils.DashboardHeadModule): def __init__(self, dashboard_head): super().__init__(dashboard_head) self._stubs = {} - self._profiling_stats = {} DataSource.agents.signal.append(self._update_stubs) async def _update_stubs(self, change): + if change.old: + ip, port = change.old + self._stubs.pop(ip) if change.new: - ip, ports = next(iter(change.new.items())) - channel = aiogrpc.insecure_channel("{}:{}".format(ip, ports[1])) + ip, ports = change.new + channel = aiogrpc.insecure_channel(f"{ip}:{ports[1]}") stub = reporter_pb2_grpc.ReporterServiceStub(channel) self._stubs[ip] = stub - if change.old: - ip, port = next(iter(change.old.items())) - self._stubs.pop(ip) @routes.get("/api/launch_profiling") async def launch_profiling(self, req) -> aiohttp.web.Response: - node_id = req.query.get("node_id") - pid = int(req.query.get("pid")) - duration = int(req.query.get("duration")) - profiling_id = str(uuid.uuid4()) - reporter_stub = self._stubs[node_id] + ip = req.query["ip"] + pid = int(req.query["pid"]) + duration = int(req.query["duration"]) + reporter_stub = self._stubs[ip] reply = await reporter_stub.GetProfilingStats( reporter_pb2.GetProfilingStatsRequest(pid=pid, duration=duration)) - self._profiling_stats[profiling_id] = reply - return await dashboard_utils.rest_response( - success=True, - message="Profiling launched.", - profiling_id=profiling_id) - - @routes.get("/api/check_profiling_status") - async def check_profiling_status(self, req) -> aiohttp.web.Response: - profiling_id = req.query.get("profiling_id") - is_present = profiling_id in self._profiling_stats - if not is_present: - status = {"status": "pending"} - else: - reply = self._profiling_stats[profiling_id] - if reply.stderr: - status = {"status": "error", "error": reply.stderr} - else: - status = {"status": "finished"} - return await dashboard_utils.rest_response( - success=True, message="Profiling status fetched.", status=status) - - @routes.get("/api/get_profiling_info") - async def get_profiling_info(self, req) -> aiohttp.web.Response: - profiling_id = req.query.get("profiling_id") - profiling_stats = self._profiling_stats.get(profiling_id) - assert profiling_stats, "profiling not finished" + profiling_info = (json.loads(reply.profiling_stats) + if reply.profiling_stats else reply.std_out) return await dashboard_utils.rest_response( success=True, - message="Profiling info fetched.", - profiling_info=json.loads(profiling_stats.profiling_stats)) + message="Profiling success.", + profiling_info=profiling_info) async def run(self, server): aioredis_client = self._dashboard_head.aioredis_client @@ -83,12 +56,13 @@ async def run(self, server): reporter_key = "{}*".format(reporter_consts.REPORTER_PREFIX) await aioredis_client.psubscribe(receiver.pattern(reporter_key)) - logger.info("Subscribed to {}".format(reporter_key)) + logger.info(f"Subscribed to {reporter_key}") async for sender, msg in receiver.iter(): try: _, data = msg data = json.loads(ray.utils.decode(data)) DataSource.node_physical_stats[data["ip"]] = data - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception( + "Error receiving node physical stats from reporter agent.") diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py new file mode 100644 index 000000000000..0097bc465c3a --- /dev/null +++ b/dashboard/modules/reporter/test_reporter.py @@ -0,0 +1,102 @@ +import os +import sys +import logging +import requests +import time + +import pytest +import ray +from ray.new_dashboard.tests.conftest import * # noqa +from ray.test_utils import ( + format_web_url, + RayTestTimeoutException, + wait_until_server_available, + wait_for_condition, +) + +os.environ["RAY_USE_NEW_DASHBOARD"] = "1" + +logger = logging.getLogger(__name__) + + +def test_profiling(shutdown_only): + addresses = ray.init(include_dashboard=True, num_cpus=6) + + @ray.remote(num_cpus=2) + class Actor: + def getpid(self): + return os.getpid() + + c = Actor.remote() + actor_pid = ray.get(c.getpid.remote()) + + webui_url = addresses["webui_url"] + assert (wait_until_server_available(webui_url) is True) + webui_url = format_web_url(webui_url) + + start_time = time.time() + launch_profiling = None + while True: + # Sometimes some startup time is required + if time.time() - start_time > 10: + raise RayTestTimeoutException( + "Timed out while collecting profiling stats, " + f"launch_profiling: {launch_profiling}") + launch_profiling = requests.get( + webui_url + "/api/launch_profiling", + params={ + "ip": ray.nodes()[0]["NodeManagerAddress"], + "pid": actor_pid, + "duration": 5 + }).json() + if launch_profiling["result"]: + profiling_info = launch_profiling["data"]["profilingInfo"] + break + time.sleep(1) + logger.info(profiling_info) + + +def test_node_physical_stats(enable_test_module, shutdown_only): + addresses = ray.init(include_dashboard=True, num_cpus=6) + + @ray.remote(num_cpus=1) + class Actor: + def getpid(self): + return os.getpid() + + actors = [Actor.remote() for _ in range(6)] + actor_pids = ray.get([actor.getpid.remote() for actor in actors]) + actor_pids = set(actor_pids) + + webui_url = addresses["webui_url"] + assert (wait_until_server_available(webui_url) is True) + webui_url = format_web_url(webui_url) + + def _check_workers(): + try: + resp = requests.get(webui_url + + "/test/dump?key=node_physical_stats") + resp.raise_for_status() + result = resp.json() + assert result["result"] is True + node_physical_stats = result["data"]["nodePhysicalStats"] + assert len(node_physical_stats) == 1 + current_stats = node_physical_stats[addresses["raylet_ip_address"]] + # Check Actor workers + current_actor_pids = set() + for worker in current_stats["workers"]: + if "ray::Actor" in worker["cmdline"][0]: + current_actor_pids.add(worker["pid"]) + assert current_actor_pids == actor_pids + # Check raylet cmdline + assert "raylet" in current_stats["cmdline"][0] + return True + except Exception as ex: + logger.info(ex) + return False + + wait_for_condition(_check_workers, timeout=10) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/stats_collector/__init__.py b/dashboard/modules/stats_collector/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/dashboard/modules/stats_collector/stats_collector_consts.py b/dashboard/modules/stats_collector/stats_collector_consts.py new file mode 100644 index 000000000000..a0d58d32602f --- /dev/null +++ b/dashboard/modules/stats_collector/stats_collector_consts.py @@ -0,0 +1,3 @@ +NODE_STATS_UPDATE_INTERVAL_SECONDS = 1 +RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS = 1 +ACTOR_CHANNEL = "ACTOR" diff --git a/dashboard/modules/stats_collector/stats_collector_head.py b/dashboard/modules/stats_collector/stats_collector_head.py new file mode 100644 index 000000000000..5ff21070618b --- /dev/null +++ b/dashboard/modules/stats_collector/stats_collector_head.py @@ -0,0 +1,155 @@ +import asyncio +import logging + +import aiohttp.web +from aioredis.pubsub import Receiver +from grpc.experimental import aio as aiogrpc + +import ray.gcs_utils +import ray.new_dashboard.modules.stats_collector.stats_collector_consts \ + as stats_collector_consts +import ray.new_dashboard.utils as dashboard_utils +from ray.new_dashboard.utils import async_loop_forever +from ray.core.generated import node_manager_pb2 +from ray.core.generated import node_manager_pb2_grpc +from ray.core.generated import gcs_service_pb2 +from ray.core.generated import gcs_service_pb2_grpc +from ray.new_dashboard.datacenter import DataSource, DataOrganizer +from ray.utils import binary_to_hex + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + + +def node_stats_to_dict(message): + return dashboard_utils.message_to_dict( + message, { + "actorId", "jobId", "taskId", "parentTaskId", "sourceActorId", + "callerId", "rayletId", "workerId" + }) + + +def actor_table_data_to_dict(message): + return dashboard_utils.message_to_dict( + message, { + "actorId", "parentId", "jobId", "workerId", "rayletId", + "actorCreationDummyObjectId" + }, + including_default_value_fields=True) + + +class StatsCollector(dashboard_utils.DashboardHeadModule): + def __init__(self, dashboard_head): + super().__init__(dashboard_head) + self._stubs = {} + # JobInfoGcsServiceStub + self._gcs_job_info_stub = None + # ActorInfoGcsService + self._gcs_actor_info_stub = None + DataSource.nodes.signal.append(self._update_stubs) + + async def _update_stubs(self, change): + if change.old: + ip, port = change.old + self._stubs.pop(ip) + if change.new: + ip, node_info = change.new + address = "{}:{}".format(ip, int(node_info["nodeManagerPort"])) + channel = aiogrpc.insecure_channel(address) + stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) + self._stubs[ip] = stub + + @routes.get("/nodes") + async def get_all_nodes(self, req) -> aiohttp.web.Response: + view = req.query.get("view") + if view == "summary": + all_node_summary = await DataOrganizer.get_all_node_summary() + return await dashboard_utils.rest_response( + success=True, + message="Node summary fetched.", + summary=all_node_summary) + elif view is not None and view.lower() == "hostNameList".lower(): + return await dashboard_utils.rest_response( + success=True, + message="Node hostname list fetched.", + host_name_list=list(DataSource.hostname_to_ip.keys())) + else: + return await dashboard_utils.rest_response( + success=False, message=f"Unknown view {view}") + + @routes.get("/nodes/{hostname}") + async def get_node(self, req) -> aiohttp.web.Response: + hostname = req.match_info.get("hostname") + node_info = await DataOrganizer.get_node_info(hostname) + return await dashboard_utils.rest_response( + success=True, message="Node detail fetched.", detail=node_info) + + async def _update_actors(self): + # Subscribe actor channel. + aioredis_client = self._dashboard_head.aioredis_client + receiver = Receiver() + + key = "{}:*".format(stats_collector_consts.ACTOR_CHANNEL) + pattern = receiver.pattern(key) + await aioredis_client.psubscribe(pattern) + logger.info("Subscribed to %s", key) + + # Get all actor info. + while True: + try: + logger.info("Getting all actor info from GCS.") + request = gcs_service_pb2.GetAllActorInfoRequest() + reply = await self._gcs_actor_info_stub.GetAllActorInfo( + request, timeout=2) + if reply.status.code == 0: + result = {} + for actor_info in reply.actor_table_data: + result[binary_to_hex(actor_info.actor_id)] = \ + actor_table_data_to_dict(actor_info) + DataSource.actors.reset(result) + logger.info("Received %d actor info from GCS.", + len(result)) + break + else: + raise Exception( + f"Failed to GetAllActorInfo: {reply.status.message}") + except Exception: + logger.exception("Error Getting all actor info from GCS.") + await asyncio.sleep(stats_collector_consts. + RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS) + + # Receive actors from channel. + async for sender, msg in receiver.iter(): + try: + _, data = msg + pubsub_message = ray.gcs_utils.PubSubMessage.FromString(data) + actor_info = ray.gcs_utils.ActorTableData.FromString( + pubsub_message.data) + DataSource.actors[binary_to_hex(actor_info.actor_id)] = \ + actor_table_data_to_dict(actor_info) + except Exception: + logger.exception("Error receiving actor info.") + + @async_loop_forever( + stats_collector_consts.NODE_STATS_UPDATE_INTERVAL_SECONDS) + async def _update_node_stats(self): + for ip, stub in self._stubs.items(): + node_info = DataSource.nodes.get(ip) + if node_info["state"] != "ALIVE": + continue + try: + reply = await stub.GetNodeStats( + node_manager_pb2.GetNodeStatsRequest(), timeout=2) + reply_dict = node_stats_to_dict(reply) + DataSource.node_stats[ip] = reply_dict + except Exception: + logger.exception(f"Error updating node stats of {ip}.") + + async def run(self, server): + gcs_channel = self._dashboard_head.aiogrpc_gcs_channel + self._gcs_job_info_stub = \ + gcs_service_pb2_grpc.JobInfoGcsServiceStub(gcs_channel) + self._gcs_actor_info_stub = \ + gcs_service_pb2_grpc.ActorInfoGcsServiceStub(gcs_channel) + + await asyncio.gather(self._update_node_stats(), self._update_actors()) diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py new file mode 100644 index 000000000000..72e614437d10 --- /dev/null +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -0,0 +1,93 @@ +import os +import sys +import logging +import requests +import time +import traceback + +import pytest +import ray +from ray.new_dashboard.tests.conftest import * # noqa +from ray.test_utils import ( + format_web_url, + wait_until_server_available, +) + +os.environ["RAY_USE_NEW_DASHBOARD"] = "1" + +logger = logging.getLogger(__name__) + + +def test_node_info(ray_start_with_dashboard): + @ray.remote + class Actor: + def getpid(self): + return os.getpid() + + actors = [Actor.remote(), Actor.remote()] + actor_pids = [actor.getpid.remote() for actor in actors] + actor_pids = set(ray.get(actor_pids)) + + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) + is True) + webui_url = ray_start_with_dashboard["webui_url"] + webui_url = format_web_url(webui_url) + + timeout_seconds = 10 + start_time = time.time() + last_ex = None + while True: + time.sleep(1) + try: + response = requests.get(webui_url + "/nodes?view=hostnamelist") + response.raise_for_status() + hostname_list = response.json() + assert hostname_list["result"] is True, hostname_list["msg"] + hostname_list = hostname_list["data"]["hostNameList"] + assert len(hostname_list) == 1 + + hostname = hostname_list[0] + response = requests.get(webui_url + f"/nodes/{hostname}") + response.raise_for_status() + detail = response.json() + assert detail["result"] is True, detail["msg"] + detail = detail["data"]["detail"] + assert detail["hostname"] == hostname + assert detail["state"] == "ALIVE" + assert "raylet" in detail["cmdline"][0] + assert len(detail["workers"]) >= 2 + assert len(detail["actors"]) == 2, detail["actors"] + assert len(detail["raylet"]["viewData"]) > 0 + + actor_worker_pids = set() + for worker in detail["workers"]: + if "ray::Actor" in worker["cmdline"][0]: + actor_worker_pids.add(worker["pid"]) + assert actor_worker_pids == actor_pids + + response = requests.get(webui_url + "/nodes?view=summary") + response.raise_for_status() + summary = response.json() + assert summary["result"] is True, summary["msg"] + assert len(summary["data"]["summary"]) == 1 + summary = summary["data"]["summary"][0] + assert summary["hostname"] == hostname + assert summary["state"] == "ALIVE" + assert "raylet" in summary["cmdline"][0] + assert "workers" not in summary + assert "actors" not in summary + assert "viewData" not in summary["raylet"] + break + except Exception as ex: + last_ex = ex + finally: + if time.time() > start_time + timeout_seconds: + ex_stack = traceback.format_exception( + type(last_ex), last_ex, + last_ex.__traceback__) if last_ex else [] + ex_stack = "".join(ex_stack) + raise Exception(f"Timed out while testing, {ex_stack}") + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/test/test_agent.py b/dashboard/modules/test/test_agent.py index efe9c3285a77..dd4597f7dc16 100644 --- a/dashboard/modules/test/test_agent.py +++ b/dashboard/modules/test/test_agent.py @@ -4,12 +4,16 @@ import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules.test.test_utils as test_utils +import ray.new_dashboard.modules.test.test_consts as test_consts +from ray.ray_constants import env_bool logger = logging.getLogger(__name__) routes = dashboard_utils.ClassMethodRouteTable -class HeadAgent(dashboard_utils.DashboardAgentModule): +@dashboard_utils.dashboard_module( + enable=env_bool(test_consts.TEST_MODULE_ENVIRONMENT_KEY, False)) +class TestAgent(dashboard_utils.DashboardAgentModule): def __init__(self, dashboard_agent): super().__init__(dashboard_agent) @@ -20,5 +24,17 @@ async def get_url(self, req) -> aiohttp.web.Response: url) return aiohttp.web.json_response(result) + @routes.head("/test/route_head") + async def route_head(self, req) -> aiohttp.web.Response: + pass + + @routes.post("/test/route_post") + async def route_post(self, req) -> aiohttp.web.Response: + pass + + @routes.patch("/test/route_patch") + async def route_patch(self, req) -> aiohttp.web.Response: + pass + async def run(self, server): pass diff --git a/dashboard/modules/test/test_consts.py b/dashboard/modules/test/test_consts.py new file mode 100644 index 000000000000..3c53b928c1ef --- /dev/null +++ b/dashboard/modules/test/test_consts.py @@ -0,0 +1 @@ +TEST_MODULE_ENVIRONMENT_KEY = "RAY_DASHBOARD_MODULE_TEST" diff --git a/dashboard/modules/test/test_head.py b/dashboard/modules/test/test_head.py index 61e7635e0e5b..699fc15458cd 100644 --- a/dashboard/modules/test/test_head.py +++ b/dashboard/modules/test/test_head.py @@ -4,12 +4,16 @@ import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules.test.test_utils as test_utils +import ray.new_dashboard.modules.test.test_consts as test_consts from ray.new_dashboard.datacenter import DataSource +from ray.ray_constants import env_bool logger = logging.getLogger(__name__) routes = dashboard_utils.ClassMethodRouteTable +@dashboard_utils.dashboard_module( + enable=env_bool(test_consts.TEST_MODULE_ENVIRONMENT_KEY, False)) class TestHead(dashboard_utils.DashboardHeadModule): def __init__(self, dashboard_head): super().__init__(dashboard_head) @@ -17,12 +21,28 @@ def __init__(self, dashboard_head): DataSource.agents.signal.append(self._update_notified_agents) async def _update_notified_agents(self, change): - if change.new: - ip, ports = next(iter(change.new.items())) - self._notified_agents[ip] = ports if change.old: - ip, port = next(iter(change.old.items())) + ip, port = change.old self._notified_agents.pop(ip) + if change.new: + ip, ports = change.new + self._notified_agents[ip] = ports + + @routes.get("/test/route_get") + async def route_get(self, req) -> aiohttp.web.Response: + pass + + @routes.put("/test/route_put") + async def route_put(self, req) -> aiohttp.web.Response: + pass + + @routes.delete("/test/route_delete") + async def route_delete(self, req) -> aiohttp.web.Response: + pass + + @routes.view("/test/route_view") + async def route_view(self, req) -> aiohttp.web.Response: + pass @routes.get("/test/dump") async def dump(self, req) -> aiohttp.web.Response: @@ -41,7 +61,7 @@ async def dump(self, req) -> aiohttp.web.Response: data = dict(DataSource.__dict__.get(key)) return await dashboard_utils.rest_response( success=True, - message="Fetch {} from datacenter success.".format(key), + message=f"Fetch {key} from datacenter success.", **{key: data}) @routes.get("/test/notified_agents") diff --git a/dashboard/tests/conftest.py b/dashboard/tests/conftest.py index a60ce1007d3f..030e258069c5 100644 --- a/dashboard/tests/conftest.py +++ b/dashboard/tests/conftest.py @@ -1 +1,10 @@ +import os +import pytest from ray.tests.conftest import * # noqa + + +@pytest.fixture +def enable_test_module(): + os.environ["RAY_DASHBOARD_MODULE_TEST"] = "true" + yield + os.environ.pop("RAY_DASHBOARD_MODULE_TEST", None) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index a6ebcaa49132..7983a02c174a 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -1,7 +1,11 @@ import os +import sys +import json import time import logging +import asyncio +import aiohttp.web import ray import psutil import pytest @@ -9,13 +13,20 @@ import requests from ray import ray_constants -from ray.test_utils import wait_for_condition, wait_until_server_available +from ray.test_utils import ( + format_web_url, + wait_for_condition, + wait_until_server_available, + run_string_as_driver, +) import ray.new_dashboard.consts as dashboard_consts +import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules os.environ["RAY_USE_NEW_DASHBOARD"] = "1" logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable def cleanup_test_files(): @@ -106,7 +117,9 @@ def _search_agent(processes): logger.info("Test agent register is OK.") wait_for_condition(lambda: _search_agent(raylet_proc.children())) - assert dashboard_proc.status() == psutil.STATUS_RUNNING + assert dashboard_proc.status() in [ + psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING + ] agent_proc = _search_agent(raylet_proc.children()) agent_pid = agent_proc.pid @@ -130,13 +143,13 @@ def _search_agent(processes): assert agent_ports is not None -def test_nodes_update(ray_start_with_dashboard): +def test_nodes_update(enable_test_module, ray_start_with_dashboard): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True) webui_url = ray_start_with_dashboard["webui_url"] - webui_url = webui_url.replace("localhost", "http://127.0.0.1") + webui_url = format_web_url(webui_url) - timeout_seconds = 20 + timeout_seconds = 10 start_time = time.time() while True: time.sleep(1) @@ -146,7 +159,7 @@ def test_nodes_update(ray_start_with_dashboard): try: dump_info = response.json() except Exception as ex: - logger.info("failed response: {}".format(response.text)) + logger.info("failed response: %s", response.text) raise ex assert dump_info["result"] is True dump_data = dump_info["data"] @@ -162,7 +175,7 @@ def test_nodes_update(ray_start_with_dashboard): try: notified_agents = response.json() except Exception as ex: - logger.info("failed response: {}".format(response.text)) + logger.info("failed response: %s", response.text) raise ex assert notified_agents["result"] is True notified_agents = notified_agents["data"] @@ -173,19 +186,18 @@ def test_nodes_update(ray_start_with_dashboard): logger.info("Retry because of %s", e) finally: if time.time() > start_time + timeout_seconds: - raise Exception( - "Timed out while waiting for dashboard to start.") + raise Exception("Timed out while testing.") -def test_http_get(ray_start_with_dashboard): +def test_http_get(enable_test_module, ray_start_with_dashboard): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True) webui_url = ray_start_with_dashboard["webui_url"] - webui_url = webui_url.replace("localhost", "http://127.0.0.1") + webui_url = format_web_url(webui_url) target_url = webui_url + "/test/dump" - timeout_seconds = 20 + timeout_seconds = 10 start_time = time.time() while True: time.sleep(1) @@ -196,7 +208,7 @@ def test_http_get(ray_start_with_dashboard): try: dump_info = response.json() except Exception as ex: - logger.info("failed response: {}".format(response.text)) + logger.info("failed response: %s", response.text) raise ex assert dump_info["result"] is True dump_data = dump_info["data"] @@ -205,13 +217,13 @@ def test_http_get(ray_start_with_dashboard): http_port, grpc_port = ports response = requests.get( - "http://{}:{}/test/http_get_from_agent?url={}".format( - ip, http_port, target_url)) + f"http://{ip}:{http_port}" + f"/test/http_get_from_agent?url={target_url}") response.raise_for_status() try: dump_info = response.json() except Exception as ex: - logger.info("failed response: {}".format(response.text)) + logger.info("failed response: %s", response.text) raise ex assert dump_info["result"] is True break @@ -219,5 +231,135 @@ def test_http_get(ray_start_with_dashboard): logger.info("Retry because of %s", e) finally: if time.time() > start_time + timeout_seconds: - raise Exception( - "Timed out while waiting for dashboard to start.") + raise Exception("Timed out while testing.") + + +def test_class_method_route_table(enable_test_module): + head_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardHeadModule) + agent_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardAgentModule) + test_head_cls = None + for cls in head_cls_list: + if cls.__name__ == "TestHead": + test_head_cls = cls + break + assert test_head_cls is not None + test_agent_cls = None + for cls in agent_cls_list: + if cls.__name__ == "TestAgent": + test_agent_cls = cls + break + assert test_agent_cls is not None + + def _has_route(route, method, path): + if isinstance(route, aiohttp.web.RouteDef): + if route.method == method and route.path == path: + return True + return False + + def _has_static(route, path, prefix): + if isinstance(route, aiohttp.web.StaticDef): + if route.path == path and route.prefix == prefix: + return True + return False + + all_routes = dashboard_utils.ClassMethodRouteTable.routes() + assert any(_has_route(r, "HEAD", "/test/route_head") for r in all_routes) + assert any(_has_route(r, "GET", "/test/route_get") for r in all_routes) + assert any(_has_route(r, "POST", "/test/route_post") for r in all_routes) + assert any(_has_route(r, "PUT", "/test/route_put") for r in all_routes) + assert any(_has_route(r, "PATCH", "/test/route_patch") for r in all_routes) + assert any( + _has_route(r, "DELETE", "/test/route_delete") for r in all_routes) + assert any(_has_route(r, "*", "/test/route_view") for r in all_routes) + + # Test bind() + bound_routes = dashboard_utils.ClassMethodRouteTable.bound_routes() + assert len(bound_routes) == 0 + dashboard_utils.ClassMethodRouteTable.bind( + test_agent_cls.__new__(test_agent_cls)) + bound_routes = dashboard_utils.ClassMethodRouteTable.bound_routes() + assert any(_has_route(r, "POST", "/test/route_post") for r in bound_routes) + assert all( + not _has_route(r, "PUT", "/test/route_put") for r in bound_routes) + + # Static def should be in bound routes. + routes.static("/test/route_static", "/path") + bound_routes = dashboard_utils.ClassMethodRouteTable.bound_routes() + assert any( + _has_static(r, "/path", "/test/route_static") for r in bound_routes) + + # Test duplicated routes should raise exception. + try: + + @routes.get("/test/route_get") + def _duplicated_route(req): + pass + + raise Exception("Duplicated routes should raise exception.") + except Exception as ex: + message = str(ex) + assert "/test/route_get" in message + assert "test_head.py" in message + + # Test exception in handler + post_handler = None + for r in bound_routes: + if _has_route(r, "POST", "/test/route_post"): + post_handler = r.handler + break + assert post_handler is not None + + loop = asyncio.get_event_loop() + r = loop.run_until_complete(post_handler()) + assert r.status == 200 + resp = json.loads(r.body) + assert resp["result"] is False + assert "Traceback" in resp["msg"] + + +def test_async_loop_forever(): + counter = [0] + + @dashboard_utils.async_loop_forever(interval_seconds=0.1) + async def foo(): + counter[0] += 1 + raise Exception("Test exception") + + loop = asyncio.get_event_loop() + loop.create_task(foo()) + loop.call_later(1, loop.stop) + loop.run_forever() + assert counter[0] > 2 + + +def test_dashboard_module_decorator(enable_test_module): + head_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardHeadModule) + agent_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardAgentModule) + + assert any(cls.__name__ == "TestHead" for cls in head_cls_list) + assert any(cls.__name__ == "TestAgent" for cls in agent_cls_list) + + test_code = """ +import os +import ray.new_dashboard.utils as dashboard_utils + +os.environ.pop("RAY_DASHBOARD_MODULE_TEST") +head_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardHeadModule) +agent_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardAgentModule) +print(head_cls_list) +print(agent_cls_list) +assert all(cls.__name__ != "TestHead" for cls in head_cls_list) +assert all(cls.__name__ != "TestAgent" for cls in agent_cls_list) +print("success") +""" + run_string_as_driver(test_code) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/utils.py b/dashboard/utils.py index 054fe9fd4839..9fd170890b8e 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -13,6 +13,7 @@ import traceback from base64 import b64decode from collections.abc import MutableMapping, Mapping +from collections import namedtuple from typing import Any import aioredis @@ -103,10 +104,9 @@ def _register_route(cls, method, path, **kwargs): def _wrapper(handler): if path in cls._bind_map[method]: bind_info = cls._bind_map[method][path] - raise Exception("Duplicated route path: {}, " - "previous one registered at {}:{}".format( - path, bind_info.filename, - bind_info.lineno)) + raise Exception(f"Duplicated route path: {path}, " + f"previous one registered at " + f"{bind_info.filename}:{bind_info.lineno}") bind_info = cls._BindInfo(handler.__code__.co_filename, handler.__code__.co_firstlineno, None) @@ -174,15 +174,28 @@ def predicate(o): h.__func__.__route_path__].instance = instance +def dashboard_module(enable): + """A decorator for dashboard module.""" + + def _cls_wrapper(cls): + cls.__ray_dashboard_module_enable__ = enable + return cls + + return _cls_wrapper + + def get_all_modules(module_type): - logger.info("Get all modules by type: {}".format(module_type.__name__)) + logger.info(f"Get all modules by type: {module_type.__name__}") import ray.new_dashboard.modules for module_loader, name, ispkg in pkgutil.walk_packages( ray.new_dashboard.modules.__path__, ray.new_dashboard.modules.__name__ + "."): importlib.import_module(name) - return module_type.__subclasses__() + return [ + m for m in module_type.__subclasses__() + if getattr(m, "__ray_dashboard_module_enable__", True) + ] def to_posix_time(dt): @@ -314,8 +327,7 @@ def __init__(self, owner=None, old=None, new=None): self.new = new def __str__(self): - return "Change(owner: {}, old: {}, new: {}".format( - self.owner, self.old, self.new) + return f"Change(owner: {self.owner}, old: {self.old}, new: {self.new}" class NotifyQueue: @@ -338,6 +350,8 @@ class Dict(MutableMapping): :note: Only the first level data report change. """ + ChangeItem = namedtuple("DictChangeItem", ["key", "value"]) + def __init__(self, *args, **kwargs): self._data = dict(*args, **kwargs) self.signal = Signal(self) @@ -347,10 +361,14 @@ def __setitem__(self, key, value): self._data[key] = value if len(self.signal) and old != value: if old is None: - co = self.signal.send(Change(owner=self, new={key: value})) + co = self.signal.send( + Change(owner=self, new=Dict.ChangeItem(key, value))) else: co = self.signal.send( - Change(owner=self, old={key: old}, new={key: value})) + Change( + owner=self, + old=Dict.ChangeItem(key, old), + new=Dict.ChangeItem(key, value))) NotifyQueue.put(co) def __getitem__(self, item): @@ -359,7 +377,8 @@ def __getitem__(self, item): def __delitem__(self, key): old = self._data.pop(key, None) if len(self.signal) and old is not None: - co = self.signal.send(Change(owner=self, old={key: old})) + co = self.signal.send( + Change(owner=self, old=Dict.ChangeItem(key, old))) NotifyQueue.put(co) def __len__(self): @@ -387,3 +406,19 @@ async def get_aioredis_client(redis_address, redis_password, # Raise exception from create_redis_pool return await aioredis.create_redis_pool( address=redis_address, password=redis_password) + + +def async_loop_forever(interval_seconds): + def _wrapper(coro): + @functools.wraps(coro) + async def _looper(*args, **kwargs): + while True: + try: + await coro(*args, **kwargs) + except Exception: + logger.exception(f"Error looping coroutine {coro}.") + await asyncio.sleep(interval_seconds) + + return _looper + + return _wrapper diff --git a/python/ray/services.py b/python/ray/services.py index a52e32ec4873..46f002840708 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1413,6 +1413,7 @@ def start_raylet(redis_address, os.path.dirname(os.path.abspath(__file__)), "new_dashboard/agent.py"), "--redis-address={}".format(redis_address), + "--metrics-export-port={}".format(metrics_export_port), "--node-manager-port={}".format(node_manager_port), "--object-store-name={}".format(plasma_store_name), "--raylet-name={}".format(raylet_name), diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 99cbd31c49ec..8bb01bf226af 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -428,3 +428,11 @@ def get_error_message(pub_sub, num, error_type=None, timeout=5): time.sleep(0.01) return msgs + + +def format_web_url(url): + """Format web url.""" + url = url.replace("localhost", "http://127.0.0.1") + if not url.startswith("http://"): + return "http://" + url + return url diff --git a/python/ray/utils.py b/python/ray/utils.py index 6cb9fb68ea94..f5071a7f9dbb 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -122,7 +122,7 @@ def push_error_to_driver_through_redis(redis_client, error_data = ray.gcs_utils.construct_error_message(job_id, error_type, message, time.time()) pubsub_msg = ray.gcs_utils.PubSubMessage() - pubsub_msg.id = job_id.hex() + pubsub_msg.id = job_id.binary() pubsub_msg.data = error_data redis_client.publish("ERROR_INFO:" + job_id.hex(), pubsub_msg.SerializeAsString()) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 425f967ab1e0..abd3d7712b28 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2146,6 +2146,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & stats->set_current_task_func_desc(current_task_.FunctionDescriptor()->ToString()); stats->set_ip_address(rpc_address_.ip_address()); stats->set_port(rpc_address_.port()); + stats->set_job_id(worker_context_.GetCurrentJobID().Binary()); stats->set_actor_id(actor_id_.Binary()); auto used_resources_map = stats->mutable_used_resources(); for (auto const &it : *resource_ids_) { diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 1c622b602955..fa2cf15a6d70 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -384,6 +384,8 @@ message CoreWorkerStats { string actor_title = 16; // Local reference table. repeated ObjectRefInfo object_refs = 17; + // Job ID. + bytes job_id = 18; } message MetricPoint { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index c33f0abcbbee..45c22be962b5 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -114,6 +114,10 @@ message WorkerStats { CoreWorkerStats core_worker_stats = 3; // Error string if fetching core worker stats failed. string fetch_error = 4; + // Worker id of core worker. + bytes worker_id = 5; + // Worker language. + Language language = 6; } message GetNodeStatsReply { @@ -122,6 +126,7 @@ message GetNodeStatsReply { uint32 num_workers = 3; repeated TaskSpec infeasible_tasks = 4; repeated TaskSpec ready_tasks = 5; + int32 pid = 6; } message GlobalGCRequest { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3007da6db279..bce98cba5fd6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3147,6 +3147,7 @@ void NodeManager::FlushObjectsToFree() { void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request, rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { + reply->set_pid(getpid()); for (const auto &task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { if (task.GetTaskSpecification().IsActorCreationTask()) { auto infeasible_task = reply->add_infeasible_tasks(); @@ -3211,6 +3212,10 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_ all_workers.push_back(driver); driver_ids.insert(driver->WorkerId()); } + if (all_workers.empty()) { + send_reply_callback(Status::OK(), nullptr, nullptr); + return; + } for (const auto &worker : all_workers) { rpc::GetCoreWorkerStatsRequest request; request.set_intended_worker_id(worker->WorkerId().Binary()); @@ -3220,7 +3225,9 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_ const ray::Status &status, const rpc::GetCoreWorkerStatsReply &r) { auto worker_stats = reply->add_workers_stats(); worker_stats->set_pid(worker->GetProcess().GetId()); + worker_stats->set_worker_id(worker->WorkerId().Binary()); worker_stats->set_is_driver(driver_ids.contains(worker->WorkerId())); + worker_stats->set_language(worker->GetLanguage()); reply->set_num_workers(reply->num_workers() + 1); if (status.ok()) { worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats());