Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions bazel/python.bzl
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
# py_test_module_list creates a py_test target for each
# py_test_module_list creates a py_test target for each
# Python file in `files`
def py_test_module_list(files, size, deps, extra_srcs, **kwargs):
for file in files:
# remove .py
name = file[:-3]
native.py_test(
name=name,
size=size,
srcs=extra_srcs+[file],
**kwargs
)
for file in files:
# remove .py
name = file[:-3]
native.py_test(
name = name,
size = size,
srcs = extra_srcs + [file],
**kwargs
)

def py_test_run_all_subdirectory(include, exclude, extra_srcs, **kwargs):
for file in native.glob(include = include, exclude = exclude):
print(file)
basename = file.rpartition("/")[-1]
native.py_test(
name = basename[:-3],
srcs = extra_srcs + [file],
**kwargs
)
16 changes: 11 additions & 5 deletions dashboard/BUILD
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
load("//bazel:python.bzl", "py_test_run_all_subdirectory")

# This is a dummy test dependency that causes the above tests to be
# re-run if any of these files changes.
py_library(
name = "dashboard_lib",
srcs = glob(["**/*.py"],exclude=["tests/*"]),
srcs = glob(
["**/*.py"],
exclude = ["tests/*"],
),
)

py_test(
name = "test_dashboard",
py_test_run_all_subdirectory(
size = "small",
srcs = glob(["tests/*.py"]),
deps = [":dashboard_lib"]
include = ["**/test*.py"],
exclude = ["modules/test/**"],
extra_srcs = [],
tags = ["exclusive"],
)
17 changes: 15 additions & 2 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
from ray.core.generated import agent_manager_pb2_grpc
import psutil

try:
create_task = asyncio.create_task
except AttributeError:
create_task = asyncio.ensure_future

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable

Expand All @@ -36,6 +41,7 @@ def __init__(self,
redis_password=None,
temp_dir=None,
log_dir=None,
metrics_export_port=None,
node_manager_port=None,
object_store_name=None,
raylet_name=None):
Expand All @@ -45,6 +51,7 @@ def __init__(self,
self.redis_password = redis_password
self.temp_dir = temp_dir
self.log_dir = log_dir
self.metrics_export_port = metrics_export_port
self.node_manager_port = node_manager_port
self.object_store_name = object_store_name
self.raylet_name = raylet_name
Expand All @@ -69,7 +76,7 @@ def _load_modules(self):
c = cls(self)
dashboard_utils.ClassMethodRouteTable.bind(c)
modules.append(c)
logger.info("Loaded {} modules.".format(len(modules)))
logger.info("Loaded %d modules.", len(modules))
return modules

async def run(self):
Expand All @@ -86,7 +93,7 @@ async def _check_parent():
dashboard_consts.
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS)

check_parent_task = asyncio.create_task(_check_parent())
check_parent_task = create_task(_check_parent())

# Create an aioredis client for all modules.
try:
Expand Down Expand Up @@ -172,6 +179,11 @@ async def _check_parent():
required=True,
type=str,
help="The address to use for Redis.")
parser.add_argument(
"--metrics-export-port",
required=True,
type=int,
help="The port to expose metrics through Prometheus.")
parser.add_argument(
"--node-manager-port",
required=True,
Expand Down Expand Up @@ -277,6 +289,7 @@ async def _check_parent():
redis_password=args.redis_password,
temp_dir=temp_dir,
log_dir=log_dir,
metrics_export_port=args.metrics_export_port,
node_manager_port=args.node_manager_port,
object_store_name=args.object_store_name,
raylet_name=args.raylet_name)
Expand Down
18 changes: 13 additions & 5 deletions dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ def setup_static_dir():
errno.ENOENT, "Dashboard build directory not found. If installing "
"from source, please follow the additional steps "
"required to build the dashboard"
"(cd python/ray/{}/client "
f"(cd python/ray/{module_name}/client "
"&& npm install "
"&& npm ci "
"&& npm run build)".format(module_name), build_dir)
"&& npm run build)", build_dir)

static_dir = os.path.join(build_dir, "static")
routes.static("/static", static_dir, follow_symlinks=True)
Expand All @@ -59,14 +59,21 @@ class Dashboard:
port(int): Port number of dashboard aiohttp server.
redis_address(str): GCS address of a Ray cluster
redis_password(str): Redis password to access GCS
log_dir(str): Log directory of dashboard.
"""

def __init__(self, host, port, redis_address, redis_password=None):
def __init__(self,
host,
port,
redis_address,
redis_password=None,
log_dir=None):
self.dashboard_head = dashboard_head.DashboardHead(
http_host=host,
http_port=port,
redis_address=redis_address,
redis_password=redis_password)
redis_password=redis_password,
log_dir=log_dir)

# Setup Dashboard Routes
build_dir = setup_static_dir()
Expand Down Expand Up @@ -197,7 +204,8 @@ async def run(self):
args.host,
args.port,
args.redis_address,
redis_password=args.redis_password)
redis_password=args.redis_password,
log_dir=log_dir)
loop = asyncio.get_event_loop()
loop.run_until_complete(dashboard.run())
except Exception as e:
Expand Down
5 changes: 3 additions & 2 deletions dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class DataSource:
# {actor id hex(str): actor table data(dict of ActorTableData
# in gcs.proto)}
actors = Dict()
# {ip address(str): dashboard agent grpc server port(int)}
# {ip address(str): dashboard agent [http port(int), grpc port(int)]}
agents = Dict()
# {ip address(str): gcs node info(dict of GcsNodeInfo in gcs.proto)}
nodes = Dict()
Expand Down Expand Up @@ -56,7 +56,7 @@ async def get_node_actors(cls, hostname):
node_worker_id_set.add(worker_stats["workerId"])
node_actors = {}
for actor_id, actor_table_data in DataSource.actors.items():
if actor_table_data["workerId"] in node_worker_id_set:
if actor_table_data["address"]["workerId"] in node_worker_id_set:
node_actors[actor_id] = actor_table_data
return node_actors

Expand Down Expand Up @@ -102,6 +102,7 @@ async def get_all_node_summary(cls):
for hostname in DataSource.hostname_to_ip.keys():
node_info = await cls.get_node_info(hostname)
node_info.pop("workers", None)
node_info.pop("actors", None)
node_info["raylet"].pop("workersStats", None)
node_info["raylet"].pop("viewData", None)
all_nodes_summary.append(node_info)
Expand Down
50 changes: 33 additions & 17 deletions dashboard/head.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def gcs_node_info_to_dict(message):


class DashboardHead:
def __init__(self, http_host, http_port, redis_address, redis_password):
def __init__(self, http_host, http_port, redis_address, redis_password,
log_dir):
# NodeInfoGcsService
self._gcs_node_info_stub = None
self._gcs_rpc_error_counter = 0
Expand All @@ -37,6 +38,7 @@ def __init__(self, http_host, http_port, redis_address, redis_password):
self.http_port = http_port
self.redis_address = dashboard_utils.address_tuple(redis_address)
self.redis_password = redis_password
self.log_dir = log_dir
self.aioredis_client = None
self.aiogrpc_gcs_channel = None
self.http_session = None
Expand Down Expand Up @@ -74,6 +76,22 @@ async def _update_nodes(self):
while True:
try:
nodes = await self._get_nodes()

# Get correct node info by state,
# 1. The node is ALIVE if any ALIVE node info
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the reason in the comment too?

Copy link
Contributor Author

@fyrestone fyrestone Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetAllNodeInfo returns all nodes info including dead nodes. We needs to know how to merge them to hostname. For example, there are two nodes info:

  • {node id: 1, hostname: example}
  • {node id: 2, hostname: example}

We choose which one for host example?

For one hostname, there will be a list of node info with only two cases:

  • All nodes info of one hostname are DEAD
  • Only one node info of one hostname is ALIVE

So, here is the rule,

  • Choose a DEAD one if all nodes info of one hostname are DEAD.
  • Choose the ALIVE one if there is ALIVE node in one hostname.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a timestamp to GcsNodeInfo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I see. So this is due to the case where there could be multiple raylets on a single host. I think this should be the case only when we use cluster_utils right? Or do you know any other case? I think it kind of doesn't make sense to have multiple ray start in a single node.

If this is useful only for the cluster utils, why don't we just group by hostname + node_id and display both in the dashboard? Is there any issue with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a node failover many times, there will be a lot node info belongs to one hostname.

Copy link
Contributor

@rkooo567 rkooo567 Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can hide the second options' con easily by filtering DEAD node ids to user facing APIs.

Copy link
Contributor

@rkooo567 rkooo567 Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
node_id: {
ip:
host:
state:
}
}

And in the frontend, we can just filter DEAD state node id if there are the same hostname + ip pairs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkooo567 I see. Use node id as the key of node info will returns full nodes info to front-end. And front-end do the filter job. My concern is that the data will be too large if we run many nodes in one cluster. For the DEAD node, the physics node info is useless. If we choose node id as the key, we need some strategies to reduce the data size:

  • Only contains GcsNodeInfo data for the DEAD nodes.
  • Each (IP + hostname) reserves a limited number of DEAD nodes. For example, the last 5 DEAD nodes.

Copy link
Contributor

@rkooo567 rkooo567 Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your concerns (since you guys are running long-running clusters, it can have lots of information). I like the second solution, but we can allow users to configure them. So, for regular usage, it is 0~2, and for cluster_utils cases, it can be like 10.

Only contains GcsNodeInfo data for the DEAD nodes.

For this, I am probably not familiar how the data is represented now (because I thought this was the current behavior). Can you explain a little bit more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current node info is a mixture of node physical stats (from reporter agent) and node stats (from GetNodeStats rpc). If a node is dead, the node physical stats and node stats will be unreliable, only GcsNodeInfo is correct.

# of the hostname exists.
# 2. The node is DEAD if all node info of the
# hostname are DEAD.
hostname_to_node_info = {}
for node in nodes:
hostname = node["nodeManagerAddress"]
assert node["state"] in ["ALIVE", "DEAD"]
choose = hostname_to_node_info.get(hostname)
if choose is not None and choose["state"] == "ALIVE":
continue
hostname_to_node_info[hostname] = node
nodes = hostname_to_node_info.values()

self._gcs_rpc_error_counter = 0
node_ips = [node["nodeManagerAddress"] for node in nodes]
node_hostnames = [
Expand All @@ -83,13 +101,11 @@ async def _update_nodes(self):
agents = dict(DataSource.agents)
for node in nodes:
node_ip = node["nodeManagerAddress"]
if node_ip not in agents:
key = "{}{}".format(
dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX,
node_ip)
agent_port = await self.aioredis_client.get(key)
if agent_port:
agents[node_ip] = json.loads(agent_port)
key = "{}{}".format(
dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX, node_ip)
agent_port = await self.aioredis_client.get(key)
if agent_port:
agents[node_ip] = json.loads(agent_port)
for ip in agents.keys() - set(node_ips):
agents.pop(ip, None)

Expand All @@ -99,8 +115,8 @@ async def _update_nodes(self):
dict(zip(node_hostnames, node_ips)))
DataSource.ip_to_hostname.reset(
dict(zip(node_ips, node_hostnames)))
except aiogrpc.AioRpcError as ex:
logger.exception(ex)
except aiogrpc.AioRpcError:
logger.exception("Got AioRpcError when updating nodes.")
self._gcs_rpc_error_counter += 1
if self._gcs_rpc_error_counter > \
dashboard_consts.MAX_COUNT_OF_GCS_RPC_ERROR:
Expand All @@ -109,8 +125,8 @@ async def _update_nodes(self):
self._gcs_rpc_error_counter,
dashboard_consts.MAX_COUNT_OF_GCS_RPC_ERROR)
sys.exit(-1)
except Exception as ex:
logger.exception(ex)
except Exception:
logger.exception("Error updating nodes.")
finally:
await asyncio.sleep(
dashboard_consts.UPDATE_NODES_INTERVAL_SECONDS)
Expand All @@ -126,7 +142,7 @@ def _load_modules(self):
c = cls(self)
dashboard_utils.ClassMethodRouteTable.bind(c)
modules.append(c)
logger.info("Loaded {} modules.".format(len(modules)))
logger.info("Loaded %d modules.", len(modules))
return modules

async def run(self):
Expand Down Expand Up @@ -183,8 +199,8 @@ async def _async_notify():
co = await dashboard_utils.NotifyQueue.get()
try:
await co
except Exception as e:
logger.exception(e)
except Exception:
logger.exception(f"Error notifying coroutine {co}")

async def _purge_data():
"""Purge data in datacenter."""
Expand All @@ -193,8 +209,8 @@ async def _purge_data():
dashboard_consts.PURGE_DATA_INTERVAL_SECONDS)
try:
await DataOrganizer.purge()
except Exception as e:
logger.exception(e)
except Exception:
logger.exception("Error purging data.")

modules = self._load_modules()

Expand Down
Empty file.
19 changes: 19 additions & 0 deletions dashboard/modules/log/log_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging

import mimetypes

import ray.new_dashboard.utils as dashboard_utils

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable


class LogAgent(dashboard_utils.DashboardAgentModule):
def __init__(self, dashboard_agent):
super().__init__(dashboard_agent)
mimetypes.add_type("text/plain", ".err")
mimetypes.add_type("text/plain", ".out")
routes.static("/logs", self._dashboard_agent.log_dir, show_index=True)

async def run(self, server):
pass
69 changes: 69 additions & 0 deletions dashboard/modules/log/log_head.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging

import mimetypes

import aiohttp.web
import ray.new_dashboard.utils as dashboard_utils
from ray.new_dashboard.datacenter import DataSource, GlobalSignals

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable


class LogHead(dashboard_utils.DashboardHeadModule):
LOG_URL_TEMPLATE = "http://{ip}:{port}/logs"

def __init__(self, dashboard_head):
super().__init__(dashboard_head)
mimetypes.add_type("text/plain", ".err")
mimetypes.add_type("text/plain", ".out")
routes.static("/logs", self._dashboard_head.log_dir, show_index=True)
GlobalSignals.node_info_fetched.append(
self.insert_log_url_to_node_info)

async def insert_log_url_to_node_info(self, node_info):
ip = node_info.get("ip")
if ip is None:
return
agent_port = DataSource.agents.get(ip)
if agent_port is None:
return
agent_http_port, _ = agent_port
log_url = self.LOG_URL_TEMPLATE.format(ip=ip, port=agent_http_port)
node_info["logUrl"] = log_url

@routes.get("/log_index")
async def get_log_index(self, req) -> aiohttp.web.Response:
url_list = []
for ip, ports in DataSource.agents.items():
url_list.append(
self.LOG_URL_TEMPLATE.format(ip=ip, port=str(ports[0])))
if self._dashboard_head.ip not in DataSource.agents:
url_list.append(
self.LOG_URL_TEMPLATE.format(
ip=self._dashboard_head.ip,
port=self._dashboard_head.http_port))
return aiohttp.web.Response(
text=self._directory_as_html(url_list), content_type="text/html")

@staticmethod
def _directory_as_html(url_list) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this rendering logic should be defined on the front-end. I really prefer not shipping HTML to the front-end if it isn't needed, and it is a lot cleaner to write HTML in React and on the front-end than to write it in Python without a library. It seems to me like if someone wanted to change this, it would be difficult.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is to show the node log entries by html from log head, the html structure is same with show_index of static files from log agent. The log structure is:

  • /log_index (_directory_as_html logic shows the html from log head)
    • /logs (static files with show_index=True shows the html from log agent)
      • dashboard.log
      • dashboard_agent.log
    • /logs (static files with show_index=True shows the html from log agent)
      • dashboard.log
        ...

@mxz96102 will provides the front-end of log module with advanced features.

# returns directory's index as html

index_of = "Index of logs"
h1 = f"<h1>{index_of}</h1>"

index_list = []
for url in sorted(url_list):
index_list.append(f'<li><a href="{url}">{url}</a></li>')
index_list = "\n".join(index_list)
ul = f"<ul>\n{index_list}\n</ul>"
body = f"<body>\n{h1}\n{ul}\n</body>"

head_str = f"<head>\n<title>{index_of}</title>\n</head>"
html = f"<html>\n{head_str}\n{body}\n</html>"

return html

async def run(self, server):
pass
Loading