Skip to content

Commit 219fb67

Browse files
authored
[core] Simplify local/global gc logic (#58671)
Signed-off-by: dayshah <[email protected]>
1 parent 24dc069 commit 219fb67

File tree

18 files changed

+132
-234
lines changed

18 files changed

+132
-234
lines changed

python/ray/_private/gc_collect_manager.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,14 @@ class PythonGCThread(threading.Thread):
1111
"""A background thread that triggers Python garbage collection.
1212
1313
This thread waits for GC events from CoreWorker and triggers `gc.collect()` when
14-
requested, ensuring that collections are spaced out by at least
15-
`min_interval_s` seconds."""
14+
when requested."""
1615

17-
def __init__(
18-
self, *, min_interval_s: int = 5, gc_collect_func: Optional[Callable] = None
19-
):
16+
def __init__(self, *, gc_collect_func: Optional[Callable] = None):
2017
logger.debug("Starting Python GC thread")
2118
super().__init__(name="PythonGCThread", daemon=True)
2219
self._should_exit = False
23-
self._last_gc_time = float("-inf")
24-
self._min_gc_interval = min_interval_s
2520
self._gc_event = threading.Event()
26-
# Set the gc_collect_func for UT, defaulting to gc.collect if None
21+
# Sets the gc_collect_func (only for testing), defaults to gc.collect
2722
self._gc_collect_func = gc_collect_func or gc.collect
2823

2924
def trigger_gc(self) -> None:
@@ -37,26 +32,17 @@ def run(self):
3732
if self._should_exit:
3833
break
3934

40-
time_since_last_gc = time.monotonic() - self._last_gc_time
41-
if time_since_last_gc < self._min_gc_interval:
42-
logger.debug(
43-
f"Skipping GC, only {time_since_last_gc:.2f}s since last GC"
44-
)
45-
continue
46-
4735
try:
4836
start = time.monotonic()
4937
num_freed = self._gc_collect_func()
50-
self._last_gc_time = time.monotonic()
5138
if num_freed > 0:
5239
logger.debug(
5340
"gc.collect() freed {} refs in {} seconds".format(
54-
num_freed, self._last_gc_time - start
41+
num_freed, time.monotonic() - start
5542
)
5643
)
5744
except Exception as e:
5845
logger.error(f"Error during GC: {e}")
59-
self._last_gc_time = time.monotonic()
6046

6147
def stop(self):
6248
logger.debug("Stopping Python GC thread")

python/ray/_private/ray_constants.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,5 +583,3 @@ def gcs_actor_scheduling_enabled():
583583
RDT_FETCH_FAIL_TIMEOUT_SECONDS = (
584584
env_integer("RAY_rdt_fetch_fail_timeout_milliseconds", 60000) / 1000
585585
)
586-
587-
RAY_GC_MIN_COLLECT_INTERVAL = env_float("RAY_GC_MIN_COLLECT_INTERVAL_S", 5)

python/ray/_raylet.pyx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,7 +2279,7 @@ cdef CRayStatus check_signals() nogil:
22792279
return CRayStatus.OK()
22802280

22812281

2282-
cdef void gc_collect(c_bool triggered_by_global_gc) nogil:
2282+
cdef void gc_collect() nogil:
22832283
with gil:
22842284
if RayConfig.instance().start_python_gc_manager_thread():
22852285
start = time.perf_counter()
@@ -2706,7 +2706,7 @@ cdef class CoreWorker:
27062706

27072707
self._gc_thread = None
27082708
if RayConfig.instance().start_python_gc_manager_thread():
2709-
self._gc_thread = PythonGCThread(min_interval_s=ray_constants.RAY_GC_MIN_COLLECT_INTERVAL)
2709+
self._gc_thread = PythonGCThread()
27102710
self._gc_thread.start()
27112711

27122712
def shutdown_driver(self):

python/ray/autoscaler/v2/tests/test_e2e.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ def foo():
498498
"raylet_report_resources_period_milliseconds": 10000,
499499
"global_gc_min_interval_s": 1,
500500
"local_gc_interval_s": 1,
501-
"high_plasma_storage_usage": 0.2,
501+
"plasma_store_usage_trigger_gc_threshold": 0.2,
502502
"raylet_check_gc_period_milliseconds": 10,
503503
},
504504
)

python/ray/includes/libcoreworker.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
409409
(void(const CObjectID &) nogil) free_actor_object_callback
410410
(function[void()]() nogil) initialize_thread_callback
411411
(CRayStatus() nogil) check_signals
412-
(void(c_bool) nogil) gc_collect
412+
(void() nogil) gc_collect
413413
(c_vector[c_string](
414414
const c_vector[CObjectReference] &) nogil) spill_objects
415415
(int64_t(

python/ray/tests/test_global_gc.py

Lines changed: 40 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from ray._common.test_utils import wait_for_condition
1515
from ray._private.gc_collect_manager import PythonGCThread
1616
from ray._private.internal_api import global_gc
17-
from ray._private.ray_constants import RAY_GC_MIN_COLLECT_INTERVAL
1817

1918
logger = logging.getLogger(__name__)
2019

@@ -248,107 +247,69 @@ def make_garbage(self):
248247
def has_garbage(self):
249248
return self.garbage() is not None
250249

251-
try:
252-
gc.disable()
253-
254-
# 1) Test GC behavior for the local driver.
255-
256-
# 1a) Wait for the first GC to happen to avoid timing issues.
257-
local_ref = weakref.ref(ObjectWithCyclicRef())
258-
wait_for_condition(lambda: local_ref() is None, retry_interval_ms=10)
250+
gc.disable()
259251

260-
# 1b) Check that GC *is not* called within the min interval.
261-
local_ref = weakref.ref(ObjectWithCyclicRef())
262-
time.sleep(RAY_GC_MIN_COLLECT_INTERVAL / 2)
263-
assert local_ref() is not None
252+
# 1) Test GC behavior for the local driver.
264253

265-
# 1c) Check that GC *is* called after the min interval.
266-
wait_for_condition(
267-
lambda: local_ref() is None,
268-
timeout=RAY_GC_MIN_COLLECT_INTERVAL * 2,
269-
)
254+
# 1a) Wait for the first GC to happen to avoid timing issues.
255+
local_ref = weakref.ref(ObjectWithCyclicRef())
256+
wait_for_condition(lambda: local_ref() is None, retry_interval_ms=10)
270257

271-
# 2) Test GC behavior for a remote actor.
272-
a = GarbageHolder.remote()
258+
# 1b) Check that GC is called.
259+
wait_for_condition(
260+
lambda: local_ref() is None,
261+
timeout=5,
262+
)
273263

274-
# 2a) Wait for the first GC to happen to avoid timing issues.
275-
ray.get(a.make_garbage.remote())
276-
wait_for_condition(
277-
lambda: not ray.get(a.has_garbage.remote()), retry_interval_ms=10
278-
)
264+
# 2) Test GC behavior for a remote actor.
265+
a = GarbageHolder.remote()
279266

280-
# 2b) Check that GC *is not* called within the min interval.
281-
ray.get(a.make_garbage.remote())
282-
time.sleep(RAY_GC_MIN_COLLECT_INTERVAL / 2)
283-
assert ray.get(a.has_garbage.remote())
267+
# 2a) Wait for the first GC to happen to avoid timing issues.
268+
ray.get(a.make_garbage.remote())
269+
wait_for_condition(
270+
lambda: not ray.get(a.has_garbage.remote()), retry_interval_ms=10
271+
)
284272

285-
# 2c) Check that GC *is* called after the min interval.
286-
wait_for_condition(
287-
lambda: not ray.get(a.has_garbage.remote()),
288-
timeout=RAY_GC_MIN_COLLECT_INTERVAL * 2,
289-
)
273+
# 2b) Check that GC is called.
274+
wait_for_condition(
275+
lambda: not ray.get(a.has_garbage.remote()),
276+
timeout=5,
277+
)
290278

291-
finally:
292-
gc.enable()
279+
gc.enable()
293280

294281

295282
def test_gc_manager_thread_basic_functionality():
296283
mock_gc_collect = Mock(return_value=10)
297284

298-
gc_thread = PythonGCThread(min_interval_s=1, gc_collect_func=mock_gc_collect)
285+
gc_thread = PythonGCThread(gc_collect_func=mock_gc_collect)
286+
gc_thread.start()
287+
assert gc_thread.is_alive()
299288

300-
try:
301-
gc_thread.start()
302-
assert gc_thread.is_alive()
303-
304-
gc_thread.trigger_gc()
289+
gc_thread.trigger_gc()
290+
wait_for_condition(lambda: mock_gc_collect.call_count == 1, timeout=2)
305291

306-
wait_for_condition(lambda: mock_gc_collect.call_count == 1, timeout=2)
292+
gc_thread.trigger_gc()
293+
wait_for_condition(lambda: mock_gc_collect.call_count == 2, timeout=2)
307294

308-
mock_gc_collect.assert_called_once()
309-
310-
finally:
311-
gc_thread.stop()
312-
assert not gc_thread.is_alive()
313-
314-
315-
def test_gc_manager_thread_min_interval_throttling():
316-
mock_gc_collect = Mock(return_value=5)
317-
318-
gc_thread = PythonGCThread(min_interval_s=2, gc_collect_func=mock_gc_collect)
319-
320-
try:
321-
gc_thread.start()
322-
323-
for _ in range(3):
324-
gc_thread.trigger_gc()
325-
time.sleep(1)
326-
327-
wait_for_condition(lambda: mock_gc_collect.call_count == 2, timeout=2)
328-
329-
assert mock_gc_collect.call_count == 2
330-
331-
finally:
332-
gc_thread.stop()
295+
gc_thread.stop()
296+
assert not gc_thread.is_alive()
333297

334298

335299
def test_gc_manager_thread_exception_handling():
336300
mock_gc_collect = Mock(side_effect=RuntimeError("GC failed"))
337301

338-
gc_thread = PythonGCThread(min_interval_s=5, gc_collect_func=mock_gc_collect)
302+
gc_thread = PythonGCThread(gc_collect_func=mock_gc_collect)
303+
gc_thread.start()
339304

340-
try:
341-
gc_thread.start()
342-
343-
for _ in range(3):
344-
gc_thread.trigger_gc()
345-
time.sleep(0.1)
305+
for _ in range(3):
306+
gc_thread.trigger_gc()
307+
time.sleep(0.1)
346308

347-
assert gc_thread.is_alive()
348-
mock_gc_collect.assert_called_once()
309+
assert gc_thread.is_alive()
310+
assert mock_gc_collect.call_count == 3
349311

350-
finally:
351-
gc_thread.stop()
312+
gc_thread.stop()
352313

353314

354315
if __name__ == "__main__":

src/ray/common/ray_config_def.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ RAY_CONFIG(std::string, gcs_storage, "memory")
396396
RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10)
397397

398398
/// The threshold to trigger a global gc
399-
RAY_CONFIG(double, high_plasma_storage_usage, 0.7)
399+
RAY_CONFIG(double, plasma_store_usage_trigger_gc_threshold, 0.7)
400400

401401
/// The amount of time between automatic local Python GC triggers.
402402
RAY_CONFIG(uint64_t, local_gc_interval_s, 90 * 60)
@@ -405,7 +405,7 @@ RAY_CONFIG(uint64_t, local_gc_interval_s, 90 * 60)
405405
RAY_CONFIG(uint64_t, local_gc_min_interval_s, 10)
406406

407407
/// The min amount of time between triggering global_gc in raylet. This only applies
408-
/// to global GCs triggered due to high_plasma_storage_usage.
408+
/// to global GCs triggered due to plasma_store_usage_trigger_gc_threshold.
409409
RAY_CONFIG(uint64_t, global_gc_min_interval_s, 30)
410410

411411
/// Duration to wait between retries for failed tasks.

src/ray/core_worker/core_worker.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4183,7 +4183,7 @@ void CoreWorker::HandleLocalGC(rpc::LocalGCRequest request,
41834183
rpc::LocalGCReply *reply,
41844184
rpc::SendReplyCallback send_reply_callback) {
41854185
if (options_.gc_collect != nullptr) {
4186-
options_.gc_collect(request.triggered_by_global_gc());
4186+
options_.gc_collect();
41874187
send_reply_callback(Status::OK(), nullptr, nullptr);
41884188
} else {
41894189
send_reply_callback(

src/ray/core_worker/core_worker_options.h

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,10 @@ struct CoreWorkerOptions {
7474
const rpc::TensorTransport &tensor_transport)>;
7575

7676
CoreWorkerOptions()
77-
: store_socket(""),
78-
raylet_socket(""),
79-
enable_logging(false),
80-
log_dir(""),
77+
: enable_logging(false),
8178
install_failure_signal_handler(false),
8279
interactive(false),
83-
node_ip_address(""),
8480
node_manager_port(0),
85-
driver_name(""),
8681
task_execution_callback(nullptr),
8782
free_actor_object_callback(nullptr),
8883
check_signals(nullptr),
@@ -97,15 +92,11 @@ struct CoreWorkerOptions {
9792
cancel_async_actor_task(nullptr),
9893
actor_shutdown_callback(nullptr),
9994
is_local_mode(false),
100-
serialized_job_config(""),
10195
metrics_agent_port(-1),
10296
runtime_env_hash(0),
10397
cluster_id(ClusterID::Nil()),
104-
session_name(""),
105-
entrypoint(""),
10698
worker_launch_time_ms(-1),
107-
worker_launched_time_ms(-1),
108-
debug_source("") {}
99+
worker_launched_time_ms(-1) {}
109100

110101
/// Type of this worker (i.e., DRIVER or WORKER).
111102
WorkerType worker_type;
@@ -150,7 +141,7 @@ struct CoreWorkerOptions {
150141
/// Application-language callback to trigger garbage collection in the language
151142
/// runtime. This is required to free distributed references that may otherwise
152143
/// be held up in garbage objects.
153-
std::function<void(bool triggered_by_global_gc)> gc_collect;
144+
std::function<void()> gc_collect;
154145
/// Application-language callback to spill objects to external storage.
155146
std::function<std::vector<std::string>(const std::vector<rpc::ObjectReference> &)>
156147
spill_objects;

src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,16 +261,12 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env,
261261
return Status::OK();
262262
};
263263

264-
auto gc_collect = [](bool triggered_by_global_gc) {
264+
auto gc_collect = []() {
265265
// A Java worker process usually contains more than one worker.
266266
// A LocalGC request is likely to be received by multiple workers in a short time.
267267
// Here we ensure that the 1 second interval of `System.gc()` execution is
268268
// guaranteed no matter how frequent the requests are received and how many workers
269269
// the process has.
270-
if (!triggered_by_global_gc) {
271-
RAY_LOG(DEBUG) << "Skipping non-global GC.";
272-
return;
273-
}
274270

275271
static absl::Mutex mutex;
276272
static int64_t last_gc_time_ms = 0;

0 commit comments

Comments
 (0)