Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ray/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ ray_cc_library(
"//src/ray/common/scheduling:resource_set",
"//src/ray/common/scheduling:scheduling_class_util",
"//src/ray/flatbuffers:node_manager_generated",
"//src/ray/observability:metric_interface",
"//src/ray/util:container_util",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
Expand Down Expand Up @@ -425,6 +426,6 @@ ray_cc_library(
name = "metrics",
hdrs = ["metrics.h"],
deps = [
"//src/ray/stats:stats_lib",
"//src/ray/stats:stats_metric",
],
)
13 changes: 13 additions & 0 deletions src/ray/common/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,17 @@ inline ray::stats::Gauge GetObjectStoreMemoryGaugeMetric() {
};
}

inline ray::stats::Histogram GetSchedulerPlacementTimeSHistogramMetric() {
return ray::stats::Histogram{
/*name=*/"scheduler_placement_time_s",
/*description=*/
"The time it takes for a worklod (task, actor, placement group) to "
"be placed. This is the time from when the tasks dependencies are "
"resolved to when it actually reserves resources on a node to run.",
/*unit=*/"s",
/*boundaries=*/{0.1, 1, 10, 100, 1000, 10000},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I've lately gotten a bit of a crash course in this thing. These.... buckets seem a little off right? scheduler_placement_time_s is in seconds (I'm assuming because of the _s). So we're saying the scheduler placement time falls within 0.1s->1s->10s->100s->16 minutes -> 2.5+ hours

The latter buckets seem absurd right? What is the actual realistic spread of latency for the scheduler to place something? If the _s is misleading and actually it's not in seconds then we should fix that. But we should choose our buckets along the lines of

-->healthy range
-->elevated range
-->error range

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, the series of number must’ve been pulled out of a hat at some point ;) - i'll fix them on a PR on top of this, to keep this PR purely a refactoring

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image

yeah look like the value is only meaningful in the range of less than 10s; most are actually below 0.1s so further breakdown of bucket less than 0.1s is probably more useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it even makes sense to have this metric in seconds at all. It seems like we'd want this to usually be sub second

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make total sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there you go #58217

/*tag_keys=*/{"WorkloadType"},
};
}

} // namespace ray
10 changes: 4 additions & 6 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#include "ray/common/ray_config.h"
#include "ray/common/runtime_env_common.h"
#include "ray/stats/metric_defs.h"
#include "ray/util/logging.h"

namespace ray {
Expand Down Expand Up @@ -602,17 +601,16 @@ bool TaskSpecification::IsRetriable() const {
return true;
}

void TaskSpecification::EmitTaskMetrics() const {
void TaskSpecification::EmitTaskMetrics(
ray::observability::MetricInterface &scheduler_placement_time_s_histogram) const {
double duration_s = (GetMessage().lease_grant_timestamp_ms() -
GetMessage().dependency_resolution_timestamp_ms()) /
1000;

if (IsActorCreationTask()) {
stats::STATS_scheduler_placement_time_s.Record(duration_s,
{{"WorkloadType", "Actor"}});
scheduler_placement_time_s_histogram.Record(duration_s, {{"WorkloadType", "Actor"}});
} else {
stats::STATS_scheduler_placement_time_s.Record(duration_s,
{{"WorkloadType", "Task"}});
scheduler_placement_time_s_histogram.Record(duration_s, {{"WorkloadType", "Task"}});
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "ray/common/scheduling/resource_set.h"
#include "ray/common/scheduling/scheduling_class_util.h"
#include "ray/common/task/task_common.h"
#include "ray/observability/metric_interface.h"

extern "C" {
#include "ray/thirdparty/sha256.h"
Expand Down Expand Up @@ -357,7 +358,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
/// \return true if the task or actor is retriable.
bool IsRetriable() const;

void EmitTaskMetrics() const;
void EmitTaskMetrics(
ray::observability::MetricInterface &scheduler_placement_time_s_histogram) const;

/// \return true if task events from this task should be reported.
bool EnableTaskEvents() const;
Expand Down
18 changes: 14 additions & 4 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
return core_worker->core_worker_client_pool_->GetOrConnect(*addr);
},
gcs_client,
task_by_state_gauge_,
*task_by_state_gauge_,
*total_lineage_bytes_gauge_,
/*free_actor_object_callback=*/
[this](const ObjectID &object_id) {
auto core_worker = GetCoreWorker();
Expand Down Expand Up @@ -552,7 +553,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
// OBJECT_STORE.
return rpc::TensorTransport::OBJECT_STORE;
},
boost::asio::steady_timer(io_service_));
boost::asio::steady_timer(io_service_),
*scheduler_placement_time_s_histogram_);

auto report_locality_data_callback = [this](
const ObjectID &object_id,
Expand Down Expand Up @@ -683,8 +685,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
task_execution_service_,
std::move(task_event_buffer),
pid,
task_by_state_gauge_,
actor_by_state_gauge_);
*task_by_state_gauge_,
*actor_by_state_gauge_);
return core_worker;
}

Expand Down Expand Up @@ -790,6 +792,14 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)

// We need init stats before using it/spawning threads.
stats::Init(global_tags, options_.metrics_agent_port, worker_id_);
task_by_state_gauge_ = std::unique_ptr<ray::stats::Gauge>(
new ray::stats::Gauge(GetTaskByStateGaugeMetric()));
actor_by_state_gauge_ = std::unique_ptr<ray::stats::Gauge>(
new ray::stats::Gauge(GetActorByStateGaugeMetric()));
total_lineage_bytes_gauge_ = std::unique_ptr<ray::stats::Gauge>(
new ray::stats::Gauge(GetTotalLineageBytesGaugeMetric()));
scheduler_placement_time_s_histogram_ = std::unique_ptr<ray::stats::Histogram>(
new ray::stats::Histogram(GetSchedulerPlacementTimeSHistogramMetric()));

// Initialize event framework before starting up worker.
if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) {
Expand Down
6 changes: 4 additions & 2 deletions src/ray/core_worker/core_worker_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,10 @@ class CoreWorkerProcessImpl {
/// The client to export metrics to the metrics agent.
std::unique_ptr<ray::rpc::MetricsAgentClient> metrics_agent_client_;

ray::stats::Gauge task_by_state_gauge_{GetTaskByStateGaugeMetric()};
ray::stats::Gauge actor_by_state_gauge_{GetActorByStateGaugeMetric()};
std::unique_ptr<ray::stats::Gauge> task_by_state_gauge_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I changed this to pointer to the metric initialization can happen after stats::Init inside core_worker_process.cc

std::unique_ptr<ray::stats::Gauge> actor_by_state_gauge_;
std::unique_ptr<ray::stats::Gauge> total_lineage_bytes_gauge_;
std::unique_ptr<ray::stats::Histogram> scheduler_placement_time_s_histogram_;
};
} // namespace core
} // namespace ray
10 changes: 10 additions & 0 deletions src/ray/core_worker/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,15 @@ inline ray::stats::Gauge GetTaskByStateGaugeMetric() {
};
}

inline ray::stats::Gauge GetTotalLineageBytesGaugeMetric() {
return ray::stats::Gauge{
/*name=*/"total_lineage_bytes",
/*description=*/
"Total amount of memory used to store task specs for lineage reconstruction.",
/*unit=*/"",
/*tag_keys=*/{},
};
}

} // namespace core
} // namespace ray
2 changes: 1 addition & 1 deletion src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,7 @@ void TaskManager::FillTaskInfo(rpc::GetCoreWorkerStatsReply *reply,

void TaskManager::RecordMetrics() {
absl::MutexLock lock(&mu_);
ray::stats::STATS_total_lineage_bytes.Record(total_lineage_footprint_bytes_);
total_lineage_bytes_gauge_.Record(total_lineage_footprint_bytes_);
task_counter_.FlushOnChangeCallbacks();
}

Expand Down
7 changes: 6 additions & 1 deletion src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "ray/core_worker_rpc_client/core_worker_client_interface.h"
#include "ray/gcs_rpc_client/gcs_client.h"
#include "ray/observability/metric_interface.h"
#include "ray/stats/metric_defs.h"
#include "ray/util/counter_map.h"
#include "src/ray/protobuf/common.pb.h"
#include "src/ray/protobuf/core_worker.pb.h"
Expand Down Expand Up @@ -187,6 +186,7 @@ class TaskManager : public TaskManagerInterface {
const ActorID &)> get_actor_rpc_client_callback,
std::shared_ptr<gcs::GcsClient> gcs_client,
ray::observability::MetricInterface &task_by_state_counter,
ray::observability::MetricInterface &total_lineage_bytes_gauge,
FreeActorObjectCallback free_actor_object_callback)
: in_memory_store_(in_memory_store),
reference_counter_(reference_counter),
Expand All @@ -199,6 +199,7 @@ class TaskManager : public TaskManagerInterface {
get_actor_rpc_client_callback_(std::move(get_actor_rpc_client_callback)),
gcs_client_(std::move(gcs_client)),
task_by_state_counter_(task_by_state_counter),
total_lineage_bytes_gauge_(total_lineage_bytes_gauge),
free_actor_object_callback_(std::move(free_actor_object_callback)) {
task_counter_.SetOnChangeCallback(
[this](const std::tuple<std::string, rpc::TaskStatus, bool> &key)
Expand Down Expand Up @@ -812,6 +813,10 @@ class TaskManager : public TaskManagerInterface {
// - Source: component reporting, e.g., "core_worker", "executor", or "pull_manager"
observability::MetricInterface &task_by_state_counter_;

/// Metric to track the total amount of memory used to store task specs for lineage
/// reconstruction.
observability::MetricInterface &total_lineage_bytes_gauge_;

/// Callback to free GPU object from the in-actor object store.
FreeActorObjectCallback free_actor_object_callback_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void NormalTaskSubmitter::OnWorkerIdle(
scheduling_key_entry.num_busy_workers++;

task_spec.GetMutableMessage().set_lease_grant_timestamp_ms(current_sys_time_ms());
task_spec.EmitTaskMetrics();
task_spec.EmitTaskMetrics(scheduler_placement_time_s_histogram_);

executing_tasks_.emplace(task_spec.TaskId(), addr);
PushNormalTask(
Expand Down
8 changes: 6 additions & 2 deletions src/ray/core_worker/task_submission/normal_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class NormalTaskSubmitter {
const JobID &job_id,
std::shared_ptr<LeaseRequestRateLimiter> lease_request_rate_limiter,
const TensorTransportGetter &tensor_transport_getter,
boost::asio::steady_timer cancel_timer)
boost::asio::steady_timer cancel_timer,
ray::observability::MetricInterface &scheduler_placement_time_s_histogram)
: rpc_address_(std::move(rpc_address)),
local_raylet_client_(std::move(local_raylet_client)),
raylet_client_pool_(std::move(raylet_client_pool)),
Expand All @@ -109,7 +110,8 @@ class NormalTaskSubmitter {
core_worker_client_pool_(std::move(core_worker_client_pool)),
job_id_(job_id),
lease_request_rate_limiter_(std::move(lease_request_rate_limiter)),
cancel_retry_timer_(std::move(cancel_timer)) {}
cancel_retry_timer_(std::move(cancel_timer)),
scheduler_placement_time_s_histogram_(scheduler_placement_time_s_histogram) {}

/// Schedule a task for direct submission to a worker.
void SubmitTask(TaskSpecification task_spec);
Expand Down Expand Up @@ -362,6 +364,8 @@ class NormalTaskSubmitter {

// Retries cancelation requests if they were not successful.
boost::asio::steady_timer cancel_retry_timer_ ABSL_GUARDED_BY(mu_);

ray::observability::MetricInterface &scheduler_placement_time_s_histogram_;
};

} // namespace core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker_rpc_client/core_worker_client_pool.h"
#include "ray/core_worker_rpc_client/fake_core_worker_client.h"
#include "ray/observability/fake_metric.h"
#include "ray/raylet_rpc_client/fake_raylet_client.h"
#include "ray/raylet_rpc_client/raylet_client_interface.h"

Expand Down Expand Up @@ -494,7 +495,8 @@ class NormalTaskSubmitterTest : public testing::Test {
JobID::Nil(),
rate_limiter,
[](const ObjectID &object_id) { return rpc::TensorTransport::OBJECT_STORE; },
boost::asio::steady_timer(io_context));
boost::asio::steady_timer(io_context),
fake_scheduler_placement_time_s_histogram_);
}

NodeID local_node_id;
Expand All @@ -511,6 +513,7 @@ class NormalTaskSubmitterTest : public testing::Test {
std::unique_ptr<MockLeasePolicy> lease_policy;
MockLeasePolicy *lease_policy_ptr = nullptr;
instrumented_io_context io_context;
ray::observability::FakeHistogram fake_scheduler_placement_time_s_histogram_;
};

TEST_F(NormalTaskSubmitterTest, TestLocalityAwareSubmitOneTask) {
Expand Down Expand Up @@ -1430,6 +1433,7 @@ void TestSchedulingKey(const std::shared_ptr<CoreWorkerMemoryStore> store,
const TaskSpecification &same2,
const TaskSpecification &different) {
rpc::Address address;
ray::observability::FakeHistogram fake_scheduler_placement_time_s_histogram_;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Local Variable Passed to Constructor Causes Use-After-Free

Variable fake_scheduler_placement_time_s_histogram_ is declared as a local variable inside the TestSchedulingKey helper function at line 1436, but is then passed by reference to NormalTaskSubmitter constructor at line 1465. This creates a use-after-free bug because the local variable will be destroyed before NormalTaskSubmitter uses it. This should be a local variable declaration that persists for the lifetime of the submitter, not a declaration at the function scope start.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fine, the NormalTaskSubmitter is also a local object within the TestSchedulingKey function

auto local_node_id = NodeID::FromRandom();
auto raylet_client = std::make_shared<MockRayletClient>();
auto raylet_client_pool = std::make_shared<rpc::RayletClientPool>(
Expand Down Expand Up @@ -1457,7 +1461,8 @@ void TestSchedulingKey(const std::shared_ptr<CoreWorkerMemoryStore> store,
JobID::Nil(),
std::make_shared<StaticLeaseRequestRateLimiter>(1),
[](const ObjectID &object_id) { return rpc::TensorTransport::OBJECT_STORE; },
boost::asio::steady_timer(io_context));
boost::asio::steady_timer(io_context),
fake_scheduler_placement_time_s_histogram_);

submitter.SubmitTask(same1);
submitter.SubmitTask(same2);
Expand Down
6 changes: 5 additions & 1 deletion src/ray/core_worker/tests/core_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class CoreWorkerTest : public ::testing::Test {
},
mock_gcs_client_,
fake_task_by_state_gauge_,
fake_total_lineage_bytes_gauge_,
/*free_actor_object_callback=*/[](const ObjectID &object_id) {});

auto object_recovery_manager = std::make_unique<ObjectRecoveryManager>(
Expand Down Expand Up @@ -222,7 +223,8 @@ class CoreWorkerTest : public ::testing::Test {
JobID::Nil(),
lease_request_rate_limiter,
[](const ObjectID &object_id) { return rpc::TensorTransport::OBJECT_STORE; },
boost::asio::steady_timer(io_service_));
boost::asio::steady_timer(io_service_),
fake_scheduler_placement_time_s_histogram_);

auto actor_task_submitter = std::make_unique<ActorTaskSubmitter>(
*core_worker_client_pool,
Expand Down Expand Up @@ -297,6 +299,8 @@ class CoreWorkerTest : public ::testing::Test {
std::shared_ptr<CoreWorker> core_worker_;
ray::observability::FakeGauge fake_task_by_state_gauge_;
ray::observability::FakeGauge fake_actor_by_state_gauge_;
ray::observability::FakeGauge fake_total_lineage_bytes_gauge_;
ray::observability::FakeHistogram fake_scheduler_placement_time_s_histogram_;
std::unique_ptr<FakePeriodicalRunner> fake_periodical_runner_;

// Controllable time for testing publisher timeouts
Expand Down
7 changes: 7 additions & 0 deletions src/ray/core_worker/tests/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class TaskManagerTest : public ::testing::Test {
},
mock_gcs_client_,
fake_task_by_state_counter_,
fake_total_lineage_bytes_gauge_,
/*free_actor_object_callback=*/[](const ObjectID &object_id) {}) {}

virtual void TearDown() { AssertNoLeaks(); }
Expand Down Expand Up @@ -230,6 +231,7 @@ class TaskManagerTest : public ::testing::Test {
uint32_t last_delay_ms_ = 0;
std::unordered_set<ObjectID> stored_in_plasma;
ray::observability::FakeGauge fake_task_by_state_counter_;
ray::observability::FakeGauge fake_total_lineage_bytes_gauge_;
};

class TaskManagerLineageTest : public TaskManagerTest {
Expand Down Expand Up @@ -1404,6 +1406,7 @@ TEST_F(TaskManagerTest, PlasmaPut_ObjectStoreFull_FailsTaskAndWritesError) {
},
mock_gcs_client_,
fake_task_by_state_counter_,
fake_total_lineage_bytes_gauge_,
/*free_actor_object_callback=*/[](const ObjectID &object_id) {});

rpc::Address caller_address;
Expand Down Expand Up @@ -1467,6 +1470,7 @@ TEST_F(TaskManagerTest, PlasmaPut_TransientFull_RetriesThenSucceeds) {
},
mock_gcs_client_,
fake_task_by_state_counter_,
fake_total_lineage_bytes_gauge_,
/*free_actor_object_callback=*/[](const ObjectID &object_id) {});

rpc::Address caller_address;
Expand Down Expand Up @@ -1528,6 +1532,7 @@ TEST_F(TaskManagerTest, DynamicReturn_PlasmaPutFailure_FailsTaskImmediately) {
},
mock_gcs_client_,
fake_task_by_state_counter_,
fake_total_lineage_bytes_gauge_,
/*free_actor_object_callback=*/[](const ObjectID &object_id) {});

auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true);
Expand Down Expand Up @@ -3016,6 +3021,7 @@ TEST_F(TaskManagerTest, TestRetryErrorMessageSentToCallback) {
-> std::shared_ptr<ray::rpc::CoreWorkerClientInterface> { return nullptr; },
mock_gcs_client_,
fake_task_by_state_counter_,
fake_total_lineage_bytes_gauge_,
/*free_actor_object_callback=*/[](const ObjectID &object_id) {});

// Create a task with retries enabled
Expand Down Expand Up @@ -3096,6 +3102,7 @@ TEST_F(TaskManagerTest, TestErrorLogWhenPushErrorCallbackFails) {
-> std::shared_ptr<ray::rpc::CoreWorkerClientInterface> { return nullptr; },
mock_gcs_client_,
fake_task_by_state_counter_,
fake_total_lineage_bytes_gauge_,
/*free_actor_object_callback=*/[](const ObjectID &object_id) {});

// Create a task that will be retried
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ ray_cc_library(
deps = [
"//src/ray/common:bundle_spec",
"//src/ray/common:id",
"//src/ray/common:metrics",
"//src/ray/protobuf:gcs_service_cc_proto",
"//src/ray/stats:stats_lib",
"//src/ray/util:counter_map",
Expand Down
5 changes: 4 additions & 1 deletion src/ray/gcs/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ GcsActorScheduler::GcsActorScheduler(
GcsActorSchedulerSuccessCallback schedule_success_handler,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool,
ray::observability::MetricInterface &scheduler_placement_time_s_histogram,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback)
: io_context_(io_context),
Expand All @@ -45,6 +46,7 @@ GcsActorScheduler::GcsActorScheduler(
schedule_success_handler_(std::move(schedule_success_handler)),
raylet_client_pool_(raylet_client_pool),
worker_client_pool_(worker_client_pool),
scheduler_placement_time_s_histogram_(scheduler_placement_time_s_histogram),
normal_task_resources_changed_callback_(
std::move(normal_task_resources_changed_callback)) {
RAY_CHECK(schedule_failure_handler_ != nullptr && schedule_success_handler_ != nullptr);
Expand Down Expand Up @@ -393,7 +395,8 @@ void GcsActorScheduler::HandleWorkerLeaseGrantedReply(
actor->UpdateAddress(leased_worker->GetAddress());
actor->GetMutableActorTableData()->set_pid(reply.worker_pid());
actor->GetMutableTaskSpec()->set_lease_grant_timestamp_ms(current_sys_time_ms());
actor->GetCreationTaskSpecification().EmitTaskMetrics();
actor->GetCreationTaskSpecification().EmitTaskMetrics(
scheduler_placement_time_s_histogram_);
// Make sure to connect to the client before persisting actor info to GCS.
// Without this, there could be a possible race condition. Related issues:
// https://github.com/ray-project/ray/pull/9215/files#r449469320
Expand Down
Loading