Skip to content

Commit b950cff

Browse files
committed
[core][stats-die/03] kill STATS in core worker component
Signed-off-by: Cuong Nguyen <[email protected]>
1 parent 9e8b291 commit b950cff

29 files changed

+119
-28
lines changed

src/ray/common/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ ray_cc_library(
234234
"//src/ray/common/scheduling:resource_set",
235235
"//src/ray/common/scheduling:scheduling_class_util",
236236
"//src/ray/flatbuffers:node_manager_generated",
237+
"//src/ray/observability:metric_interface",
237238
"//src/ray/util:container_util",
238239
"@com_google_absl//absl/container:flat_hash_map",
239240
"@com_google_absl//absl/container:flat_hash_set",
@@ -425,6 +426,6 @@ ray_cc_library(
425426
name = "metrics",
426427
hdrs = ["metrics.h"],
427428
deps = [
428-
"//src/ray/stats:stats_lib",
429+
"//src/ray/stats:stats_metric",
429430
],
430431
)

src/ray/common/metrics.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,17 @@ inline ray::stats::Gauge GetObjectStoreMemoryGaugeMetric() {
7373
};
7474
}
7575

76+
inline ray::stats::Histogram GetSchedulerPlacementTimeSHistogramMetric() {
77+
return ray::stats::Histogram{
78+
/*name=*/"scheduler_placement_time_s",
79+
/*description=*/
80+
"The time it takes for a worklod (task, actor, placement group) to "
81+
"be placed. This is the time from when the tasks dependencies are "
82+
"resolved to when it actually reserves resources on a node to run.",
83+
/*unit=*/"s",
84+
/*boundaries=*/{0.1, 1, 10, 100, 1000, 10000},
85+
/*tag_keys=*/{"WorkloadType"},
86+
};
87+
}
88+
7689
} // namespace ray

src/ray/common/task/task_spec.cc

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
#include "ray/common/ray_config.h"
2525
#include "ray/common/runtime_env_common.h"
26-
#include "ray/stats/metric_defs.h"
2726
#include "ray/util/logging.h"
2827

2928
namespace ray {
@@ -602,17 +601,16 @@ bool TaskSpecification::IsRetriable() const {
602601
return true;
603602
}
604603

605-
void TaskSpecification::EmitTaskMetrics() const {
604+
void TaskSpecification::EmitTaskMetrics(
605+
ray::observability::MetricInterface &scheduler_placement_time_s_histogram) const {
606606
double duration_s = (GetMessage().lease_grant_timestamp_ms() -
607607
GetMessage().dependency_resolution_timestamp_ms()) /
608608
1000;
609609

610610
if (IsActorCreationTask()) {
611-
stats::STATS_scheduler_placement_time_s.Record(duration_s,
612-
{{"WorkloadType", "Actor"}});
611+
scheduler_placement_time_s_histogram.Record(duration_s, {{"WorkloadType", "Actor"}});
613612
} else {
614-
stats::STATS_scheduler_placement_time_s.Record(duration_s,
615-
{{"WorkloadType", "Task"}});
613+
scheduler_placement_time_s_histogram.Record(duration_s, {{"WorkloadType", "Task"}});
616614
}
617615
}
618616

src/ray/common/task/task_spec.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "ray/common/scheduling/resource_set.h"
3030
#include "ray/common/scheduling/scheduling_class_util.h"
3131
#include "ray/common/task/task_common.h"
32+
#include "ray/observability/metric_interface.h"
3233

3334
extern "C" {
3435
#include "ray/thirdparty/sha256.h"
@@ -357,7 +358,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
357358
/// \return true if the task or actor is retriable.
358359
bool IsRetriable() const;
359360

360-
void EmitTaskMetrics() const;
361+
void EmitTaskMetrics(
362+
ray::observability::MetricInterface &scheduler_placement_time_s_histogram) const;
361363

362364
/// \return true if task events from this task should be reported.
363365
bool EnableTaskEvents() const;

src/ray/core_worker/core_worker_process.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
470470
return core_worker->core_worker_client_pool_->GetOrConnect(*addr);
471471
},
472472
gcs_client,
473-
task_by_state_gauge_,
473+
*task_by_state_gauge_,
474+
*total_lineage_bytes_gauge_,
474475
/*free_actor_object_callback=*/
475476
[this](const ObjectID &object_id) {
476477
auto core_worker = GetCoreWorker();
@@ -552,7 +553,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
552553
// OBJECT_STORE.
553554
return rpc::TensorTransport::OBJECT_STORE;
554555
},
555-
boost::asio::steady_timer(io_service_));
556+
boost::asio::steady_timer(io_service_),
557+
*scheduler_placement_time_s_histogram_);
556558

557559
auto report_locality_data_callback = [this](
558560
const ObjectID &object_id,
@@ -683,8 +685,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
683685
task_execution_service_,
684686
std::move(task_event_buffer),
685687
pid,
686-
task_by_state_gauge_,
687-
actor_by_state_gauge_);
688+
*task_by_state_gauge_,
689+
*actor_by_state_gauge_);
688690
return core_worker;
689691
}
690692

@@ -790,6 +792,14 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
790792

791793
// We need init stats before using it/spawning threads.
792794
stats::Init(global_tags, options_.metrics_agent_port, worker_id_);
795+
task_by_state_gauge_ = std::unique_ptr<ray::stats::Gauge>(
796+
new ray::stats::Gauge(GetTaskByStateGaugeMetric()));
797+
actor_by_state_gauge_ = std::unique_ptr<ray::stats::Gauge>(
798+
new ray::stats::Gauge(GetActorByStateGaugeMetric()));
799+
total_lineage_bytes_gauge_ = std::unique_ptr<ray::stats::Gauge>(
800+
new ray::stats::Gauge(GetTotalLineageBytesGaugeMetric()));
801+
scheduler_placement_time_s_histogram_ = std::unique_ptr<ray::stats::Histogram>(
802+
new ray::stats::Histogram(GetSchedulerPlacementTimeSHistogramMetric()));
793803

794804
// Initialize event framework before starting up worker.
795805
if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) {

src/ray/core_worker/core_worker_process.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,10 @@ class CoreWorkerProcessImpl {
185185
/// The client to export metrics to the metrics agent.
186186
std::unique_ptr<ray::rpc::MetricsAgentClient> metrics_agent_client_;
187187

188-
ray::stats::Gauge task_by_state_gauge_{GetTaskByStateGaugeMetric()};
189-
ray::stats::Gauge actor_by_state_gauge_{GetActorByStateGaugeMetric()};
188+
std::unique_ptr<ray::stats::Gauge> task_by_state_gauge_;
189+
std::unique_ptr<ray::stats::Gauge> actor_by_state_gauge_;
190+
std::unique_ptr<ray::stats::Gauge> total_lineage_bytes_gauge_;
191+
std::unique_ptr<ray::stats::Histogram> scheduler_placement_time_s_histogram_;
190192
};
191193
} // namespace core
192194
} // namespace ray

src/ray/core_worker/metrics.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,15 @@ inline ray::stats::Gauge GetTaskByStateGaugeMetric() {
4141
};
4242
}
4343

44+
inline ray::stats::Gauge GetTotalLineageBytesGaugeMetric() {
45+
return ray::stats::Gauge{
46+
/*name=*/"total_lineage_bytes",
47+
/*description=*/
48+
"Total amount of memory used to store task specs for lineage reconstruction.",
49+
/*unit=*/"",
50+
/*tag_keys=*/{},
51+
};
52+
}
53+
4454
} // namespace core
4555
} // namespace ray

src/ray/core_worker/task_manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1781,7 +1781,7 @@ void TaskManager::FillTaskInfo(rpc::GetCoreWorkerStatsReply *reply,
17811781

17821782
void TaskManager::RecordMetrics() {
17831783
absl::MutexLock lock(&mu_);
1784-
ray::stats::STATS_total_lineage_bytes.Record(total_lineage_footprint_bytes_);
1784+
total_lineage_bytes_gauge_.Record(total_lineage_footprint_bytes_);
17851785
task_counter_.FlushOnChangeCallbacks();
17861786
}
17871787

src/ray/core_worker/task_manager.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
#include "ray/core_worker_rpc_client/core_worker_client_interface.h"
3434
#include "ray/gcs_rpc_client/gcs_client.h"
3535
#include "ray/observability/metric_interface.h"
36-
#include "ray/stats/metric_defs.h"
3736
#include "ray/util/counter_map.h"
3837
#include "src/ray/protobuf/common.pb.h"
3938
#include "src/ray/protobuf/core_worker.pb.h"
@@ -187,6 +186,7 @@ class TaskManager : public TaskManagerInterface {
187186
const ActorID &)> get_actor_rpc_client_callback,
188187
std::shared_ptr<gcs::GcsClient> gcs_client,
189188
ray::observability::MetricInterface &task_by_state_counter,
189+
ray::observability::MetricInterface &total_lineage_bytes_gauge,
190190
FreeActorObjectCallback free_actor_object_callback)
191191
: in_memory_store_(in_memory_store),
192192
reference_counter_(reference_counter),
@@ -199,6 +199,7 @@ class TaskManager : public TaskManagerInterface {
199199
get_actor_rpc_client_callback_(std::move(get_actor_rpc_client_callback)),
200200
gcs_client_(std::move(gcs_client)),
201201
task_by_state_counter_(task_by_state_counter),
202+
total_lineage_bytes_gauge_(total_lineage_bytes_gauge),
202203
free_actor_object_callback_(std::move(free_actor_object_callback)) {
203204
task_counter_.SetOnChangeCallback(
204205
[this](const std::tuple<std::string, rpc::TaskStatus, bool> &key)
@@ -812,6 +813,10 @@ class TaskManager : public TaskManagerInterface {
812813
// - Source: component reporting, e.g., "core_worker", "executor", or "pull_manager"
813814
observability::MetricInterface &task_by_state_counter_;
814815

816+
/// Metric to track the total amount of memory used to store task specs for lineage
817+
/// reconstruction.
818+
observability::MetricInterface &total_lineage_bytes_gauge_;
819+
815820
/// Callback to free GPU object from the in-actor object store.
816821
FreeActorObjectCallback free_actor_object_callback_;
817822

src/ray/core_worker/task_submission/normal_task_submitter.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ void NormalTaskSubmitter::OnWorkerIdle(
177177
scheduling_key_entry.num_busy_workers++;
178178

179179
task_spec.GetMutableMessage().set_lease_grant_timestamp_ms(current_sys_time_ms());
180-
task_spec.EmitTaskMetrics();
180+
task_spec.EmitTaskMetrics(scheduler_placement_time_s_histogram_);
181181

182182
executing_tasks_.emplace(task_spec.TaskId(), addr);
183183
PushNormalTask(

0 commit comments

Comments
 (0)