diff --git a/src/common/redis_module/ray_redis_module.cc b/src/common/redis_module/ray_redis_module.cc index 14d241c246b2..92f3e5631cf1 100644 --- a/src/common/redis_module/ray_redis_module.cc +++ b/src/common/redis_module/ray_redis_module.cc @@ -415,7 +415,7 @@ bool PublishObjectNotification(RedisModuleCtx *ctx, RedisModuleString *client_id, RedisModuleString *object_id, RedisModuleString *data_size, - RedisModuleKey *key) { + RedisModuleKey *key, bool deleted) { flatbuffers::FlatBufferBuilder fbb; long long data_size_value; @@ -425,15 +425,17 @@ bool PublishObjectNotification(RedisModuleCtx *ctx, } std::vector> manager_ids; - CHECK_ERROR( - RedisModule_ZsetFirstInScoreRange(key, REDISMODULE_NEGATIVE_INFINITE, - REDISMODULE_POSITIVE_INFINITE, 1, 1), - "Unable to initialize zset iterator"); - /* Loop over the managers in the object table for this object ID. */ - do { - RedisModuleString *curr = RedisModule_ZsetRangeCurrentElement(key, NULL); - manager_ids.push_back(RedisStringToFlatbuf(fbb, curr)); - } while (RedisModule_ZsetRangeNext(key)); + if (!deleted) { + CHECK_ERROR( + RedisModule_ZsetFirstInScoreRange(key, REDISMODULE_NEGATIVE_INFINITE, + REDISMODULE_POSITIVE_INFINITE, 1, 1), + "Unable to initialize zset iterator"); + /* Loop over the managers in the object table for this object ID. */ + do { + RedisModuleString *curr = RedisModule_ZsetRangeCurrentElement(key, NULL); + manager_ids.push_back(RedisStringToFlatbuf(fbb, curr)); + } while (RedisModule_ZsetRangeNext(key)); + } auto message = CreateSubscribeToNotificationsReply( fbb, RedisStringToFlatbuf(fbb, object_id), data_size_value, @@ -926,6 +928,81 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, return result; } +/* send notification to the subscribers */ +int sendObjectNotification(RedisModuleCtx *ctx, + RedisModuleString *object_id, + RedisModuleKey *table_key, + bool deleted) { + RedisModuleString *data_size; + if (deleted) { + data_size = RedisModule_CreateStringFromLongLong(ctx, 0); + } else { + RedisModuleKey *info_key; + info_key = OpenPrefixedKey(ctx, OBJECT_INFO_PREFIX, object_id, + REDISMODULE_READ | REDISMODULE_WRITE); + RedisModule_HashGet(info_key, REDISMODULE_HASH_CFIELDS, "data_size", + &data_size, NULL); + RedisModule_CloseKey(info_key); + if (data_size == NULL) { + return REDISMODULE_ERR; + } + } + + bool success = true; + /* broadcast channel */ + RedisModuleString *bcast_client_str = + RedisModule_CreateString(ctx, OBJECT_BCAST, strlen(OBJECT_BCAST)); + + success = PublishObjectNotification(ctx, bcast_client_str, object_id, + data_size, table_key, deleted); + RedisModule_FreeString(ctx, bcast_client_str); + if (!success) { + /* The publish failed somehow. */ + RedisModule_FreeString(ctx, data_size); + return REDISMODULE_ERR; + } + + /* Get the zset of clients that requested a notification about the + * availability of this object. */ + RedisModuleKey *object_notification_key = + OpenPrefixedKey(ctx, OBJECT_NOTIFICATION_PREFIX, object_id, + REDISMODULE_READ | REDISMODULE_WRITE); + /* If the zset exists, initialize the key to iterate over the zset. */ + if (RedisModule_KeyType(object_notification_key) != + REDISMODULE_KEYTYPE_EMPTY) { + CHECK_ERROR(RedisModule_ZsetFirstInScoreRange( + object_notification_key, REDISMODULE_NEGATIVE_INFINITE, + REDISMODULE_POSITIVE_INFINITE, 1, 1), + "Unable to initialize zset iterator when send object notification"); + /* Iterate over the list of clients that requested notifiations about the + * availability of this object, and publish notifications to their object + * notification channels. */ + do { + RedisModuleString *client_id = + RedisModule_ZsetRangeCurrentElement(object_notification_key, NULL); + /* TODO(rkn): Some computation could be saved by batching the string + * constructions in the multiple calls to PublishObjectNotification + * together. */ + success = PublishObjectNotification(ctx, client_id, object_id, + data_size, table_key, deleted); + if (!success) { + /* The publish failed somehow. */ + RedisModule_CloseKey(object_notification_key); + RedisModule_FreeString(ctx, data_size); + return REDISMODULE_ERR; + } + } while (RedisModule_ZsetRangeNext(object_notification_key)); + /* Now that the clients have been notified, remove the zset of clients + * waiting for notifications. */ + CHECK_ERROR(RedisModule_DeleteKey(object_notification_key), + "Unable to delete zset key when send object notification."); + RedisModule_CloseKey(object_notification_key); + } + + RedisModule_FreeString(ctx, data_size); + return REDISMODULE_OK; +} + /** * Add a new entry to the object table or update an existing one. * @@ -994,51 +1071,8 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx, /* Sets are not implemented yet, so we use ZSETs instead. */ RedisModule_ZsetAdd(table_key, 0.0, manager, NULL); - RedisModuleString *bcast_client_str = - RedisModule_CreateString(ctx, OBJECT_BCAST, strlen(OBJECT_BCAST)); - bool success = PublishObjectNotification(ctx, bcast_client_str, object_id, - data_size, table_key); - if (!success) { - /* The publish failed somehow. */ - return RedisModule_ReplyWithError(ctx, "PUBLISH BCAST unsuccessful"); - } - RedisModule_FreeString(ctx, bcast_client_str); - - /* Get the zset of clients that requested a notification about the - * availability of this object. */ - RedisModuleKey *object_notification_key = - OpenPrefixedKey(ctx, OBJECT_NOTIFICATION_PREFIX, object_id, - REDISMODULE_READ | REDISMODULE_WRITE); - /* If the zset exists, initialize the key to iterate over the zset. */ - if (RedisModule_KeyType(object_notification_key) != - REDISMODULE_KEYTYPE_EMPTY) { - CHECK_ERROR(RedisModule_ZsetFirstInScoreRange( - object_notification_key, REDISMODULE_NEGATIVE_INFINITE, - REDISMODULE_POSITIVE_INFINITE, 1, 1), - "Unable to initialize zset iterator"); - /* Iterate over the list of clients that requested notifiations about the - * availability of this object, and publish notifications to their object - * notification channels. */ - - do { - RedisModuleString *client_id = - RedisModule_ZsetRangeCurrentElement(object_notification_key, NULL); - /* TODO(rkn): Some computation could be saved by batching the string - * constructions in the multiple calls to PublishObjectNotification - * together. */ - bool success = PublishObjectNotification(ctx, client_id, object_id, - data_size, table_key); - if (!success) { - /* The publish failed somehow. */ - RedisModule_CloseKey(object_notification_key); - return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful"); - } - } while (RedisModule_ZsetRangeNext(object_notification_key)); - /* Now that the clients have been notified, remove the zset of clients - * waiting for notifications. */ - CHECK_ERROR(RedisModule_DeleteKey(object_notification_key), - "Unable to delete zset key."); - RedisModule_CloseKey(object_notification_key); + if (sendObjectNotification(ctx, object_id, table_key, false) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful"); } RedisModule_CloseKey(table_key); @@ -1085,6 +1119,12 @@ int ObjectTableRemove_RedisCommand(RedisModuleCtx *ctx, } RedisModule_ZsetRem(table_key, manager, NULL); + if (RedisModule_ZsetRangeCurrentElement(table_key, NULL) == NULL) { + sendObjectNotification(ctx, object_id, table_key, true); + RedisModule_DeleteKey(table_key); + } else { + sendObjectNotification(ctx, object_id, table_key, false); + } RedisModule_CloseKey(table_key); RedisModule_ReplyWithSimpleString(ctx, "OK"); @@ -1159,7 +1199,7 @@ int ObjectTableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, } bool success = PublishObjectNotification(ctx, client_id, object_id, - existing_data_size, key); + existing_data_size, key, false); RedisModule_FreeString(ctx, existing_data_size); if (!success) { /* The publish failed somehow. */ diff --git a/src/common/test/object_table_tests.cc b/src/common/test/object_table_tests.cc index 2c18a89213c1..e458d96aec9b 100644 --- a/src/common/test/object_table_tests.cc +++ b/src/common/test/object_table_tests.cc @@ -765,12 +765,18 @@ TEST subscribe_object_not_present_test(void) { const char *subscribe_object_available_later_context = "subscribe_object_available_later"; int subscribe_object_available_later_succeeded = 0; +int subscribe_object_unavailable_later_succeeded = 0; void subscribe_object_available_later_object_available_callback( ObjectID object_id, int64_t data_size, const std::vector &manager_vector, void *user_context) { + if (manager_vector.size() == 0) { + /* a notification which indicate object removed */ + subscribe_object_unavailable_later_succeeded += 1; + return; + } subscribe_object_present_context_t *myctx = (subscribe_object_present_context_t *) user_context; RAY_CHECK(myctx->data_size == data_size); @@ -877,6 +883,16 @@ TEST subscribe_object_available_subscribe_all(void) { /* Run the event loop to do the object table add. */ event_loop_run(g_loop); /* At this point we assume that object table add completed. */ + ASSERT_EQ(subscribe_object_available_later_succeeded, 1); + ASSERT_EQ(subscribe_object_unavailable_later_succeeded, 0); + object_table_remove(db, id, NULL, &retry, NULL, NULL); + /* Install handler to terminate event loop after 750ms. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + /* Run the event loop to do the object table remove. */ + event_loop_run(g_loop); + /* At this point we assume that object table remove completed. */ db_disconnect(db); destroy_outstanding_callbacks(g_loop); @@ -887,8 +903,10 @@ TEST subscribe_object_available_subscribe_all(void) { subscribe_object_available_later_succeeded); fflush(stdout); ASSERT_EQ(subscribe_object_available_later_succeeded, 1); + ASSERT_EQ(subscribe_object_unavailable_later_succeeded, 1); /* Reset the global variable before exiting this unit test. */ subscribe_object_available_later_succeeded = 0; + subscribe_object_unavailable_later_succeeded = 0; PASS(); } diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index db97e76eb38c..b238b715188e 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -336,6 +336,9 @@ void object_table_subscribe_callback(ObjectID object_id, for (size_t i = 0; i < managers.size(); i++) { obj_info_entry.object_locations.push_back(managers[i]); } + if (managers.size() == 0) { + state->scheduler_object_info_table.erase(object_id); + } } void local_scheduler_table_handler(DBClientID client_id,