From 6e1b8cef9b22d29de02f13a3182b4a001831d0b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Tue, 18 Aug 2020 16:31:53 +0800 Subject: [PATCH 01/21] Improve reporter module --- dashboard/agent.py | 8 +++ dashboard/datacenter.py | 2 +- dashboard/head.py | 28 ++++++++--- dashboard/modules/reporter/reporter_agent.py | 27 ++++++++-- dashboard/modules/reporter/reporter_head.py | 45 ++++------------- dashboard/modules/reporter/test_reporter.py | 52 ++++++++++++++++++++ python/ray/services.py | 1 + 7 files changed, 114 insertions(+), 49 deletions(-) create mode 100644 dashboard/modules/reporter/test_reporter.py diff --git a/dashboard/agent.py b/dashboard/agent.py index f0fce2f917f7..9c522a964d01 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -36,6 +36,7 @@ def __init__(self, redis_password=None, temp_dir=None, log_dir=None, + metrics_export_port=None, node_manager_port=None, object_store_name=None, raylet_name=None): @@ -45,6 +46,7 @@ def __init__(self, self.redis_password = redis_password self.temp_dir = temp_dir self.log_dir = log_dir + self.metrics_export_port = metrics_export_port self.node_manager_port = node_manager_port self.object_store_name = object_store_name self.raylet_name = raylet_name @@ -172,6 +174,11 @@ async def _check_parent(): required=True, type=str, help="The address to use for Redis.") + parser.add_argument( + "--metrics-export-port", + required=True, + type=int, + help="The port to expose metrics through Prometheus.") parser.add_argument( "--node-manager-port", required=True, @@ -277,6 +284,7 @@ async def _check_parent(): redis_password=args.redis_password, temp_dir=temp_dir, log_dir=log_dir, + metrics_export_port=args.metrics_export_port, node_manager_port=args.node_manager_port, object_store_name=args.object_store_name, raylet_name=args.raylet_name) diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 65e3bb449628..d607cfdc3205 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -19,7 +19,7 @@ class DataSource: # {actor id hex(str): actor table data(dict of ActorTableData # in gcs.proto)} actors = Dict() - # {ip address(str): dashboard agent grpc server port(int)} + # {ip address(str): dashboard agent [http port(int), grpc port(int)]} agents = Dict() # {ip address(str): gcs node info(dict of GcsNodeInfo in gcs.proto)} nodes = Dict() diff --git a/dashboard/head.py b/dashboard/head.py index 7b40ef013064..1b947694c9d1 100644 --- a/dashboard/head.py +++ b/dashboard/head.py @@ -74,6 +74,22 @@ async def _update_nodes(self): while True: try: nodes = await self._get_nodes() + + # Get correct node info by state, + # 1. The node is ALIVE if any ALIVE node info + # of the hostname exists. + # 2. The node is DEAD if all node info of the + # hostname are DEAD. + hostname_to_node_info = {} + for node in nodes: + hostname = node["nodeManagerAddress"] + assert node["state"] in ["ALIVE", "DEAD"] + choose = hostname_to_node_info.get(hostname) + if choose is not None and choose["state"] == "ALIVE": + continue + hostname_to_node_info[hostname] = node + nodes = hostname_to_node_info.values() + self._gcs_rpc_error_counter = 0 node_ips = [node["nodeManagerAddress"] for node in nodes] node_hostnames = [ @@ -83,13 +99,11 @@ async def _update_nodes(self): agents = dict(DataSource.agents) for node in nodes: node_ip = node["nodeManagerAddress"] - if node_ip not in agents: - key = "{}{}".format( - dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX, - node_ip) - agent_port = await self.aioredis_client.get(key) - if agent_port: - agents[node_ip] = json.loads(agent_port) + key = "{}{}".format( + dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX, node_ip) + agent_port = await self.aioredis_client.get(key) + if agent_port: + agents[node_ip] = json.loads(agent_port) for ip in agents.keys() - set(node_ips): agents.pop(ip, None) diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index 4e3f7cd55d6d..80fecdd8152e 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -6,6 +6,7 @@ import socket import subprocess import sys +import traceback import aioredis @@ -17,6 +18,7 @@ import ray.utils from ray.core.generated import reporter_pb2 from ray.core.generated import reporter_pb2_grpc +from ray.metrics_agent import MetricsAgent import psutil logger = logging.getLogger(__name__) @@ -67,30 +69,45 @@ def __init__(self, dashboard_agent): self._hostname = socket.gethostname() self._workers = set() self._network_stats_hist = [(0, (0.0, 0.0))] # time, (sent, recv) + self._metrics_agent = MetricsAgent(dashboard_agent.metrics_export_port) async def GetProfilingStats(self, request, context): pid = request.pid duration = request.duration profiling_file_path = os.path.join(ray.utils.get_ray_temp_dir(), "{}_profiling.txt".format(pid)) - process = subprocess.Popen( + process = await asyncio.create_subprocess_shell( "sudo $(which py-spy) record -o {} -p {} -d {} -f speedscope" .format(profiling_file_path, pid, duration), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - stdout, stderr = process.communicate() + stdout, stderr = await process.communicate() if process.returncode != 0: profiling_stats = "" else: with open(profiling_file_path, "r") as f: profiling_stats = f.read() return reporter_pb2.GetProfilingStatsReply( - profiling_stats=profiling_stats, stdout=stdout, stderr=stderr) + profiling_stats=profiling_stats, std_out=stdout, std_err=stderr) async def ReportMetrics(self, request, context): - # TODO(sang): Process metrics here. - return reporter_pb2.ReportMetricsReply() + # NOTE: Exceptions are not propagated properly + # when we don't catch them here. + try: + metrcs_description_required = ( + self._metrics_agent.record_metrics_points( + request.metrics_points)) + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + + # If metrics description is missing, we should notify cpp processes + # that we need them. Cpp processes will then report them to here. + # We need it when (1) a new metric is reported (application metric) + # (2) a reporter goes down and restarted (currently not implemented). + return reporter_pb2.ReportMetricsReply( + metrcs_description_required=metrcs_description_required) @staticmethod def _get_cpu_percent(): diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index fdd262a604bd..ed79aff816b5 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -1,6 +1,5 @@ import json import logging -import uuid import aiohttp.web from aioredis.pubsub import Receiver @@ -24,58 +23,32 @@ class ReportHead(dashboard_utils.DashboardHeadModule): def __init__(self, dashboard_head): super().__init__(dashboard_head) self._stubs = {} - self._profiling_stats = {} DataSource.agents.signal.append(self._update_stubs) async def _update_stubs(self, change): + if change.old: + ip, port = next(iter(change.old.items())) + self._stubs.pop(ip) if change.new: ip, ports = next(iter(change.new.items())) channel = aiogrpc.insecure_channel("{}:{}".format(ip, ports[1])) stub = reporter_pb2_grpc.ReporterServiceStub(channel) self._stubs[ip] = stub - if change.old: - ip, port = next(iter(change.old.items())) - self._stubs.pop(ip) @routes.get("/api/launch_profiling") async def launch_profiling(self, req) -> aiohttp.web.Response: - node_id = req.query.get("node_id") + ip = req.query.get("ip") pid = int(req.query.get("pid")) duration = int(req.query.get("duration")) - profiling_id = str(uuid.uuid4()) - reporter_stub = self._stubs[node_id] + reporter_stub = self._stubs[ip] reply = await reporter_stub.GetProfilingStats( reporter_pb2.GetProfilingStatsRequest(pid=pid, duration=duration)) - self._profiling_stats[profiling_id] = reply - return await dashboard_utils.rest_response( - success=True, - message="Profiling launched.", - profiling_id=profiling_id) - - @routes.get("/api/check_profiling_status") - async def check_profiling_status(self, req) -> aiohttp.web.Response: - profiling_id = req.query.get("profiling_id") - is_present = profiling_id in self._profiling_stats - if not is_present: - status = {"status": "pending"} - else: - reply = self._profiling_stats[profiling_id] - if reply.stderr: - status = {"status": "error", "error": reply.stderr} - else: - status = {"status": "finished"} - return await dashboard_utils.rest_response( - success=True, message="Profiling status fetched.", status=status) - - @routes.get("/api/get_profiling_info") - async def get_profiling_info(self, req) -> aiohttp.web.Response: - profiling_id = req.query.get("profiling_id") - profiling_stats = self._profiling_stats.get(profiling_id) - assert profiling_stats, "profiling not finished" + print("ccc", reply.std_err, reply.std_out) return await dashboard_utils.rest_response( success=True, - message="Profiling info fetched.", - profiling_info=json.loads(profiling_stats.profiling_stats)) + message="Profiling success.", + profiling_info=json.loads(reply.profiling_stats) + if reply.profiling_stats else reply.std_out) async def run(self, server): aioredis_client = self._dashboard_head.aioredis_client diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py new file mode 100644 index 000000000000..14e64559d0a8 --- /dev/null +++ b/dashboard/modules/reporter/test_reporter.py @@ -0,0 +1,52 @@ +import os +import logging +import requests +import time + +import ray +from ray.new_dashboard.tests.conftest import * # noqa +from ray.test_utils import ( + RayTestTimeoutException, + wait_until_server_available, +) + +os.environ["RAY_USE_NEW_DASHBOARD"] = "1" + +logger = logging.getLogger(__name__) + + +def test_profiling(shutdown_only): + addresses = ray.init(include_dashboard=True, num_cpus=6) + + @ray.remote(num_cpus=2) + class Actor: + def getpid(self): + return os.getpid() + + c = Actor.remote() + actor_pid = ray.get(c.getpid.remote()) + + webui_url = addresses["webui_url"] + assert (wait_until_server_available(webui_url) is True) + webui_url = webui_url.replace("localhost", "http://127.0.0.1") + + start_time = time.time() + launch_profiling = None + while True: + # Sometimes some startup time is required + if time.time() - start_time > 30: + raise RayTestTimeoutException( + "Timed out while collecting profiling stats, " + "launch_profiling: {}".format(launch_profiling)) + launch_profiling = requests.get( + webui_url + "/api/launch_profiling", + params={ + "ip": ray.nodes()[0]["NodeManagerAddress"], + "pid": actor_pid, + "duration": 5 + }).json() + if launch_profiling["result"]: + profiling_info = launch_profiling["data"]["profilingInfo"] + break + time.sleep(1) + logger.info(profiling_info) diff --git a/python/ray/services.py b/python/ray/services.py index 7acbbf075ca1..c8802673cc81 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1412,6 +1412,7 @@ def start_raylet(redis_address, os.path.dirname(os.path.abspath(__file__)), "new_dashboard/agent.py"), "--redis-address={}".format(redis_address), + "--metrics-export-port={}".format(metrics_export_port), "--node-manager-port={}".format(node_manager_port), "--object-store-name={}".format(plasma_store_name), "--raylet-name={}".format(raylet_name), From ee3615d809cdfc9fa1b9c554d716fde7291de157 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Wed, 19 Aug 2020 17:09:37 +0800 Subject: [PATCH 02/21] Add test_node_physical_stats to test_reporter.py --- dashboard/modules/reporter/test_reporter.py | 43 +++++++++++++++++++++ python/ray/utils.py | 2 +- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py index 14e64559d0a8..e4db3df46ea0 100644 --- a/dashboard/modules/reporter/test_reporter.py +++ b/dashboard/modules/reporter/test_reporter.py @@ -8,6 +8,7 @@ from ray.test_utils import ( RayTestTimeoutException, wait_until_server_available, + wait_for_condition, ) os.environ["RAY_USE_NEW_DASHBOARD"] = "1" @@ -50,3 +51,45 @@ def getpid(self): break time.sleep(1) logger.info(profiling_info) + + +def test_node_physical_stats(shutdown_only): + addresses = ray.init(include_dashboard=True, num_cpus=6) + + @ray.remote(num_cpus=1) + class Actor: + def getpid(self): + return os.getpid() + + actors = [Actor.remote() for _ in range(6)] + actor_pids = ray.get([actor.getpid.remote() for actor in actors]) + actor_pids = set(actor_pids) + + webui_url = addresses["webui_url"] + assert (wait_until_server_available(webui_url) is True) + webui_url = webui_url.replace("localhost", "http://127.0.0.1") + + def _check_workers(): + try: + resp = requests.get(webui_url + + "/test/dump?key=node_physical_stats") + resp.raise_for_status() + result = resp.json() + assert result["result"] is True + node_physical_stats = result["data"]["nodePhysicalStats"] + assert len(node_physical_stats) == 1 + current_stats = node_physical_stats[addresses["raylet_ip_address"]] + # Check Actor workers + current_actor_pids = set() + for worker in current_stats["workers"]: + if "ray::Actor" in worker["cmdline"][0]: + current_actor_pids.add(worker["pid"]) + assert current_actor_pids == actor_pids + # Check raylet cmdline + assert "raylet" in current_stats["cmdline"][0] + return True + except Exception as ex: + logger.info(ex) + return False + + wait_for_condition(_check_workers, timeout=30) diff --git a/python/ray/utils.py b/python/ray/utils.py index c41c76ba5601..854cff823ffd 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -118,7 +118,7 @@ def push_error_to_driver_through_redis(redis_client, error_data = ray.gcs_utils.construct_error_message(job_id, error_type, message, time.time()) pubsub_msg = ray.gcs_utils.PubSubMessage() - pubsub_msg.id = job_id.hex() + pubsub_msg.id = job_id.binary() pubsub_msg.data = error_data redis_client.publish("ERROR_INFO:" + job_id.hex(), pubsub_msg.SerializeAsString()) From 5da159192246c288c14abbf6e4efb1ec784ac8e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Thu, 20 Aug 2020 16:18:16 +0800 Subject: [PATCH 03/21] Add test_class_method_route_table to test_dashboard.py --- dashboard/modules/test/test_agent.py | 14 ++++- dashboard/modules/test/test_head.py | 16 +++++ dashboard/tests/test_dashboard.py | 88 ++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 1 deletion(-) diff --git a/dashboard/modules/test/test_agent.py b/dashboard/modules/test/test_agent.py index efe9c3285a77..c60c2db8a0d5 100644 --- a/dashboard/modules/test/test_agent.py +++ b/dashboard/modules/test/test_agent.py @@ -9,7 +9,7 @@ routes = dashboard_utils.ClassMethodRouteTable -class HeadAgent(dashboard_utils.DashboardAgentModule): +class TestAgent(dashboard_utils.DashboardAgentModule): def __init__(self, dashboard_agent): super().__init__(dashboard_agent) @@ -20,5 +20,17 @@ async def get_url(self, req) -> aiohttp.web.Response: url) return aiohttp.web.json_response(result) + @routes.head("/test/route_head") + async def route_head(self, req) -> aiohttp.web.Response: + pass + + @routes.post("/test/route_post") + async def route_post(self, req) -> aiohttp.web.Response: + pass + + @routes.patch("/test/route_patch") + async def route_patch(self, req) -> aiohttp.web.Response: + pass + async def run(self, server): pass diff --git a/dashboard/modules/test/test_head.py b/dashboard/modules/test/test_head.py index 61e7635e0e5b..aed0a49efc12 100644 --- a/dashboard/modules/test/test_head.py +++ b/dashboard/modules/test/test_head.py @@ -24,6 +24,22 @@ async def _update_notified_agents(self, change): ip, port = next(iter(change.old.items())) self._notified_agents.pop(ip) + @routes.get("/test/route_get") + async def route_get(self, req) -> aiohttp.web.Response: + pass + + @routes.put("/test/route_put") + async def route_put(self, req) -> aiohttp.web.Response: + pass + + @routes.delete("/test/route_delete") + async def route_delete(self, req) -> aiohttp.web.Response: + pass + + @routes.view("/test/route_view") + async def route_view(self, req) -> aiohttp.web.Response: + pass + @routes.get("/test/dump") async def dump(self, req) -> aiohttp.web.Response: key = req.query.get("key") diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 325044ad7257..0dd9f0201418 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -2,7 +2,9 @@ import json import time import logging +import asyncio +import aiohttp.web import ray import psutil import pytest @@ -12,11 +14,13 @@ from ray import ray_constants from ray.test_utils import wait_for_condition, wait_until_server_available import ray.new_dashboard.consts as dashboard_consts +import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules os.environ["RAY_USE_NEW_DASHBOARD"] = "1" logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable def cleanup_test_files(): @@ -222,3 +226,87 @@ def test_http_get(ray_start_with_dashboard): if time.time() > start_time + timeout_seconds: raise Exception( "Timed out while waiting for dashboard to start.") + + +def test_class_method_route_table(): + head_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardHeadModule) + agent_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardAgentModule) + test_head_cls = None + for cls in head_cls_list: + if cls.__name__ == "TestHead": + test_head_cls = cls + break + assert test_head_cls is not None + test_agent_cls = None + for cls in agent_cls_list: + if cls.__name__ == "TestAgent": + test_agent_cls = cls + break + assert test_agent_cls is not None + + def _has_route(route, method, path): + if isinstance(route, aiohttp.web.RouteDef): + if route.method == method and route.path == path: + return True + return False + + def _has_static(route, path, prefix): + if isinstance(route, aiohttp.web.StaticDef): + if route.path == path and route.prefix == prefix: + return True + return False + + all_routes = dashboard_utils.ClassMethodRouteTable.routes() + assert any(_has_route(r, "HEAD", "/test/route_head") for r in all_routes) + assert any(_has_route(r, "GET", "/test/route_get") for r in all_routes) + assert any(_has_route(r, "POST", "/test/route_post") for r in all_routes) + assert any(_has_route(r, "PUT", "/test/route_put") for r in all_routes) + assert any(_has_route(r, "PATCH", "/test/route_patch") for r in all_routes) + assert any( + _has_route(r, "DELETE", "/test/route_delete") for r in all_routes) + assert any(_has_route(r, "*", "/test/route_view") for r in all_routes) + + # Test bind() + bound_routes = dashboard_utils.ClassMethodRouteTable.bound_routes() + assert len(bound_routes) == 0 + dashboard_utils.ClassMethodRouteTable.bind( + test_agent_cls.__new__(test_agent_cls)) + bound_routes = dashboard_utils.ClassMethodRouteTable.bound_routes() + assert any(_has_route(r, "POST", "/test/route_post") for r in bound_routes) + assert all( + not _has_route(r, "PUT", "/test/route_put") for r in bound_routes) + + # Static def should be in bound routes. + routes.static("/test/route_static", "/path") + bound_routes = dashboard_utils.ClassMethodRouteTable.bound_routes() + assert any( + _has_static(r, "/path", "/test/route_static") for r in bound_routes) + + # Test duplicated routes should raise exception. + try: + + @routes.get("/test/route_get") + def _duplicated_route(req): + pass + + raise Exception("Duplicated routes should raise exception.") + except Exception as ex: + message = str(ex) + assert "/test/route_get" in message + assert "test_head.py" in message + + # Test exception in handler + post_handler = None + for r in bound_routes: + if _has_route(r, "POST", "/test/route_post"): + post_handler = r.handler + break + assert post_handler is not None + + r = asyncio.run(post_handler()) + assert r.status == 200 + resp = json.loads(r.body) + assert resp["result"] is False + assert "Traceback" in resp["msg"] From 169ec0c9d4b548c30c6ce526078359aca5873bd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Thu, 20 Aug 2020 20:25:24 +0800 Subject: [PATCH 04/21] Add stats_collector module for dashboard --- dashboard/datacenter.py | 3 +- dashboard/modules/stats_collector/__init__.py | 0 .../stats_collector/stats_collector_consts.py | 2 + .../stats_collector/stats_collector_head.py | 123 ++++++++++++++++++ .../stats_collector/test_stats_collector.py | 74 +++++++++++ dashboard/tests/test_dashboard.py | 23 +++- dashboard/utils.py | 16 +++ src/ray/core_worker/core_worker.cc | 1 + src/ray/protobuf/common.proto | 3 + src/ray/protobuf/node_manager.proto | 5 + src/ray/raylet/node_manager.cc | 7 + 11 files changed, 255 insertions(+), 2 deletions(-) create mode 100644 dashboard/modules/stats_collector/__init__.py create mode 100644 dashboard/modules/stats_collector/stats_collector_consts.py create mode 100644 dashboard/modules/stats_collector/stats_collector_head.py create mode 100644 dashboard/modules/stats_collector/test_stats_collector.py diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index d607cfdc3205..1ee454917671 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -56,7 +56,7 @@ async def get_node_actors(cls, hostname): node_worker_id_set.add(worker_stats["workerId"]) node_actors = {} for actor_id, actor_table_data in DataSource.actors.items(): - if actor_table_data["workerId"] in node_worker_id_set: + if actor_table_data["address"]["workerId"] in node_worker_id_set: node_actors[actor_id] = actor_table_data return node_actors @@ -102,6 +102,7 @@ async def get_all_node_summary(cls): for hostname in DataSource.hostname_to_ip.keys(): node_info = await cls.get_node_info(hostname) node_info.pop("workers", None) + node_info.pop("actors", None) node_info["raylet"].pop("workersStats", None) node_info["raylet"].pop("viewData", None) all_nodes_summary.append(node_info) diff --git a/dashboard/modules/stats_collector/__init__.py b/dashboard/modules/stats_collector/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/dashboard/modules/stats_collector/stats_collector_consts.py b/dashboard/modules/stats_collector/stats_collector_consts.py new file mode 100644 index 000000000000..140dab14981b --- /dev/null +++ b/dashboard/modules/stats_collector/stats_collector_consts.py @@ -0,0 +1,2 @@ +NODE_STATS_UPDATE_INTERVAL_SECONDS = 1 +ACTOR_UPDATE_INTERVAL_SECONDS = 1 diff --git a/dashboard/modules/stats_collector/stats_collector_head.py b/dashboard/modules/stats_collector/stats_collector_head.py new file mode 100644 index 000000000000..9dea2546abc1 --- /dev/null +++ b/dashboard/modules/stats_collector/stats_collector_head.py @@ -0,0 +1,123 @@ +import asyncio +import logging + +import aiohttp.web +from grpc.experimental import aio as aiogrpc + +import ray.new_dashboard.modules.stats_collector.stats_collector_consts \ + as stats_collector_consts +import ray.new_dashboard.utils as dashboard_utils +from ray.new_dashboard.utils import async_loop_forever +from ray.core.generated import node_manager_pb2 +from ray.core.generated import node_manager_pb2_grpc +from ray.core.generated import gcs_service_pb2 +from ray.core.generated import gcs_service_pb2_grpc +from ray.new_dashboard.datacenter import DataSource, DataOrganizer +from ray.utils import binary_to_hex + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + + +def node_stats_to_dict(message): + return dashboard_utils.message_to_dict( + message, { + "actorId", "jobId", "taskId", "parentTaskId", "sourceActorId", + "callerId", "rayletId", "workerId" + }) + + +def actor_table_data_to_dict(message): + return dashboard_utils.message_to_dict( + message, { + "actorId", "parentId", "jobId", "workerId", "rayletId", + "actorCreationDummyObjectId" + }, + including_default_value_fields=True) + + +class StatsCollector(dashboard_utils.DashboardHeadModule): + def __init__(self, dashboard_head): + super().__init__(dashboard_head) + self._stubs = {} + # JobInfoGcsServiceStub + self._gcs_job_info_stub = None + # ActorInfoGcsService + self._gcs_actor_info_stub = None + DataSource.nodes.signal.append(self._update_stubs) + + async def _update_stubs(self, change): + if change.old: + ip, port = next(iter(change.old.items())) + self._stubs.pop(ip) + if change.new: + ip, node_info = next(iter(change.new.items())) + address = "{}:{}".format(ip, int(node_info["nodeManagerPort"])) + channel = aiogrpc.insecure_channel(address) + stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) + self._stubs[ip] = stub + + @routes.get("/nodes") + async def get_all_nodes(self, req) -> aiohttp.web.Response: + view = req.query.get("view") + if view == "summary": + all_node_summary = await DataOrganizer.get_all_node_summary() + return await dashboard_utils.rest_response( + success=True, + message="Node summary fetched.", + summary=all_node_summary) + elif view is not None and view.lower() == "hostNameList".lower(): + return await dashboard_utils.rest_response( + success=True, + message="Node hostname list fetched.", + host_name_list=list(DataSource.hostname_to_ip.keys())) + else: + return await dashboard_utils.rest_response( + success=False, message="Unknown view {}".format(view)) + + @routes.get("/nodes/{hostname}") + async def get_node(self, req) -> aiohttp.web.Response: + hostname = req.match_info.get("hostname") + node_info = await DataOrganizer.get_node_info(hostname) + return await dashboard_utils.rest_response( + success=True, message="Node detail fetched.", detail=node_info) + + @async_loop_forever(stats_collector_consts.ACTOR_UPDATE_INTERVAL_SECONDS) + async def _update_actors(self): + """ We don't want to rely on redis pubsub, the performance of current + implementation is poor. + TODO(fyrestone): GCS push actor info to dashboard. + """ + request = gcs_service_pb2.GetAllActorInfoRequest() + reply = await self._gcs_actor_info_stub.GetAllActorInfo( + request, timeout=2) + if reply.status.code == 0: + results = {} + for actor_info in reply.actor_table_data: + results[binary_to_hex(actor_info.actor_id)] = \ + actor_table_data_to_dict(actor_info) + DataSource.actors.reset(results) + else: + raise Exception("Failed to GetAllActorInfo: {}".format( + reply.status.message)) + + @async_loop_forever( + stats_collector_consts.NODE_STATS_UPDATE_INTERVAL_SECONDS) + async def _update_node_stats(self): + for ip, stub in self._stubs.items(): + node_info = DataSource.nodes.get(ip) + if node_info["state"] != "ALIVE": + continue + reply = await stub.GetNodeStats( + node_manager_pb2.GetNodeStatsRequest(), timeout=2) + reply_dict = node_stats_to_dict(reply) + DataSource.node_stats[ip] = reply_dict + + async def run(self, server): + gcs_channel = self._dashboard_head.aiogrpc_gcs_channel + self._gcs_job_info_stub = \ + gcs_service_pb2_grpc.JobInfoGcsServiceStub(gcs_channel) + self._gcs_actor_info_stub = \ + gcs_service_pb2_grpc.ActorInfoGcsServiceStub(gcs_channel) + + await asyncio.gather(self._update_node_stats(), self._update_actors()) diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py new file mode 100644 index 000000000000..1072e927994b --- /dev/null +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -0,0 +1,74 @@ +import os +import logging +import requests +import time + +import ray +from ray.new_dashboard.tests.conftest import * # noqa +from ray.test_utils import ( + wait_until_server_available, ) + +os.environ["RAY_USE_NEW_DASHBOARD"] = "1" + +logger = logging.getLogger(__name__) + + +def test_node_info(ray_start_with_dashboard): + @ray.remote + class Actor: + def getpid(self): + return os.getpid() + + actors = [Actor.remote(), Actor.remote()] + actor_pids = [actor.getpid.remote() for actor in actors] + actor_pids = ray.get(actor_pids) + + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) + is True) + webui_url = ray_start_with_dashboard["webui_url"] + webui_url = webui_url.replace("localhost", "http://127.0.0.1") + + timeout_seconds = 20 + start_time = time.time() + while True: + time.sleep(1) + try: + response = requests.get(webui_url + "/nodes?view=hostnamelist") + response.raise_for_status() + hostname_list = response.json() + assert hostname_list["result"] is True + hostname_list = hostname_list["data"]["hostNameList"] + assert len(hostname_list) == 1 + + hostname = hostname_list[0] + response = requests.get(webui_url + "/nodes/{}".format(hostname)) + response.raise_for_status() + detail = response.json() + assert detail["result"] is True + detail = detail["data"]["detail"] + assert detail["hostname"] == hostname + assert detail["state"] == "ALIVE" + assert "raylet" in detail["cmdline"][0] + assert len(detail["workers"]) >= 2 + assert len(detail["actors"]) == 2 + assert len(detail["raylet"]["viewData"]) > 0 + + response = requests.get(webui_url + "/nodes?view=summary") + response.raise_for_status() + summary = response.json() + assert summary["result"] is True + assert len(summary["data"]["summary"]) == 1 + summary = summary["data"]["summary"][0] + assert summary["hostname"] == hostname + assert summary["state"] == "ALIVE" + assert "raylet" in summary["cmdline"][0] + assert "workers" not in summary + assert "actors" not in summary + assert "viewData" not in summary["raylet"] + break + except Exception as ex: + logger.info(ex) + finally: + if time.time() > start_time + timeout_seconds: + raise Exception( + "Timed out while waiting for dashboard to start.") diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 0dd9f0201418..6533c049ec2f 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -17,6 +17,11 @@ import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules +try: + create_task = asyncio.create_task +except AttributeError: + create_task = asyncio.ensure_future + os.environ["RAY_USE_NEW_DASHBOARD"] = "1" logger = logging.getLogger(__name__) @@ -305,8 +310,24 @@ def _duplicated_route(req): break assert post_handler is not None - r = asyncio.run(post_handler()) + loop = asyncio.get_event_loop() + r = loop.run_until_complete(post_handler()) assert r.status == 200 resp = json.loads(r.body) assert resp["result"] is False assert "Traceback" in resp["msg"] + + +def test_async_loop_forever(): + counter = [0] + + @dashboard_utils.async_loop_forever(interval_seconds=1) + async def foo(): + counter[0] += 1 + raise Exception("Test exception") + + loop = asyncio.get_event_loop() + loop.create_task(foo()) + loop.call_later(4, loop.stop) + loop.run_forever() + assert counter[0] > 2 diff --git a/dashboard/utils.py b/dashboard/utils.py index 054fe9fd4839..e556b4633814 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -387,3 +387,19 @@ async def get_aioredis_client(redis_address, redis_password, # Raise exception from create_redis_pool return await aioredis.create_redis_pool( address=redis_address, password=redis_password) + + +def async_loop_forever(interval_seconds): + def _wrapper(coro): + @functools.wraps(coro) + async def _looper(*args, **kwargs): + while True: + try: + await coro(*args, **kwargs) + except Exception as ex: + logger.exception(ex) + await asyncio.sleep(interval_seconds) + + return _looper + + return _wrapper diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 007822da5919..2326cd638bb7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2176,6 +2176,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & stats->set_current_task_func_desc(current_task_.FunctionDescriptor()->ToString()); stats->set_ip_address(rpc_address_.ip_address()); stats->set_port(rpc_address_.port()); + stats->set_job_id(worker_context_.GetCurrentJobID().Binary()); stats->set_actor_id(actor_id_.Binary()); auto used_resources_map = stats->mutable_used_resources(); for (auto const &it : *resource_ids_) { diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 6e9952093766..19c7d6523f39 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -1,3 +1,4 @@ + // Copyright 2017 The Ray Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -340,6 +341,8 @@ message CoreWorkerStats { string actor_title = 16; // Local reference table. repeated ObjectRefInfo object_refs = 17; + // Job ID. + bytes job_id = 18; } message MetricPoint { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index c33f0abcbbee..45c22be962b5 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -114,6 +114,10 @@ message WorkerStats { CoreWorkerStats core_worker_stats = 3; // Error string if fetching core worker stats failed. string fetch_error = 4; + // Worker id of core worker. + bytes worker_id = 5; + // Worker language. + Language language = 6; } message GetNodeStatsReply { @@ -122,6 +126,7 @@ message GetNodeStatsReply { uint32 num_workers = 3; repeated TaskSpec infeasible_tasks = 4; repeated TaskSpec ready_tasks = 5; + int32 pid = 6; } message GlobalGCRequest { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 094236e91447..288e9d6088cb 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3207,6 +3207,7 @@ void NodeManager::FlushObjectsToFree() { void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request, rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { + reply->set_pid(getpid()); for (const auto &task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { if (task.GetTaskSpecification().IsActorCreationTask()) { auto infeasible_task = reply->add_infeasible_tasks(); @@ -3271,6 +3272,10 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_ all_workers.push_back(driver); driver_ids.insert(driver->WorkerId()); } + if (all_workers.empty()) { + send_reply_callback(Status::OK(), nullptr, nullptr); + return; + } for (const auto &worker : all_workers) { rpc::GetCoreWorkerStatsRequest request; request.set_intended_worker_id(worker->WorkerId().Binary()); @@ -3280,7 +3285,9 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_ const ray::Status &status, const rpc::GetCoreWorkerStatsReply &r) { auto worker_stats = reply->add_workers_stats(); worker_stats->set_pid(worker->GetProcess().GetId()); + worker_stats->set_worker_id(worker->WorkerId().Binary()); worker_stats->set_is_driver(driver_ids.contains(worker->WorkerId())); + worker_stats->set_language(worker->GetLanguage()); reply->set_num_workers(reply->num_workers() + 1); if (status.ok()) { worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats()); From bef8f0a5eb5f26da818b2b9ccacb11b2ab8fb1df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Fri, 21 Aug 2020 13:55:28 +0800 Subject: [PATCH 05/21] Subscribe actor table data --- .../stats_collector/stats_collector_consts.py | 3 +- .../stats_collector/stats_collector_head.py | 72 +++++++++++++------ .../stats_collector/test_stats_collector.py | 26 +++++-- 3 files changed, 73 insertions(+), 28 deletions(-) diff --git a/dashboard/modules/stats_collector/stats_collector_consts.py b/dashboard/modules/stats_collector/stats_collector_consts.py index 140dab14981b..a0d58d32602f 100644 --- a/dashboard/modules/stats_collector/stats_collector_consts.py +++ b/dashboard/modules/stats_collector/stats_collector_consts.py @@ -1,2 +1,3 @@ NODE_STATS_UPDATE_INTERVAL_SECONDS = 1 -ACTOR_UPDATE_INTERVAL_SECONDS = 1 +RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS = 1 +ACTOR_CHANNEL = "ACTOR" diff --git a/dashboard/modules/stats_collector/stats_collector_head.py b/dashboard/modules/stats_collector/stats_collector_head.py index 9dea2546abc1..1f270e5ba6c5 100644 --- a/dashboard/modules/stats_collector/stats_collector_head.py +++ b/dashboard/modules/stats_collector/stats_collector_head.py @@ -2,8 +2,10 @@ import logging import aiohttp.web +from aioredis.pubsub import Receiver from grpc.experimental import aio as aiogrpc +import ray.gcs_utils import ray.new_dashboard.modules.stats_collector.stats_collector_consts \ as stats_collector_consts import ray.new_dashboard.utils as dashboard_utils @@ -82,24 +84,51 @@ async def get_node(self, req) -> aiohttp.web.Response: return await dashboard_utils.rest_response( success=True, message="Node detail fetched.", detail=node_info) - @async_loop_forever(stats_collector_consts.ACTOR_UPDATE_INTERVAL_SECONDS) async def _update_actors(self): - """ We don't want to rely on redis pubsub, the performance of current - implementation is poor. - TODO(fyrestone): GCS push actor info to dashboard. - """ - request = gcs_service_pb2.GetAllActorInfoRequest() - reply = await self._gcs_actor_info_stub.GetAllActorInfo( - request, timeout=2) - if reply.status.code == 0: - results = {} - for actor_info in reply.actor_table_data: - results[binary_to_hex(actor_info.actor_id)] = \ + # Subscribe actor channel. + aioredis_client = self._dashboard_head.aioredis_client + receiver = Receiver() + + key = "{}:*".format(stats_collector_consts.ACTOR_CHANNEL) + pattern = receiver.pattern(key) + await aioredis_client.psubscribe(pattern) + logger.info("Subscribed to {}".format(key)) + + # Get all actor info. + while True: + try: + logger.info("Getting all actor info from GCS.") + request = gcs_service_pb2.GetAllActorInfoRequest() + reply = await self._gcs_actor_info_stub.GetAllActorInfo( + request, timeout=2) + if reply.status.code == 0: + result = {} + for actor_info in reply.actor_table_data: + result[binary_to_hex(actor_info.actor_id)] = \ + actor_table_data_to_dict(actor_info) + DataSource.actors.reset(result) + logger.info("Received {} actor info from GCS.".format( + len(result))) + break + else: + raise Exception("Failed to GetAllActorInfo: {}".format( + reply.status.message)) + except Exception as ex: + logger.exception(ex) + await asyncio.sleep(stats_collector_consts. + RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS) + + # Receive actors from channel. + async for sender, msg in receiver.iter(): + try: + _, data = msg + pubsub_message = ray.gcs_utils.PubSubMessage.FromString(data) + actor_info = ray.gcs_utils.ActorTableData.FromString( + pubsub_message.data) + DataSource.actors[binary_to_hex(actor_info.actor_id)] = \ actor_table_data_to_dict(actor_info) - DataSource.actors.reset(results) - else: - raise Exception("Failed to GetAllActorInfo: {}".format( - reply.status.message)) + except Exception as ex: + logger.exception(ex) @async_loop_forever( stats_collector_consts.NODE_STATS_UPDATE_INTERVAL_SECONDS) @@ -108,10 +137,13 @@ async def _update_node_stats(self): node_info = DataSource.nodes.get(ip) if node_info["state"] != "ALIVE": continue - reply = await stub.GetNodeStats( - node_manager_pb2.GetNodeStatsRequest(), timeout=2) - reply_dict = node_stats_to_dict(reply) - DataSource.node_stats[ip] = reply_dict + try: + reply = await stub.GetNodeStats( + node_manager_pb2.GetNodeStatsRequest(), timeout=2) + reply_dict = node_stats_to_dict(reply) + DataSource.node_stats[ip] = reply_dict + except Exception as ex: + logger.exception(ex) async def run(self, server): gcs_channel = self._dashboard_head.aiogrpc_gcs_channel diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py index 1072e927994b..9a309b152f4c 100644 --- a/dashboard/modules/stats_collector/test_stats_collector.py +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -2,6 +2,7 @@ import logging import requests import time +import traceback import ray from ray.new_dashboard.tests.conftest import * # noqa @@ -21,7 +22,7 @@ def getpid(self): actors = [Actor.remote(), Actor.remote()] actor_pids = [actor.getpid.remote() for actor in actors] - actor_pids = ray.get(actor_pids) + actor_pids = set(ray.get(actor_pids)) assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True) @@ -30,13 +31,14 @@ def getpid(self): timeout_seconds = 20 start_time = time.time() + last_ex = None while True: time.sleep(1) try: response = requests.get(webui_url + "/nodes?view=hostnamelist") response.raise_for_status() hostname_list = response.json() - assert hostname_list["result"] is True + assert hostname_list["result"] is True, hostname_list["msg"] hostname_list = hostname_list["data"]["hostNameList"] assert len(hostname_list) == 1 @@ -44,19 +46,25 @@ def getpid(self): response = requests.get(webui_url + "/nodes/{}".format(hostname)) response.raise_for_status() detail = response.json() - assert detail["result"] is True + assert detail["result"] is True, detail["msg"] detail = detail["data"]["detail"] assert detail["hostname"] == hostname assert detail["state"] == "ALIVE" assert "raylet" in detail["cmdline"][0] assert len(detail["workers"]) >= 2 - assert len(detail["actors"]) == 2 + assert len(detail["actors"]) == 2, detail["actors"] assert len(detail["raylet"]["viewData"]) > 0 + actor_worker_pids = set() + for worker in detail["workers"]: + if "ray::Actor" in worker["cmdline"][0]: + actor_worker_pids.add(worker["pid"]) + assert actor_worker_pids == actor_pids + response = requests.get(webui_url + "/nodes?view=summary") response.raise_for_status() summary = response.json() - assert summary["result"] is True + assert summary["result"] is True, summary["msg"] assert len(summary["data"]["summary"]) == 1 summary = summary["data"]["summary"][0] assert summary["hostname"] == hostname @@ -67,8 +75,12 @@ def getpid(self): assert "viewData" not in summary["raylet"] break except Exception as ex: - logger.info(ex) + last_ex = ex finally: if time.time() > start_time + timeout_seconds: + ex_stack = traceback.format_exception( + type(last_ex), last_ex, + last_ex.__traceback__) if last_ex else [] raise Exception( - "Timed out while waiting for dashboard to start.") + "Timed out while waiting for dashboard to start, {}". + format("".join(ex_stack))) From 57ab2b4c35d33b88de6ad46e3c022ab0b5346a0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Fri, 21 Aug 2020 17:51:14 +0800 Subject: [PATCH 06/21] Add log module for dashboard --- dashboard/dashboard.py | 14 +++- dashboard/head.py | 4 +- dashboard/modules/log/__init__.py | 0 dashboard/modules/log/log_agent.py | 19 +++++ dashboard/modules/log/log_head.py | 69 ++++++++++++++++++ dashboard/modules/log/test_log.py | 109 +++++++++++++++++++++++++++++ 6 files changed, 211 insertions(+), 4 deletions(-) create mode 100644 dashboard/modules/log/__init__.py create mode 100644 dashboard/modules/log/log_agent.py create mode 100644 dashboard/modules/log/log_head.py create mode 100644 dashboard/modules/log/test_log.py diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py index f8f19ef212af..dd7fa716a62a 100644 --- a/dashboard/dashboard.py +++ b/dashboard/dashboard.py @@ -59,14 +59,21 @@ class Dashboard: port(int): Port number of dashboard aiohttp server. redis_address(str): GCS address of a Ray cluster redis_password(str): Redis password to access GCS + log_dir(str): Log directory of dashboard. """ - def __init__(self, host, port, redis_address, redis_password=None): + def __init__(self, + host, + port, + redis_address, + redis_password=None, + log_dir=None): self.dashboard_head = dashboard_head.DashboardHead( http_host=host, http_port=port, redis_address=redis_address, - redis_password=redis_password) + redis_password=redis_password, + log_dir=log_dir) # Setup Dashboard Routes build_dir = setup_static_dir() @@ -197,7 +204,8 @@ async def run(self): args.host, args.port, args.redis_address, - redis_password=args.redis_password) + redis_password=args.redis_password, + log_dir=log_dir) loop = asyncio.get_event_loop() loop.run_until_complete(dashboard.run()) except Exception as e: diff --git a/dashboard/head.py b/dashboard/head.py index 1b947694c9d1..5c7ebca16f5e 100644 --- a/dashboard/head.py +++ b/dashboard/head.py @@ -28,7 +28,8 @@ def gcs_node_info_to_dict(message): class DashboardHead: - def __init__(self, http_host, http_port, redis_address, redis_password): + def __init__(self, http_host, http_port, redis_address, redis_password, + log_dir): # NodeInfoGcsService self._gcs_node_info_stub = None self._gcs_rpc_error_counter = 0 @@ -37,6 +38,7 @@ def __init__(self, http_host, http_port, redis_address, redis_password): self.http_port = http_port self.redis_address = dashboard_utils.address_tuple(redis_address) self.redis_password = redis_password + self.log_dir = log_dir self.aioredis_client = None self.aiogrpc_gcs_channel = None self.http_session = None diff --git a/dashboard/modules/log/__init__.py b/dashboard/modules/log/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/dashboard/modules/log/log_agent.py b/dashboard/modules/log/log_agent.py new file mode 100644 index 000000000000..8ef31d96852b --- /dev/null +++ b/dashboard/modules/log/log_agent.py @@ -0,0 +1,19 @@ +import logging + +import mimetypes + +import ray.new_dashboard.utils as dashboard_utils + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + + +class LogAgent(dashboard_utils.DashboardAgentModule): + def __init__(self, dashboard_agent): + super().__init__(dashboard_agent) + mimetypes.add_type("text/plain", ".err") + mimetypes.add_type("text/plain", ".out") + routes.static("/logs", self._dashboard_agent.log_dir, show_index=True) + + async def run(self, server): + pass diff --git a/dashboard/modules/log/log_head.py b/dashboard/modules/log/log_head.py new file mode 100644 index 000000000000..0f65f2de89f5 --- /dev/null +++ b/dashboard/modules/log/log_head.py @@ -0,0 +1,69 @@ +import logging + +import mimetypes + +import aiohttp.web +import ray.new_dashboard.utils as dashboard_utils +from ray.new_dashboard.datacenter import DataSource, GlobalSignals + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + + +class LogHead(dashboard_utils.DashboardHeadModule): + LOG_URL_TEMPLATE = "http://{ip}:{port}/logs" + + def __init__(self, dashboard_head): + super().__init__(dashboard_head) + mimetypes.add_type("text/plain", ".err") + mimetypes.add_type("text/plain", ".out") + routes.static("/logs", self._dashboard_head.log_dir, show_index=True) + GlobalSignals.node_info_fetched.append( + self.insert_log_url_to_node_info) + + async def insert_log_url_to_node_info(self, node_info): + ip = node_info.get("ip") + if ip is None: + return + agent_port = DataSource.agents.get(ip) + if agent_port is None: + return + agent_http_port, _ = agent_port + log_url = self.LOG_URL_TEMPLATE.format(ip=ip, port=agent_http_port) + node_info["logUrl"] = log_url + + @routes.get("/log_index") + async def get_log_index(self, req) -> aiohttp.web.Response: + url_list = [] + for ip, ports in DataSource.agents.items(): + url_list.append( + self.LOG_URL_TEMPLATE.format(ip=ip, port=str(ports[0]))) + if self._dashboard_head.ip not in DataSource.agents: + url_list.append( + self.LOG_URL_TEMPLATE.format( + ip=self._dashboard_head.ip, + port=self._dashboard_head.http_port)) + return aiohttp.web.Response( + text=self._directory_as_html(url_list), content_type="text/html") + + @staticmethod + def _directory_as_html(url_list) -> str: + # returns directory's index as html + + index_of = "Index of logs" + h1 = "

{}

".format(index_of) + + index_list = [] + for url in sorted(url_list): + index_list.append('
  • {name}
  • '.format( + url=url, name=url)) + ul = "
      \n{}\n
    ".format("\n".join(index_list)) + body = "\n{}\n{}\n".format(h1, ul) + + head_str = "\n{}\n".format(index_of) + html = "\n{}\n{}\n".format(head_str, body) + + return html + + async def run(self, server): + pass diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/test_log.py new file mode 100644 index 000000000000..7d7be608336c --- /dev/null +++ b/dashboard/modules/log/test_log.py @@ -0,0 +1,109 @@ +import os +import logging +import requests +import socket +import time +import traceback +import html.parser +import urllib.parse + +import ray +from ray.new_dashboard.tests.conftest import * # noqa +from ray.test_utils import ( + wait_until_server_available, ) + +os.environ["RAY_USE_NEW_DASHBOARD"] = "1" + +logger = logging.getLogger(__name__) + + +class LogUrlParser(html.parser.HTMLParser): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._urls = [] + + def handle_starttag(self, tag, attrs): + if tag == "a": + self._urls.append(dict(attrs)["href"]) + + def error(self, message): + logger.error(message) + + def get_urls(self): + return self._urls + + +def test_log(ray_start_with_dashboard): + @ray.remote + def write_log(s): + print(s) + + test_log_text = "test_log_text" + ray.get(write_log.remote(test_log_text)) + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) + is True) + webui_url = ray_start_with_dashboard["webui_url"] + webui_url = webui_url.replace("localhost", "http://127.0.0.1") + + timeout_seconds = 20 + start_time = time.time() + last_ex = None + while True: + time.sleep(1) + try: + response = requests.get(webui_url + "/log_index") + response.raise_for_status() + parser = LogUrlParser() + parser.feed(response.text) + all_nodes_log_urls = parser.get_urls() + assert len(all_nodes_log_urls) == 1 + + response = requests.get(all_nodes_log_urls[0]) + response.raise_for_status() + parser = LogUrlParser() + parser.feed(response.text) + + # Search test_log_text from all worker logs. + parsed_url = urllib.parse.urlparse(all_nodes_log_urls[0]) + paths = parser.get_urls() + urls = [] + for p in paths: + if "worker" in p: + urls.append(parsed_url._replace(path=p).geturl()) + + for u in urls: + response = requests.get(u) + response.raise_for_status() + if test_log_text in response.text: + break + else: + raise Exception("Can't find {} from {}".format( + test_log_text, urls)) + + # Test range request. + response = requests.get( + webui_url + "/logs/dashboard.log", + headers={"Range": "bytes=43-51"}) + response.raise_for_status() + assert response.text == "Dashboard" + + # Test logUrl in node info. + response = requests.get(webui_url + + "/nodes/{}".format(socket.gethostname())) + response.raise_for_status() + node_info = response.json() + assert node_info["result"] is True + node_info = node_info["data"]["detail"] + assert "logUrl" in node_info + assert node_info["logUrl"] in all_nodes_log_urls + break + except Exception as ex: + last_ex = ex + finally: + if time.time() > start_time + timeout_seconds: + ex_stack = traceback.format_exception( + type(last_ex), last_ex, + last_ex.__traceback__) if last_ex else [] + raise Exception( + "Timed out while waiting for dashboard to start, {}". + format("".join(ex_stack))) From c06541ff71cff8808888f02cb61d9753796bda9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Mon, 24 Aug 2020 22:43:19 +0800 Subject: [PATCH 07/21] Only enable test module in some test cases --- dashboard/modules/log/test_log.py | 2 +- dashboard/modules/reporter/test_reporter.py | 4 +- .../stats_collector/test_stats_collector.py | 2 +- dashboard/modules/test/test_agent.py | 4 ++ dashboard/modules/test/test_consts.py | 1 + dashboard/modules/test/test_head.py | 4 ++ dashboard/tests/conftest.py | 12 ++++++ dashboard/tests/test_dashboard.py | 43 ++++++++++++++++--- dashboard/utils.py | 15 ++++++- 9 files changed, 76 insertions(+), 11 deletions(-) create mode 100644 dashboard/modules/test/test_consts.py diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/test_log.py index 7d7be608336c..f2a145ff8ea9 100644 --- a/dashboard/modules/log/test_log.py +++ b/dashboard/modules/log/test_log.py @@ -33,7 +33,7 @@ def get_urls(self): return self._urls -def test_log(ray_start_with_dashboard): +def test_log(disable_test_module, ray_start_with_dashboard): @ray.remote def write_log(s): print(s) diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py index e4db3df46ea0..759301c32dff 100644 --- a/dashboard/modules/reporter/test_reporter.py +++ b/dashboard/modules/reporter/test_reporter.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) -def test_profiling(shutdown_only): +def test_profiling(disable_test_module, shutdown_only): addresses = ray.init(include_dashboard=True, num_cpus=6) @ray.remote(num_cpus=2) @@ -53,7 +53,7 @@ def getpid(self): logger.info(profiling_info) -def test_node_physical_stats(shutdown_only): +def test_node_physical_stats(enable_test_module, shutdown_only): addresses = ray.init(include_dashboard=True, num_cpus=6) @ray.remote(num_cpus=1) diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py index 9a309b152f4c..67d30e533e61 100644 --- a/dashboard/modules/stats_collector/test_stats_collector.py +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) -def test_node_info(ray_start_with_dashboard): +def test_node_info(disable_test_module, ray_start_with_dashboard): @ray.remote class Actor: def getpid(self): diff --git a/dashboard/modules/test/test_agent.py b/dashboard/modules/test/test_agent.py index c60c2db8a0d5..dd4597f7dc16 100644 --- a/dashboard/modules/test/test_agent.py +++ b/dashboard/modules/test/test_agent.py @@ -4,11 +4,15 @@ import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules.test.test_utils as test_utils +import ray.new_dashboard.modules.test.test_consts as test_consts +from ray.ray_constants import env_bool logger = logging.getLogger(__name__) routes = dashboard_utils.ClassMethodRouteTable +@dashboard_utils.dashboard_module( + enable=env_bool(test_consts.TEST_MODULE_ENVIRONMENT_KEY, False)) class TestAgent(dashboard_utils.DashboardAgentModule): def __init__(self, dashboard_agent): super().__init__(dashboard_agent) diff --git a/dashboard/modules/test/test_consts.py b/dashboard/modules/test/test_consts.py new file mode 100644 index 000000000000..3c53b928c1ef --- /dev/null +++ b/dashboard/modules/test/test_consts.py @@ -0,0 +1 @@ +TEST_MODULE_ENVIRONMENT_KEY = "RAY_DASHBOARD_MODULE_TEST" diff --git a/dashboard/modules/test/test_head.py b/dashboard/modules/test/test_head.py index aed0a49efc12..8c2b86c06412 100644 --- a/dashboard/modules/test/test_head.py +++ b/dashboard/modules/test/test_head.py @@ -4,12 +4,16 @@ import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules.test.test_utils as test_utils +import ray.new_dashboard.modules.test.test_consts as test_consts from ray.new_dashboard.datacenter import DataSource +from ray.ray_constants import env_bool logger = logging.getLogger(__name__) routes = dashboard_utils.ClassMethodRouteTable +@dashboard_utils.dashboard_module( + enable=env_bool(test_consts.TEST_MODULE_ENVIRONMENT_KEY, False)) class TestHead(dashboard_utils.DashboardHeadModule): def __init__(self, dashboard_head): super().__init__(dashboard_head) diff --git a/dashboard/tests/conftest.py b/dashboard/tests/conftest.py index a60ce1007d3f..d555946fb967 100644 --- a/dashboard/tests/conftest.py +++ b/dashboard/tests/conftest.py @@ -1 +1,13 @@ +import os +import pytest from ray.tests.conftest import * # noqa + + +@pytest.fixture +def enable_test_module(): + os.environ["RAY_DASHBOARD_MODULE_TEST"] = "true" + + +@pytest.fixture +def disable_test_module(): + os.environ.pop("RAY_DASHBOARD_MODULE_TEST", None) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 6533c049ec2f..954741c87bf6 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -12,7 +12,11 @@ import requests from ray import ray_constants -from ray.test_utils import wait_for_condition, wait_until_server_available +from ray.test_utils import ( + wait_for_condition, + wait_until_server_available, + run_string_as_driver, +) import ray.new_dashboard.consts as dashboard_consts import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules @@ -56,7 +60,7 @@ def prepare_test_files(): }) }], indirect=True) -def test_basic(ray_start_with_dashboard): +def test_basic(disable_test_module, ray_start_with_dashboard): """Dashboard test that starts a Ray cluster with a dashboard server running, then hits the dashboard API and asserts that it receives sensible data.""" assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) @@ -140,7 +144,7 @@ def _search_agent(processes): assert agent_ports is not None -def test_nodes_update(ray_start_with_dashboard): +def test_nodes_update(enable_test_module, ray_start_with_dashboard): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True) webui_url = ray_start_with_dashboard["webui_url"] @@ -187,7 +191,7 @@ def test_nodes_update(ray_start_with_dashboard): "Timed out while waiting for dashboard to start.") -def test_http_get(ray_start_with_dashboard): +def test_http_get(enable_test_module, ray_start_with_dashboard): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True) webui_url = ray_start_with_dashboard["webui_url"] @@ -233,7 +237,7 @@ def test_http_get(ray_start_with_dashboard): "Timed out while waiting for dashboard to start.") -def test_class_method_route_table(): +def test_class_method_route_table(enable_test_module): head_cls_list = dashboard_utils.get_all_modules( dashboard_utils.DashboardHeadModule) agent_cls_list = dashboard_utils.get_all_modules( @@ -318,7 +322,7 @@ def _duplicated_route(req): assert "Traceback" in resp["msg"] -def test_async_loop_forever(): +def test_async_loop_forever(disable_test_module): counter = [0] @dashboard_utils.async_loop_forever(interval_seconds=1) @@ -331,3 +335,30 @@ async def foo(): loop.call_later(4, loop.stop) loop.run_forever() assert counter[0] > 2 + + +def test_dashboard_module_decorator(enable_test_module): + head_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardHeadModule) + agent_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardAgentModule) + + assert any(cls.__name__ == "TestHead" for cls in head_cls_list) + assert any(cls.__name__ == "TestAgent" for cls in agent_cls_list) + + test_code = """ +import os +import ray.new_dashboard.utils as dashboard_utils + +os.environ.pop("RAY_DASHBOARD_MODULE_TEST") +head_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardHeadModule) +agent_cls_list = dashboard_utils.get_all_modules( + dashboard_utils.DashboardAgentModule) +print(head_cls_list) +print(agent_cls_list) +assert all(cls.__name__ != "TestHead" for cls in head_cls_list) +assert all(cls.__name__ != "TestAgent" for cls in agent_cls_list) +print("success") +""" + run_string_as_driver(test_code) diff --git a/dashboard/utils.py b/dashboard/utils.py index e556b4633814..f808e1e38890 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -174,6 +174,16 @@ def predicate(o): h.__func__.__route_path__].instance = instance +def dashboard_module(enable): + """A decorator for dashboard module.""" + + def _cls_wrapper(cls): + cls.__ray_dashboard_module_enable__ = enable + return cls + + return _cls_wrapper + + def get_all_modules(module_type): logger.info("Get all modules by type: {}".format(module_type.__name__)) import ray.new_dashboard.modules @@ -182,7 +192,10 @@ def get_all_modules(module_type): ray.new_dashboard.modules.__path__, ray.new_dashboard.modules.__name__ + "."): importlib.import_module(name) - return module_type.__subclasses__() + return [ + m for m in module_type.__subclasses__() + if getattr(m, "__ray_dashboard_module_enable__", True) + ] def to_posix_time(dt): From 093fb212dea8e6cab26b1c52273c90f696be24aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Tue, 25 Aug 2020 17:07:39 +0800 Subject: [PATCH 08/21] CI run all dashboard tests --- bazel/python.bzl | 30 ++++++++++++++++++++---------- dashboard/BUILD | 16 +++++++++++----- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/bazel/python.bzl b/bazel/python.bzl index 2b6f548a24d0..523792fdc4eb 100644 --- a/bazel/python.bzl +++ b/bazel/python.bzl @@ -1,12 +1,22 @@ -# py_test_module_list creates a py_test target for each +# py_test_module_list creates a py_test target for each # Python file in `files` def py_test_module_list(files, size, deps, extra_srcs, **kwargs): - for file in files: - # remove .py - name = file[:-3] - native.py_test( - name=name, - size=size, - srcs=extra_srcs+[file], - **kwargs - ) + for file in files: + # remove .py + name = file[:-3] + native.py_test( + name = name, + size = size, + srcs = extra_srcs + [file], + **kwargs + ) + +def py_test_run_all_subdirectory(include, exclude, extra_srcs, **kwargs): + for file in native.glob(include = include, exclude = exclude): + print(file) + basename = file.rpartition("/")[-1] + native.py_test( + name = basename[:-3], + srcs = extra_srcs + [file], + **kwargs + ) diff --git a/dashboard/BUILD b/dashboard/BUILD index 15ed537e68be..4a4d0894c510 100644 --- a/dashboard/BUILD +++ b/dashboard/BUILD @@ -1,13 +1,19 @@ +load("//bazel:python.bzl", "py_test_run_all_subdirectory") + # This is a dummy test dependency that causes the above tests to be # re-run if any of these files changes. py_library( name = "dashboard_lib", - srcs = glob(["**/*.py"],exclude=["tests/*"]), + srcs = glob( + ["**/*.py"], + exclude = ["tests/*"], + ), ) -py_test( - name = "test_dashboard", +py_test_run_all_subdirectory( size = "small", - srcs = glob(["tests/*.py"]), - deps = [":dashboard_lib"] + include = ["**/test*.py"], + exclude = ["modules/test/**"], + extra_srcs = [], + tags = ["exclusive"], ) From a3bf3e9d39176cca1934a8be4d4802a807032f4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Wed, 26 Aug 2020 15:09:36 +0800 Subject: [PATCH 09/21] Reduce test timeout to 10s --- dashboard/modules/log/test_log.py | 2 +- dashboard/modules/reporter/test_reporter.py | 4 ++-- dashboard/modules/stats_collector/test_stats_collector.py | 2 +- dashboard/tests/test_dashboard.py | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/test_log.py index f2a145ff8ea9..6d01f3f240d4 100644 --- a/dashboard/modules/log/test_log.py +++ b/dashboard/modules/log/test_log.py @@ -45,7 +45,7 @@ def write_log(s): webui_url = ray_start_with_dashboard["webui_url"] webui_url = webui_url.replace("localhost", "http://127.0.0.1") - timeout_seconds = 20 + timeout_seconds = 10 start_time = time.time() last_ex = None while True: diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py index 759301c32dff..c35d050dfcc7 100644 --- a/dashboard/modules/reporter/test_reporter.py +++ b/dashboard/modules/reporter/test_reporter.py @@ -35,7 +35,7 @@ def getpid(self): launch_profiling = None while True: # Sometimes some startup time is required - if time.time() - start_time > 30: + if time.time() - start_time > 10: raise RayTestTimeoutException( "Timed out while collecting profiling stats, " "launch_profiling: {}".format(launch_profiling)) @@ -92,4 +92,4 @@ def _check_workers(): logger.info(ex) return False - wait_for_condition(_check_workers, timeout=30) + wait_for_condition(_check_workers, timeout=10) diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py index 67d30e533e61..90c410bb1d21 100644 --- a/dashboard/modules/stats_collector/test_stats_collector.py +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -29,7 +29,7 @@ def getpid(self): webui_url = ray_start_with_dashboard["webui_url"] webui_url = webui_url.replace("localhost", "http://127.0.0.1") - timeout_seconds = 20 + timeout_seconds = 10 start_time = time.time() last_ex = None while True: diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 954741c87bf6..8105ce41dd03 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -150,7 +150,7 @@ def test_nodes_update(enable_test_module, ray_start_with_dashboard): webui_url = ray_start_with_dashboard["webui_url"] webui_url = webui_url.replace("localhost", "http://127.0.0.1") - timeout_seconds = 20 + timeout_seconds = 10 start_time = time.time() while True: time.sleep(1) @@ -199,7 +199,7 @@ def test_http_get(enable_test_module, ray_start_with_dashboard): target_url = webui_url + "/test/dump" - timeout_seconds = 20 + timeout_seconds = 10 start_time = time.time() while True: time.sleep(1) From c1a886f3f6ceafd4ead3811412a2d7f363966bb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Wed, 26 Aug 2020 16:14:26 +0800 Subject: [PATCH 10/21] Use fstring --- dashboard/agent.py | 2 +- dashboard/dashboard.py | 4 ++-- dashboard/head.py | 2 +- dashboard/modules/log/log_head.py | 14 +++++++------- dashboard/modules/log/test_log.py | 10 ++++------ dashboard/modules/reporter/reporter_agent.py | 6 +++--- dashboard/modules/reporter/reporter_head.py | 10 +++++----- dashboard/modules/reporter/test_reporter.py | 2 +- .../stats_collector/stats_collector_head.py | 12 ++++++------ .../stats_collector/test_stats_collector.py | 7 +++---- dashboard/modules/test/test_head.py | 2 +- dashboard/tests/test_dashboard.py | 19 +++++++++---------- dashboard/utils.py | 12 +++++------- 13 files changed, 48 insertions(+), 54 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index 9c522a964d01..8d5e4b04bab3 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -71,7 +71,7 @@ def _load_modules(self): c = cls(self) dashboard_utils.ClassMethodRouteTable.bind(c) modules.append(c) - logger.info("Loaded {} modules.".format(len(modules))) + logger.info("Loaded %d modules.", len(modules)) return modules async def run(self): diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py index dd7fa716a62a..6abc9293f88e 100644 --- a/dashboard/dashboard.py +++ b/dashboard/dashboard.py @@ -37,10 +37,10 @@ def setup_static_dir(): errno.ENOENT, "Dashboard build directory not found. If installing " "from source, please follow the additional steps " "required to build the dashboard" - "(cd python/ray/{}/client " + f"(cd python/ray/{module_name}/client " "&& npm install " "&& npm ci " - "&& npm run build)".format(module_name), build_dir) + "&& npm run build)", build_dir) static_dir = os.path.join(build_dir, "static") routes.static("/static", static_dir, follow_symlinks=True) diff --git a/dashboard/head.py b/dashboard/head.py index 5c7ebca16f5e..4176c62f7bd7 100644 --- a/dashboard/head.py +++ b/dashboard/head.py @@ -142,7 +142,7 @@ def _load_modules(self): c = cls(self) dashboard_utils.ClassMethodRouteTable.bind(c) modules.append(c) - logger.info("Loaded {} modules.".format(len(modules))) + logger.info("Loaded %d modules.", len(modules)) return modules async def run(self): diff --git a/dashboard/modules/log/log_head.py b/dashboard/modules/log/log_head.py index 0f65f2de89f5..5f1a1f6dcf0a 100644 --- a/dashboard/modules/log/log_head.py +++ b/dashboard/modules/log/log_head.py @@ -51,17 +51,17 @@ def _directory_as_html(url_list) -> str: # returns directory's index as html index_of = "Index of logs" - h1 = "

    {}

    ".format(index_of) + h1 = f"

    {index_of}

    " index_list = [] for url in sorted(url_list): - index_list.append('
  • {name}
  • '.format( - url=url, name=url)) - ul = "
      \n{}\n
    ".format("\n".join(index_list)) - body = "\n{}\n{}\n".format(h1, ul) + index_list.append(f'
  • {url}
  • ') + index_list = "\n".join(index_list) + ul = f"
      \n{index_list}\n
    " + body = f"\n{h1}\n{ul}\n" - head_str = "\n{}\n".format(index_of) - html = "\n{}\n{}\n".format(head_str, body) + head_str = f"\n{index_of}\n" + html = f"\n{head_str}\n{body}\n" return html diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/test_log.py index 6d01f3f240d4..0a2f5dbea795 100644 --- a/dashboard/modules/log/test_log.py +++ b/dashboard/modules/log/test_log.py @@ -77,8 +77,7 @@ def write_log(s): if test_log_text in response.text: break else: - raise Exception("Can't find {} from {}".format( - test_log_text, urls)) + raise Exception(f"Can't find {test_log_text} from {urls}") # Test range request. response = requests.get( @@ -89,7 +88,7 @@ def write_log(s): # Test logUrl in node info. response = requests.get(webui_url + - "/nodes/{}".format(socket.gethostname())) + f"/nodes/{socket.gethostname()}") response.raise_for_status() node_info = response.json() assert node_info["result"] is True @@ -104,6 +103,5 @@ def write_log(s): ex_stack = traceback.format_exception( type(last_ex), last_ex, last_ex.__traceback__) if last_ex else [] - raise Exception( - "Timed out while waiting for dashboard to start, {}". - format("".join(ex_stack))) + ex_stack = "".join(ex_stack) + raise Exception(f"Timed out while testing, {ex_stack}") diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index 80fecdd8152e..6a43dbbd50d2 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -75,10 +75,10 @@ async def GetProfilingStats(self, request, context): pid = request.pid duration = request.duration profiling_file_path = os.path.join(ray.utils.get_ray_temp_dir(), - "{}_profiling.txt".format(pid)) + f"{pid}_profiling.txt") process = await asyncio.create_subprocess_shell( - "sudo $(which py-spy) record -o {} -p {} -d {} -f speedscope" - .format(profiling_file_path, pid, duration), + f"sudo $(which py-spy) record " + f"-o {profiling_file_path} -p {pid} -d {duration} -f speedscope", stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index ed79aff816b5..7c2259cf8afd 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -31,7 +31,7 @@ async def _update_stubs(self, change): self._stubs.pop(ip) if change.new: ip, ports = next(iter(change.new.items())) - channel = aiogrpc.insecure_channel("{}:{}".format(ip, ports[1])) + channel = aiogrpc.insecure_channel(f"{ip}:{ports[1]}") stub = reporter_pb2_grpc.ReporterServiceStub(channel) self._stubs[ip] = stub @@ -43,12 +43,12 @@ async def launch_profiling(self, req) -> aiohttp.web.Response: reporter_stub = self._stubs[ip] reply = await reporter_stub.GetProfilingStats( reporter_pb2.GetProfilingStatsRequest(pid=pid, duration=duration)) - print("ccc", reply.std_err, reply.std_out) + profiling_info = (json.loads(reply.profiling_stats) + if reply.profiling_stats else reply.std_out) return await dashboard_utils.rest_response( success=True, message="Profiling success.", - profiling_info=json.loads(reply.profiling_stats) - if reply.profiling_stats else reply.std_out) + profiling_info=profiling_info) async def run(self, server): aioredis_client = self._dashboard_head.aioredis_client @@ -56,7 +56,7 @@ async def run(self, server): reporter_key = "{}*".format(reporter_consts.REPORTER_PREFIX) await aioredis_client.psubscribe(receiver.pattern(reporter_key)) - logger.info("Subscribed to {}".format(reporter_key)) + logger.info("Subscribed to %s", reporter_key) async for sender, msg in receiver.iter(): try: diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py index c35d050dfcc7..e563f8ebd9e5 100644 --- a/dashboard/modules/reporter/test_reporter.py +++ b/dashboard/modules/reporter/test_reporter.py @@ -38,7 +38,7 @@ def getpid(self): if time.time() - start_time > 10: raise RayTestTimeoutException( "Timed out while collecting profiling stats, " - "launch_profiling: {}".format(launch_profiling)) + f"launch_profiling: {launch_profiling}") launch_profiling = requests.get( webui_url + "/api/launch_profiling", params={ diff --git a/dashboard/modules/stats_collector/stats_collector_head.py b/dashboard/modules/stats_collector/stats_collector_head.py index 1f270e5ba6c5..c2a83c5299ec 100644 --- a/dashboard/modules/stats_collector/stats_collector_head.py +++ b/dashboard/modules/stats_collector/stats_collector_head.py @@ -75,7 +75,7 @@ async def get_all_nodes(self, req) -> aiohttp.web.Response: host_name_list=list(DataSource.hostname_to_ip.keys())) else: return await dashboard_utils.rest_response( - success=False, message="Unknown view {}".format(view)) + success=False, message=f"Unknown view {view}") @routes.get("/nodes/{hostname}") async def get_node(self, req) -> aiohttp.web.Response: @@ -92,7 +92,7 @@ async def _update_actors(self): key = "{}:*".format(stats_collector_consts.ACTOR_CHANNEL) pattern = receiver.pattern(key) await aioredis_client.psubscribe(pattern) - logger.info("Subscribed to {}".format(key)) + logger.info("Subscribed to %s", key) # Get all actor info. while True: @@ -107,12 +107,12 @@ async def _update_actors(self): result[binary_to_hex(actor_info.actor_id)] = \ actor_table_data_to_dict(actor_info) DataSource.actors.reset(result) - logger.info("Received {} actor info from GCS.".format( - len(result))) + logger.info("Received %d actor info from GCS.", + len(result)) break else: - raise Exception("Failed to GetAllActorInfo: {}".format( - reply.status.message)) + raise Exception( + f"Failed to GetAllActorInfo: {reply.status.message}") except Exception as ex: logger.exception(ex) await asyncio.sleep(stats_collector_consts. diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py index 90c410bb1d21..6dff16f535c0 100644 --- a/dashboard/modules/stats_collector/test_stats_collector.py +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -43,7 +43,7 @@ def getpid(self): assert len(hostname_list) == 1 hostname = hostname_list[0] - response = requests.get(webui_url + "/nodes/{}".format(hostname)) + response = requests.get(webui_url + f"/nodes/{hostname}") response.raise_for_status() detail = response.json() assert detail["result"] is True, detail["msg"] @@ -81,6 +81,5 @@ def getpid(self): ex_stack = traceback.format_exception( type(last_ex), last_ex, last_ex.__traceback__) if last_ex else [] - raise Exception( - "Timed out while waiting for dashboard to start, {}". - format("".join(ex_stack))) + ex_stack = "".join(ex_stack) + raise Exception(f"Timed out while testing, {ex_stack}") diff --git a/dashboard/modules/test/test_head.py b/dashboard/modules/test/test_head.py index 8c2b86c06412..ec5dd0f77ba7 100644 --- a/dashboard/modules/test/test_head.py +++ b/dashboard/modules/test/test_head.py @@ -61,7 +61,7 @@ async def dump(self, req) -> aiohttp.web.Response: data = dict(DataSource.__dict__.get(key)) return await dashboard_utils.rest_response( success=True, - message="Fetch {} from datacenter success.".format(key), + message=f"Fetch {key} from datacenter success.", **{key: data}) @routes.get("/test/notified_agents") diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 8105ce41dd03..fe0f7e298f97 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -160,7 +160,7 @@ def test_nodes_update(enable_test_module, ray_start_with_dashboard): try: dump_info = response.json() except Exception as ex: - logger.info("failed response: {}".format(response.text)) + logger.info("failed response: %s", response.text) raise ex assert dump_info["result"] is True dump_data = dump_info["data"] @@ -176,7 +176,7 @@ def test_nodes_update(enable_test_module, ray_start_with_dashboard): try: notified_agents = response.json() except Exception as ex: - logger.info("failed response: {}".format(response.text)) + logger.info("failed response: %s", response.text) raise ex assert notified_agents["result"] is True notified_agents = notified_agents["data"] @@ -210,7 +210,7 @@ def test_http_get(enable_test_module, ray_start_with_dashboard): try: dump_info = response.json() except Exception as ex: - logger.info("failed response: {}".format(response.text)) + logger.info("failed response: %s", response.text) raise ex assert dump_info["result"] is True dump_data = dump_info["data"] @@ -219,13 +219,13 @@ def test_http_get(enable_test_module, ray_start_with_dashboard): http_port, grpc_port = ports response = requests.get( - "http://{}:{}/test/http_get_from_agent?url={}".format( - ip, http_port, target_url)) + f"http://{ip}:{http_port}" + f"/test/http_get_from_agent?url={target_url}") response.raise_for_status() try: dump_info = response.json() except Exception as ex: - logger.info("failed response: {}".format(response.text)) + logger.info("failed response: %s", response.text) raise ex assert dump_info["result"] is True break @@ -233,8 +233,7 @@ def test_http_get(enable_test_module, ray_start_with_dashboard): logger.info("Retry because of %s", e) finally: if time.time() > start_time + timeout_seconds: - raise Exception( - "Timed out while waiting for dashboard to start.") + raise Exception("Timed out while testing.") def test_class_method_route_table(enable_test_module): @@ -325,14 +324,14 @@ def _duplicated_route(req): def test_async_loop_forever(disable_test_module): counter = [0] - @dashboard_utils.async_loop_forever(interval_seconds=1) + @dashboard_utils.async_loop_forever(interval_seconds=0.1) async def foo(): counter[0] += 1 raise Exception("Test exception") loop = asyncio.get_event_loop() loop.create_task(foo()) - loop.call_later(4, loop.stop) + loop.call_later(1, loop.stop) loop.run_forever() assert counter[0] > 2 diff --git a/dashboard/utils.py b/dashboard/utils.py index f808e1e38890..32e6e36764c3 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -103,10 +103,9 @@ def _register_route(cls, method, path, **kwargs): def _wrapper(handler): if path in cls._bind_map[method]: bind_info = cls._bind_map[method][path] - raise Exception("Duplicated route path: {}, " - "previous one registered at {}:{}".format( - path, bind_info.filename, - bind_info.lineno)) + raise Exception(f"Duplicated route path: {path}, " + f"previous one registered at " + f"{bind_info.filename}:{bind_info.lineno}") bind_info = cls._BindInfo(handler.__code__.co_filename, handler.__code__.co_firstlineno, None) @@ -185,7 +184,7 @@ def _cls_wrapper(cls): def get_all_modules(module_type): - logger.info("Get all modules by type: {}".format(module_type.__name__)) + logger.info(f"Get all modules by type: {module_type.__name__}") import ray.new_dashboard.modules for module_loader, name, ispkg in pkgutil.walk_packages( @@ -327,8 +326,7 @@ def __init__(self, owner=None, old=None, new=None): self.new = new def __str__(self): - return "Change(owner: {}, old: {}, new: {}".format( - self.owner, self.old, self.new) + return f"Change(owner: {self.owner}, old: {self.old}, new: {self.new}" class NotifyQueue: From e9226eb977ff02155e485083d2489ce9e0c87231 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Wed, 26 Aug 2020 16:54:20 +0800 Subject: [PATCH 11/21] Remove unused code --- dashboard/tests/test_dashboard.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index fe0f7e298f97..0a886674975a 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -21,11 +21,6 @@ import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules -try: - create_task = asyncio.create_task -except AttributeError: - create_task = asyncio.ensure_future - os.environ["RAY_USE_NEW_DASHBOARD"] = "1" logger = logging.getLogger(__name__) From 05f43254ddb456f9696d473a39cb0847366cb730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Wed, 26 Aug 2020 17:00:37 +0800 Subject: [PATCH 12/21] Remove blank line --- src/ray/protobuf/common.proto | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 19c7d6523f39..92b09d4c4697 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -1,4 +1,3 @@ - // Copyright 2017 The Ray Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); From 99d1a4c70d6b0b918eed0a0d2fb0f4a42083c534 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Wed, 26 Aug 2020 17:57:05 +0800 Subject: [PATCH 13/21] Fix dashboard tests --- dashboard/modules/log/test_log.py | 5 +++++ dashboard/modules/reporter/test_reporter.py | 5 +++++ dashboard/modules/stats_collector/test_stats_collector.py | 5 +++++ dashboard/tests/test_dashboard.py | 5 +++++ 4 files changed, 20 insertions(+) diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/test_log.py index 0a2f5dbea795..4cdff7a1db10 100644 --- a/dashboard/modules/log/test_log.py +++ b/dashboard/modules/log/test_log.py @@ -1,4 +1,5 @@ import os +import sys import logging import requests import socket @@ -105,3 +106,7 @@ def write_log(s): last_ex.__traceback__) if last_ex else [] ex_stack = "".join(ex_stack) raise Exception(f"Timed out while testing, {ex_stack}") + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py index e563f8ebd9e5..cd875d92b897 100644 --- a/dashboard/modules/reporter/test_reporter.py +++ b/dashboard/modules/reporter/test_reporter.py @@ -1,4 +1,5 @@ import os +import sys import logging import requests import time @@ -93,3 +94,7 @@ def _check_workers(): return False wait_for_condition(_check_workers, timeout=10) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py index 6dff16f535c0..37c1de90e875 100644 --- a/dashboard/modules/stats_collector/test_stats_collector.py +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -1,4 +1,5 @@ import os +import sys import logging import requests import time @@ -83,3 +84,7 @@ def getpid(self): last_ex.__traceback__) if last_ex else [] ex_stack = "".join(ex_stack) raise Exception(f"Timed out while testing, {ex_stack}") + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 0a886674975a..858b6c505891 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -1,4 +1,5 @@ import os +import sys import json import time import logging @@ -356,3 +357,7 @@ def test_dashboard_module_decorator(enable_test_module): print("success") """ run_string_as_driver(test_code) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) From 0ac887c7905e0461ef76e29758f8cbec432355fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Wed, 26 Aug 2020 19:11:21 +0800 Subject: [PATCH 14/21] Fix asyncio.create_task not available in py36; Fix lint --- dashboard/agent.py | 7 ++++++- dashboard/modules/log/test_log.py | 1 + dashboard/modules/reporter/test_reporter.py | 1 + dashboard/modules/stats_collector/test_stats_collector.py | 1 + 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index 8d5e4b04bab3..b7c379da3cb5 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -24,6 +24,11 @@ from ray.core.generated import agent_manager_pb2_grpc import psutil +try: + create_task = asyncio.create_task +except AttributeError: + create_task = asyncio.ensure_future + logger = logging.getLogger(__name__) routes = dashboard_utils.ClassMethodRouteTable @@ -88,7 +93,7 @@ async def _check_parent(): dashboard_consts. DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS) - check_parent_task = asyncio.create_task(_check_parent()) + check_parent_task = create_task(_check_parent()) # Create an aioredis client for all modules. try: diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/test_log.py index 4cdff7a1db10..d6b06b9008da 100644 --- a/dashboard/modules/log/test_log.py +++ b/dashboard/modules/log/test_log.py @@ -8,6 +8,7 @@ import html.parser import urllib.parse +import pytest import ray from ray.new_dashboard.tests.conftest import * # noqa from ray.test_utils import ( diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py index cd875d92b897..f9778b4bf96b 100644 --- a/dashboard/modules/reporter/test_reporter.py +++ b/dashboard/modules/reporter/test_reporter.py @@ -4,6 +4,7 @@ import requests import time +import pytest import ray from ray.new_dashboard.tests.conftest import * # noqa from ray.test_utils import ( diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py index 37c1de90e875..7de450047661 100644 --- a/dashboard/modules/stats_collector/test_stats_collector.py +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -5,6 +5,7 @@ import time import traceback +import pytest import ray from ray.new_dashboard.tests.conftest import * # noqa from ray.test_utils import ( From ca5c93bfef93e7787ee1b61a6fc59a22ef53861a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Wed, 26 Aug 2020 19:27:17 +0800 Subject: [PATCH 15/21] Add format_web_url to ray.test_utils --- dashboard/modules/log/test_log.py | 6 ++++-- dashboard/modules/reporter/test_reporter.py | 5 +++-- dashboard/modules/stats_collector/test_stats_collector.py | 6 ++++-- dashboard/tests/test_dashboard.py | 5 +++-- python/ray/test_utils.py | 8 ++++++++ 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/test_log.py index d6b06b9008da..91fa1a16f3fd 100644 --- a/dashboard/modules/log/test_log.py +++ b/dashboard/modules/log/test_log.py @@ -12,7 +12,9 @@ import ray from ray.new_dashboard.tests.conftest import * # noqa from ray.test_utils import ( - wait_until_server_available, ) + format_web_url, + wait_until_server_available, +) os.environ["RAY_USE_NEW_DASHBOARD"] = "1" @@ -45,7 +47,7 @@ def write_log(s): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True) webui_url = ray_start_with_dashboard["webui_url"] - webui_url = webui_url.replace("localhost", "http://127.0.0.1") + webui_url = format_web_url(webui_url) timeout_seconds = 10 start_time = time.time() diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py index f9778b4bf96b..5747c6478cf2 100644 --- a/dashboard/modules/reporter/test_reporter.py +++ b/dashboard/modules/reporter/test_reporter.py @@ -8,6 +8,7 @@ import ray from ray.new_dashboard.tests.conftest import * # noqa from ray.test_utils import ( + format_web_url, RayTestTimeoutException, wait_until_server_available, wait_for_condition, @@ -31,7 +32,7 @@ def getpid(self): webui_url = addresses["webui_url"] assert (wait_until_server_available(webui_url) is True) - webui_url = webui_url.replace("localhost", "http://127.0.0.1") + webui_url = format_web_url(webui_url) start_time = time.time() launch_profiling = None @@ -69,7 +70,7 @@ def getpid(self): webui_url = addresses["webui_url"] assert (wait_until_server_available(webui_url) is True) - webui_url = webui_url.replace("localhost", "http://127.0.0.1") + webui_url = format_web_url(webui_url) def _check_workers(): try: diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py index 7de450047661..06c919133b68 100644 --- a/dashboard/modules/stats_collector/test_stats_collector.py +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -9,7 +9,9 @@ import ray from ray.new_dashboard.tests.conftest import * # noqa from ray.test_utils import ( - wait_until_server_available, ) + format_web_url, + wait_until_server_available, +) os.environ["RAY_USE_NEW_DASHBOARD"] = "1" @@ -29,7 +31,7 @@ def getpid(self): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True) webui_url = ray_start_with_dashboard["webui_url"] - webui_url = webui_url.replace("localhost", "http://127.0.0.1") + webui_url = format_web_url(webui_url) timeout_seconds = 10 start_time = time.time() diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 858b6c505891..3becbbe7d701 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -14,6 +14,7 @@ from ray import ray_constants from ray.test_utils import ( + format_web_url, wait_for_condition, wait_until_server_available, run_string_as_driver, @@ -144,7 +145,7 @@ def test_nodes_update(enable_test_module, ray_start_with_dashboard): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True) webui_url = ray_start_with_dashboard["webui_url"] - webui_url = webui_url.replace("localhost", "http://127.0.0.1") + webui_url = format_web_url(webui_url) timeout_seconds = 10 start_time = time.time() @@ -191,7 +192,7 @@ def test_http_get(enable_test_module, ray_start_with_dashboard): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True) webui_url = ray_start_with_dashboard["webui_url"] - webui_url = webui_url.replace("localhost", "http://127.0.0.1") + webui_url = format_web_url(webui_url) target_url = webui_url + "/test/dump" diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index ebc9e36d0d3c..bcdbcb8246b1 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -411,3 +411,11 @@ def get_error_message(pub_sub, num, error_type=None, timeout=5): time.sleep(0.01) return msgs + + +def format_web_url(url): + """Format web url.""" + url = url.replace("localhost", "http://127.0.0.1") + if not url.startswith("http://"): + return "http://" + url + return url From e660ca75f24a036499cd009f43d992181d45da5e Mon Sep 17 00:00:00 2001 From: fyrestone Date: Thu, 27 Aug 2020 11:39:05 +0800 Subject: [PATCH 16/21] Update dashboard/modules/reporter/reporter_head.py Co-authored-by: Max Fitton --- dashboard/modules/reporter/reporter_head.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index 7c2259cf8afd..6276d9a50fc2 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -56,7 +56,7 @@ async def run(self, server): reporter_key = "{}*".format(reporter_consts.REPORTER_PREFIX) await aioredis_client.psubscribe(receiver.pattern(reporter_key)) - logger.info("Subscribed to %s", reporter_key) + logger.info(f"Subscribed to {reporter_key}") async for sender, msg in receiver.iter(): try: From d9c331f0ebb25aaac4a3bac9229d192b713c58b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Thu, 27 Aug 2020 15:25:05 +0800 Subject: [PATCH 17/21] Add DictChangeItem type for Dict change --- dashboard/modules/reporter/reporter_head.py | 4 ++-- .../stats_collector/stats_collector_head.py | 4 ++-- dashboard/modules/test/test_head.py | 8 ++++---- dashboard/tests/test_dashboard.py | 3 +-- dashboard/utils.py | 14 +++++++++++--- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index 6276d9a50fc2..91f7fc695fd4 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -27,10 +27,10 @@ def __init__(self, dashboard_head): async def _update_stubs(self, change): if change.old: - ip, port = next(iter(change.old.items())) + ip, port = change.old self._stubs.pop(ip) if change.new: - ip, ports = next(iter(change.new.items())) + ip, ports = change.new channel = aiogrpc.insecure_channel(f"{ip}:{ports[1]}") stub = reporter_pb2_grpc.ReporterServiceStub(channel) self._stubs[ip] = stub diff --git a/dashboard/modules/stats_collector/stats_collector_head.py b/dashboard/modules/stats_collector/stats_collector_head.py index c2a83c5299ec..d5affe79946a 100644 --- a/dashboard/modules/stats_collector/stats_collector_head.py +++ b/dashboard/modules/stats_collector/stats_collector_head.py @@ -50,10 +50,10 @@ def __init__(self, dashboard_head): async def _update_stubs(self, change): if change.old: - ip, port = next(iter(change.old.items())) + ip, port = change.old self._stubs.pop(ip) if change.new: - ip, node_info = next(iter(change.new.items())) + ip, node_info = change.new address = "{}:{}".format(ip, int(node_info["nodeManagerPort"])) channel = aiogrpc.insecure_channel(address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) diff --git a/dashboard/modules/test/test_head.py b/dashboard/modules/test/test_head.py index ec5dd0f77ba7..699fc15458cd 100644 --- a/dashboard/modules/test/test_head.py +++ b/dashboard/modules/test/test_head.py @@ -21,12 +21,12 @@ def __init__(self, dashboard_head): DataSource.agents.signal.append(self._update_notified_agents) async def _update_notified_agents(self, change): - if change.new: - ip, ports = next(iter(change.new.items())) - self._notified_agents[ip] = ports if change.old: - ip, port = next(iter(change.old.items())) + ip, port = change.old self._notified_agents.pop(ip) + if change.new: + ip, ports = change.new + self._notified_agents[ip] = ports @routes.get("/test/route_get") async def route_get(self, req) -> aiohttp.web.Response: diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 3becbbe7d701..54df32a06e18 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -184,8 +184,7 @@ def test_nodes_update(enable_test_module, ray_start_with_dashboard): logger.info("Retry because of %s", e) finally: if time.time() > start_time + timeout_seconds: - raise Exception( - "Timed out while waiting for dashboard to start.") + raise Exception("Timed out while testing.") def test_http_get(enable_test_module, ray_start_with_dashboard): diff --git a/dashboard/utils.py b/dashboard/utils.py index 32e6e36764c3..3634195d9cf6 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -13,6 +13,7 @@ import traceback from base64 import b64decode from collections.abc import MutableMapping, Mapping +from collections import namedtuple from typing import Any import aioredis @@ -349,6 +350,8 @@ class Dict(MutableMapping): :note: Only the first level data report change. """ + ChangeItem = namedtuple("DictChangeItem", ["key", "value"]) + def __init__(self, *args, **kwargs): self._data = dict(*args, **kwargs) self.signal = Signal(self) @@ -358,10 +361,14 @@ def __setitem__(self, key, value): self._data[key] = value if len(self.signal) and old != value: if old is None: - co = self.signal.send(Change(owner=self, new={key: value})) + co = self.signal.send( + Change(owner=self, new=Dict.ChangeItem(key, value))) else: co = self.signal.send( - Change(owner=self, old={key: old}, new={key: value})) + Change( + owner=self, + old=Dict.ChangeItem(key, old), + new=Dict.ChangeItem(key, value))) NotifyQueue.put(co) def __getitem__(self, item): @@ -370,7 +377,8 @@ def __getitem__(self, item): def __delitem__(self, key): old = self._data.pop(key, None) if len(self.signal) and old is not None: - co = self.signal.send(Change(owner=self, old={key: old})) + co = self.signal.send( + Change(owner=self, old=Dict.ChangeItem(key, old))) NotifyQueue.put(co) def __len__(self): From bf87431d1ffb904e33eccfbcb616316780767289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Thu, 27 Aug 2020 15:38:38 +0800 Subject: [PATCH 18/21] Refine logger.exception --- dashboard/head.py | 16 ++++++++-------- dashboard/modules/reporter/reporter_agent.py | 4 ++-- dashboard/modules/reporter/reporter_head.py | 5 +++-- .../stats_collector/stats_collector_head.py | 12 ++++++------ dashboard/utils.py | 4 ++-- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/dashboard/head.py b/dashboard/head.py index 4176c62f7bd7..9a8f273796f3 100644 --- a/dashboard/head.py +++ b/dashboard/head.py @@ -115,8 +115,8 @@ async def _update_nodes(self): dict(zip(node_hostnames, node_ips))) DataSource.ip_to_hostname.reset( dict(zip(node_ips, node_hostnames))) - except aiogrpc.AioRpcError as ex: - logger.exception(ex) + except aiogrpc.AioRpcError: + logger.exception("Got AioRpcError when updating nodes.") self._gcs_rpc_error_counter += 1 if self._gcs_rpc_error_counter > \ dashboard_consts.MAX_COUNT_OF_GCS_RPC_ERROR: @@ -125,8 +125,8 @@ async def _update_nodes(self): self._gcs_rpc_error_counter, dashboard_consts.MAX_COUNT_OF_GCS_RPC_ERROR) sys.exit(-1) - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception("Error updating nodes.") finally: await asyncio.sleep( dashboard_consts.UPDATE_NODES_INTERVAL_SECONDS) @@ -199,8 +199,8 @@ async def _async_notify(): co = await dashboard_utils.NotifyQueue.get() try: await co - except Exception as e: - logger.exception(e) + except Exception: + logger.exception(f"Error notifying coroutine {co}") async def _purge_data(): """Purge data in datacenter.""" @@ -209,8 +209,8 @@ async def _purge_data(): dashboard_consts.PURGE_DATA_INTERVAL_SECONDS) try: await DataOrganizer.purge() - except Exception as e: - logger.exception(e) + except Exception: + logger.exception("Error purging data.") modules = self._load_modules() diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index 6a43dbbd50d2..24db53f71268 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -239,8 +239,8 @@ async def _perform_iteration(self): await aioredis_client.publish( "{}{}".format(reporter_consts.REPORTER_PREFIX, self._hostname), jsonify_asdict(stats)) - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception("Error publishing node physical stats.") await asyncio.sleep( reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000) diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index 91f7fc695fd4..dfdfd2281ef3 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -63,5 +63,6 @@ async def run(self, server): _, data = msg data = json.loads(ray.utils.decode(data)) DataSource.node_physical_stats[data["ip"]] = data - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception( + "Error receiving node physical stats from reporter agent.") diff --git a/dashboard/modules/stats_collector/stats_collector_head.py b/dashboard/modules/stats_collector/stats_collector_head.py index d5affe79946a..5ff21070618b 100644 --- a/dashboard/modules/stats_collector/stats_collector_head.py +++ b/dashboard/modules/stats_collector/stats_collector_head.py @@ -113,8 +113,8 @@ async def _update_actors(self): else: raise Exception( f"Failed to GetAllActorInfo: {reply.status.message}") - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception("Error Getting all actor info from GCS.") await asyncio.sleep(stats_collector_consts. RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS) @@ -127,8 +127,8 @@ async def _update_actors(self): pubsub_message.data) DataSource.actors[binary_to_hex(actor_info.actor_id)] = \ actor_table_data_to_dict(actor_info) - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception("Error receiving actor info.") @async_loop_forever( stats_collector_consts.NODE_STATS_UPDATE_INTERVAL_SECONDS) @@ -142,8 +142,8 @@ async def _update_node_stats(self): node_manager_pb2.GetNodeStatsRequest(), timeout=2) reply_dict = node_stats_to_dict(reply) DataSource.node_stats[ip] = reply_dict - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception(f"Error updating node stats of {ip}.") async def run(self, server): gcs_channel = self._dashboard_head.aiogrpc_gcs_channel diff --git a/dashboard/utils.py b/dashboard/utils.py index 3634195d9cf6..9fd170890b8e 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -415,8 +415,8 @@ async def _looper(*args, **kwargs): while True: try: await coro(*args, **kwargs) - except Exception as ex: - logger.exception(ex) + except Exception: + logger.exception(f"Error looping coroutine {coro}.") await asyncio.sleep(interval_seconds) return _looper From 6304ed126f08b4d61dd76722cbd96aeb56e1f03b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Thu, 27 Aug 2020 15:42:24 +0800 Subject: [PATCH 19/21] Refine GET /api/launch_profiling --- dashboard/modules/reporter/reporter_head.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index dfdfd2281ef3..6046fd325781 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -37,9 +37,9 @@ async def _update_stubs(self, change): @routes.get("/api/launch_profiling") async def launch_profiling(self, req) -> aiohttp.web.Response: - ip = req.query.get("ip") - pid = int(req.query.get("pid")) - duration = int(req.query.get("duration")) + ip = req.query["ip"] + pid = int(req.query["pid"]) + duration = int(req.query["duration"]) reporter_stub = self._stubs[ip] reply = await reporter_stub.GetProfilingStats( reporter_pb2.GetProfilingStatsRequest(pid=pid, duration=duration)) From 52f9a7d7d19a81f336ad392cd14ca9eef1a2fdc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Thu, 27 Aug 2020 16:41:15 +0800 Subject: [PATCH 20/21] Remove disable_test_module fixture --- dashboard/modules/log/test_log.py | 2 +- dashboard/modules/reporter/test_reporter.py | 2 +- dashboard/modules/stats_collector/test_stats_collector.py | 2 +- dashboard/tests/conftest.py | 5 +---- dashboard/tests/test_dashboard.py | 4 ++-- 5 files changed, 6 insertions(+), 9 deletions(-) diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/test_log.py index 91fa1a16f3fd..a40643f9035e 100644 --- a/dashboard/modules/log/test_log.py +++ b/dashboard/modules/log/test_log.py @@ -37,7 +37,7 @@ def get_urls(self): return self._urls -def test_log(disable_test_module, ray_start_with_dashboard): +def test_log(ray_start_with_dashboard): @ray.remote def write_log(s): print(s) diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py index 5747c6478cf2..0097bc465c3a 100644 --- a/dashboard/modules/reporter/test_reporter.py +++ b/dashboard/modules/reporter/test_reporter.py @@ -19,7 +19,7 @@ logger = logging.getLogger(__name__) -def test_profiling(disable_test_module, shutdown_only): +def test_profiling(shutdown_only): addresses = ray.init(include_dashboard=True, num_cpus=6) @ray.remote(num_cpus=2) diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py index 06c919133b68..72e614437d10 100644 --- a/dashboard/modules/stats_collector/test_stats_collector.py +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) -def test_node_info(disable_test_module, ray_start_with_dashboard): +def test_node_info(ray_start_with_dashboard): @ray.remote class Actor: def getpid(self): diff --git a/dashboard/tests/conftest.py b/dashboard/tests/conftest.py index d555946fb967..030e258069c5 100644 --- a/dashboard/tests/conftest.py +++ b/dashboard/tests/conftest.py @@ -6,8 +6,5 @@ @pytest.fixture def enable_test_module(): os.environ["RAY_DASHBOARD_MODULE_TEST"] = "true" - - -@pytest.fixture -def disable_test_module(): + yield os.environ.pop("RAY_DASHBOARD_MODULE_TEST", None) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 54df32a06e18..1667e32177c8 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -57,7 +57,7 @@ def prepare_test_files(): }) }], indirect=True) -def test_basic(disable_test_module, ray_start_with_dashboard): +def test_basic(ray_start_with_dashboard): """Dashboard test that starts a Ray cluster with a dashboard server running, then hits the dashboard API and asserts that it receives sensible data.""" assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) @@ -317,7 +317,7 @@ def _duplicated_route(req): assert "Traceback" in resp["msg"] -def test_async_loop_forever(disable_test_module): +def test_async_loop_forever(): counter = [0] @dashboard_utils.async_loop_forever(interval_seconds=0.1) From 6b3e788861df085056fdb42fa6db1a0ca21bcf54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=AE=9D?= Date: Fri, 28 Aug 2020 21:38:36 +0800 Subject: [PATCH 21/21] Fix test_basic may fail --- dashboard/tests/test_dashboard.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 7c3651edcca0..7983a02c174a 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -117,7 +117,9 @@ def _search_agent(processes): logger.info("Test agent register is OK.") wait_for_condition(lambda: _search_agent(raylet_proc.children())) - assert dashboard_proc.status() == psutil.STATUS_RUNNING + assert dashboard_proc.status() in [ + psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING + ] agent_proc = _search_agent(raylet_proc.children()) agent_pid = agent_proc.pid