Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/ray/common/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,29 @@ inline ray::stats::Gauge GetActorByStateGaugeMetric() {
};
}

inline ray::stats::Gauge GetObjectStoreMemoryGaugeMetric() {
return ray::stats::Gauge{
/*name=*/"object_store_memory",
/*description=*/"Object store memory by various sub-kinds on this node",
/*unit=*/"",
/// Location:
/// - MMAP_SHM: currently in shared memory(e.g. /dev/shm).
/// - MMAP_DISK: memory that's fallback allocated on mmapped disk,
/// e.g. /tmp.
/// - WORKER_HEAP: ray objects smaller than ('max_direct_call_object_size',
/// default 100KiB) stored in process memory, i.e. inlined return
/// values, placeholders for objects stored in plasma store.
/// - SPILLED: current number of bytes from objects spilled
/// to external storage. Note this might be smaller than
/// the physical storage incurred on the external storage because
/// Ray might fuse spilled objects into a single file, so a deleted
/// spill object might still exist in the spilled file. Check
/// spilled object fusing for more details.
/// ObjectState:
/// - SEALED: sealed objects bytes (could be MMAP_SHM or MMAP_DISK)
/// - UNSEALED: unsealed objects bytes (could be MMAP_SHM or MMAP_DISK)
/*tag_keys=*/{"Location", "ObjectState"},
};
}

} // namespace ray
3 changes: 2 additions & 1 deletion src/ray/core_worker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ ray_cc_library(
":reference_counter_interface",
"//src/ray/common:asio",
"//src/ray/common:id",
"//src/ray/common:metrics",
"//src/ray/common:ray_config",
"//src/ray/common:status",
"//src/ray/raylet_ipc_client:raylet_ipc_client_interface",
Expand Down Expand Up @@ -423,6 +424,6 @@ ray_cc_library(
name = "metrics",
hdrs = ["metrics.h"],
deps = [
"//src/ray/stats:stats_lib",
"//src/ray/stats:stats_metric",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ MemoryStoreStats CoreWorkerMemoryStore::GetMemoryStoreStatisticalData() {

void CoreWorkerMemoryStore::RecordMetrics() {
absl::MutexLock lock(&mu_);
stats::STATS_object_store_memory.Record(num_local_objects_bytes_,
{{stats::LocationKey, "WORKER_HEAP"}});
object_store_memory_gauge_.Record(num_local_objects_bytes_,
{{stats::LocationKey, "WORKER_HEAP"}});
}

} // namespace core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "absl/synchronization/mutex.h"
#include "ray/common/asio/asio_util.h"
#include "ray/common/id.h"
#include "ray/common/metrics.h"
#include "ray/common/status.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/reference_counter_interface.h"
Expand Down Expand Up @@ -253,6 +254,8 @@ class CoreWorkerMemoryStore {
std::function<std::shared_ptr<RayObject>(const RayObject &object,
const ObjectID &object_id)>
object_allocator_;

ray::stats::Gauge object_store_memory_gauge_{ray::GetObjectStoreMemoryGaugeMetric()};
};

} // namespace core
Expand Down
12 changes: 12 additions & 0 deletions src/ray/object_manager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ ray_cc_library(
hdrs = ["object_manager.h"],
deps = [
":chunk_object_reader",
":metrics",
":object_buffer_pool",
":object_directory",
":object_manager_common",
Expand All @@ -30,6 +31,7 @@ ray_cc_library(
srcs = ["push_manager.cc"],
hdrs = ["push_manager.h"],
deps = [
":metrics",
"//src/ray/common:id",
"//src/ray/stats:stats_metric",
"@com_google_absl//absl/container:flat_hash_map",
Expand All @@ -41,6 +43,7 @@ ray_cc_library(
srcs = ["pull_manager.cc"],
hdrs = ["pull_manager.h"],
deps = [
":metrics",
":object_manager_common",
":ownership_object_directory",
"//src/ray/common:id",
Expand Down Expand Up @@ -162,3 +165,12 @@ ray_cc_library(
hdrs = ["object_reader.h"],
deps = ["//src/ray/protobuf:common_cc_proto"],
)

ray_cc_library(
name = "metrics",
hdrs = ["metrics.h"],
deps = [
"//src/ray/stats:stats_metric",
"//src/ray/util:size_literals",
],
)
177 changes: 177 additions & 0 deletions src/ray/object_manager/metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "ray/stats/metric.h"
#include "ray/util/size_literals.h"

namespace ray {

using ray::literals::operator""_MiB;

inline ray::stats::Histogram GetObjectStoreDistHistogramMetric() {
return ray::stats::Histogram{
/*name=*/"object_store_dist",
/*description=*/"The distribution of object size in bytes",
/*unit=*/"MiB",
/*boundaries=*/
{32_MiB,
64_MiB,
128_MiB,
256_MiB,
512_MiB,
1024_MiB,
2048_MiB,
4096_MiB,
8192_MiB,
16384_MiB},
/*tag_keys=*/{"Source"},
};
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Histogram Boundaries Mismatch Object Size Units

The object_store_dist histogram in src/ray/object_manager/metrics.h defines its boundaries using plain numerical values. However, the metric's unit is "MiB" and its description indicates it tracks object sizes in bytes. This scaling mismatch means the boundaries are too small for the actual object sizes, rendering the histogram buckets ineffective.

Fix in Cursor Fix in Web


inline ray::stats::Gauge GetObjectStoreAvailableMemoryGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"object_store_available_memory",
/*description=*/"Amount of memory currently available in the object store.",
/*unit=*/"bytes");
}

inline ray::stats::Gauge GetObjectStoreUsedMemoryGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"object_store_used_memory",
/*description=*/"Amount of memory currently occupied in the object store.",
/*unit=*/"bytes");
}

inline ray::stats::Gauge GetObjectStoreFallbackMemoryGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"object_store_fallback_memory",
/*description=*/"Amount of memory in fallback allocations in the filesystem.",
/*unit=*/"bytes");
}

inline ray::stats::Gauge GetObjectStoreLocalObjectsGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"object_store_num_local_objects",
/*description=*/"Number of objects currently in the object store.",
/*unit=*/"objects");
}

inline ray::stats::Gauge GetObjectManagerPullRequestsGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"object_manager_num_pull_requests",
/*description=*/"Number of active pull requests for objects.",
/*unit=*/"requests");
}

inline ray::stats::Gauge GetObjectManagerBytesGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"object_manager_bytes",
/*description=*/
"Number of bytes pushed or received by type {PushedFromLocalPlasma, "
"PushedFromLocalDisk, Received}.",
/*unit=*/"bytes",
/*tag_keys=*/{"Type"});
}

inline ray::stats::Gauge GetObjectManagerReceivedChunksGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"object_manager_received_chunks",
/*description=*/
"Number object chunks received broken per type {Total, FailedTotal, "
"FailedCancelled, FailedPlasmaFull}.",
/*unit=*/"chunks",
/*tag_keys=*/{"Type"});
}

inline ray::stats::Gauge GetPullManagerUsageBytesGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"pull_manager_usage_bytes",
/*description=*/
"The total number of bytes usage broken per type {Available, BeingPulled, Pinned}",
/*unit=*/"bytes",
/*tag_keys=*/{"Type"});
}

inline ray::stats::Gauge GetPullManagerRequestedBundlesGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"pull_manager_requested_bundles",
/*description=*/
"Number of requested bundles broken per type {Get, Wait, TaskArgs}.",
/*unit=*/"bundles",
/*tag_keys=*/{"Type"});
}

inline ray::stats::Gauge GetPullManagerRequestsGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"pull_manager_requests",
/*description=*/"Number of pull requests broken per type {Queued, Active, Pinned}.",
/*unit=*/"requests",
/*tag_keys=*/{"Type"});
}

inline ray::stats::Gauge GetPullManagerActiveBundlesGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"pull_manager_active_bundles",
/*description=*/"Number of active bundle requests",
/*unit=*/"bundles",
/*tag_keys=*/{"Type"});
}

inline ray::stats::Gauge GetPullManagerRetriesTotalGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"pull_manager_retries_total",
/*description=*/"Number of cumulative pull retries.",
/*unit=*/"retries",
/*tag_keys=*/{"Type"});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Metric Tagging Inconsistency

The GetPullManagerActiveBundlesGaugeMetric and GetPullManagerRetriesTotalGaugeMetric functions define tag_keys={"Type"}. However, these metrics are recorded without providing any tags in pull_manager.cc, which is inconsistent with their previous untagged definitions and may lead to unexpected metric behavior.

Fix in Cursor Fix in Web

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Metric Tag Mismatch Causes Recording Issues

The GetPullManagerRetriesTotalGaugeMetric and GetPushManagerNumPushesRemainingGaugeMetric definitions specify tag_keys={"Type"}. However, their corresponding Record() calls in pull_manager.cc and push_manager.cc do not provide any tags. This mismatch, diverging from their original tag-less definitions, may lead to unexpected metric recording behavior.

Additional Locations (1)

Fix in Cursor Fix in Web


inline ray::stats::Gauge GetPullManagerNumObjectPinsGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"pull_manager_num_object_pins",
/*description=*/
"Number of object pin attempts by the pull manager, can be {Success, Failure}.",
/*unit=*/"pins",
/*tag_keys=*/{"Type"});
}

inline ray::stats::Histogram GetPullManagerObjectRequestTimeMsHistogramMetric() {
return ray::stats::Histogram(
/*name=*/"pull_manager_object_request_time_ms",
/*description=*/
"Time between initial object pull request and local pinning of the object. ",
/*unit=*/"ms",
/*boundaries=*/{1, 10, 100, 1000, 10000},
/*tag_keys=*/{"Type"});
}

inline ray::stats::Gauge GetPushManagerNumPushesRemainingGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"push_manager_num_pushes_remaining",
/*description=*/"Number of pushes not completed.",
/*unit=*/"pushes",
/*tag_keys=*/{"Type"});
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Metric Tag Mismatch Causes Recording Issues

The GetPushManagerNumPushesRemainingGaugeMetric is defined with a {"Type"} tag, but its Record() calls in push_manager.cc don't provide tags. This inconsistency, which differs from the original metric's definition, may lead to unexpected metric recording behavior.

Fix in Cursor Fix in Web


inline ray::stats::Gauge GetPushManagerChunksGaugeMetric() {
return ray::stats::Gauge(
/*name=*/"push_manager_chunks",
/*description=*/
"Number of object chunks transfer broken per type {InFlight, Remaining}.",
/*unit=*/"chunks",
/*tag_keys=*/{"Type"});
}

} // namespace ray
40 changes: 19 additions & 21 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "ray/common/asio/asio_util.h"
#include "ray/object_manager/plasma/store_runner.h"
#include "ray/object_manager/spilled_object_reader.h"
#include "ray/stats/metric_defs.h"
#include "ray/util/exponential_backoff.h"

namespace ray {
Expand Down Expand Up @@ -770,32 +769,31 @@ void ObjectManager::RecordMetrics() {
push_manager_->RecordMetrics();
// used_memory_ includes the fallback allocation, so we should add it again here
// to calculate the exact available memory.
ray_metric_object_store_available_memory_.Record(
object_store_available_memory_gauge_.Record(
config_.object_store_memory - used_memory_ +
plasma::plasma_store_runner->GetFallbackAllocated());
// Subtract fallback allocated memory. It is tracked separately by
// `ObjectStoreFallbackMemory`.
ray_metric_object_store_used_memory_.Record(
object_store_used_memory_gauge_.Record(
used_memory_ - plasma::plasma_store_runner->GetFallbackAllocated());
ray_metric_object_store_fallback_memory_.Record(
object_store_fallback_memory_gauge_.Record(
plasma::plasma_store_runner->GetFallbackAllocated());
ray_metric_object_store_local_objects_.Record(local_objects_.size());
ray_metric_object_manager_pull_requests_.Record(pull_manager_->NumObjectPullRequests());

ray::stats::STATS_object_manager_bytes.Record(num_bytes_pushed_from_plasma_,
"PushedFromLocalPlasma");
ray::stats::STATS_object_manager_bytes.Record(num_bytes_pushed_from_disk_,
"PushedFromLocalDisk");
ray::stats::STATS_object_manager_bytes.Record(num_bytes_received_total_, "Received");

ray::stats::STATS_object_manager_received_chunks.Record(num_chunks_received_total_,
"Total");
ray::stats::STATS_object_manager_received_chunks.Record(
num_chunks_received_total_failed_, "FailedTotal");
ray::stats::STATS_object_manager_received_chunks.Record(num_chunks_received_cancelled_,
"FailedCancelled");
ray::stats::STATS_object_manager_received_chunks.Record(
num_chunks_received_failed_due_to_plasma_, "FailedPlasmaFull");
object_store_local_objects_gauge_.Record(local_objects_.size());
object_manager_pull_requests_gauge_.Record(pull_manager_->NumObjectPullRequests());

object_manager_bytes_gauge_.Record(num_bytes_pushed_from_plasma_,
{{"Type", "PushedFromLocalPlasma"}});
object_manager_bytes_gauge_.Record(num_bytes_pushed_from_disk_,
{{"Type", "PushedFromLocalDisk"}});
object_manager_bytes_gauge_.Record(num_bytes_received_total_, {{"Type", "Received"}});
object_manager_received_chunks_gauge_.Record(num_chunks_received_total_,
{{"Type", "Total"}});
object_manager_received_chunks_gauge_.Record(num_chunks_received_total_failed_,
{{"Type", "FailedTotal"}});
object_manager_received_chunks_gauge_.Record(num_chunks_received_cancelled_,
{{"Type", "FailedCancelled"}});
object_manager_received_chunks_gauge_.Record(num_chunks_received_failed_due_to_plasma_,
{{"Type", "FailedPlasmaFull"}});
}

void ObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const {
Expand Down
38 changes: 13 additions & 25 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,31 +521,19 @@ class ObjectManager : public ObjectManagerInterface,
/// plasma.
size_t num_chunks_received_failed_due_to_plasma_ = 0;

/// Metrics
ray::stats::Gauge ray_metric_object_store_available_memory_{
/*name=*/"object_store_available_memory",
/*description=*/"Amount of memory currently available in the object store.",
/*unit=*/"bytes"};

ray::stats::Gauge ray_metric_object_store_used_memory_{
/*name=*/"object_store_used_memory",
/*description=*/"Amount of memory currently occupied in the object store.",
/*unit=*/"bytes"};

ray::stats::Gauge ray_metric_object_store_fallback_memory_{
/*name=*/"object_store_fallback_memory",
/*description=*/"Amount of memory in fallback allocations in the filesystem.",
/*unit=*/"bytes"};

ray::stats::Gauge ray_metric_object_store_local_objects_{
/*name=*/"object_store_num_local_objects",
/*description=*/"Number of objects currently in the object store.",
/*unit=*/"objects"};

ray::stats::Gauge ray_metric_object_manager_pull_requests_{
/*name=*/"object_manager_num_pull_requests",
/*description=*/"Number of active pull requests for objects.",
/*unit=*/"requests"};
ray::stats::Gauge object_store_available_memory_gauge_{
GetObjectStoreAvailableMemoryGaugeMetric()};
ray::stats::Gauge object_store_used_memory_gauge_{
ray::GetObjectStoreUsedMemoryGaugeMetric()};
ray::stats::Gauge object_store_fallback_memory_gauge_{
ray::GetObjectStoreFallbackMemoryGaugeMetric()};
ray::stats::Gauge object_store_local_objects_gauge_{
ray::GetObjectStoreLocalObjectsGaugeMetric()};
ray::stats::Gauge object_manager_pull_requests_gauge_{
ray::GetObjectManagerPullRequestsGaugeMetric()};
ray::stats::Gauge object_manager_bytes_gauge_{ray::GetObjectManagerBytesGaugeMetric()};
ray::stats::Gauge object_manager_received_chunks_gauge_{
ray::GetObjectManagerReceivedChunksGaugeMetric()};
};

} // namespace ray
Loading