77from cpython.exc cimport PyErr_CheckSignals
88
99import asyncio
10- from functools import wraps
1110import gc
1211import inspect
1312import logging
@@ -16,13 +15,11 @@ import io
1615import os
1716import pickle
1817import random
19- import signal
2018import sys
2119import threading
2220import time
2321import traceback
2422import _thread
25- import typing
2623from typing import (
2724 Any,
2825 AsyncGenerator,
@@ -43,7 +40,6 @@ import collections
4340from libc.stdint cimport (
4441 int32_t,
4542 int64_t,
46- INT64_MAX,
4743 uint64_t,
4844 uint8_t,
4945)
@@ -96,7 +92,6 @@ from ray.includes.common cimport (
9692 CTaskArgByValue,
9793 CTaskType,
9894 CPlacementStrategy,
99- CPythonFunction,
10095 CSchedulingStrategy,
10196 CPlacementGroupSchedulingStrategy,
10297 CNodeAffinitySchedulingStrategy,
@@ -105,9 +100,6 @@ from ray.includes.common cimport (
105100 CLabelMatchExpression,
106101 CLabelIn,
107102 CLabelNotIn,
108- CLabelExists,
109- CLabelDoesNotExist,
110- CLabelOperator,
111103 CRayFunction,
112104 CWorkerType,
113105 CJobConfig,
@@ -130,10 +122,8 @@ from ray.includes.common cimport (
130122 PLACEMENT_STRATEGY_SPREAD,
131123 PLACEMENT_STRATEGY_STRICT_PACK,
132124 PLACEMENT_STRATEGY_STRICT_SPREAD,
133- CChannelType,
134125 RAY_ERROR_INFO_CHANNEL,
135126 RAY_LOG_CHANNEL,
136- GCS_ACTOR_CHANNEL,
137127 PythonGetLogBatchLines,
138128 WORKER_EXIT_TYPE_USER_ERROR,
139129 WORKER_EXIT_TYPE_SYSTEM_ERROR,
@@ -164,7 +154,6 @@ from ray.includes.libcoreworker cimport (
164154 CTaskOptions,
165155 ResourceMappingType,
166156 CFiberEvent,
167- CActorHandle,
168157 CGeneratorBackpressureWaiter,
169158 CReaderRefInfo,
170159)
@@ -233,7 +222,6 @@ from ray.util.scheduling_strategies import (
233222import ray._private.ray_constants as ray_constants
234223import ray.cloudpickle as ray_pickle
235224from ray.core.generated.common_pb2 import ActorDiedErrorContext
236- from ray.core.generated.gcs_pb2 import JobTableData, GcsNodeInfo, ActorTableData
237225from ray.core.generated.gcs_service_pb2 import GetAllResourceUsageReply
238226from ray._private.async_compat import (
239227 sync_to_async,
@@ -243,8 +231,6 @@ from ray._private.async_compat import (
243231)
244232from ray._private.client_mode_hook import disable_client_hook
245233import ray.core.generated.common_pb2 as common_pb2
246- import ray._private.memory_monitor as memory_monitor
247- import ray._private.profiling as profiling
248234from ray._common.utils import decode
249235from ray._private.utils import DeferSigint
250236from ray._private.object_ref_generator import DynamicObjectRefGenerator
@@ -322,9 +308,7 @@ class ObjectRefGenerator:
322308 self .worker.check_connected()
323309 assert hasattr (worker, " core_worker" )
324310
325- """
326- Public APIs
327- """
311+ # Public APIs
328312
329313 def __iter__ (self ) -> "ObjectRefGenerator":
330314 return self
@@ -438,9 +422,7 @@ class ObjectRefGenerator:
438422 else:
439423 return False
440424
441- """
442- Private APIs
443- """
425+ # Private APIs
444426
445427 def _get_next_ref(self ) -> ObjectRef:
446428 """Return the next reference from a generator.
@@ -533,7 +515,7 @@ class ObjectRefGenerator:
533515
534516 if not is_ready:
535517 # TODO(swang): Avoid fetching the value.
536- ready , unready = await asyncio.wait(
518+ _ready , unready = await asyncio.wait(
537519 [asyncio.create_task(self ._suppress_exceptions(ref))],
538520 timeout = timeout_s
539521 )
@@ -861,7 +843,6 @@ cdef int prepare_resources(
861843 dict resource_dict,
862844 unordered_map[c_string, double ] * resource_map) except - 1 :
863845 cdef:
864- c_string resource_name
865846 list unit_resources
866847
867848 if resource_dict is None :
@@ -892,7 +873,6 @@ cdef int prepare_resources(
892873cdef c_vector[CFunctionDescriptor] prepare_function_descriptors(pyfd_list):
893874 cdef:
894875 c_vector[CFunctionDescriptor] fd_list
895- CRayFunction ray_function
896876
897877 fd_list.reserve(len (pyfd_list))
898878 for pyfd in pyfd_list:
@@ -1856,7 +1836,6 @@ cdef void execute_task(
18561836 JobID job_id = core_worker.get_current_job_id()
18571837 TaskID task_id = core_worker.get_current_task_id()
18581838 uint64_t attempt_number = core_worker.get_current_task_attempt_number()
1859- c_vector[shared_ptr[CRayObject]] dynamic_return_ptrs
18601839
18611840 # Helper method used to exit current asyncio actor.
18621841 # This is called when a KeyboardInterrupt is received by the main thread.
@@ -1894,7 +1873,7 @@ cdef void execute_task(
18941873 next_title = f" ray::{class_name}"
18951874
18961875 def function_executor (*arguments , **kwarguments ):
1897- function = execution_info.function
1876+ func = execution_info.function
18981877
18991878 if core_worker.current_actor_is_asyncio():
19001879 if not has_async_methods(actor.__class__ ):
@@ -1910,15 +1889,15 @@ cdef void execute_task(
19101889 )
19111890 )
19121891
1913- if is_async_func(function .method):
1914- async_function = function
1892+ if is_async_func(func .method):
1893+ async_function = func
19151894 else :
19161895 # Just execute the method if it's ray internal method.
1917- if function .name.startswith(" __ray" ):
1918- return function (actor, * arguments, ** kwarguments)
1919- async_function = sync_to_async(function )
1896+ if func .name.startswith(" __ray" ):
1897+ return func (actor, * arguments, ** kwarguments)
1898+ async_function = sync_to_async(func )
19201899
1921- if inspect.isasyncgenfunction(function .method):
1900+ if inspect.isasyncgenfunction(func .method):
19221901 # The coroutine will be handled separately by
19231902 # execute_dynamic_generator_and_store_task_outputs
19241903 return async_function(actor, * arguments, ** kwarguments)
@@ -1929,7 +1908,7 @@ cdef void execute_task(
19291908 task_name = task_name, func_args = (actor, * arguments),
19301909 func_kwargs = kwarguments)
19311910
1932- return function (actor, * arguments, ** kwarguments)
1911+ return func (actor, * arguments, ** kwarguments)
19331912
19341913 with core_worker.profile_event(b" task::" + name, extra_data = extra_data), \
19351914 ray._private.worker._changeproctitle(title, next_title):
@@ -2226,7 +2205,6 @@ cdef execute_task_with_cancellation_handler(
22262205 CoreWorker core_worker = worker.core_worker
22272206 JobID job_id = core_worker.get_current_job_id()
22282207 TaskID task_id = core_worker.get_current_task_id()
2229- c_vector[shared_ptr[CRayObject]] dynamic_return_ptrs
22302208
22312209 task_name = name.decode(" utf-8" )
22322210 title = f" ray::{task_name}"
@@ -3239,7 +3217,6 @@ cdef class CoreWorker:
32393217 cdef:
32403218 CObjectID c_object_id = object_ref.native()
32413219 shared_ptr[CBuffer] data
3242- unique_ptr[CAddress] null_owner_address
32433220 uint64_t data_size = serialized_object.total_bytes
32443221 int64_t c_num_readers = num_readers
32453222 int64_t c_timeout_ms = timeout_ms
@@ -4453,7 +4430,6 @@ cdef class CoreWorker:
44534430
44544431 cdef:
44554432 CConcurrencyGroup c_concurrency_group
4456- c_vector[CFunctionDescriptor] c_function_descriptors
44574433
44584434 self .cgname_to_eventloop_dict = {}
44594435 self .fd_to_cgname_dict = {}
@@ -4568,7 +4544,7 @@ cdef class CoreWorker:
45684544 # transport with max_concurrency flag.
45694545 increase_recursion_limit()
45704546
4571- eventloop, async_thread = self .get_event_loop(
4547+ eventloop, _ = self .get_event_loop(
45724548 function_descriptor, specified_cgname)
45734549
45744550 async def async_func():
@@ -4911,7 +4887,7 @@ cdef void async_callback(shared_ptr[CRayObject] obj,
49114887 except Exception :
49124888 # Only log the error here because this callback is called from Cpp
49134889 # and Cython will ignore the exception anyway
4914- logger.exception(f " failed to run async callback (user func)" )
4890+ logger.exception(" failed to run async callback (user func)" )
49154891 finally :
49164892 # NOTE: we manually increment the Python reference count of the callback when
49174893 # registering it in the core worker, so we must decrement here to avoid a leak.
0 commit comments