Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a4a046d
[core] Make DrainRaylet + ShutdownRaylet Fault Tolerant (#57861)
Sparks0219 Oct 20, 2025
1c48cf1
[release] Group all hello world tests together (#57920)
khluu Oct 20, 2025
6a10e11
[Data] Callback-based stat computation for preprocessors and ValueCou…
cem-anyscale Oct 20, 2025
f4d81a3
[RLlib] Add Prometheus metrics for various components and improve log…
kamil-kaczmarek Oct 20, 2025
adef7b5
[core] Fix "RayEventRecorder::StartExportingEvents() should be called…
can-anyscale Oct 20, 2025
e61b8d8
[Data] Support serializing zero-length numpy arrays (#57858)
chriso Oct 20, 2025
0d0466b
[ci] fix postmerge tests that require credentials (#57915)
aslonnie Oct 20, 2025
c6ba247
[core] Make ReleaseUnusedBundles Fault Tolerant (#57786)
Sparks0219 Oct 20, 2025
3477a0f
[doc] remove "Note that" in dataset.py documentation (#57884)
MaxVanDijck Oct 20, 2025
2d36487
[codeowners] Reorder `CODEOWNERS` for resolution order + organization…
eicherseiji Oct 20, 2025
88f6e3f
[ci] change macos bisect job to use arm64 (#57914)
aslonnie Oct 20, 2025
50eb1ed
Merge branch 'master' into kk/rllib-prometheus-metrics
kamil-kaczmarek Oct 20, 2025
58e56f0
Merge branch 'master' into kk/rllib-prometheus-metrics
kamil-kaczmarek Oct 20, 2025
4a61803
try docs build
kamil-kaczmarek Oct 21, 2025
7a4b965
Merge remote-tracking branch 'origin/kk/rllib-prometheus-metrics' int…
kamil-kaczmarek Oct 21, 2025
29f4823
try docs build (2)
kamil-kaczmarek Oct 21, 2025
e394756
fix
kamil-kaczmarek Oct 21, 2025
5351fc3
Merge branch 'master' into kk/rllib-prometheus-metrics
kamil-kaczmarek Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,167 changes: 691 additions & 476 deletions rllib/algorithms/algorithm.py

Large diffs are not rendered by default.

102 changes: 70 additions & 32 deletions rllib/algorithms/appo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
Luo et al. 2020
https://arxiv.org/pdf/1912.00167
"""
from collections import deque
import threading
import time
from collections import deque

import numpy as np

from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.utils.annotations import OldAPIStack

from ray.rllib.utils.metrics.ray_metrics import (
DEFAULT_HISTOGRAM_BOUNDARIES_SHORT_EVENTS,
TimerAndPrometheusLogger,
)
from ray.util.metrics import Counter, Histogram

POLICY_SCOPE = "func"
TARGET_POLICY_SCOPE = "target_func"
Expand Down Expand Up @@ -43,43 +47,77 @@ def __init__(self, num_batches: int, iterations_per_batch: int):

self._rng = np.random.default_rng()

# Ray metrics
self._metrics_circular_buffer_add_time = Histogram(
name="rllib_utils_circular_buffer_add_time",
description="Time spent in CircularBuffer.add()",
boundaries=DEFAULT_HISTOGRAM_BOUNDARIES_SHORT_EVENTS,
tag_keys=("rllib",),
)
self._metrics_circular_buffer_add_time.set_default_tags(
{"rllib": self.__class__.__name__}
)

self._metrics_circular_buffer_add_ts_dropped = Counter(
name="rllib_utils_circular_buffer_add_ts_dropped_counter",
description="Total number of env steps dropped by the CircularBuffer.",
tag_keys=("rllib",),
)
self._metrics_circular_buffer_add_ts_dropped.set_default_tags(
{"rllib": self.__class__.__name__}
)

self._metrics_circular_buffer_sample_time = Histogram(
name="rllib_utils_circular_buffer_sample_time",
description="Time spent in CircularBuffer.sample()",
boundaries=DEFAULT_HISTOGRAM_BOUNDARIES_SHORT_EVENTS,
tag_keys=("rllib",),
)
self._metrics_circular_buffer_sample_time.set_default_tags(
{"rllib": self.__class__.__name__}
)

def add(self, batch):
# Add buffer and k=0 information to the deque.
with self._lock:
dropped_entry = self._buffer[0]
for _ in range(self.iterations_per_batch):
self._buffer.append(batch)
self._indices.add(self._offset)
self._indices.discard(self._offset - self._NxK)
self._offset += 1
self._num_added += 1

# A valid entry (w/ a batch whose k has not been reach K yet) was dropped.
dropped_ts = 0
if dropped_entry is not None:
dropped_ts = dropped_entry.env_steps()
with TimerAndPrometheusLogger(self._metrics_circular_buffer_add_time):
with self._lock:
dropped_entry = self._buffer[0]
for _ in range(self.iterations_per_batch):
self._buffer.append(batch)
self._indices.add(self._offset)
self._indices.discard(self._offset - self._NxK)
self._offset += 1
self._num_added += 1

# A valid entry (w/ a batch whose k has not been reach K yet) was dropped.
dropped_ts = 0
if dropped_entry is not None:
dropped_ts = dropped_entry.env_steps()
if dropped_ts > 0:
self._metrics_circular_buffer_add_ts_dropped.inc(value=dropped_ts)

return dropped_ts

def sample(self):
# Only initially, the buffer may be empty -> Just wait for some time.
while len(self) == 0:
time.sleep(0.0001)

# Sample a random buffer index.
with self._lock:
idx = self._rng.choice(list(self._indices))
actual_buffer_idx = idx - self._offset + self._NxK
batch = self._buffer[actual_buffer_idx]
assert batch is not None, (
idx,
actual_buffer_idx,
self._offset,
self._indices,
[b is None for b in self._buffer],
)
self._buffer[actual_buffer_idx] = None
self._indices.discard(idx)
with TimerAndPrometheusLogger(self._metrics_circular_buffer_sample_time):
while len(self) == 0:
time.sleep(0.0001)

# Sample a random buffer index.
with self._lock:
idx = self._rng.choice(list(self._indices))
actual_buffer_idx = idx - self._offset + self._NxK
batch = self._buffer[actual_buffer_idx]
assert batch is not None, (
idx,
actual_buffer_idx,
self._offset,
self._indices,
[b is None for b in self._buffer],
)
self._buffer[actual_buffer_idx] = None
self._indices.discard(idx)

# Return the sampled batch.
return batch
Expand Down
Loading