Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ci/long_running_tests/workloads/many_actor_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ def method(self):


actors = [
Actor._remote([], {}, num_cpus=0.1, resources={str(i % num_nodes): 0.1})
for i in range(num_nodes * 5)
Actor.options(num_cpus=0.1, resources={
str(i % num_nodes): 0.1
}).remote() for i in range(num_nodes * 5)
]

iteration = 0
Expand Down
17 changes: 7 additions & 10 deletions ci/long_running_tests/workloads/many_drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ def method(self):
for _ in range(5):
for i in range(num_nodes):
assert (ray.get(
f._remote(args=[], kwargs={{}}, resources={{str(i): 1}})) == 1)
actor = Actor._remote(args=[], kwargs={{}}, resources={{str(i): 1}})
f.options(resources={{str(i): 1}})) == 1).remote()
actor = Actor.options(resources={{str(i): 1}}).remote()
assert ray.get(actor.method.remote()) == 1

print("success")
Expand All @@ -73,9 +73,9 @@ def run_driver():

iteration = 0
running_ids = [
run_driver._remote(
args=[], kwargs={}, num_cpus=0, resources={str(i): 0.01})
for i in range(num_nodes)
run_driver.options(num_cpus=0, resources={
str(i): 0.01
}).remote() for i in range(num_nodes)
]
start_time = time.time()
previous_time = start_time
Expand All @@ -85,11 +85,8 @@ def run_driver():
ray.get(ready_id)

running_ids.append(
run_driver._remote(
args=[],
kwargs={},
num_cpus=0,
resources={str(iteration % num_nodes): 0.01}))
run_driver.options(
num_cpus=0, resources={str(iteration % num_nodes): 0.01})).remote()

new_time = time.time()
print("Iteration {}:\n"
Expand Down
5 changes: 3 additions & 2 deletions ci/long_running_tests/workloads/many_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ def f(*xs):
while True:
for _ in range(50):
new_constrained_ids = [
f._remote(args=[*ids], resources={str(i % num_nodes): 1})
for i in range(25)
f.options(resources={
str(i % num_nodes): 1
}).remote(args=[*ids]) for i in range(25)
]
new_unconstrained_ids = [f.remote(*ids) for _ in range(25)]
ids = new_constrained_ids + new_unconstrained_ids
Expand Down
10 changes: 6 additions & 4 deletions ci/long_running_tests/workloads/many_tasks_serialized_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ def f(*xs):
while True:
for _ in range(50):
new_constrained_ids = [
f._remote(args=ids, resources={str(i % num_nodes): 1})
for i in range(25)
f.options(resources={
str(i % num_nodes): 1
}).remote(ids) for i in range(25)
]
new_unconstrained_ids = [f.remote(*ids) for _ in range(25)]
ids = new_constrained_ids + new_unconstrained_ids
Expand All @@ -79,8 +80,9 @@ def f(*xs):
for i in range(num_nodes):
for _ in range(10):
[
churn._remote(args=[], resources={str(i % num_nodes): 1})
for _ in range(10)
churn.options(resources={
str(i % num_nodes): 1
}).remote() for _ in range(10)
]

# Make sure that the objects are still available.
Expand Down
10 changes: 5 additions & 5 deletions ci/performance_tests/test_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def warm_up_cluster(num_nodes, object_store_memory):
for i in range(num_nodes):
for _ in range(num_objects):
object_ids += [
create_array._remote(args=[size], resources={str(i): 1})
create_array.options(args=[size], resources={str(i): 1})
]
size = size // 2
num_objects = min(num_objects * 2, 1000)
Expand All @@ -164,7 +164,7 @@ def run_multiple_trials(f, num_trials):
def test_tasks(num_nodes):
def one_thousand_serial_tasks_local_node():
for _ in range(1000):
ray.get(no_op._remote(resources={"0": 1}))
ray.get(no_op.options(resources={"0": 1}))

durations = run_multiple_trials(one_thousand_serial_tasks_local_node, 10)
logger.warning(
Expand All @@ -176,7 +176,7 @@ def one_thousand_serial_tasks_local_node():

def one_thousand_serial_tasks_remote_node():
for _ in range(1000):
ray.get(no_op._remote(resources={"1": 1}))
ray.get(no_op.options(resources={"1": 1}))

durations = run_multiple_trials(one_thousand_serial_tasks_remote_node, 10)
logger.warning(
Expand All @@ -187,7 +187,7 @@ def one_thousand_serial_tasks_remote_node():
np.std(durations))

def ten_thousand_parallel_tasks_local():
ray.get([no_op._remote(resources={"0": 1}) for _ in range(10000)])
ray.get([no_op.options(resources={"0": 1}) for _ in range(10000)])

durations = run_multiple_trials(ten_thousand_parallel_tasks_local, 5)
logger.warning(
Expand All @@ -199,7 +199,7 @@ def ten_thousand_parallel_tasks_local():

def ten_thousand_parallel_tasks_load_balanced():
ray.get([
no_op._remote(resources={str(i % num_nodes): 1})
no_op.options(resources={str(i % num_nodes): 1})
for i in range(10000)
])

Expand Down
26 changes: 21 additions & 5 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ray import ActorID, ActorClassID, Language
from ray._raylet import PythonFunctionDescriptor
from ray import cross_language
from ray.utils import deprecation_warning

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -100,9 +101,16 @@ def __call__(self, *args, **kwargs):
self._method_name))

def remote(self, *args, **kwargs):
return self._remote(args, kwargs)
return self._remote(args, kwargs, internal_called=True)

def _remote(self,
args=None,
kwargs=None,
num_return_vals=None,
internal_called=False):
if not internal_called:
deprecation_warning("_remote", new="remote")

def _remote(self, args=None, kwargs=None, num_return_vals=None):
if num_return_vals is None:
num_return_vals = self._num_return_vals

Expand Down Expand Up @@ -376,7 +384,7 @@ def remote(self, *args, **kwargs):
Returns:
A handle to the newly created actor.
"""
return self._remote(args=args, kwargs=kwargs)
return self._remote(args=args, kwargs=kwargs, internal_called=True)

def options(self, **options):
"""Convenience method for creating an actor with options.
Expand All @@ -394,7 +402,8 @@ def options(self, **options):

class ActorOptionWrapper:
def remote(self, *args, **kwargs):
return actor_cls._remote(args=args, kwargs=kwargs, **options)
return actor_cls._remote(
args=args, kwargs=kwargs, internal_called=True, **options)

return ActorOptionWrapper()

Expand All @@ -409,7 +418,8 @@ def _remote(self,
is_direct_call=None,
max_concurrency=None,
name=None,
detached=False):
detached=False,
internal_called=False):
"""Create an actor.

This method allows more flexibility than the remote method because
Expand Down Expand Up @@ -439,6 +449,12 @@ def _remote(self,
Returns:
A handle to the newly created actor.
"""
if not internal_called:
# NOTE: We still want this method, we just dont want user to
# call this directly outside of this class. In the upcoming
# release, simply add `error=True` to this call will raise an
# Exception.
deprecation_warning("_remote", new=".options")
if args is None:
args = []
if kwargs is None:
Expand Down
4 changes: 2 additions & 2 deletions python/ray/ray_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def actor_concurrent():

n = 5000
n_cpu = multiprocessing.cpu_count() // 2
actors = [Actor._remote() for _ in range(n_cpu)]
actors = [Actor.remote() for _ in range(n_cpu)]
client = Client.remote(actors)

def actor_async_direct():
Expand All @@ -181,7 +181,7 @@ def actor_multi2():
timeit("n:n actor calls async", actor_multi2, m * n)

n = 1000
actors = [Actor._remote() for _ in range(n_cpu)]
actors = [Actor.remote() for _ in range(n_cpu)]
clients = [Client.remote(a) for a in actors]

def actor_multi2_direct_arg():
Expand Down
19 changes: 15 additions & 4 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ray import cloudpickle as pickle
from ray._raylet import PythonFunctionDescriptor
from ray import cross_language, Language
from ray.utils import deprecation_warning
import ray.signature

# Default parameters for remote functions.
Expand Down Expand Up @@ -92,7 +93,7 @@ def __init__(self, language, function, function_descriptor, num_cpus,
# Override task.remote's signature and docstring
@wraps(function)
def _remote_proxy(*args, **kwargs):
return self._remote(args=args, kwargs=kwargs)
return self._remote(args=args, kwargs=kwargs, internal_called=True)

self.remote = _remote_proxy
Copy link
Contributor Author

@allenanswerzq allenanswerzq Mar 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edoakes If we remove _remote completely, then this wrapper here that used to keep function's signature also needs to be removed. for detail see #4985. I tried and failed to find a better way to remove the _remote method and also keep the function signature at the same time. Note that we must keep the function signature otherwise it will raise an exception at this place:

ray/python/ray/signature.py

Lines 112 to 118 in 6ce8b63

reconstructed_signature = inspect.Signature(
parameters=signature_parameters)
try:
reconstructed_signature.bind(*args, **kwargs)
except TypeError as exc:
raise TypeError(str(exc))
list_args = []


Expand All @@ -116,7 +117,8 @@ def _submit(self,
num_return_vals=num_return_vals,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources)
resources=resources,
internal_called=True)

def options(self, **options):
"""Convenience method for executing a task with options.
Expand All @@ -134,7 +136,8 @@ def options(self, **options):

class FuncWrapper:
def remote(self, *args, **kwargs):
return func_cls._remote(args=args, kwargs=kwargs, **options)
return func_cls._remote(
args=args, kwargs=kwargs, internal_called=True, **options)

return FuncWrapper()

Expand All @@ -148,8 +151,16 @@ def _remote(self,
memory=None,
object_store_memory=None,
resources=None,
max_retries=None):
max_retries=None,
internal_called=False):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this internal_called flag? Can we just remove all instances of calling this instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, there are mainly two places will call this _remote method, one place is inside the class definition which is good and intended, the other place is outside of the class, we only want to raise a warning message when the latter case happens, thus need this flag to distinguish.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just get rid of _remote() entirely instead. Looks like .remote() just calls ._remote() directly - we can just move the implementation to .remote() and leave a deprecation error in ._remote()

"""Submit the remote function for execution."""
if not internal_called:
# NOTE: We still want this method, we just dont want user to
# call this directly outside of this class. In the upcoming
# release, simply add `error=True` to this call will raise an
# Exception.
deprecation_warning("_remote", new=".options")

worker = ray.worker.global_worker
worker.check_connected()

Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def create_backend(func_or_class,
# arg list for a fn is function itself
arg_list = [func_or_class]
# ignore lint on lambda expression
creator = lambda kwrgs: TaskRunnerActor._remote(**kwrgs) # noqa: E731
creator = lambda kwrgs: TaskRunnerActor.remote(**kwrgs) # noqa: E731
elif inspect.isclass(func_or_class):
# Python inheritance order is right-to-left. We put RayServeMixin
# on the left to make sure its methods are not overriden.
Expand All @@ -278,7 +278,7 @@ def __init__(self, *args, **kwargs):

arg_list = actor_init_args
# ignore lint on lambda expression
creator = lambda kwargs: CustomActor._remote(**kwargs) # noqa: E731
creator = lambda kwargs: CustomActor.remote(**kwargs) # noqa: E731
else:
raise TypeError(
"Backend must be a function or class, it is {}.".format(
Expand Down
8 changes: 8 additions & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,11 @@ py_test(
tags = ["exclusive"],
deps = ["//:ray_lib"],
)

py_test(
name = "test_deprecate_remote",
size = "small",
srcs = ["test_deprecate_remote.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
8 changes: 4 additions & 4 deletions python/ray/tests/test_actor_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,11 @@ def ping(self):
return "pong"

with pytest.raises(Exception, match="Detached actors must be named"):
DetachedActor._remote(detached=True)
DetachedActor.options(detached=True)

with pytest.raises(ValueError, match="Please use a different name"):
_ = DetachedActor._remote(name="d_actor")
DetachedActor._remote(name="d_actor")
_ = DetachedActor.options(name="d_actor")
DetachedActor.options(name="d_actor")

redis_address = ray_start_regular["redis_address"]

Expand All @@ -677,7 +677,7 @@ class DetachedActor:
def ping(self):
return "pong"

actor = DetachedActor._remote(name="{}", detached=True)
actor = DetachedActor.options(name="{}", detached=True)
ray.get(actor.ping.remote())
""".format(redis_address, actor_name)

Expand Down
9 changes: 6 additions & 3 deletions python/ray/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,17 @@ def f(x):

# These objects will live on different nodes.
object_ids = [
f._remote(args=[1], resources={str(i): 1}) for i in range(num_nodes)
f.options(resources={
str(i): 1
}).remote(1) for i in range(num_nodes)
]

# Broadcast each object from each machine to each other machine.
for object_id in object_ids:
ray.get([
f._remote(args=[object_id], resources={str(i): 1})
for i in range(num_nodes)
f.options(resources={
str(i): 1
}).remote(object_id) for i in range(num_nodes)
])

# The profiling information only flushes once every second.
Expand Down
8 changes: 4 additions & 4 deletions python/ray/tests/test_advanced_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,17 @@ def method(self):
pass

# Create an actor that requires 0.7 of the custom resource.
f1 = Foo2._remote([], {}, resources={"Custom": 0.7})
f1 = Foo2.options(resources={"Custom": 0.7}).remote()
ray.get(f1.method.remote())
# Make sure that we cannot create an actor that requires 0.7 of the
# custom resource. TODO(rkn): Re-enable this once ray.wait is
# implemented.
f2 = Foo2._remote([], {}, resources={"Custom": 0.7})
f2 = Foo2.options(resources={"Custom": 0.7}).remote()
ready, _ = ray.wait([f2.method.remote()], timeout=0.5)
assert len(ready) == 0
# Make sure we can start an actor that requries only 0.3 of the custom
# resource.
f3 = Foo2._remote([], {}, resources={"Custom": 0.3})
f3 = Foo2.options(resources={"Custom": 0.3}).remote()
ray.get(f3.method.remote())

del f1, f3
Expand All @@ -314,7 +314,7 @@ def test():
test.remote()

with pytest.raises(ValueError):
Foo2._remote([], {}, resources={"Custom": 1.5})
Foo2.options(resources={"Custom": 1.5}).remote()


def test_multiple_raylets(ray_start_cluster):
Expand Down
Loading