Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ cc_binary(
deps = [
":ray_util",
":raylet_lib",
"@com_github_gflags_gflags//:gflags",
],
)

Expand Down
29 changes: 15 additions & 14 deletions java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,21 @@ private void startRaylet() {
List<String> command = ImmutableList.of(
// The raylet executable file.
getTempFile("/raylet").getAbsolutePath(),
rayConfig.rayletSocketName,
rayConfig.objectStoreSocketName,
"0", // The object manager port.
"0", // The node manager port.
rayConfig.nodeIp,
rayConfig.getRedisIp(),
rayConfig.getRedisPort().toString(),
"0", // number of initial workers
String.valueOf(maximumStartupConcurrency),
ResourceUtil.getResourcesStringFromMap(rayConfig.resources),
String.join(",", rayConfig.rayletConfigParameters), // The internal config list.
buildPythonWorkerCommand(), // python worker command
buildWorkerCommandRaylet(), // java worker command
redisPasswordOption
String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName),
String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName),
String.format("--object_manager_port=%d", 0), // The object manager port.
String.format("--node_manager_port=%d", 0), // The node manager port.
String.format("--node_ip_address=%s",rayConfig.nodeIp),
String.format("--redis_address=%s", rayConfig.getRedisIp()),
String.format("--redis_port=%d", rayConfig.getRedisPort()),
String.format("--num_initial_workers=%d", 0), // number of initial workers
String.format("--maximum_startup_concurrency=%d", maximumStartupConcurrency),
String.format("--static_resource_list=%s",
ResourceUtil.getResourcesStringFromMap(rayConfig.resources)),
String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)),
String.format("--python_worker_command=%s", buildPythonWorkerCommand()),
String.format("--java_worker_command=%s", buildWorkerCommandRaylet()),
String.format("--redis_password=%s", redisPasswordOption)
);

startProcess(command, null, "raylet");
Expand Down
37 changes: 21 additions & 16 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,21 +1177,21 @@ def start_raylet(redis_address,

command = [
RAYLET_EXECUTABLE,
raylet_name,
plasma_store_name,
str(object_manager_port),
str(node_manager_port),
node_ip_address,
gcs_ip_address,
gcs_port,
str(num_initial_workers),
str(maximum_startup_concurrency),
resource_argument,
config_str,
start_worker_command,
java_worker_command,
redis_password or "",
temp_dir,
"--raylet_socket_name={}".format(raylet_name),
"--store_socket_name={}".format(plasma_store_name),
"--object_manager_port={}".format(object_manager_port),
"--node_manager_port={}".format(node_manager_port),
"--node_ip_address={}".format(node_ip_address),
"--redis_address={}".format(gcs_ip_address),
"--redis_port={}".format(gcs_port),
"--num_initial_workers={}".format(num_initial_workers),
"--maximum_startup_concurrency={}".format(maximum_startup_concurrency),
"--static_resource_list={}".format(resource_argument),
"--config_list={}".format(config_str),
"--python_worker_command={}".format(start_worker_command),
"--java_worker_command={}".format(java_worker_command),
"--redis_password={}".format(redis_password or ""),
"--temp_dir={}".format(temp_dir),
]
process_info = start_ray_process(
command,
Expand Down Expand Up @@ -1555,7 +1555,12 @@ def start_raylet_monitor(redis_address,
redis_password = redis_password or ""
config = config or {}
config_str = ",".join(["{},{}".format(*kv) for kv in config.items()])
command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port, config_str]
command = [
RAYLET_MONITOR_EXECUTABLE,
"--redis_address={}".format(gcs_ip_address),
"--redis_port={}".format(gcs_port),
"--config_list={}".format(config_str),
]
if redis_password:
command += [redis_password]
process_info = start_ray_process(
Expand Down
70 changes: 45 additions & 25 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@
#include "ray/stats/stats.h"
#include "ray/status.h"

#include "gflags/gflags.h"

DEFINE_string(raylet_socket_name, "", "The socket name of raylet.");
DEFINE_string(store_socket_name, "", "The socket name of object store.");
DEFINE_int32(object_manager_port, -1, "The port of object manager.");
DEFINE_int32(node_manager_port, -1, "The port of node manager.");
DEFINE_string(node_ip_address, "", "The ip address of this node.");
DEFINE_string(redis_address, "", "The ip address of redis server.");
DEFINE_int32(redis_port, -1, "The port of redis server.");
DEFINE_int32(num_initial_workers, 0, "Number of initial workers.");
DEFINE_int32(maximum_startup_concurrency, 1, "Maximum startup concurrency");
DEFINE_string(static_resource_list, "", "The static resource list of this node.");
DEFINE_string(config_list, "", "The raylet config list of this node.");
DEFINE_string(python_worker_command, "", "Python worker command.");
DEFINE_string(java_worker_command, "", "Java worker command.");
DEFINE_string(redis_password, "", "The password of redis.");
DEFINE_string(temp_dir, "", "Temporary directory.");
DEFINE_bool(disable_stats, false, "Whether disable the stats.");
DEFINE_string(stat_address, "127.0.0.1:8888", "The address that we report metrics to.");
DEFINE_bool(enable_stdout_exporter, false,
"Whether enable the stdout exporter for stats.");

#ifndef RAYLET_TEST

/// A helper function that parse the worker command string into a vector of arguments.
Expand All @@ -21,37 +43,35 @@ int main(int argc, char *argv[]) {
ray::RayLogLevel::INFO,
/*log_dir=*/"");
ray::RayLog::InstallFailureSignalHandler();
RAY_CHECK(argc >= 14 && argc <= 19);

const std::string raylet_socket_name = std::string(argv[1]);
const std::string store_socket_name = std::string(argv[2]);
int object_manager_port = std::stoi(argv[3]);
int node_manager_port = std::stoi(argv[4]);
const std::string node_ip_address = std::string(argv[5]);
const std::string redis_address = std::string(argv[6]);
int redis_port = std::stoi(argv[7]);
int num_initial_workers = std::stoi(argv[8]);
int maximum_startup_concurrency = std::stoi(argv[9]);
const std::string static_resource_list = std::string(argv[10]);
const std::string config_list = std::string(argv[11]);
const std::string python_worker_command = std::string(argv[12]);
const std::string java_worker_command = std::string(argv[13]);
const std::string redis_password = (argc >= 15 ? std::string(argv[14]) : "");
const std::string temp_dir = (argc >= 16 ? std::string(argv[15]) : "/tmp/ray");
const std::string disable_stats_str(argc >= 17 ? std::string(argv[16]) : "false");
const bool disable_stats("true" == disable_stats_str);
const std::string stat_address =
(argc >= 18 ? std::string(argv[17]) : "127.0.0.1:8888");
const std::string disable_stdout_exporter_str(argc >= 19 ? std::string(argv[18])
: "true");
const bool disable_stdout_exporter("true" == disable_stdout_exporter_str);

gflags::ParseCommandLineFlags(&argc, &argv, true);
const std::string raylet_socket_name = FLAGS_raylet_socket_name;
const std::string store_socket_name = FLAGS_store_socket_name;
const int object_manager_port = static_cast<int>(FLAGS_object_manager_port);
const int node_manager_port = static_cast<int>(FLAGS_node_manager_port);
const std::string node_ip_address = FLAGS_node_ip_address;
const std::string redis_address = FLAGS_redis_address;
const int redis_port = static_cast<int>(FLAGS_redis_port);
const int num_initial_workers = static_cast<int>(FLAGS_num_initial_workers);
const int maximum_startup_concurrency =
static_cast<int>(FLAGS_maximum_startup_concurrency);
const std::string static_resource_list = FLAGS_static_resource_list;
const std::string config_list = FLAGS_config_list;
const std::string python_worker_command = FLAGS_python_worker_command;
const std::string java_worker_command = FLAGS_java_worker_command;
const std::string redis_password = FLAGS_redis_password;
const std::string temp_dir = FLAGS_temp_dir;
const bool disable_stats = FLAGS_disable_stats;
const std::string stat_address = FLAGS_stat_address;
const bool enable_stdout_exporter = FLAGS_enable_stdout_exporter;
gflags::ShutDownCommandLineFlags();

// Initialize stats.
const ray::stats::TagsType global_tags = {
{ray::stats::JobNameKey, "raylet"},
{ray::stats::VersionKey, "0.7.0"},
{ray::stats::NodeAddressKey, node_ip_address}};
ray::stats::Init(stat_address, global_tags, disable_stats, disable_stdout_exporter);
ray::stats::Init(stat_address, global_tags, disable_stats, enable_stdout_exporter);

// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
Expand Down
18 changes: 13 additions & 5 deletions src/ray/raylet/monitor_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@
#include "ray/raylet/monitor.h"
#include "ray/util/util.h"

#include "gflags/gflags.h"

DEFINE_string(redis_address, "", "The ip address of redis.");
DEFINE_int32(redis_port, -1, "The port of redis.");
DEFINE_string(config_list, "", "The config list of raylet.");
DEFINE_string(redis_password, "", "The password of redis.");

int main(int argc, char *argv[]) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
ray::RayLog::ShutDownRayLog, argv[0],
ray::RayLogLevel::INFO, /*log_dir=*/"");
ray::RayLog::InstallFailureSignalHandler();
RAY_CHECK(argc == 4 || argc == 5);

const std::string redis_address = std::string(argv[1]);
int redis_port = std::stoi(argv[2]);
const std::string config_list = std::string(argv[3]);
const std::string redis_password = (argc == 5 ? std::string(argv[4]) : "");
gflags::ParseCommandLineFlags(&argc, &argv, true);
const std::string redis_address = FLAGS_redis_address;
const int redis_port = static_cast<int>(FLAGS_redis_port);
const std::string config_list = FLAGS_config_list;
const std::string redis_password = FLAGS_redis_password;
gflags::ShutDownCommandLineFlags();

std::unordered_map<std::string, int> raylet_config;

Expand Down
4 changes: 2 additions & 2 deletions src/ray/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace stats {

/// Initialize stats.
static void Init(const std::string &address, const TagsType &global_tags,
bool disable_stats = false, bool disable_stdout_exporter = true) {
bool disable_stats = false, bool enable_stdout_exporter = false) {
StatsConfig::instance().SetIsDisableStats(disable_stats);
if (disable_stats) {
RAY_LOG(INFO) << "Disabled stats.";
Expand All @@ -36,7 +36,7 @@ static void Init(const std::string &address, const TagsType &global_tags,
static auto exporter =
std::make_shared<opencensus::exporters::stats::PrometheusExporter>();

if (!disable_stdout_exporter) {
if (enable_stdout_exporter) {
// Enable stdout exporter by default.
opencensus::exporters::stats::StdoutExporter::Register();
}
Expand Down