Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/requirements-doc.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
colorama
click
flatbuffers
funcsigs
mock
numpy
Expand Down
14 changes: 12 additions & 2 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,24 @@
"tensorflow.python.util",
"ray.local_scheduler",
"ray.plasma",
"ray.core",
"ray.core.generated",
"ray.core.generated.DriverTableMessage",
"ray.core.generated.LocalSchedulerInfoMessage",
"ray.core.generated.ResultTableReply",
"ray.core.generated.SubscribeToDBClientTableReply",
"ray.core.generated.SubscribeToNotificationsReply",
"ray.core.generated.TaskInfo",
"ray.core.generated.TaskReply",
"ray.core.generated.ResultTableReply",
"ray.core.generated.TaskExecutionDependencies",
"ray.core.generated.ClientTableData",
"ray.core.generated.GcsTableEntry",
"ray.core.generated.HeartbeatTableData",
"ray.core.generated.ErrorTableData",
"ray.core.generated.ObjectTableData",
"ray.core.generated.ray.protocol.Task"]
"ray.core.generated.ray.protocol.Task",
"ray.core.generated.TablePrefix",
"ray.core.generated.TablePubsub",]
for mod_name in MOCK_MODULES:
sys.modules[mod_name] = mock.Mock()

Expand Down
8 changes: 4 additions & 4 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def save_and_log_checkpoint(worker, actor):
traceback_str = ray.utils.format_error_message(traceback.format_exc())
# Log the error message.
ray.utils.push_error_to_driver(
worker.redis_client,
worker,
ray_constants.CHECKPOINT_PUSH_ERROR,
traceback_str,
driver_id=worker.task_driver_id.id(),
Expand All @@ -188,7 +188,7 @@ def restore_and_log_checkpoint(worker, actor):
traceback_str = ray.utils.format_error_message(traceback.format_exc())
# Log the error message.
ray.utils.push_error_to_driver(
worker.redis_client,
worker,
ray_constants.CHECKPOINT_PUSH_ERROR,
traceback_str,
driver_id=worker.task_driver_id.id(),
Expand Down Expand Up @@ -330,7 +330,7 @@ def temporary_actor_method(*xs):
traceback_str = ray.utils.format_error_message(traceback.format_exc())
# Log the error message.
push_error_to_driver(
worker.redis_client,
worker,
ray_constants.REGISTER_ACTOR_PUSH_ERROR,
traceback_str,
driver_id,
Expand Down Expand Up @@ -402,7 +402,7 @@ def export_actor_class(class_id, Class, actor_method_names,
.format(actor_class_info["class_name"],
len(actor_class_info["class"])))
ray.utils.push_error_to_driver(
worker.redis_client,
worker,
ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR,
warning_message,
driver_id=worker.task_driver_id.id())
Expand Down
45 changes: 20 additions & 25 deletions python/ray/common/redis_module/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,9 @@
import time
import unittest

import ray.gcs_utils
import ray.services

# Import flatbuffer bindings.
from ray.core.generated.SubscribeToNotificationsReply \
import SubscribeToNotificationsReply
from ray.core.generated.TaskReply import TaskReply
from ray.core.generated.ResultTableReply import ResultTableReply

OBJECT_INFO_PREFIX = "OI:"
OBJECT_LOCATION_PREFIX = "OL:"
OBJECT_SUBSCRIBE_PREFIX = "OS:"
TASK_PREFIX = "TT:"
OBJECT_CHANNEL_PREFIX = "OC:"


def integerToAsciiHex(num, numbytes):
retstr = b""
Expand Down Expand Up @@ -194,7 +183,7 @@ def testObjectTableSubscribeToNotifications(self):
# notifications.
def check_object_notification(notification_message, object_id,
object_size, manager_ids):
notification_object = (SubscribeToNotificationsReply.
notification_object = (ray.gcs_utils.SubscribeToNotificationsReply.
GetRootAsSubscribeToNotificationsReply(
notification_message, 0))
self.assertEqual(notification_object.ObjectId(), object_id)
Expand All @@ -208,7 +197,8 @@ def check_object_notification(notification_message, object_id,
data_size = 0xf1f0
p = self.redis.pubsub()
# Subscribe to an object ID.
p.psubscribe("{}manager_id1".format(OBJECT_CHANNEL_PREFIX))
p.psubscribe("{}manager_id1".format(
ray.gcs_utils.OBJECT_CHANNEL_PREFIX))
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1",
data_size, "hash1", "manager_id2")
# Receive the acknowledgement message.
Expand Down Expand Up @@ -252,8 +242,9 @@ def check_object_notification(notification_message, object_id,

def testResultTableAddAndLookup(self):
def check_result_table_entry(message, task_id, is_put):
result_table_reply = ResultTableReply.GetRootAsResultTableReply(
message, 0)
result_table_reply = (
ray.gcs_utils.ResultTableReply.GetRootAsResultTableReply(
message, 0))
self.assertEqual(result_table_reply.TaskId(), task_id)
self.assertEqual(result_table_reply.IsPut(), is_put)

Expand Down Expand Up @@ -315,12 +306,13 @@ def testTaskTableAddAndLookup(self):
# make sure somebody will get a notification (checked in the redis
# module)
p = self.redis.pubsub()
p.psubscribe("{prefix}*:*".format(prefix=TASK_PREFIX))
p.psubscribe("{prefix}*:*".format(prefix=ray.gcs_utils.TASK_PREFIX))

def check_task_reply(message, task_args, updated=False):
(task_status, local_scheduler_id, execution_dependencies_string,
spillback_count, task_spec) = task_args
task_reply_object = TaskReply.GetRootAsTaskReply(message, 0)
task_reply_object = ray.gcs_utils.TaskReply.GetRootAsTaskReply(
message, 0)
self.assertEqual(task_reply_object.State(), task_status)
self.assertEqual(task_reply_object.LocalSchedulerId(),
local_scheduler_id)
Expand Down Expand Up @@ -409,7 +401,8 @@ def check_task_subscription(self, p, scheduling_state, local_scheduler_id):
# Receive the data.
message = get_next_message(p)["data"]
# Check that the notification object is correct.
notification_object = TaskReply.GetRootAsTaskReply(message, 0)
notification_object = ray.gcs_utils.TaskReply.GetRootAsTaskReply(
message, 0)
self.assertEqual(notification_object.TaskId(), task_args[0])
self.assertEqual(notification_object.State(), task_args[1])
self.assertEqual(notification_object.LocalSchedulerId(), task_args[2])
Expand All @@ -422,32 +415,34 @@ def testTaskTableSubscribe(self):
local_scheduler_id = "local_scheduler_id"
# Subscribe to the task table.
p = self.redis.pubsub()
p.psubscribe("{prefix}*:*".format(prefix=TASK_PREFIX))
p.psubscribe("{prefix}*:*".format(prefix=ray.gcs_utils.TASK_PREFIX))
# Receive acknowledgment.
self.assertEqual(get_next_message(p)["data"], 1)
self.check_task_subscription(p, scheduling_state, local_scheduler_id)
# unsubscribe to make sure there is only one subscriber at a given time
p.punsubscribe("{prefix}*:*".format(prefix=TASK_PREFIX))
p.punsubscribe("{prefix}*:*".format(prefix=ray.gcs_utils.TASK_PREFIX))
# Receive acknowledgment.
self.assertEqual(get_next_message(p)["data"], 0)

p.psubscribe("{prefix}*:{state}".format(
prefix=TASK_PREFIX, state=scheduling_state))
prefix=ray.gcs_utils.TASK_PREFIX, state=scheduling_state))
# Receive acknowledgment.
self.assertEqual(get_next_message(p)["data"], 1)
self.check_task_subscription(p, scheduling_state, local_scheduler_id)
p.punsubscribe("{prefix}*:{state}".format(
prefix=TASK_PREFIX, state=scheduling_state))
prefix=ray.gcs_utils.TASK_PREFIX, state=scheduling_state))
# Receive acknowledgment.
self.assertEqual(get_next_message(p)["data"], 0)

p.psubscribe("{prefix}{local_scheduler_id}:*".format(
prefix=TASK_PREFIX, local_scheduler_id=local_scheduler_id))
prefix=ray.gcs_utils.TASK_PREFIX,
local_scheduler_id=local_scheduler_id))
# Receive acknowledgment.
self.assertEqual(get_next_message(p)["data"], 1)
self.check_task_subscription(p, scheduling_state, local_scheduler_id)
p.punsubscribe("{prefix}{local_scheduler_id}:*".format(
prefix=TASK_PREFIX, local_scheduler_id=local_scheduler_id))
prefix=ray.gcs_utils.TASK_PREFIX,
local_scheduler_id=local_scheduler_id))
# Receive acknowledgment.
self.assertEqual(get_next_message(p)["data"], 0)

Expand Down
Loading