Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20.1</version>
<version>2.21.0</version>
<configuration>
<!-- <properties>
<property>
Expand Down
6 changes: 3 additions & 3 deletions python/ray/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
pd_major = int(pd_version.split(".")[0])
pd_minor = int(pd_version.split(".")[1])

if pd_major == 0 and pd_minor < 22:
raise Exception("In order to use Pandas on Ray, please upgrade your Pandas"
" version to >= 0.22.")
if pd_major == 0 and pd_minor != 22:
raise Exception("In order to use Pandas on Ray, your pandas version must "
"be 0.22. You can run 'pip install pandas==0.22'")

DEFAULT_NPARTITIONS = 8

Expand Down
42 changes: 28 additions & 14 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2529,6 +2529,11 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
correspond to objects that are stored in the object store. The second list
corresponds to the rest of the object IDs (which may or may not be ready).

Ordering of the input list of object IDs is preserved: if A precedes B in
the input list, and both are in the ready list, then A will precede B in
the ready list. This also holds true if A and B are both in the remaining
list.

Args:
object_ids (List[ObjectID]): List of object IDs for objects that may or
may not be ready. Note that these IDs must be unique.
Expand All @@ -2540,9 +2545,6 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
A list of object IDs that are ready and a list of the remaining object
IDs.
"""
if worker.use_raylet:
print("plasma_client.wait has not been implemented yet")
return

if isinstance(object_ids, ray.ObjectID):
raise TypeError(
Expand Down Expand Up @@ -2574,18 +2576,30 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
if len(object_ids) == 0:
return [], []

object_id_strs = [
plasma.ObjectID(object_id.id()) for object_id in object_ids
]
if len(object_ids) != len(set(object_ids)):
raise Exception("Wait requires a list of unique object IDs.")
if num_returns <= 0:
raise Exception(
"Invalid number of objects to return %d." % num_returns)
if num_returns > len(object_ids):
raise Exception("num_returns cannot be greater than the number "
"of objects provided to ray.wait.")
timeout = timeout if timeout is not None else 2**30
ready_ids, remaining_ids = worker.plasma_client.wait(
object_id_strs, timeout, num_returns)
ready_ids = [
ray.ObjectID(object_id.binary()) for object_id in ready_ids
]
remaining_ids = [
ray.ObjectID(object_id.binary()) for object_id in remaining_ids
]
if worker.use_raylet:
ready_ids, remaining_ids = worker.local_scheduler_client.wait(
object_ids, num_returns, timeout, False)
else:
object_id_strs = [
plasma.ObjectID(object_id.id()) for object_id in object_ids
]
ready_ids, remaining_ids = worker.plasma_client.wait(
object_id_strs, timeout, num_returns)
ready_ids = [
ray.ObjectID(object_id.binary()) for object_id in ready_ids
]
remaining_ids = [
ray.ObjectID(object_id.binary()) for object_id in remaining_ids
]
return ready_ids, remaining_ids


Expand Down
2 changes: 1 addition & 1 deletion src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ add_custom_command(
# flatbuffers message Message, which can be used to store deserialized
# messages in data structures. This is currently used for ObjectInfo for
# example.
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${COMMON_FBS_SRC} --gen-object-api
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${COMMON_FBS_SRC} --gen-object-api --scoped-enums
DEPENDS ${FBS_DEPENDS}
COMMENT "Running flatc compiler on ${COMMON_FBS_SRC}"
VERBATIM)
Expand Down
10 changes: 6 additions & 4 deletions src/common/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) {

disconnected:
/* Handle the case in which the socket is closed. */
*type = DISCONNECT_CLIENT;
*type = static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT);
*length = 0;
*bytes = NULL;
return;
Expand Down Expand Up @@ -382,20 +382,22 @@ int64_t read_vector(int fd, int64_t *type, std::vector<uint8_t> &buffer) {
return length;
disconnected:
/* Handle the case in which the socket is closed. */
*type = DISCONNECT_CLIENT;
*type = static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT);
return 0;
}

void write_log_message(int fd, const char *message) {
/* Account for the \0 at the end of the string. */
write_message(fd, LOG_MESSAGE, strlen(message) + 1, (uint8_t *) message);
write_message(fd, static_cast<int64_t>(CommonMessageType::LOG_MESSAGE),
strlen(message) + 1, (uint8_t *) message);
}

char *read_log_message(int fd) {
uint8_t *bytes;
int64_t type;
int64_t length;
read_message(fd, &type, &length, &bytes);
RAY_CHECK(type == LOG_MESSAGE);
RAY_CHECK(static_cast<CommonMessageType>(type) ==
CommonMessageType::LOG_MESSAGE);
return (char *) bytes;
}
2 changes: 1 addition & 1 deletion src/common/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
struct aeEventLoop;
typedef aeEventLoop event_loop;

enum common_message_type {
enum class CommonMessageType : int32_t {
/** Disconnect a client. */
DISCONNECT_CLIENT,
/** Log a message from a client. */
Expand Down
50 changes: 27 additions & 23 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ TablePubsub ParseTablePubsub(const RedisModuleString *pubsub_channel_str) {
pubsub_channel_str, &pubsub_channel_long) == REDISMODULE_OK)
<< "Pubsub channel must be a valid TablePubsub";
auto pubsub_channel = static_cast<TablePubsub>(pubsub_channel_long);
RAY_CHECK(pubsub_channel >= TablePubsub_MIN &&
pubsub_channel <= TablePubsub_MAX)
RAY_CHECK(pubsub_channel >= TablePubsub::MIN &&
pubsub_channel <= TablePubsub::MAX)
<< "Pubsub channel must be a valid TablePubsub";
return pubsub_channel;
}
Expand All @@ -90,8 +90,9 @@ RedisModuleString *FormatPubsubChannel(
const RedisModuleString *id) {
// Format the pubsub channel enum to a string. TablePubsub_MAX should be more
// than enough digits, but add 1 just in case for the null terminator.
char pubsub_channel[TablePubsub_MAX + 1];
sprintf(pubsub_channel, "%d", ParseTablePubsub(pubsub_channel_str));
char pubsub_channel[static_cast<int>(TablePubsub::MAX) + 1];
sprintf(pubsub_channel, "%d",
static_cast<int>(ParseTablePubsub(pubsub_channel_str)));
return RedisString_Format(ctx, "%s:%S", pubsub_channel, id);
}

Expand Down Expand Up @@ -123,12 +124,12 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
REDISMODULE_OK)
<< "Prefix must be a valid TablePrefix";
auto prefix = static_cast<TablePrefix>(prefix_long);
RAY_CHECK(prefix != TablePrefix_UNUSED)
RAY_CHECK(prefix != TablePrefix::UNUSED)
<< "This table has no prefix registered";
RAY_CHECK(prefix >= TablePrefix_MIN && prefix <= TablePrefix_MAX)
RAY_CHECK(prefix >= TablePrefix::MIN && prefix <= TablePrefix::MAX)
<< "Prefix must be a valid TablePrefix";
return OpenPrefixedKey(ctx, table_prefixes[prefix], keyname, mode,
mutated_key_str);
return OpenPrefixedKey(ctx, table_prefixes[static_cast<long long>(prefix)],
keyname, mode, mutated_key_str);
}

RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
Expand Down Expand Up @@ -486,27 +487,29 @@ int PublishTaskTableAdd(RedisModuleCtx *ctx,
auto message = flatbuffers::GetRoot<TaskTableData>(buf);
RAY_CHECK(message != nullptr);

if (message->scheduling_state() == SchedulingState_WAITING ||
message->scheduling_state() == SchedulingState_SCHEDULED) {
if (message->scheduling_state() == SchedulingState::WAITING ||
message->scheduling_state() == SchedulingState::SCHEDULED) {
/* Build the PUBLISH topic and message for task table subscribers. The
* topic
* is a string in the format "TASK_PREFIX:<local scheduler ID>:<state>".
* The
* message is a serialized SubscribeToTasksReply flatbuffer object. */
std::string state = std::to_string(message->scheduling_state());
std::string state =
std::to_string(static_cast<int>(message->scheduling_state()));
RedisModuleString *publish_topic = RedisString_Format(
ctx, "%s%b:%s", TASK_PREFIX, message->scheduler_id()->str().data(),
sizeof(DBClientID), state.c_str());

/* Construct the flatbuffers object for the payload. */
flatbuffers::FlatBufferBuilder fbb;
/* Create the flatbuffers message. */
auto msg = CreateTaskReply(
fbb, RedisStringToFlatbuf(fbb, id), message->scheduling_state(),
fbb.CreateString(message->scheduler_id()),
fbb.CreateString(message->execution_dependencies()),
fbb.CreateString(message->task_info()), message->spillback_count(),
true /* not used */);
auto msg =
CreateTaskReply(fbb, RedisStringToFlatbuf(fbb, id),
static_cast<long long>(message->scheduling_state()),
fbb.CreateString(message->scheduler_id()),
fbb.CreateString(message->execution_dependencies()),
fbb.CreateString(message->task_info()),
message->spillback_count(), true /* not used */);
fbb.Finish(msg);

RedisModuleString *publish_message = RedisModule_CreateString(
Expand Down Expand Up @@ -613,12 +616,12 @@ int TableAdd_DoPublish(RedisModuleCtx *ctx,

TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);

if (pubsub_channel == TablePubsub_TASK) {
if (pubsub_channel == TablePubsub::TASK) {
// Publish the task to its subscribers.
// TODO(swang): This is only necessary for legacy Ray and should be removed
// once we switch to using the new GCS API for the task table.
return PublishTaskTableAdd(ctx, id, data);
} else if (pubsub_channel != TablePubsub_NO_PUBLISH) {
} else if (pubsub_channel != TablePubsub::NO_PUBLISH) {
// All other pubsub channels write the data back directly onto the channel.
return PublishTableAdd(ctx, pubsub_channel_str, id, data);
} else {
Expand Down Expand Up @@ -723,7 +726,7 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx,
RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry";
// Publish a message on the requested pubsub channel if necessary.
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
if (pubsub_channel != TablePubsub_NO_PUBLISH) {
if (pubsub_channel != TablePubsub::NO_PUBLISH) {
// All other pubsub channels write the data back directly onto the
// channel.
return PublishTableAdd(ctx, pubsub_channel_str, id, data);
Expand Down Expand Up @@ -956,7 +959,8 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,

auto update = flatbuffers::GetRoot<TaskTableTestAndUpdate>(update_buf);

bool do_update = data->scheduling_state() & update->test_state_bitmask();
bool do_update = static_cast<int>(data->scheduling_state()) &
static_cast<int>(update->test_state_bitmask());

if (!is_nil(update->test_scheduler_id()->str())) {
do_update =
Expand Down Expand Up @@ -1460,8 +1464,8 @@ int TaskTableWrite(RedisModuleCtx *ctx,
"TaskSpec", task_spec, "spillback_count", spillback_count, NULL);
}

if (state_value == TASK_STATUS_WAITING ||
state_value == TASK_STATUS_SCHEDULED) {
if (static_cast<TaskStatus>(state_value) == TaskStatus::WAITING ||
static_cast<TaskStatus>(state_value) == TaskStatus::SCHEDULED) {
/* Build the PUBLISH topic and message for task table subscribers. The
* topic is a string in the format
* "TASK_PREFIX:<local scheduler ID>:<state>". The message is a serialized
Expand Down
2 changes: 1 addition & 1 deletion src/common/state/error_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const char *error_types[] = {"object_hash_mismatch", "put_reconstruction",

void push_error(DBHandle *db_handle,
DBClientID driver_id,
int error_type,
ErrorIndex error_type,
const std::string &error_message) {
int64_t message_size = error_message.size();

Expand Down
34 changes: 17 additions & 17 deletions src/common/state/error_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@
#include "db.h"
#include "table.h"

/// An ErrorIndex may be used as an index into error_types.
enum class ErrorIndex : int32_t {
/// An object was added with a different hash from the existing one.
OBJECT_HASH_MISMATCH = 0,
/// An object that was created through a ray.put is lost.
PUT_RECONSTRUCTION,
/// A worker died or was killed while executing a task.
WORKER_DIED,
/// An actor hasn't been created for a while.
ACTOR_NOT_CREATED,
/// The total number of error types.
MAX
};

/// Data that is needed to push an error.
typedef struct {
/// The ID of the driver to push the error to.
DBClientID driver_id;
/// An index into the error_types array indicating the type of the error.
int error_type;
ErrorIndex error_type;
/// The key to use for the error message in Redis.
UniqueID error_key;
/// The length of the error message.
Expand All @@ -18,33 +32,19 @@ typedef struct {
uint8_t error_message[0];
} ErrorInfo;

/// An error_index may be used as an index into error_types.
typedef enum {
/// An object was added with a different hash from the existing one.
OBJECT_HASH_MISMATCH_ERROR_INDEX = 0,
/// An object that was created through a ray.put is lost.
PUT_RECONSTRUCTION_ERROR_INDEX,
/// A worker died or was killed while executing a task.
WORKER_DIED_ERROR_INDEX,
/// An actor hasn't been created for a while.
ACTOR_NOT_CREATED_ERROR_INDEX,
/// The total number of error types.
MAX_ERROR_INDEX
} error_index;

extern const char *error_types[];

/// Push an error to the given Python driver.
///
/// \param db_handle Database handle.
/// \param driver_id The ID of the Python driver to push the error to.
/// \param error_type An index specifying the type of the error. This should
/// be a value from the error_index enum.
/// be a value from the ErrorIndex enum.
/// \param error_message The error message to print.
/// \return Void.
void push_error(DBHandle *db_handle,
DBClientID driver_id,
int error_type,
ErrorIndex error_type,
const std::string &error_message);

#endif
13 changes: 7 additions & 6 deletions src/common/state/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ Task *parse_and_construct_task_from_redis_reply(redisReply *reply) {
flatbuffers::GetRoot<TaskExecutionDependencies>(
message->execution_dependencies()->data());
task = Task_alloc(
spec, task_spec_size, message->state(),
spec, task_spec_size, static_cast<TaskStatus>(message->state()),
from_flatbuf(*message->local_scheduler_id()),
from_flatbuf(*execution_dependencies->execution_dependencies()));
} else {
Expand Down Expand Up @@ -932,7 +932,7 @@ void redis_task_table_add_task(TableCallbackData *callback_data) {
TaskID task_id = Task_task_id(task);
DBClientID local_scheduler_id = Task_local_scheduler(task);
redisAsyncContext *context = get_redis_context(db, task_id);
int state = Task_state(task);
int state = static_cast<int>(Task_state(task));

TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
TaskSpec *spec = execution_spec->Spec();
Expand Down Expand Up @@ -998,7 +998,7 @@ void redis_task_table_update(TableCallbackData *callback_data) {
TaskID task_id = Task_task_id(task);
redisAsyncContext *context = get_redis_context(db, task_id);
DBClientID local_scheduler_id = Task_local_scheduler(task);
int state = Task_state(task);
int state = static_cast<int>(Task_state(task));

TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
flatbuffers::FlatBufferBuilder fbb;
Expand Down Expand Up @@ -1108,7 +1108,7 @@ void redis_task_table_subscribe_callback(redisAsyncContext *c,
/* Handle a task table event. Parse the payload and call the callback. */
auto message = flatbuffers::GetRoot<TaskReply>(payload->str);
/* Extract the scheduling state. */
int64_t state = message->state();
TaskStatus state = static_cast<TaskStatus>(message->state());
/* Extract the local scheduler ID. */
DBClientID local_scheduler_id =
from_flatbuf(*message->local_scheduler_id());
Expand Down Expand Up @@ -1673,9 +1673,10 @@ void redis_push_error_hmset_callback(redisAsyncContext *c,
void redis_push_error(TableCallbackData *callback_data) {
DBHandle *db = callback_data->db_handle;
ErrorInfo *info = (ErrorInfo *) callback_data->data->Get();
RAY_CHECK(info->error_type < MAX_ERROR_INDEX && info->error_type >= 0);
RAY_CHECK(info->error_type < ErrorIndex::MAX &&
info->error_type >= ErrorIndex::OBJECT_HASH_MISMATCH);
/// Look up the error type.
const char *error_type = error_types[info->error_type];
const char *error_type = error_types[static_cast<int>(info->error_type)];

/* Set the error information. */
int status = redisAsyncCommand(
Expand Down
Loading