Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions python/ray/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,20 +509,29 @@ def _publish_actor_class_to_key(self, key, actor_class_info):
"""
# We set the driver ID here because it may not have been available when
# the actor class was defined.
actor_class_info["driver_id"] = self._worker.task_driver_id.id()
self._worker.redis_client.hmset(key, actor_class_info)
self._worker.redis_client.rpush("Exports", key)

def export_actor_class(self, Class, actor_method_names,
checkpoint_interval):
function_descriptor = FunctionDescriptor.from_class(Class)
key = (b"ActorClass:" + self._worker.task_driver_id.id() + b":" +
# `task_driver_id` shouldn't be NIL, unless:
# 1) This worker isn't an actor;
# 2) And a previous task started a background thread, which didn't
# finish before the task finished, and still uses Ray API
# after that.
assert not self._worker.task_driver_id.is_nil(), (
"You might have started a background thread in a non-actor task, "
"please make sure the thread finishes before the task finishes.")
driver_id = self._worker.task_driver_id
key = (b"ActorClass:" + driver_id.id() + b":" +
function_descriptor.function_id.id())
actor_class_info = {
"class_name": Class.__name__,
"module": Class.__module__,
"class": pickle.dumps(Class),
"checkpoint_interval": checkpoint_interval,
"driver_id": driver_id.id(),
"actor_method_names": json.dumps(list(actor_method_names))
}

Expand Down
5 changes: 5 additions & 0 deletions python/ray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import functools
import hashlib
import inspect
import logging
import numpy as np
import os
import subprocess
Expand All @@ -18,6 +19,8 @@
import ray.raylet
import ray.ray_constants as ray_constants

logger = logging.getLogger(__name__)


def _random_string():
id_hash = hashlib.sha1()
Expand Down Expand Up @@ -69,6 +72,8 @@ def push_error_to_driver(worker,
if driver_id is None:
driver_id = ray_constants.NIL_JOB_ID.id()
data = {} if data is None else data
logging.error("Pushing error to dirver, type: %s, message: %s.",
error_type, message)
worker.raylet_client.push_error(
ray.ObjectID(driver_id), error_type, message, time.time())

Expand Down
42 changes: 26 additions & 16 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,8 @@ def submit_task(self,
task_index = self.task_index
self.task_index += 1
# The parent task must be set for the submitted task.
assert not self.current_task_id.is_nil()
if self.actor_id == NIL_ACTOR_ID:
assert not self.current_task_id.is_nil()
# Submit the task to local scheduler.
function_descriptor_list = (
function_descriptor.get_function_descriptor_list())
Expand Down Expand Up @@ -766,23 +767,30 @@ def _process_task(self, task, function_execution_info):
use the outputs of this task).
"""
with self.state_lock:
assert self.task_driver_id.is_nil()
assert self.current_task_id.is_nil()
assert self.task_index == 0
assert self.put_index == 1
if task.actor_id().is_nil():
# If this worker is not an actor, check that `task_driver_id`
# was reset when the worker finished the previous task.
assert self.task_driver_id.is_nil()
# Set the driver ID of the current running task. This is
# needed so that if the task throws an exception, we propagate
# the error message to the correct driver.
self.task_driver_id = task.driver_id()
else:
# If this worker is an actor, task_driver_id wasn't reset.
# Check that current task's driver ID equals the previous one.
assert self.task_driver_id == task.driver_id()

# The ID of the driver that this task belongs to. This is needed so
# that if the task throws an exception, we propagate the error
# message to the correct driver.
self.task_driver_id = task.driver_id()
self.current_task_id = task.task_id()

function_descriptor = FunctionDescriptor.from_bytes_list(
task.function_descriptor_list())
args = task.arguments()
return_object_ids = task.returns()
if (task.actor_id().id() != NIL_ACTOR_ID
or task.actor_creation_id().id() != NIL_ACTOR_ID):
if (not task.actor_id().is_nil()
or not task.actor_creation_id().is_nil()):
dummy_return_id = return_object_ids.pop()
function_executor = function_execution_info.function
function_name = function_execution_info.function_name
Expand All @@ -809,11 +817,11 @@ def _process_task(self, task, function_execution_info):
# Execute the task.
try:
with profiling.profile("task:execute", worker=self):
if (task.actor_id().id() == NIL_ACTOR_ID
and task.actor_creation_id().id() == NIL_ACTOR_ID):
if (task.actor_id().is_nil()
and task.actor_creation_id().is_nil()):
outputs = function_executor(*arguments)
else:
if task.actor_id().id() != NIL_ACTOR_ID:
if not task.actor_id().is_nil():
key = task.actor_id().id()
else:
key = task.actor_creation_id().id()
Expand All @@ -822,7 +830,7 @@ def _process_task(self, task, function_execution_info):
except Exception as e:
# Determine whether the exception occured during a task, not an
# actor method.
task_exception = task.actor_id().id() == NIL_ACTOR_ID
task_exception = task.actor_id().is_nil()
traceback_str = ray.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)
self._handle_process_task_failure(
Expand Down Expand Up @@ -881,7 +889,7 @@ def _wait_for_and_process_task(self, task):

# TODO(rkn): It would be preferable for actor creation tasks to share
# more of the code path with regular task execution.
if (task.actor_creation_id() != ray.ObjectID(NIL_ACTOR_ID)):
if not task.actor_creation_id().is_nil():
assert self.actor_id == NIL_ACTOR_ID
self.actor_id = task.actor_creation_id().id()
self.function_actor_manager.load_actor(driver_id,
Expand All @@ -901,8 +909,8 @@ def _wait_for_and_process_task(self, task):
"name": function_name,
"task_id": task.task_id().hex()
}
if task.actor_id().id() == NIL_ACTOR_ID:
if (task.actor_creation_id() == ray.ObjectID(NIL_ACTOR_ID)):
if task.actor_id().is_nil():
if task.actor_creation_id().is_nil():
title = "ray_worker:{}()".format(function_name)
next_title = "ray_worker"
else:
Expand All @@ -920,7 +928,9 @@ def _wait_for_and_process_task(self, task):
self._process_task(task, execution_info)
# Reset the state fields so the next task can run.
with self.state_lock:
self.task_driver_id = ray.ObjectID(NIL_ID)
if self.actor_id == NIL_ACTOR_ID:
# We will keep task_driver_id unchanged for actor.
self.task_driver_id = ray.ObjectID(NIL_ID)
self.current_task_id = ray.ObjectID(NIL_ID)
self.task_index = 0
self.put_index = 1
Expand Down