diff --git a/src/ray/common/metrics.h b/src/ray/common/metrics.h index 7fdf7f1a331d..969a30fad555 100644 --- a/src/ray/common/metrics.h +++ b/src/ray/common/metrics.h @@ -48,4 +48,29 @@ inline ray::stats::Gauge GetActorByStateGaugeMetric() { }; } +inline ray::stats::Gauge GetObjectStoreMemoryGaugeMetric() { + return ray::stats::Gauge{ + /*name=*/"object_store_memory", + /*description=*/"Object store memory by various sub-kinds on this node", + /*unit=*/"", + /// Location: + /// - MMAP_SHM: currently in shared memory(e.g. /dev/shm). + /// - MMAP_DISK: memory that's fallback allocated on mmapped disk, + /// e.g. /tmp. + /// - WORKER_HEAP: ray objects smaller than ('max_direct_call_object_size', + /// default 100KiB) stored in process memory, i.e. inlined return + /// values, placeholders for objects stored in plasma store. + /// - SPILLED: current number of bytes from objects spilled + /// to external storage. Note this might be smaller than + /// the physical storage incurred on the external storage because + /// Ray might fuse spilled objects into a single file, so a deleted + /// spill object might still exist in the spilled file. Check + /// spilled object fusing for more details. + /// ObjectState: + /// - SEALED: sealed objects bytes (could be MMAP_SHM or MMAP_DISK) + /// - UNSEALED: unsealed objects bytes (could be MMAP_SHM or MMAP_DISK) + /*tag_keys=*/{"Location", "ObjectState"}, + }; +} + } // namespace ray diff --git a/src/ray/core_worker/BUILD.bazel b/src/ray/core_worker/BUILD.bazel index 9726db9dedaf..a92f4f4323b0 100644 --- a/src/ray/core_worker/BUILD.bazel +++ b/src/ray/core_worker/BUILD.bazel @@ -272,6 +272,7 @@ ray_cc_library( ":reference_counter_interface", "//src/ray/common:asio", "//src/ray/common:id", + "//src/ray/common:metrics", "//src/ray/common:ray_config", "//src/ray/common:status", "//src/ray/raylet_ipc_client:raylet_ipc_client_interface", @@ -423,6 +424,6 @@ ray_cc_library( name = "metrics", hdrs = ["metrics.h"], deps = [ - "//src/ray/stats:stats_lib", + "//src/ray/stats:stats_metric", ], ) diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index bd7e2a63deb5..f3e444cca5c4 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -597,8 +597,8 @@ MemoryStoreStats CoreWorkerMemoryStore::GetMemoryStoreStatisticalData() { void CoreWorkerMemoryStore::RecordMetrics() { absl::MutexLock lock(&mu_); - stats::STATS_object_store_memory.Record(num_local_objects_bytes_, - {{stats::LocationKey, "WORKER_HEAP"}}); + object_store_memory_gauge_.Record(num_local_objects_bytes_, + {{stats::LocationKey, "WORKER_HEAP"}}); } } // namespace core diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index e305a64b0d01..1852a3dc1f7b 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -24,6 +24,7 @@ #include "absl/synchronization/mutex.h" #include "ray/common/asio/asio_util.h" #include "ray/common/id.h" +#include "ray/common/metrics.h" #include "ray/common/status.h" #include "ray/core_worker/context.h" #include "ray/core_worker/reference_counter_interface.h" @@ -253,6 +254,8 @@ class CoreWorkerMemoryStore { std::function(const RayObject &object, const ObjectID &object_id)> object_allocator_; + + ray::stats::Gauge object_store_memory_gauge_{ray::GetObjectStoreMemoryGaugeMetric()}; }; } // namespace core diff --git a/src/ray/object_manager/BUILD.bazel b/src/ray/object_manager/BUILD.bazel index 7674d6aaef20..9bd930e0690f 100644 --- a/src/ray/object_manager/BUILD.bazel +++ b/src/ray/object_manager/BUILD.bazel @@ -6,6 +6,7 @@ ray_cc_library( hdrs = ["object_manager.h"], deps = [ ":chunk_object_reader", + ":metrics", ":object_buffer_pool", ":object_directory", ":object_manager_common", @@ -30,6 +31,7 @@ ray_cc_library( srcs = ["push_manager.cc"], hdrs = ["push_manager.h"], deps = [ + ":metrics", "//src/ray/common:id", "//src/ray/stats:stats_metric", "@com_google_absl//absl/container:flat_hash_map", @@ -41,6 +43,7 @@ ray_cc_library( srcs = ["pull_manager.cc"], hdrs = ["pull_manager.h"], deps = [ + ":metrics", ":object_manager_common", ":ownership_object_directory", "//src/ray/common:id", @@ -162,3 +165,12 @@ ray_cc_library( hdrs = ["object_reader.h"], deps = ["//src/ray/protobuf:common_cc_proto"], ) + +ray_cc_library( + name = "metrics", + hdrs = ["metrics.h"], + deps = [ + "//src/ray/stats:stats_metric", + "//src/ray/util:size_literals", + ], +) diff --git a/src/ray/object_manager/metrics.h b/src/ray/object_manager/metrics.h new file mode 100644 index 000000000000..84b1cb3d473e --- /dev/null +++ b/src/ray/object_manager/metrics.h @@ -0,0 +1,177 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/stats/metric.h" +#include "ray/util/size_literals.h" + +namespace ray { + +using ray::literals::operator""_MiB; + +inline ray::stats::Histogram GetObjectStoreDistHistogramMetric() { + return ray::stats::Histogram{ + /*name=*/"object_store_dist", + /*description=*/"The distribution of object size in bytes", + /*unit=*/"MiB", + /*boundaries=*/ + {32_MiB, + 64_MiB, + 128_MiB, + 256_MiB, + 512_MiB, + 1024_MiB, + 2048_MiB, + 4096_MiB, + 8192_MiB, + 16384_MiB}, + /*tag_keys=*/{"Source"}, + }; +} + +inline ray::stats::Gauge GetObjectStoreAvailableMemoryGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"object_store_available_memory", + /*description=*/"Amount of memory currently available in the object store.", + /*unit=*/"bytes"); +} + +inline ray::stats::Gauge GetObjectStoreUsedMemoryGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"object_store_used_memory", + /*description=*/"Amount of memory currently occupied in the object store.", + /*unit=*/"bytes"); +} + +inline ray::stats::Gauge GetObjectStoreFallbackMemoryGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"object_store_fallback_memory", + /*description=*/"Amount of memory in fallback allocations in the filesystem.", + /*unit=*/"bytes"); +} + +inline ray::stats::Gauge GetObjectStoreLocalObjectsGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"object_store_num_local_objects", + /*description=*/"Number of objects currently in the object store.", + /*unit=*/"objects"); +} + +inline ray::stats::Gauge GetObjectManagerPullRequestsGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"object_manager_num_pull_requests", + /*description=*/"Number of active pull requests for objects.", + /*unit=*/"requests"); +} + +inline ray::stats::Gauge GetObjectManagerBytesGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"object_manager_bytes", + /*description=*/ + "Number of bytes pushed or received by type {PushedFromLocalPlasma, " + "PushedFromLocalDisk, Received}.", + /*unit=*/"bytes", + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Gauge GetObjectManagerReceivedChunksGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"object_manager_received_chunks", + /*description=*/ + "Number object chunks received broken per type {Total, FailedTotal, " + "FailedCancelled, FailedPlasmaFull}.", + /*unit=*/"chunks", + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Gauge GetPullManagerUsageBytesGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"pull_manager_usage_bytes", + /*description=*/ + "The total number of bytes usage broken per type {Available, BeingPulled, Pinned}", + /*unit=*/"bytes", + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Gauge GetPullManagerRequestedBundlesGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"pull_manager_requested_bundles", + /*description=*/ + "Number of requested bundles broken per type {Get, Wait, TaskArgs}.", + /*unit=*/"bundles", + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Gauge GetPullManagerRequestsGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"pull_manager_requests", + /*description=*/"Number of pull requests broken per type {Queued, Active, Pinned}.", + /*unit=*/"requests", + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Gauge GetPullManagerActiveBundlesGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"pull_manager_active_bundles", + /*description=*/"Number of active bundle requests", + /*unit=*/"bundles", + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Gauge GetPullManagerRetriesTotalGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"pull_manager_retries_total", + /*description=*/"Number of cumulative pull retries.", + /*unit=*/"retries", + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Gauge GetPullManagerNumObjectPinsGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"pull_manager_num_object_pins", + /*description=*/ + "Number of object pin attempts by the pull manager, can be {Success, Failure}.", + /*unit=*/"pins", + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Histogram GetPullManagerObjectRequestTimeMsHistogramMetric() { + return ray::stats::Histogram( + /*name=*/"pull_manager_object_request_time_ms", + /*description=*/ + "Time between initial object pull request and local pinning of the object. ", + /*unit=*/"ms", + /*boundaries=*/{1, 10, 100, 1000, 10000}, + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Gauge GetPushManagerNumPushesRemainingGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"push_manager_num_pushes_remaining", + /*description=*/"Number of pushes not completed.", + /*unit=*/"pushes", + /*tag_keys=*/{"Type"}); +} + +inline ray::stats::Gauge GetPushManagerChunksGaugeMetric() { + return ray::stats::Gauge( + /*name=*/"push_manager_chunks", + /*description=*/ + "Number of object chunks transfer broken per type {InFlight, Remaining}.", + /*unit=*/"chunks", + /*tag_keys=*/{"Type"}); +} + +} // namespace ray diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 33442e9ad3bc..646198cce3f8 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -26,7 +26,6 @@ #include "ray/common/asio/asio_util.h" #include "ray/object_manager/plasma/store_runner.h" #include "ray/object_manager/spilled_object_reader.h" -#include "ray/stats/metric_defs.h" #include "ray/util/exponential_backoff.h" namespace ray { @@ -770,32 +769,31 @@ void ObjectManager::RecordMetrics() { push_manager_->RecordMetrics(); // used_memory_ includes the fallback allocation, so we should add it again here // to calculate the exact available memory. - ray_metric_object_store_available_memory_.Record( + object_store_available_memory_gauge_.Record( config_.object_store_memory - used_memory_ + plasma::plasma_store_runner->GetFallbackAllocated()); // Subtract fallback allocated memory. It is tracked separately by // `ObjectStoreFallbackMemory`. - ray_metric_object_store_used_memory_.Record( + object_store_used_memory_gauge_.Record( used_memory_ - plasma::plasma_store_runner->GetFallbackAllocated()); - ray_metric_object_store_fallback_memory_.Record( + object_store_fallback_memory_gauge_.Record( plasma::plasma_store_runner->GetFallbackAllocated()); - ray_metric_object_store_local_objects_.Record(local_objects_.size()); - ray_metric_object_manager_pull_requests_.Record(pull_manager_->NumObjectPullRequests()); - - ray::stats::STATS_object_manager_bytes.Record(num_bytes_pushed_from_plasma_, - "PushedFromLocalPlasma"); - ray::stats::STATS_object_manager_bytes.Record(num_bytes_pushed_from_disk_, - "PushedFromLocalDisk"); - ray::stats::STATS_object_manager_bytes.Record(num_bytes_received_total_, "Received"); - - ray::stats::STATS_object_manager_received_chunks.Record(num_chunks_received_total_, - "Total"); - ray::stats::STATS_object_manager_received_chunks.Record( - num_chunks_received_total_failed_, "FailedTotal"); - ray::stats::STATS_object_manager_received_chunks.Record(num_chunks_received_cancelled_, - "FailedCancelled"); - ray::stats::STATS_object_manager_received_chunks.Record( - num_chunks_received_failed_due_to_plasma_, "FailedPlasmaFull"); + object_store_local_objects_gauge_.Record(local_objects_.size()); + object_manager_pull_requests_gauge_.Record(pull_manager_->NumObjectPullRequests()); + + object_manager_bytes_gauge_.Record(num_bytes_pushed_from_plasma_, + {{"Type", "PushedFromLocalPlasma"}}); + object_manager_bytes_gauge_.Record(num_bytes_pushed_from_disk_, + {{"Type", "PushedFromLocalDisk"}}); + object_manager_bytes_gauge_.Record(num_bytes_received_total_, {{"Type", "Received"}}); + object_manager_received_chunks_gauge_.Record(num_chunks_received_total_, + {{"Type", "Total"}}); + object_manager_received_chunks_gauge_.Record(num_chunks_received_total_failed_, + {{"Type", "FailedTotal"}}); + object_manager_received_chunks_gauge_.Record(num_chunks_received_cancelled_, + {{"Type", "FailedCancelled"}}); + object_manager_received_chunks_gauge_.Record(num_chunks_received_failed_due_to_plasma_, + {{"Type", "FailedPlasmaFull"}}); } void ObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const { diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index ebcec48f7c44..8dadc3fa0caf 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -521,31 +521,19 @@ class ObjectManager : public ObjectManagerInterface, /// plasma. size_t num_chunks_received_failed_due_to_plasma_ = 0; - /// Metrics - ray::stats::Gauge ray_metric_object_store_available_memory_{ - /*name=*/"object_store_available_memory", - /*description=*/"Amount of memory currently available in the object store.", - /*unit=*/"bytes"}; - - ray::stats::Gauge ray_metric_object_store_used_memory_{ - /*name=*/"object_store_used_memory", - /*description=*/"Amount of memory currently occupied in the object store.", - /*unit=*/"bytes"}; - - ray::stats::Gauge ray_metric_object_store_fallback_memory_{ - /*name=*/"object_store_fallback_memory", - /*description=*/"Amount of memory in fallback allocations in the filesystem.", - /*unit=*/"bytes"}; - - ray::stats::Gauge ray_metric_object_store_local_objects_{ - /*name=*/"object_store_num_local_objects", - /*description=*/"Number of objects currently in the object store.", - /*unit=*/"objects"}; - - ray::stats::Gauge ray_metric_object_manager_pull_requests_{ - /*name=*/"object_manager_num_pull_requests", - /*description=*/"Number of active pull requests for objects.", - /*unit=*/"requests"}; + ray::stats::Gauge object_store_available_memory_gauge_{ + GetObjectStoreAvailableMemoryGaugeMetric()}; + ray::stats::Gauge object_store_used_memory_gauge_{ + ray::GetObjectStoreUsedMemoryGaugeMetric()}; + ray::stats::Gauge object_store_fallback_memory_gauge_{ + ray::GetObjectStoreFallbackMemoryGaugeMetric()}; + ray::stats::Gauge object_store_local_objects_gauge_{ + ray::GetObjectStoreLocalObjectsGaugeMetric()}; + ray::stats::Gauge object_manager_pull_requests_gauge_{ + ray::GetObjectManagerPullRequestsGaugeMetric()}; + ray::stats::Gauge object_manager_bytes_gauge_{ray::GetObjectManagerBytesGaugeMetric()}; + ray::stats::Gauge object_manager_received_chunks_gauge_{ + ray::GetObjectManagerReceivedChunksGaugeMetric()}; }; } // namespace ray diff --git a/src/ray/object_manager/ownership_object_directory.cc b/src/ray/object_manager/ownership_object_directory.cc index 544a84ba3158..f62e50a06654 100644 --- a/src/ray/object_manager/ownership_object_directory.cc +++ b/src/ray/object_manager/ownership_object_directory.cc @@ -19,8 +19,6 @@ #include #include -#include "ray/stats/metric_defs.h" - namespace ray { OwnershipBasedObjectDirectory::OwnershipBasedObjectDirectory( diff --git a/src/ray/object_manager/plasma/BUILD.bazel b/src/ray/object_manager/plasma/BUILD.bazel index 3a352425f149..f5c029e97643 100644 --- a/src/ray/object_manager/plasma/BUILD.bazel +++ b/src/ray/object_manager/plasma/BUILD.bazel @@ -181,6 +181,8 @@ ray_cc_library( hdrs = ["stats_collector.h"], deps = [ ":object_manager_plasma_common", + "//src/ray/common:metrics", + "//src/ray/object_manager:metrics", "//src/ray/stats:stats_metric", "//src/ray/util:counter_map", ], diff --git a/src/ray/object_manager/plasma/stats_collector.cc b/src/ray/object_manager/plasma/stats_collector.cc index 219c5c071d4c..f5e3d4adb55b 100644 --- a/src/ray/object_manager/plasma/stats_collector.cc +++ b/src/ray/object_manager/plasma/stats_collector.cc @@ -39,23 +39,23 @@ void ObjectStatsCollector::OnObjectCreated(const LocalObject &obj) { if (kSource == plasma::flatbuf::ObjectSource::CreatedByWorker) { num_objects_created_by_worker_++; num_bytes_created_by_worker_ += kObjectSize; - ray::stats::STATS_object_store_dist.Record( - kObjectSize, {{ray::stats::SourceKey, "CreatedByWorker"}}); + object_store_dist_histogram_.Record(kObjectSize, + {{ray::stats::SourceKey, "CreatedByWorker"}}); } else if (kSource == plasma::flatbuf::ObjectSource::RestoredFromStorage) { num_objects_restored_++; num_bytes_restored_ += kObjectSize; - ray::stats::STATS_object_store_dist.Record( - kObjectSize, {{ray::stats::SourceKey, "RestoredFromStorage"}}); + object_store_dist_histogram_.Record(kObjectSize, + {{ray::stats::SourceKey, "RestoredFromStorage"}}); } else if (kSource == plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet) { num_objects_received_++; num_bytes_received_ += kObjectSize; - ray::stats::STATS_object_store_dist.Record( + object_store_dist_histogram_.Record( kObjectSize, {{ray::stats::SourceKey, "ReceivedFromRemoteRaylet"}}); } else if (kSource == plasma::flatbuf::ObjectSource::ErrorStoredByRaylet) { num_objects_errored_++; num_bytes_errored_ += kObjectSize; - ray::stats::STATS_object_store_dist.Record( - kObjectSize, {{ray::stats::SourceKey, "ErrorStoredByRaylet"}}); + object_store_dist_histogram_.Record(kObjectSize, + {{ray::stats::SourceKey, "ErrorStoredByRaylet"}}); } RAY_CHECK(!obj.Sealed()); @@ -206,25 +206,25 @@ void ObjectStatsCollector::RecordMetrics() const { static std::string kObjectLocMmapDisk = "MMAP_DISK"; // Shared memory sealed - ray::stats::STATS_object_store_memory.Record( + object_store_memory_gauge_.Record( bytes_by_loc_seal_.Get({/* fallback_allocated */ false, /* sealed */ true}), {{ray::stats::LocationKey, kObjectLocMmapShm}, {ray::stats::ObjectStateKey, kObjectSealed}}); // Shared memory unsealed - ray::stats::STATS_object_store_memory.Record( + object_store_memory_gauge_.Record( bytes_by_loc_seal_.Get({/* fallback_allocated */ false, /* sealed */ false}), {{ray::stats::LocationKey, kObjectLocMmapShm}, {ray::stats::ObjectStateKey, kObjectUnsealed}}); // Fallback memory sealed - ray::stats::STATS_object_store_memory.Record( + object_store_memory_gauge_.Record( bytes_by_loc_seal_.Get({/* fallback_allocated */ true, /* sealed */ true}), {{ray::stats::LocationKey, kObjectLocMmapDisk}, {ray::stats::ObjectStateKey, kObjectSealed}}); // Fallback memory unsealed - ray::stats::STATS_object_store_memory.Record( + object_store_memory_gauge_.Record( bytes_by_loc_seal_.Get({/* fallback_allocated */ true, /* sealed */ false}), {{ray::stats::LocationKey, kObjectLocMmapDisk}, {ray::stats::ObjectStateKey, kObjectUnsealed}}); diff --git a/src/ray/object_manager/plasma/stats_collector.h b/src/ray/object_manager/plasma/stats_collector.h index 17f41fd0882c..6fafae3616e2 100644 --- a/src/ray/object_manager/plasma/stats_collector.h +++ b/src/ray/object_manager/plasma/stats_collector.h @@ -19,6 +19,8 @@ #include // std::pair +#include "ray/common/metrics.h" +#include "ray/object_manager/metrics.h" #include "ray/object_manager/plasma/common.h" #include "ray/util/counter_map.h" // CounterMap @@ -92,6 +94,11 @@ class ObjectStatsCollector { int64_t num_bytes_errored_ = 0; int64_t num_objects_created_total_ = 0; int64_t num_bytes_created_total_ = 0; + + mutable ray::stats::Gauge object_store_memory_gauge_{ + ray::GetObjectStoreMemoryGaugeMetric()}; + mutable ray::stats::Histogram object_store_dist_histogram_{ + ray::GetObjectStoreDistHistogramMetric()}; }; } // namespace plasma diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 203a7658ecfb..f9c424d021c1 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -49,7 +49,6 @@ #include "ray/object_manager/plasma/plasma_allocator.h" #include "ray/object_manager/plasma/protocol.h" #include "ray/raylet_ipc_client/client_connection.h" -#include "ray/stats/metric_defs.h" #include "ray/util/network_util.h" namespace ph = boost::placeholders; diff --git a/src/ray/object_manager/pull_manager.cc b/src/ray/object_manager/pull_manager.cc index 3c9af345c527..f44d13fb45e7 100644 --- a/src/ray/object_manager/pull_manager.cc +++ b/src/ray/object_manager/pull_manager.cc @@ -21,7 +21,6 @@ #include #include "ray/common/ray_config.h" -#include "ray/stats/metric_defs.h" namespace ray { @@ -342,9 +341,9 @@ std::vector PullManager::CancelPull(uint64_t request_id) { RAY_LOG(DEBUG) << "Removing an object pull request of id: " << obj_id; it->second.bundle_request_ids.erase(bundle_it->first); if (it->second.bundle_request_ids.empty()) { - ray::stats::STATS_pull_manager_object_request_time_ms.Record( + pull_manager_object_request_time_ms_histogram_.Record( absl::GetCurrentTimeNanos() / 1e3 - it->second.request_start_time_ms, - "StartToCancel"); + {{"Type", "StartToCancel"}}); object_pull_requests_.erase(it); object_ids_to_cancel_subscription.push_back(obj_id); } @@ -608,13 +607,13 @@ bool PullManager::TryPinObject(const ObjectID &object_id) { auto it = object_pull_requests_.find(object_id); RAY_CHECK(it != object_pull_requests_.end()); - ray::stats::STATS_pull_manager_object_request_time_ms.Record( + pull_manager_object_request_time_ms_histogram_.Record( absl::GetCurrentTimeNanos() / 1e3 - it->second.request_start_time_ms, - "StartToPin"); + {{"Type", "StartToPin"}}); if (it->second.activate_time_ms > 0) { - ray::stats::STATS_pull_manager_object_request_time_ms.Record( + pull_manager_object_request_time_ms_histogram_.Record( absl::GetCurrentTimeNanos() / 1e3 - it->second.activate_time_ms, - "MemoryAvailableToPin"); + {{"Type", "MemoryAvailableToPin"}}); } return true; } @@ -727,30 +726,29 @@ int64_t PullManager::NextRequestBundleSize(const BundlePullRequestQueue &bundles void PullManager::RecordMetrics() const { absl::MutexLock lock(&active_objects_mu_); - ray::stats::STATS_pull_manager_usage_bytes.Record(num_bytes_available_, "Available"); - ray::stats::STATS_pull_manager_usage_bytes.Record(num_bytes_being_pulled_, - "BeingPulled"); - ray::stats::STATS_pull_manager_usage_bytes.Record(pinned_objects_size_, "Pinned"); - ray::stats::STATS_pull_manager_requested_bundles.Record( - get_request_bundles_.requests.size(), "Get"); - ray::stats::STATS_pull_manager_requested_bundles.Record( - wait_request_bundles_.requests.size(), "Wait"); - ray::stats::STATS_pull_manager_requested_bundles.Record( - task_argument_bundles_.requests.size(), "TaskArgs"); - ray::stats::STATS_pull_manager_requested_bundles.Record(next_req_id_, - "CumulativeTotal"); - ray::stats::STATS_pull_manager_requests.Record(object_pull_requests_.size(), "Queued"); - ray::stats::STATS_pull_manager_requests.Record(active_object_pull_requests_.size(), - "Active"); - ray::stats::STATS_pull_manager_requests.Record(pinned_objects_.size(), "Pinned"); - ray::stats::STATS_pull_manager_active_bundles.Record(num_active_bundles_); - ray::stats::STATS_pull_manager_retries_total.Record(num_retries_total_); - ray::stats::STATS_pull_manager_retries_total.Record(num_tries_total_); - - ray::stats::STATS_pull_manager_num_object_pins.Record(num_succeeded_pins_total_, - "Success"); - ray::stats::STATS_pull_manager_num_object_pins.Record(num_failed_pins_total_, - "Failure"); + pull_manager_usage_bytes_gauge_.Record(num_bytes_available_, {{"Type", "Available"}}); + pull_manager_usage_bytes_gauge_.Record(num_bytes_being_pulled_, + {{"Type", "BeingPulled"}}); + pull_manager_usage_bytes_gauge_.Record(pinned_objects_size_, {{"Type", "Pinned"}}); + pull_manager_requested_bundles_gauge_.Record(get_request_bundles_.requests.size(), + {{"Type", "Get"}}); + pull_manager_requested_bundles_gauge_.Record(wait_request_bundles_.requests.size(), + {{"Type", "Wait"}}); + pull_manager_requested_bundles_gauge_.Record(task_argument_bundles_.requests.size(), + {{"Type", "TaskArgs"}}); + pull_manager_requested_bundles_gauge_.Record(next_req_id_, + {{"Type", "CumulativeTotal"}}); + pull_manager_requests_gauge_.Record(object_pull_requests_.size(), {{"Type", "Queued"}}); + pull_manager_requests_gauge_.Record(active_object_pull_requests_.size(), + {{"Type", "Active"}}); + pull_manager_requests_gauge_.Record(pinned_objects_.size(), {{"Type", "Pinned"}}); + pull_manager_active_bundles_gauge_.Record(num_active_bundles_); + pull_manager_retries_total_gauge_.Record(num_retries_total_); + pull_manager_retries_total_gauge_.Record(num_tries_total_); + pull_manager_num_object_pins_gauge_.Record(num_succeeded_pins_total_, + {{"Type", "Success"}}); + pull_manager_num_object_pins_gauge_.Record(num_failed_pins_total_, + {{"Type", "Failure"}}); } std::string PullManager::DebugString() const { diff --git a/src/ray/object_manager/pull_manager.h b/src/ray/object_manager/pull_manager.h index c9a7c34f5c70..aa448115903d 100644 --- a/src/ray/object_manager/pull_manager.h +++ b/src/ray/object_manager/pull_manager.h @@ -27,6 +27,7 @@ #include "ray/common/id.h" #include "ray/common/ray_object.h" #include "ray/object_manager/common.h" +#include "ray/object_manager/metrics.h" #include "ray/util/container_util.h" #include "ray/util/counter_map.h" @@ -512,6 +513,21 @@ class PullManager { int64_t num_succeeded_pins_total_ = 0; int64_t num_failed_pins_total_ = 0; + mutable ray::stats::Gauge pull_manager_usage_bytes_gauge_{ + GetPullManagerUsageBytesGaugeMetric()}; + mutable ray::stats::Gauge pull_manager_requested_bundles_gauge_{ + GetPullManagerRequestedBundlesGaugeMetric()}; + mutable ray::stats::Gauge pull_manager_requests_gauge_{ + GetPullManagerRequestsGaugeMetric()}; + mutable ray::stats::Gauge pull_manager_active_bundles_gauge_{ + GetPullManagerActiveBundlesGaugeMetric()}; + mutable ray::stats::Gauge pull_manager_retries_total_gauge_{ + GetPullManagerRetriesTotalGaugeMetric()}; + mutable ray::stats::Gauge pull_manager_num_object_pins_gauge_{ + GetPullManagerNumObjectPinsGaugeMetric()}; + mutable ray::stats::Histogram pull_manager_object_request_time_ms_histogram_{ + GetPullManagerObjectRequestTimeMsHistogramMetric()}; + friend class PullManagerTest; friend class PullManagerTestWithCapacity; friend class PullManagerWithAdmissionControlTest; diff --git a/src/ray/object_manager/push_manager.cc b/src/ray/object_manager/push_manager.cc index 1487c6d5b563..439ff1cd4de1 100644 --- a/src/ray/object_manager/push_manager.cc +++ b/src/ray/object_manager/push_manager.cc @@ -17,8 +17,6 @@ #include #include -#include "ray/stats/metric_defs.h" - namespace ray { void PushManager::StartPush(const NodeID &dest_id, @@ -106,10 +104,9 @@ void PushManager::HandleNodeRemoved(const NodeID &node_id) { } void PushManager::RecordMetrics() const { - ray::stats::STATS_push_manager_num_pushes_remaining.Record( - NumPushRequestsWithChunksToSend()); - ray::stats::STATS_push_manager_chunks.Record(NumChunksInFlight(), "InFlight"); - ray::stats::STATS_push_manager_chunks.Record(NumChunksRemaining(), "Remaining"); + push_manager_num_pushes_remaining_gauge_.Record(NumPushRequestsWithChunksToSend()); + push_manager_chunks_gauge_.Record(NumChunksInFlight(), {{"Type", "InFlight"}}); + push_manager_chunks_gauge_.Record(NumChunksRemaining(), {{"Type", "Remaining"}}); } std::string PushManager::DebugString() const { diff --git a/src/ray/object_manager/push_manager.h b/src/ray/object_manager/push_manager.h index 4149d8f29c30..969f8fa82fcd 100644 --- a/src/ray/object_manager/push_manager.h +++ b/src/ray/object_manager/push_manager.h @@ -20,6 +20,7 @@ #include "absl/container/flat_hash_map.h" #include "ray/common/id.h" +#include "ray/object_manager/metrics.h" namespace ray { @@ -138,6 +139,10 @@ class PushManager { /// The list of push requests with chunks waiting to be sent. std::list push_requests_with_chunks_to_send_; + + mutable ray::stats::Gauge push_manager_num_pushes_remaining_gauge_{ + GetPushManagerNumPushesRemainingGaugeMetric()}; + mutable ray::stats::Gauge push_manager_chunks_gauge_{GetPushManagerChunksGaugeMetric()}; }; } // namespace ray diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index fb2b037dc29d..259cd01db5e4 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -647,8 +647,8 @@ void LocalObjectManager::RecordMetrics() const { ray::stats::STATS_spill_manager_request_total.Record(restored_objects_total_, "Restored"); - stats::STATS_object_store_memory.Record(spilled_bytes_current_, - {{stats::LocationKey, "SPILLED"}}); + object_store_memory_gauge_.Record(spilled_bytes_current_, + {{stats::LocationKey, "SPILLED"}}); ray::stats::STATS_spill_manager_request_total.Record(num_failed_deletion_requests_, "FailedDeletion"); diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 5a0751947347..12a3fe63248c 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -25,6 +25,7 @@ #include "ray/common/ray_object.h" #include "ray/core_worker_rpc_client/core_worker_client_pool.h" #include "ray/object_manager/object_directory.h" +#include "ray/observability/metric_interface.h" #include "ray/pubsub/subscriber_interface.h" #include "ray/raylet/local_object_manager_interface.h" #include "ray/raylet/worker_pool.h" @@ -56,7 +57,8 @@ class LocalObjectManager : public LocalObjectManagerInterface { std::function &)> on_objects_freed, std::function is_plasma_object_spillable, pubsub::SubscriberInterface *core_worker_subscriber, - IObjectDirectory *object_directory) + IObjectDirectory *object_directory, + ray::observability::MetricInterface &object_store_memory_gauge) : self_node_id_(node_id), self_node_address_(std::move(self_node_address)), self_node_port_(self_node_port), @@ -75,7 +77,8 @@ class LocalObjectManager : public LocalObjectManagerInterface { max_fused_object_count_(max_fused_object_count), next_spill_error_log_bytes_(RayConfig::instance().verbose_spill_logs()), core_worker_subscriber_(core_worker_subscriber), - object_directory_(object_directory) {} + object_directory_(object_directory), + object_store_memory_gauge_(object_store_memory_gauge) {} /// Pin objects. /// @@ -388,6 +391,8 @@ class LocalObjectManager : public LocalObjectManagerInterface { /// The number of failed deletion requests. std::atomic num_failed_deletion_requests_ = 0; + ray::observability::MetricInterface &object_store_memory_gauge_; + friend class LocalObjectManagerTestWithMinSpillingSize; }; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 4556c643bbc6..8203926bea05 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -31,6 +31,7 @@ #include "ray/common/constants.h" #include "ray/common/id.h" #include "ray/common/lease/lease.h" +#include "ray/common/metrics.h" #include "ray/common/ray_config.h" #include "ray/common/status.h" #include "ray/common/status_or.h" @@ -329,6 +330,7 @@ int main(int argc, char *argv[]) { RAY_CHECK_OK(gcs_client->Connect(main_service)); ray::stats::Gauge task_by_state_counter = ray::core::GetTaskByStateGaugeMetric(); + ray::stats::Gauge object_store_memory_gauge = ray::GetObjectStoreMemoryGaugeMetric(); std::shared_ptr plasma_client; std::unique_ptr placement_group_resource_manager; @@ -797,7 +799,8 @@ int main(int argc, char *argv[]) { return object_manager->IsPlasmaObjectSpillable(object_id); }, /*core_worker_subscriber_=*/core_worker_subscriber.get(), - object_directory.get()); + object_directory.get(), + object_store_memory_gauge); lease_dependency_manager = std::make_unique( *object_manager, task_by_state_counter); diff --git a/src/ray/raylet/scheduling/BUILD.bazel b/src/ray/raylet/scheduling/BUILD.bazel index 01987f04a65f..1ac700c2419c 100644 --- a/src/ray/raylet/scheduling/BUILD.bazel +++ b/src/ray/raylet/scheduling/BUILD.bazel @@ -124,6 +124,7 @@ ray_cc_library( "//src/ray/common:ray_syncer", "//src/ray/common/scheduling:cluster_resource_data", "//src/ray/common/scheduling:placement_group_util", + "//src/ray/observability:metric_interface", "//src/ray/protobuf:gcs_cc_proto", "//src/ray/protobuf:node_manager_cc_proto", "//src/ray/stats:stats_metric", diff --git a/src/ray/raylet/tests/BUILD.bazel b/src/ray/raylet/tests/BUILD.bazel index 6b67595a3777..79265c8cae6c 100644 --- a/src/ray/raylet/tests/BUILD.bazel +++ b/src/ray/raylet/tests/BUILD.bazel @@ -74,6 +74,7 @@ ray_cc_test( "//:ray_mock", "//src/ray/common:id", "//src/ray/common/scheduling:placement_group_util", + "//src/ray/observability:fake_metric", "//src/ray/raylet:placement_group_resource_manager", "@com_google_googletest//:gtest_main", ], diff --git a/src/ray/raylet/tests/local_object_manager_test.cc b/src/ray/raylet/tests/local_object_manager_test.cc index 213c3a8c1cc8..f03b980adb14 100644 --- a/src/ray/raylet/tests/local_object_manager_test.cc +++ b/src/ray/raylet/tests/local_object_manager_test.cc @@ -32,6 +32,7 @@ #include "ray/core_worker_rpc_client/fake_core_worker_client.h" #include "ray/gcs_rpc_client/accessor.h" #include "ray/object_manager/ownership_object_directory.h" +#include "ray/observability/fake_metric.h" #include "ray/pubsub/subscriber.h" #include "ray/raylet/tests/util.h" #include "ray/raylet/worker_pool.h" @@ -349,7 +350,8 @@ class LocalObjectManagerTestWithMinSpillingSize { return unevictable_objects_.count(object_id) == 0; }, /*core_worker_subscriber=*/subscriber_.get(), - object_directory_.get()), + object_directory_.get(), + fake_object_store_memory_gauge_), unpins(std::make_shared>()) { RayConfig::instance().initialize(R"({"object_spilling_config": "dummy"})"); manager.min_spilling_size_ = min_spilling_size; @@ -405,6 +407,7 @@ class LocalObjectManagerTestWithMinSpillingSize { std::unique_ptr gcs_client_; std::unique_ptr object_directory_; LocalObjectManager manager; + ray::observability::FakeGauge fake_object_store_memory_gauge_; std::unordered_set freed; // This hashmap is incremented when objects are unpinned by destroying their diff --git a/src/ray/stats/metric_defs.cc b/src/ray/stats/metric_defs.cc index 6265f7243b68..3009d2ab4287 100644 --- a/src/ray/stats/metric_defs.cc +++ b/src/ray/stats/metric_defs.cc @@ -70,21 +70,6 @@ DEFINE_stats( (), ray::stats::GAUGE); -DEFINE_stats(object_store_dist, - "The distribution of object size in bytes", - ("Source"), - ({32_MiB, - 64_MiB, - 128_MiB, - 256_MiB, - 512_MiB, - 1024_MiB, - 2048_MiB, - 4096_MiB, - 8192_MiB, - 16384_MiB}), - ray::stats::HISTOGRAM); - /// =============================================================================== /// ===================== INTERNAL SYSTEM METRICS ================================= /// =============================================================================== @@ -110,72 +95,6 @@ DEFINE_stats(operation_queue_time_ms, DEFINE_stats( operation_active_count, "active operation number", ("Name"), (), ray::stats::GAUGE); -/// Object Manager. -DEFINE_stats(object_manager_bytes, - "Number of bytes pushed or received by type {PushedFromLocalPlasma, " - "PushedFromLocalDisk, Received}.", - ("Type"), - (), - ray::stats::GAUGE); - -DEFINE_stats(object_manager_received_chunks, - "Number object chunks received broken per type {Total, FailedTotal, " - "FailedCancelled, FailedPlasmaFull}.", - ("Type"), - (), - ray::stats::GAUGE); - -/// Pull Manager -DEFINE_stats( - pull_manager_usage_bytes, - "The total number of bytes usage broken per type {Available, BeingPulled, Pinned}", - ("Type"), - (), - ray::stats::GAUGE); -DEFINE_stats(pull_manager_requested_bundles, - "Number of requested bundles broken per type {Get, Wait, TaskArgs}.", - ("Type"), - (), - ray::stats::GAUGE); -DEFINE_stats(pull_manager_requests, - "Number of pull requests broken per type {Queued, Active, Pinned}.", - ("Type"), - (), - ray::stats::GAUGE); -DEFINE_stats(pull_manager_active_bundles, - "Number of active bundle requests", - (), - (), - ray::stats::GAUGE); -DEFINE_stats(pull_manager_retries_total, - "Number of cumulative pull retries.", - (), - (), - ray::stats::GAUGE); -DEFINE_stats( - pull_manager_num_object_pins, - "Number of object pin attempts by the pull manager, can be {Success, Failure}.", - ("Type"), - (), - ray::stats::GAUGE); -DEFINE_stats(pull_manager_object_request_time_ms, - "Time between initial object pull request and local pinning of the object. ", - ("Type"), - ({1, 10, 100, 1000, 10000}), - ray::stats::HISTOGRAM); - -/// Push Manager -DEFINE_stats(push_manager_num_pushes_remaining, - "Number of pushes not completed.", - (), - (), - ray::stats::GAUGE); -DEFINE_stats(push_manager_chunks, - "Number of object chunks transfer broken per type {InFlight, Remaining}.", - ("Type"), - (), - ray::stats::GAUGE); - /// Scheduler DEFINE_stats( scheduler_tasks, diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index 438bcd7cdc27..b0537b770c7c 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -51,25 +51,6 @@ DECLARE_stats(operation_run_time_ms); DECLARE_stats(operation_queue_time_ms); DECLARE_stats(operation_active_count); -/// Object Manager. -DECLARE_stats(object_manager_bytes); -DECLARE_stats(object_manager_received_chunks); - -/// Pull Manager -DECLARE_stats(pull_manager_usage_bytes); -// TODO(sang): Remove pull_manager_active_bundles and -// support active/inactive get/wait/task_args -DECLARE_stats(pull_manager_requested_bundles); -DECLARE_stats(pull_manager_requests); -DECLARE_stats(pull_manager_active_bundles); -DECLARE_stats(pull_manager_retries_total); -DECLARE_stats(pull_manager_num_object_pins); -DECLARE_stats(pull_manager_object_request_time_ms); - -/// Push Manager -DECLARE_stats(push_manager_num_pushes_remaining); -DECLARE_stats(push_manager_chunks); - /// Scheduler DECLARE_stats(scheduler_failed_worker_startup_total); DECLARE_stats(scheduler_tasks); @@ -88,7 +69,6 @@ DECLARE_stats(spill_manager_throughput_mb); /// Object Store DECLARE_stats(object_store_memory); -DECLARE_stats(object_store_dist); /// Memory Manager DECLARE_stats(memory_manager_worker_eviction_total);