Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 96 additions & 56 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ bool PublishObjectNotification(RedisModuleCtx *ctx,
RedisModuleString *client_id,
RedisModuleString *object_id,
RedisModuleString *data_size,
RedisModuleKey *key) {
RedisModuleKey *key, bool deleted) {
flatbuffers::FlatBufferBuilder fbb;

long long data_size_value;
Expand All @@ -425,15 +425,17 @@ bool PublishObjectNotification(RedisModuleCtx *ctx,
}

std::vector<flatbuffers::Offset<flatbuffers::String>> manager_ids;
CHECK_ERROR(
RedisModule_ZsetFirstInScoreRange(key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1),
"Unable to initialize zset iterator");
/* Loop over the managers in the object table for this object ID. */
do {
RedisModuleString *curr = RedisModule_ZsetRangeCurrentElement(key, NULL);
manager_ids.push_back(RedisStringToFlatbuf(fbb, curr));
} while (RedisModule_ZsetRangeNext(key));
if (!deleted) {
CHECK_ERROR(
RedisModule_ZsetFirstInScoreRange(key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1),
"Unable to initialize zset iterator");
/* Loop over the managers in the object table for this object ID. */
do {
RedisModuleString *curr = RedisModule_ZsetRangeCurrentElement(key, NULL);
manager_ids.push_back(RedisStringToFlatbuf(fbb, curr));
} while (RedisModule_ZsetRangeNext(key));
}

auto message = CreateSubscribeToNotificationsReply(
fbb, RedisStringToFlatbuf(fbb, object_id), data_size_value,
Expand Down Expand Up @@ -926,6 +928,81 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
return result;
}

/* send notification to the subscribers */
int sendObjectNotification(RedisModuleCtx *ctx,
RedisModuleString *object_id,
RedisModuleKey *table_key,
bool deleted) {
RedisModuleString *data_size;
if (deleted) {
data_size = RedisModule_CreateStringFromLongLong(ctx, 0);
} else {
RedisModuleKey *info_key;
info_key = OpenPrefixedKey(ctx, OBJECT_INFO_PREFIX, object_id,
REDISMODULE_READ | REDISMODULE_WRITE);
RedisModule_HashGet(info_key, REDISMODULE_HASH_CFIELDS, "data_size",
&data_size, NULL);
RedisModule_CloseKey(info_key);
if (data_size == NULL) {
return REDISMODULE_ERR;
}
}

bool success = true;
/* broadcast channel */
RedisModuleString *bcast_client_str =
RedisModule_CreateString(ctx, OBJECT_BCAST, strlen(OBJECT_BCAST));

success = PublishObjectNotification(ctx, bcast_client_str, object_id,
data_size, table_key, deleted);
RedisModule_FreeString(ctx, bcast_client_str);
if (!success) {
/* The publish failed somehow. */
RedisModule_FreeString(ctx, data_size);
return REDISMODULE_ERR;
}

/* Get the zset of clients that requested a notification about the
* availability of this object. */
RedisModuleKey *object_notification_key =
OpenPrefixedKey(ctx, OBJECT_NOTIFICATION_PREFIX, object_id,
REDISMODULE_READ | REDISMODULE_WRITE);
/* If the zset exists, initialize the key to iterate over the zset. */
if (RedisModule_KeyType(object_notification_key) !=
REDISMODULE_KEYTYPE_EMPTY) {
CHECK_ERROR(RedisModule_ZsetFirstInScoreRange(
object_notification_key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1),
"Unable to initialize zset iterator when send object notification");
/* Iterate over the list of clients that requested notifiations about the
* availability of this object, and publish notifications to their object
* notification channels. */
do {
RedisModuleString *client_id =
RedisModule_ZsetRangeCurrentElement(object_notification_key, NULL);
/* TODO(rkn): Some computation could be saved by batching the string
* constructions in the multiple calls to PublishObjectNotification
* together. */
success = PublishObjectNotification(ctx, client_id, object_id,
data_size, table_key, deleted);
if (!success) {
/* The publish failed somehow. */
RedisModule_CloseKey(object_notification_key);
RedisModule_FreeString(ctx, data_size);
return REDISMODULE_ERR;
}
} while (RedisModule_ZsetRangeNext(object_notification_key));
/* Now that the clients have been notified, remove the zset of clients
* waiting for notifications. */
CHECK_ERROR(RedisModule_DeleteKey(object_notification_key),
"Unable to delete zset key when send object notification.");
RedisModule_CloseKey(object_notification_key);
}

RedisModule_FreeString(ctx, data_size);
return REDISMODULE_OK;
}

/**
* Add a new entry to the object table or update an existing one.
*
Expand Down Expand Up @@ -994,51 +1071,8 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
/* Sets are not implemented yet, so we use ZSETs instead. */
RedisModule_ZsetAdd(table_key, 0.0, manager, NULL);

RedisModuleString *bcast_client_str =
RedisModule_CreateString(ctx, OBJECT_BCAST, strlen(OBJECT_BCAST));
bool success = PublishObjectNotification(ctx, bcast_client_str, object_id,
data_size, table_key);
if (!success) {
/* The publish failed somehow. */
return RedisModule_ReplyWithError(ctx, "PUBLISH BCAST unsuccessful");
}
RedisModule_FreeString(ctx, bcast_client_str);

/* Get the zset of clients that requested a notification about the
* availability of this object. */
RedisModuleKey *object_notification_key =
OpenPrefixedKey(ctx, OBJECT_NOTIFICATION_PREFIX, object_id,
REDISMODULE_READ | REDISMODULE_WRITE);
/* If the zset exists, initialize the key to iterate over the zset. */
if (RedisModule_KeyType(object_notification_key) !=
REDISMODULE_KEYTYPE_EMPTY) {
CHECK_ERROR(RedisModule_ZsetFirstInScoreRange(
object_notification_key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1),
"Unable to initialize zset iterator");
/* Iterate over the list of clients that requested notifiations about the
* availability of this object, and publish notifications to their object
* notification channels. */

do {
RedisModuleString *client_id =
RedisModule_ZsetRangeCurrentElement(object_notification_key, NULL);
/* TODO(rkn): Some computation could be saved by batching the string
* constructions in the multiple calls to PublishObjectNotification
* together. */
bool success = PublishObjectNotification(ctx, client_id, object_id,
data_size, table_key);
if (!success) {
/* The publish failed somehow. */
RedisModule_CloseKey(object_notification_key);
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
}
} while (RedisModule_ZsetRangeNext(object_notification_key));
/* Now that the clients have been notified, remove the zset of clients
* waiting for notifications. */
CHECK_ERROR(RedisModule_DeleteKey(object_notification_key),
"Unable to delete zset key.");
RedisModule_CloseKey(object_notification_key);
if (sendObjectNotification(ctx, object_id, table_key, false) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
}

RedisModule_CloseKey(table_key);
Expand Down Expand Up @@ -1085,6 +1119,12 @@ int ObjectTableRemove_RedisCommand(RedisModuleCtx *ctx,
}

RedisModule_ZsetRem(table_key, manager, NULL);
if (RedisModule_ZsetRangeCurrentElement(table_key, NULL) == NULL) {
sendObjectNotification(ctx, object_id, table_key, true);
RedisModule_DeleteKey(table_key);
} else {
sendObjectNotification(ctx, object_id, table_key, false);
}
RedisModule_CloseKey(table_key);

RedisModule_ReplyWithSimpleString(ctx, "OK");
Expand Down Expand Up @@ -1159,7 +1199,7 @@ int ObjectTableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
}

bool success = PublishObjectNotification(ctx, client_id, object_id,
existing_data_size, key);
existing_data_size, key, false);
RedisModule_FreeString(ctx, existing_data_size);
if (!success) {
/* The publish failed somehow. */
Expand Down
18 changes: 18 additions & 0 deletions src/common/test/object_table_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -765,12 +765,18 @@ TEST subscribe_object_not_present_test(void) {
const char *subscribe_object_available_later_context =
"subscribe_object_available_later";
int subscribe_object_available_later_succeeded = 0;
int subscribe_object_unavailable_later_succeeded = 0;

void subscribe_object_available_later_object_available_callback(
ObjectID object_id,
int64_t data_size,
const std::vector<DBClientID> &manager_vector,
void *user_context) {
if (manager_vector.size() == 0) {
/* a notification which indicate object removed */
subscribe_object_unavailable_later_succeeded += 1;
return;
}
subscribe_object_present_context_t *myctx =
(subscribe_object_present_context_t *) user_context;
RAY_CHECK(myctx->data_size == data_size);
Expand Down Expand Up @@ -877,6 +883,16 @@ TEST subscribe_object_available_subscribe_all(void) {
/* Run the event loop to do the object table add. */
event_loop_run(g_loop);
/* At this point we assume that object table add completed. */
ASSERT_EQ(subscribe_object_available_later_succeeded, 1);
ASSERT_EQ(subscribe_object_unavailable_later_succeeded, 0);
object_table_remove(db, id, NULL, &retry, NULL, NULL);
/* Install handler to terminate event loop after 750ms. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* Run the event loop to do the object table remove. */
event_loop_run(g_loop);
/* At this point we assume that object table remove completed. */

db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
Expand All @@ -887,8 +903,10 @@ TEST subscribe_object_available_subscribe_all(void) {
subscribe_object_available_later_succeeded);
fflush(stdout);
ASSERT_EQ(subscribe_object_available_later_succeeded, 1);
ASSERT_EQ(subscribe_object_unavailable_later_succeeded, 1);
/* Reset the global variable before exiting this unit test. */
subscribe_object_available_later_succeeded = 0;
subscribe_object_unavailable_later_succeeded = 0;
PASS();
}

Expand Down
3 changes: 3 additions & 0 deletions src/global_scheduler/global_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ void object_table_subscribe_callback(ObjectID object_id,
for (size_t i = 0; i < managers.size(); i++) {
obj_info_entry.object_locations.push_back(managers[i]);
}
if (managers.size() == 0) {
state->scheduler_object_info_table.erase(object_id);
}
}

void local_scheduler_table_handler(DBClientID client_id,
Expand Down