diff --git a/src/plasma/lib/python/plasma.py b/src/plasma/lib/python/plasma.py index 1162094cb928..af197df74b7c 100644 --- a/src/plasma/lib/python/plasma.py +++ b/src/plasma/lib/python/plasma.py @@ -14,8 +14,10 @@ class PlasmaID(ctypes.Structure): def make_plasma_id(string): if len(string) != PLASMA_ID_SIZE: raise Exception("PlasmaIDs must be {} characters long".format(PLASMA_ID_SIZE)) - object_id = map(ord, string) - return PlasmaID(plasma_id=ID(*object_id)) + return PlasmaID(plasma_id=ID.from_buffer_copy(string)) + +def plasma_id_to_str(plasma_id): + return str(bytearray(plasma_id.plasma_id)) class PlasmaBuffer(object): """This is the type of objects returned by calls to get with a PlasmaClient. @@ -217,6 +219,31 @@ def fetch(self, object_ids): success_array); return [bool(success) for success in success_array] + def wait(self, object_ids, timeout, num_returns): + """Wait until num_returns objects in object_ids are ready. + + Args: + object_ids (List[str]): List of object IDs to wait for. + timeout (int): Return to the caller after timeout milliseconds. + num_returns (int): We are waiting for this number of objects to be ready. + + Returns: + ready_ids, waiting_ids (List[str], List[str]): List of object IDs that + are ready and list of object IDs we might still wait on respectively. + """ + if not self.has_manager_conn: + raise Exception("Not connected to the plasma manager socket") + object_id_array = (len(object_ids) * PlasmaID)() + for i, object_id in enumerate(object_ids): + object_id_array[i] = make_plasma_id(object_id) + return_id_array = (num_returns * PlasmaID)() + num_return_objects = self.client.plasma_wait(self.plasma_conn, + object_id_array._length_, + object_id_array, + timeout, num_returns, return_id_array) + ready_ids = map(plasma_id_to_str, return_id_array[num_returns-num_return_objects:]) + return ready_ids, list(set(object_ids) - set(ready_ids)) + def subscribe(self): """Subscribe to notifications about sealed objects.""" fd = self.client.plasma_subscribe(self.plasma_conn) diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index 3b7948afe64e..1bceac4558d6 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -62,6 +62,8 @@ enum plasma_message_type { PLASMA_DATA, /** Request a fetch of an object in another store. */ PLASMA_FETCH, + /** Wait until an object becomes available. */ + PLASMA_WAIT }; typedef struct { @@ -69,6 +71,10 @@ typedef struct { int64_t data_size; /** The size of the object's metadata. */ int64_t metadata_size; + /** The timeout of the request. */ + uint64_t timeout; + /** The number of objects we wait for for wait. */ + int num_returns; /** In a transfer request, this is the IP address of the Plasma Manager to * transfer the object to. */ uint8_t addr[4]; @@ -82,8 +88,6 @@ typedef struct { } plasma_request; typedef struct { - /** The object ID that this reply refers to. */ - object_id object_id; /** The object that is returned with this reply. */ plasma_object object; /** This is used only to respond to requests of type @@ -91,6 +95,12 @@ typedef struct { * present and 0 otherwise. Used for plasma_contains and * plasma_fetch. */ int has_object; + /** Number of object IDs a wait is returning. */ + int num_objects_returned; + /** The number of object IDs that will be included in this reply. */ + int num_object_ids; + /** The IDs of the objects that this reply refers to. */ + object_id object_ids[1]; } plasma_reply; #endif diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index ed861ef18744..68b4203b81c4 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -302,6 +302,7 @@ int plasma_subscribe(plasma_connection *conn) { * message because otherwise it seems to hang on Linux. */ char dummy = '\0'; send_fd(conn->store_conn, fd[1], &dummy, 1); + close(fd[1]); /* Return the file descriptor that the client should use to read notifications * about sealed objects. */ return fd[0]; @@ -440,10 +441,12 @@ void plasma_fetch(plasma_connection *conn, CHECK(nbytes == sizeof(reply)); success = reply.has_object; } + CHECK(reply.num_object_ids == 1); /* Update the correct index in is_fetched. */ int i = 0; for (; i < num_object_ids; i++) { - if (memcmp(&object_ids[i], &reply.object_id, sizeof(object_id)) == 0) { + if (memcmp(&object_ids[i], &reply.object_ids[0], sizeof(object_id)) == + 0) { /* Check that this isn't a duplicate response. */ CHECK(!is_fetched[i]); is_fetched[i] = success; @@ -455,6 +458,30 @@ void plasma_fetch(plasma_connection *conn, } } +int plasma_wait(plasma_connection *conn, + int num_object_ids, + object_id object_ids[], + uint64_t timeout, + int num_returns, + object_id return_object_ids[]) { + CHECK(conn->manager_conn >= 0); + plasma_request *req = + make_plasma_multiple_request(num_object_ids, object_ids); + req->num_returns = num_returns; + req->timeout = timeout; + plasma_send_request(conn->manager_conn, PLASMA_WAIT, req); + free(req); + int64_t return_size = + sizeof(plasma_reply) + (num_returns - 1) * sizeof(object_id); + plasma_reply *reply = malloc(return_size); + int nbytes = recv(conn->manager_conn, (uint8_t *) reply, return_size, 0); + CHECK(nbytes == return_size); + memcpy(return_object_ids, reply->object_ids, num_returns * sizeof(object_id)); + int num_objects_returned = reply->num_objects_returned; + free(reply); + return num_objects_returned; +} + int get_manager_fd(plasma_connection *conn) { return conn->manager_conn; } diff --git a/src/plasma/plasma_client.h b/src/plasma/plasma_client.h index 05837b52c1a5..edae26fd52d4 100644 --- a/src/plasma/plasma_client.h +++ b/src/plasma/plasma_client.h @@ -196,6 +196,28 @@ void plasma_fetch(plasma_connection *conn, object_id object_ids[], int is_fetched[]); +/** + * Wait for objects to be created (right now, wait for local objects). + * + * @param conn The object containing the connection state. + * @param num_object_ids Number of object IDs wait is called on. + * @param object_ids Object IDs wait is called on. + * @param timeout Wait will time out and return after this number of ms. + * @param num_returns Number of object IDs wait will return if it doesn't time + * out. + * @param return_object_ids Out parameter for the object IDs returned by wait. + * This is an array of size num_returns. If the number of objects that + * are ready when we time out, the objects will be stored in the last + * slots of the array and the number of objects is returned. + * @return Number of objects that are actually ready. + */ +int plasma_wait(plasma_connection *conn, + int num_object_ids, + object_id object_ids[], + uint64_t timeout, + int num_returns, + object_id return_object_ids[]); + /** * Subscribe to notifications when objects are sealed in the object store. * Whenever an object is sealed, a message will be written to the client socket diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 63d6f4ceab29..3c06211fd312 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,14 @@ typedef struct client_object_connection client_object_connection; +/* Entry of the hashtable of objects that are available locally. */ +typedef struct { + /** Object id of this object. */ + object_id object_id; + /** Handle for the uthash table. */ + UT_hash_handle hh; +} available_object; + struct plasma_manager_state { /** Event loop. */ event_loop *loop; @@ -54,6 +63,8 @@ struct plasma_manager_state { * object id, value is a list of connections to the clients * who are blocking on a fetch of this object. */ client_object_connection *fetch_connections; + /** Initialize an empty hash map for the cache of local available object. */ + available_object *local_available_objects; }; plasma_manager_state *g_manager_state = NULL; @@ -106,6 +117,13 @@ struct client_connection { /** File descriptor for the socket connected to the other * plasma manager. */ int fd; + /** Timer id for timing out wait (or fetch). */ + int64_t timer_id; + /** True if this client is in a "wait" and false if it is in a "fetch". */ + bool is_wait; + /** If this client is processing a wait, this contains the object ids that + * are already available. */ + plasma_reply *wait_reply; /** The objects that we are waiting for and their callback * contexts, for either a fetch or a wait operation. */ client_object_connection *active_objects; @@ -137,7 +155,8 @@ int send_client_reply(client_connection *conn, plasma_reply *reply) { } int send_client_failure_reply(object_id object_id, client_connection *conn) { - plasma_reply reply = {.object_id = object_id, .has_object = 0}; + plasma_reply reply = { + .object_ids = {object_id}, .num_object_ids = 1, .has_object = 0}; return send_client_reply(conn, &reply); } @@ -160,7 +179,6 @@ client_object_connection *get_object_connection(client_connection *client_conn, client_object_connection *add_object_connection(client_connection *client_conn, object_id object_id) { - /* TODO(swang): Support registration of wait operations. */ /* Create a new context for this client connection and object. */ client_object_connection *object_conn = malloc(sizeof(client_object_connection)); @@ -247,6 +265,13 @@ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, sscanf(manager_addr, "%hhu.%hhu.%hhu.%hhu", &state->addr[0], &state->addr[1], &state->addr[2], &state->addr[3]); state->port = manager_port; + /* Initialize an empty hash map for the cache of local available objects. */ + state->local_available_objects = NULL; + /* Subscribe to notifications about sealed objects. */ + int plasma_fd = plasma_subscribe(state->plasma_conn); + /* Add the callback that processes the notification to the event loop. */ + event_loop_add_file(state->loop, plasma_fd, EVENT_LOOP_READ, + process_object_notification, state); return state; } @@ -409,23 +434,10 @@ void process_data_chunk(event_loop *loop, /* Seal the object and release it. The release corresponds to the call to * plasma_create that occurred in process_data_request. */ LOG_DEBUG("reading on channel %d finished", data_sock); + /* The following seal also triggers notification of clients for fetch or + * wait requests, see process_object_notification. */ plasma_seal(conn->manager_state->plasma_conn, buf->object_id); plasma_release(conn->manager_state->plasma_conn, buf->object_id); - /* Notify any clients who were waiting on a fetch to this object. */ - client_object_connection *object_conn, *next; - client_connection *client_conn; - HASH_FIND(fetch_hh, conn->manager_state->fetch_connections, &(buf->object_id), - sizeof(buf->object_id), object_conn); - plasma_reply reply = {.object_id = buf->object_id, .has_object = 1}; - while (object_conn) { - next = object_conn->next; - client_conn = object_conn->client_conn; - send_client_reply(client_conn, &reply); - event_loop_remove_timer(client_conn->manager_state->loop, - object_conn->timer); - remove_object_connection(client_conn, object_conn); - object_conn = next; - } /* Remove the request buffer used for reading this object's data. */ LL_DELETE(conn->transfer_queue, buf); free(buf); @@ -586,10 +598,12 @@ int manager_timeout_handler(event_loop *loop, timer_id id, void *context) { object_conn->num_retries--; return MANAGER_TIMEOUT; } - plasma_reply reply = {.object_id = object_conn->object_id, .has_object = 0}; + plasma_reply reply = {.object_ids = {object_conn->object_id}, + .num_object_ids = 1, + .has_object = 0}; send_client_reply(client_conn, &reply); remove_object_connection(client_conn, object_conn); - return AE_NOMORE; + return EVENT_LOOP_TIMER_DONE; } /* TODO(swang): Consolidate transfer requests for same object @@ -614,6 +628,7 @@ void request_transfer(object_id object_id, * register a Redis callback for changes to this object table entry. */ free(manager_vector); send_client_failure_reply(object_id, client_conn); + remove_object_connection(client_conn, object_conn); return; } /* Register the new outstanding fetch with the current client connection. */ @@ -644,7 +659,9 @@ void request_transfer(object_id object_id, void process_fetch_request(client_connection *client_conn, object_id object_id) { - plasma_reply reply = {.object_id = object_id}; + client_conn->is_wait = false; + client_conn->wait_reply = NULL; + plasma_reply reply = {.object_ids = {object_id}, .num_object_ids = 1}; if (client_conn->manager_state->db == NULL) { reply.has_object = 0; send_client_reply(client_conn, &reply); @@ -678,6 +695,118 @@ void process_fetch_requests(client_connection *client_conn, } } +void return_from_wait(client_connection *client_conn) { + CHECK(client_conn->is_wait); + int64_t size = + sizeof(plasma_reply) + + (client_conn->wait_reply->num_object_ids - 1) * sizeof(object_id); + client_conn->wait_reply->num_objects_returned = + client_conn->wait_reply->num_object_ids - client_conn->num_return_objects; + int n = write(client_conn->fd, (uint8_t *) client_conn->wait_reply, size); + CHECK(n == size); + free(client_conn->wait_reply); + /* Clean the remaining object connections. */ + client_object_connection *object_conn, *tmp; + HASH_ITER(active_hh, client_conn->active_objects, object_conn, tmp) { + remove_object_connection(client_conn, object_conn); + } +} + +int wait_timeout_handler(event_loop *loop, timer_id id, void *context) { + client_connection *client_conn = context; + CHECK(client_conn->timer_id == id); + return_from_wait(client_conn); + return EVENT_LOOP_TIMER_DONE; +} + +void process_wait_request(client_connection *client_conn, + int num_object_ids, + object_id object_ids[], + uint64_t timeout, + int num_returns) { + plasma_manager_state *manager_state = client_conn->manager_state; + client_conn->num_return_objects = num_returns; + client_conn->is_wait = true; + client_conn->timer_id = event_loop_add_timer( + manager_state->loop, timeout, wait_timeout_handler, client_conn); + int64_t size = sizeof(plasma_reply) + (num_returns - 1) * sizeof(object_id); + client_conn->wait_reply = malloc(size); + memset(client_conn->wait_reply, 0, size); + client_conn->wait_reply->num_object_ids = num_returns; + for (int i = 0; i < num_object_ids; ++i) { + available_object *entry; + HASH_FIND(hh, manager_state->local_available_objects, &object_ids[i], + sizeof(object_id), entry); + if (entry) { + /* If an object id occurs twice in object_ids, this will count them twice. + * This might not be desirable behavior. */ + client_conn->num_return_objects -= 1; + client_conn->wait_reply->object_ids[client_conn->num_return_objects] = + entry->object_id; + if (client_conn->num_return_objects == 0) { + event_loop_remove_timer(manager_state->loop, client_conn->timer_id); + return_from_wait(client_conn); + return; + } + } else { + add_object_connection(client_conn, object_ids[i]); + } + } +} + +void process_object_notification(event_loop *loop, + int client_sock, + void *context, + int events) { + plasma_manager_state *state = context; + object_id obj_id; + /* Read the notification from Plasma. */ + int n = recv(client_sock, &obj_id, sizeof(object_id), MSG_WAITALL); + if (n == 0) { + /* The store has closed the socket. */ + LOG_DEBUG("The plasma store has closed the object notification socket."); + event_loop_remove_file(loop, client_sock); + close(client_sock); + return; + } + CHECK(n == sizeof(object_id)); + /* Add object to locally available object. */ + /* TODO(pcm): Where is this deallocated? */ + available_object *entry = + (available_object *) malloc(sizeof(available_object)); + entry->object_id = obj_id; + HASH_ADD(hh, state->local_available_objects, object_id, sizeof(object_id), + entry); + /* Notify any clients who were waiting on a fetch to this object and tick + * off objects we are waiting for. */ + client_object_connection *object_conn, *next; + client_connection *client_conn; + HASH_FIND(fetch_hh, state->fetch_connections, &obj_id, sizeof(object_id), + object_conn); + plasma_reply reply = { + .object_ids = {obj_id}, .num_object_ids = 1, .has_object = 1}; + while (object_conn) { + next = object_conn->next; + client_conn = object_conn->client_conn; + if (!client_conn->is_wait) { + event_loop_remove_timer(state->loop, object_conn->timer); + send_client_reply(client_conn, &reply); + } else { + client_conn->num_return_objects -= 1; + client_conn->wait_reply->object_ids[client_conn->num_return_objects] = + obj_id; + if (client_conn->num_return_objects == 0) { + event_loop_remove_timer(loop, client_conn->timer_id); + return_from_wait(client_conn); + object_conn = next; + continue; + } + } + remove_object_connection(client_conn, object_conn); + object_conn = next; + } +} + void process_message(event_loop *loop, int client_sock, void *context, @@ -703,6 +832,10 @@ void process_message(event_loop *loop, LOG_DEBUG("Processing fetch"); process_fetch_requests(conn, req->num_object_ids, req->object_ids); break; + case PLASMA_WAIT: + LOG_DEBUG("Processing wait"); + process_wait_request(conn, req->num_object_ids, req->object_ids, + req->timeout, req->num_returns); case PLASMA_SEAL: LOG_DEBUG("Publishing to object table from DB client %d.", get_client_id(conn->manager_state->db)); diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index 368dfb05dd40..4d314f88e8ed 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -137,14 +137,47 @@ void process_fetch_request(client_connection *client_conn, object_id object_id); * * @param client_conn The connection context for the client that made the * request. - * @param object_id_count The number of object IDs requested. + * @param num_object_ids The number of object IDs requested. * @param object_ids[] The vector of object IDs requested. * @return Void. */ void process_fetch_requests(client_connection *client_conn, - int object_id_count, + int num_object_ids, object_id object_ids[]); +/** + * Process a wait request from a client. + * + * @param client_conn The connection context for the client that made the + * request. + * @param num_object_ids Number of object IDs wait is called on. + * @param object_ids Object IDs wait is called on. + * @param timeout Wait will time out and return after this number of + * milliseconds. + * @param num_returns Number of object IDs wait will return if it doesn't time + * out. + * @return Void. + */ +void process_wait_request(client_connection *client_conn, + int num_object_ids, + object_id object_ids[], + uint64_t timeout, + int num_returns); + +/** + * Callback that will be called when a new object becomes available. + * + * @param loop This is the event loop of the plasma manager. + * @param client_sock The connection to the plasma store. + * @param context Plasma manager state. + * @param events (unused). + * @return Void. + */ +void process_object_notification(event_loop *loop, + int client_sock, + void *context, + int events); + /** * Send the next request queued for the other plasma manager connected to the * socket "data_sock". This could be a request to either write object data or @@ -237,6 +270,7 @@ void request_transfer(object_id object_id, * * @param client_conn The client connection context. * @param object_id The object ID whose context we want to delete. + * @return Void. */ void remove_object_connection(client_connection *client_conn, client_object_connection *object_conn); @@ -260,9 +294,9 @@ client_connection *get_manager_connection(plasma_manager_state *state, * * @param conn The connection to the client who's sending the data. * @param buf The buffer to write the data into. - * @return An integer representing whether the client is done - * sending this object. 1 means that the client has - * sent all the data, 0 means there is more. + * @return An integer representing whether the client is done sending this + * object. 1 means that the client has sent all the data, 0 means there + * is more. */ int read_object_chunk(client_connection *conn, plasma_request_buffer *buf); diff --git a/src/plasma/test/manager_tests.c b/src/plasma/test/manager_tests.c index 0babf7e9bac3..c714cf465414 100644 --- a/src/plasma/test/manager_tests.c +++ b/src/plasma/test/manager_tests.c @@ -186,7 +186,8 @@ TEST request_transfer_timeout_test(void) { int manager_fd = get_manager_fd(local_mock->plasma_conn); int nbytes = recv(manager_fd, (uint8_t *) &reply, sizeof(reply), MSG_WAITALL); ASSERT_EQ(nbytes, sizeof(reply)); - ASSERT_EQ(memcmp(&oid, &reply.object_id, sizeof(object_id)), 0); + ASSERT_EQ(reply.num_object_ids, 1); + ASSERT_EQ(memcmp(&oid, &reply.object_ids, sizeof(object_id)), 0); ASSERT_EQ(reply.has_object, 0); /* Clean up. */ destroy_plasma_mock(remote_mock); diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 01953c377513..e4cc9ae90d36 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -10,6 +10,7 @@ import random import time import tempfile +import threading import plasma @@ -52,6 +53,11 @@ def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffe unit_test.assertEqual(client1.get_metadata(object_id)[:], client2.get_metadata(object_id)[:]) +# Check if the redis-server binary is present. +redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis-3.2.3/src/redis-server") +if not os.path.exists(redis_path): + raise Exception("You do not have the redis-server binary. Run `make test` in the plasma directory to get it.") + class TestPlasmaClient(unittest.TestCase): def setUp(self): @@ -226,17 +232,12 @@ def setUp(self): # Start a Redis server. redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis-3.2.3/src/redis-server") - self.redis_process = None - manager_redis_args = [] - if os.path.exists(redis_path): - redis_port = 6379 - with open(os.devnull, 'w') as FNULL: - self.redis_process = subprocess.Popen([redis_path, - "--port", str(redis_port)], - stdout=FNULL) - time.sleep(0.1) - manager_redis_args = ["-r", "{addr}:{port}".format(addr="127.0.0.1", - port=redis_port)] + redis_port = 6379 + with open(os.devnull, "w") as FNULL: + self.redis_process = subprocess.Popen([redis_path, + "--port", str(redis_port)], + stdout=FNULL) + time.sleep(0.1) # Start two PlasmaManagers. self.port1 = random.randint(10000, 50000) @@ -246,12 +247,16 @@ def setUp(self): "-s", store_name1, "-m", manager_name1, "-h", "127.0.0.1", - "-p", str(self.port1)] + manager_redis_args + "-p", str(self.port1), + "-r", "{addr}:{port}".format(addr="127.0.0.1", + port=redis_port)] plasma_manager_command2 = [plasma_manager_executable, "-s", store_name2, "-m", manager_name2, "-h", "127.0.0.1", - "-p", str(self.port2)] + manager_redis_args + "-p", str(self.port2), + "-r", "{addr}:{port}".format(addr="127.0.0.1", + port=redis_port)] if USE_VALGRIND: self.p4 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + plasma_manager_command1) @@ -283,8 +288,7 @@ def tearDown(self): self.p3.kill() self.p4.kill() self.p5.kill() - if self.redis_process: - self.redis_process.kill() + self.redis_process.kill() # def test_fetch(self): # if self.redis_process is None: @@ -341,6 +345,57 @@ def tearDown(self): # assert_get_object_equal(self, self.client2, self.client1, object_id2, # memory_buffer=memory_buffer2, metadata=metadata2) + def test_wait(self): + # Test timeout. + obj_id0 = random_object_id() + self.client1.wait([obj_id0], timeout=100, num_returns=1) + # If we get here, the test worked. + + # Test wait if local objects available. + obj_id1 = random_object_id() + self.client1.create(obj_id1, 1000) + self.client1.seal(obj_id1) + ready, waiting = self.client1.wait([obj_id1], timeout=100, num_returns=1) + self.assertEqual(len(ready), 1) + self.assertEqual(ready[0], obj_id1) + self.assertEqual(len(waiting), 0) + + # Test wait if only one object available and only one object waited for. + obj_id2 = random_object_id() + self.client1.create(obj_id2, 1000) + # Don't seal. + ready, waiting = self.client1.wait([obj_id2, obj_id1], timeout=100, num_returns=1) + self.assertEqual(len(ready), 1) + self.assertEqual(ready[0], obj_id1) + self.assertEqual(len(waiting), 1) + self.assertEqual(waiting[0], obj_id2) + + # Test wait if object is sealed later. + obj_id3 = random_object_id() + + def finish(): + self.client2.create(obj_id3, 1000) + self.client2.seal(obj_id3) + self.client2.transfer("127.0.0.1", self.port1, obj_id3) + + t = threading.Timer(0.1, finish) + t.start() + ready, waiting = self.client1.wait([obj_id3, obj_id2, obj_id1], timeout=500, num_returns=2) + self.assertEqual(len(ready), 2) + self.assertTrue((ready[0] == obj_id1 and ready[1] == obj_id3) or (ready[0] == obj_id3 and ready[1] == obj_id1)) + self.assertEqual(len(waiting), 1) + self.assertTrue(waiting[0] == obj_id2) + + # Test if the appropriate number of objects is shown if some objects are not ready + ready, wait = self.client1.wait([obj_id3, obj_id2, obj_id1], 100, 3) + self.assertEqual(len(ready), 2) + self.assertTrue((ready[0] == obj_id1 and ready[1] == obj_id3) or (ready[0] == obj_id3 and ready[1] == obj_id1)) + self.assertEqual(len(waiting), 1) + self.assertTrue(waiting[0] == obj_id2) + + # Don't forget to seal obj_id2. + self.client1.seal(obj_id2) + def test_transfer(self): for _ in range(100): # Create an object.