diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index e452dab31371..ac9f8493af45 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -46,6 +46,8 @@ enum object_status { OBJECT_NOT_FOUND = 0, OBJECT_FOUND = 1 }; typedef enum { OPEN, SEALED } object_state; +typedef enum { NONE, MEMORY, DISK } object_residence; + enum plasma_message_type { /** Create a new object. */ PLASMA_CREATE = 128, @@ -135,6 +137,8 @@ typedef struct { UT_array *clients; /** The state of the object, e.g., whether it is open or sealed. */ object_state state; + /** Whether the object is in memory or on disk **/ + object_residence res; } object_table_entry; /** The plasma store information that is exposed to the eviction policy. */ diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index 8aae73a725bf..16b9ab99d287 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -10,11 +10,13 @@ * just enough to store and SHA1 hash) to memory mapped files. */ #include +#include #include #include #include #include #include +#include #include #include #include @@ -33,6 +35,8 @@ #include "malloc.h" #include "plasma_store.h" +const char *PERSIST_PATH = "/Users/belugajustin/Downloads/misaka/persisted/"; + void *dlmalloc(size_t); void dlfree(void *); @@ -154,7 +158,15 @@ void create_object(client *client_context, require_space(plasma_state->eviction_state, plasma_state->plasma_store_info, data_size + metadata_size, &num_objects_to_evict, &objects_to_evict); + /* Persist objects first */ + if (num_objects_to_evict > 0) { + for (int i = 0; i < num_objects_to_evict; ++i) { + persist_object(client_context, objects_to_evict[i]); + } + } + /* Actually evict */ remove_objects(plasma_state, num_objects_to_evict, objects_to_evict); + /* Allocate space for the new object */ uint8_t *pointer = dlmalloc(data_size + metadata_size); int fd; @@ -173,6 +185,7 @@ void create_object(client *client_context, entry->map_size = map_size; entry->offset = offset; entry->state = OPEN; + entry->res = MEMORY; utarray_new(entry->clients, &client_icd); HASH_ADD(handle, plasma_state->plasma_store_info->objects, object_id, sizeof(object_id), entry); @@ -194,13 +207,13 @@ void create_object(client *client_context, /* Get an object from the hash table. */ int get_object(client *client_context, int conn, - object_id object_id, + object_id obj_id, plasma_object *result) { plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; - HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &obj_id, sizeof(object_id), entry); - if (entry && entry->state == SEALED) { + if (entry && entry->state == SEALED && entry->res == MEMORY) { result->handle.store_fd = entry->fd; result->handle.mmap_size = entry->map_size; result->data_offset = entry->offset; @@ -211,16 +224,42 @@ int get_object(client *client_context, * where entry == NULL, this will be called from seal_object. */ add_client_to_object_clients(entry, client_context); return OBJECT_FOUND; + } else if (entry && entry->res == DISK) { + /* The object exists, we just have to fetch it from disk again */ + CHECKM(entry->state == SEALED, "Only sealed objects can persist"); + + int64_t num_objects_to_evict; + object_id *objects_to_evict; + require_space(plasma_state->eviction_state, plasma_state->plasma_store_info, + entry->info.data_size + entry->info.metadata_size, &num_objects_to_evict, + &objects_to_evict); + /* Persist objects first */ + if (num_objects_to_evict > 0) { + for (int i = 0; i < num_objects_to_evict; ++i) { + persist_object(client_context, objects_to_evict[i]); + } + } + /* Actually evict */ + remove_objects(plasma_state, num_objects_to_evict, objects_to_evict); + + //have to get space for the object again + entry->pointer = dlmalloc(entry->info.data_size + entry->info.metadata_size); + get_persisted_object(client_context, obj_id); + entry->res = MEMORY; + /* Notify the LRU cache that this object is back in business */ + object_created(plasma_state->eviction_state, plasma_state->plasma_store_info, + obj_id); + return get_object(client_context, conn, obj_id, result); } else { object_notify_entry *notify_entry; LOG_DEBUG("object not in hash table of sealed objects"); - HASH_FIND(handle, plasma_state->objects_notify, &object_id, - sizeof(object_id), notify_entry); + HASH_FIND(handle, plasma_state->objects_notify, &obj_id, + sizeof(obj_id), notify_entry); if (!notify_entry) { notify_entry = malloc(sizeof(object_notify_entry)); memset(notify_entry, 0, sizeof(object_notify_entry)); utarray_new(notify_entry->waiting_clients, &client_icd); - memcpy(¬ify_entry->object_id, &object_id, sizeof(object_id)); + memcpy(¬ify_entry->object_id, &obj_id, sizeof(object_id)); HASH_ADD(handle, plasma_state->objects_notify, object_id, sizeof(object_id), notify_entry); } @@ -324,8 +363,7 @@ void seal_object(client *client_context, object_id object_id) { } } -/* Delete an object that has been created in the hash table. This should only - * be called on objects that are returned by the eviction policy to evict. */ +/* Delete an object that has been created in the hash table. */ void delete_object(plasma_store_state *plasma_state, object_id object_id) { LOG_DEBUG("deleting object"); object_table_entry *entry; @@ -346,12 +384,36 @@ void delete_object(plasma_store_state *plasma_state, object_id object_id) { free(entry); } +/* Evict an object that has been created in the hash table. This should only + * be called on objects that have been returned by the eviction policy to evict. */ +void evict_object(plasma_store_state *plasma_state, object_id object_id) { + LOG_DEBUG("evicting object"); + object_table_entry *entry; + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + sizeof(object_id), entry); + CHECKM(entry != NULL, "To evict an object it must be in the object table."); + CHECKM(entry->state == SEALED, + "To evict an object it must have been sealed."); + CHECKM(utarray_len(entry->clients) == 0, + "To evict an object, there must be no clients currently using it."); + CHECKM(entry->res == MEMORY, + "To evict an object, it must be in memory.") + uint8_t *pointer = entry->pointer; + //HASH_DELETE(handle, plasma_state->plasma_store_info->objects, entry); + /* Free data */ + dlfree(pointer); + entry->res = DISK; + //utarray_free(entry->clients); + //free(entry); +} + +/* Should only be called for eviction, not deletion */ void remove_objects(plasma_store_state *plasma_state, int64_t num_objects_to_evict, object_id *objects_to_evict) { if (num_objects_to_evict > 0) { for (int i = 0; i < num_objects_to_evict; ++i) { - delete_object(plasma_state, objects_to_evict[i]); + evict_object(plasma_state, objects_to_evict[i]); } /* Free the array of objects to evict. This array was originally allocated * by the eviction policy. */ @@ -359,6 +421,59 @@ void remove_objects(plasma_store_state *plasma_state, } } +/* Convert object_id to a unique file path on-disk */ +char *object_id_to_persist_path(object_id object_id) { + // 2x for up to 2-digit-hex + size_t path_size = strlen(PERSIST_PATH) + 2*UNIQUE_ID_SIZE + 1; + char *file_path = malloc(path_size); + strcpy(file_path, PERSIST_PATH); + for (int i = strlen(PERSIST_PATH); i < path_size - 1; i+=2) { + int j = (i-strlen(PERSIST_PATH)) / 2; + sprintf(&file_path[i], "%hhx", (unsigned int)(object_id.id[j] & 0xFF)); + } + file_path[path_size - 1] = '\0'; + return file_path; +} + +/* Write a sealed object resident in memory to its own file on disk. */ +void persist_object(client *client_context, object_id object_id) { + LOG_DEBUG("persisting object"); + plasma_store_state *plasma_state = client_context->plasma_state; + object_table_entry *entry; + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + sizeof(object_id), entry); + CHECKM(entry != NULL , "To persist an object it must exist."); + CHECKM(entry->state == SEALED, "To persist an object it must be sealed."); + /* Check if the file containing the object already exists. */ + for(int i = 0; i < 20; i++){} + char *file_path = object_id_to_persist_path(object_id); + struct stat buffer; + //CHECKM(stat(file_path, &buffer) != 0, "Cannot persist an object twice"); + /* Create the file if it does not exist and write object. */ + int fd = open(file_path, O_RDWR|O_CREAT, S_IRUSR | S_IWUSR); + CHECKM(fd > 0, "Something went wrong while creating persistence file"); + write(fd, entry->pointer, entry->info.data_size + entry->info.metadata_size); + close(fd); + free(file_path); +} + +/* Read a persisted object from disk back into memory. */ +void get_persisted_object(client *client_context, object_id object_id) { + LOG_DEBUG("reading persisted object"); + plasma_store_state *plasma_state = client_context->plasma_state; + object_table_entry *entry; + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + sizeof(object_id), entry); + CHECKM(entry != NULL, "There must be an object entry to read it into."); + /* Read object into allocated space. */ + char *file_path = object_id_to_persist_path(object_id); + int fd = open(file_path, O_RDONLY); + free(file_path); + CHECKM(fd > 0, "Cannot read an object that was never persisted."); + read(fd, entry->pointer, entry->info.data_size + entry->info.metadata_size); + close(fd); +} + /* Send more notifications to a subscriber. */ void send_notifications(event_loop *loop, int client_sock, diff --git a/src/plasma/plasma_store.h b/src/plasma/plasma_store.h index 235c6c171137..4b02727df818 100644 --- a/src/plasma/plasma_store.h +++ b/src/plasma/plasma_store.h @@ -89,4 +89,10 @@ void remove_objects(plasma_store_state *plasma_state, int64_t num_objects_to_evict, object_id *objects_to_evict); +/* Comment when finalized */ +void persist_object(client *client_context, + object_id object_id); +void get_persisted_object(client *client_context, + object_id object_id); + #endif /* PLASMA_STORE_H */ diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 9c0d01c12e82..94acb741a80f 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -71,6 +71,7 @@ def setUp(self): time.sleep(2.0) else: self.p = subprocess.Popen(command) + time.sleep(2.0) # Connect to Plasma. self.plasma_client = plasma.PlasmaClient(store_name, None, 64) # For the eviction test @@ -464,6 +465,45 @@ def test_stresstest(self): print("it took", b, "seconds to put and transfer the objects") + def test_persist_test(self): + a = time.time() + object_ids = [] + for i in range(10001): + object_id = random_object_id() + object_ids.append(object_id) + buf = self.client1.create(object_id, 100000) + temp = 0 + buf[0] = object_id[0] + + self.client1.seal(object_id) + print("Now accessing everything") + for i in range(len(object_ids)): + buf = self.client1.get(object_ids[len(object_ids) - 1 - i]) + self.assertEqual(buf[0], object_ids[len(object_ids) - 1 - i][0]) + b = time.time() - a + + print("it took", b, "seconds to put and transfer the objects") + + def test_LRU_thrash_test(self): + a = time.time() + object_ids = [] + for i in range(10001): + object_id = random_object_id() + object_ids.append(object_id) + buf = self.client1.create(object_id, 100000) + temp = 0 + buf[0] = object_id[0] + + self.client1.seal(object_id) + print("Now accessing everything") + + for i in range(0, len(object_ids)): + buf = self.client1.get(object_ids[i]) + self.assertEqual(buf[0], object_ids[i][0]) + b = time.time() - a + + print("it took", b, "seconds to put and transfer the objects") + if __name__ == "__main__": if len(sys.argv) > 1: # pop the argument so we don't mess with unittest's own argument parser