Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ enum object_status { OBJECT_NOT_FOUND = 0, OBJECT_FOUND = 1 };

typedef enum { OPEN, SEALED } object_state;

typedef enum { NONE, MEMORY, DISK } object_residence;

enum plasma_message_type {
/** Create a new object. */
PLASMA_CREATE = 128,
Expand Down Expand Up @@ -135,6 +137,8 @@ typedef struct {
UT_array *clients;
/** The state of the object, e.g., whether it is open or sealed. */
object_state state;
/** Whether the object is in memory or on disk **/
object_residence res;
} object_table_entry;

/** The plasma store information that is exposed to the eviction policy. */
Expand Down
133 changes: 124 additions & 9 deletions src/plasma/plasma_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
* just enough to store and SHA1 hash) to memory mapped files. */

#include <assert.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <getopt.h>
Expand All @@ -33,6 +35,8 @@
#include "malloc.h"
#include "plasma_store.h"

const char *PERSIST_PATH = "/Users/belugajustin/Downloads/misaka/persisted/";

void *dlmalloc(size_t);
void dlfree(void *);

Expand Down Expand Up @@ -154,7 +158,15 @@ void create_object(client *client_context,
require_space(plasma_state->eviction_state, plasma_state->plasma_store_info,
data_size + metadata_size, &num_objects_to_evict,
&objects_to_evict);
/* Persist objects first */
if (num_objects_to_evict > 0) {
for (int i = 0; i < num_objects_to_evict; ++i) {
persist_object(client_context, objects_to_evict[i]);
}
}
/* Actually evict */
remove_objects(plasma_state, num_objects_to_evict, objects_to_evict);

/* Allocate space for the new object */
uint8_t *pointer = dlmalloc(data_size + metadata_size);
int fd;
Expand All @@ -173,6 +185,7 @@ void create_object(client *client_context,
entry->map_size = map_size;
entry->offset = offset;
entry->state = OPEN;
entry->res = MEMORY;
utarray_new(entry->clients, &client_icd);
HASH_ADD(handle, plasma_state->plasma_store_info->objects, object_id,
sizeof(object_id), entry);
Expand All @@ -194,13 +207,13 @@ void create_object(client *client_context,
/* Get an object from the hash table. */
int get_object(client *client_context,
int conn,
object_id object_id,
object_id obj_id,
plasma_object *result) {
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id,
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &obj_id,
sizeof(object_id), entry);
if (entry && entry->state == SEALED) {
if (entry && entry->state == SEALED && entry->res == MEMORY) {
result->handle.store_fd = entry->fd;
result->handle.mmap_size = entry->map_size;
result->data_offset = entry->offset;
Expand All @@ -211,16 +224,42 @@ int get_object(client *client_context,
* where entry == NULL, this will be called from seal_object. */
add_client_to_object_clients(entry, client_context);
return OBJECT_FOUND;
} else if (entry && entry->res == DISK) {
/* The object exists, we just have to fetch it from disk again */
CHECKM(entry->state == SEALED, "Only sealed objects can persist");

int64_t num_objects_to_evict;
object_id *objects_to_evict;
require_space(plasma_state->eviction_state, plasma_state->plasma_store_info,
entry->info.data_size + entry->info.metadata_size, &num_objects_to_evict,
&objects_to_evict);
/* Persist objects first */
if (num_objects_to_evict > 0) {
for (int i = 0; i < num_objects_to_evict; ++i) {
persist_object(client_context, objects_to_evict[i]);
}
}
/* Actually evict */
remove_objects(plasma_state, num_objects_to_evict, objects_to_evict);

//have to get space for the object again
entry->pointer = dlmalloc(entry->info.data_size + entry->info.metadata_size);
get_persisted_object(client_context, obj_id);
entry->res = MEMORY;
/* Notify the LRU cache that this object is back in business */
object_created(plasma_state->eviction_state, plasma_state->plasma_store_info,
obj_id);
return get_object(client_context, conn, obj_id, result);
} else {
object_notify_entry *notify_entry;
LOG_DEBUG("object not in hash table of sealed objects");
HASH_FIND(handle, plasma_state->objects_notify, &object_id,
sizeof(object_id), notify_entry);
HASH_FIND(handle, plasma_state->objects_notify, &obj_id,
sizeof(obj_id), notify_entry);
if (!notify_entry) {
notify_entry = malloc(sizeof(object_notify_entry));
memset(notify_entry, 0, sizeof(object_notify_entry));
utarray_new(notify_entry->waiting_clients, &client_icd);
memcpy(&notify_entry->object_id, &object_id, sizeof(object_id));
memcpy(&notify_entry->object_id, &obj_id, sizeof(object_id));
HASH_ADD(handle, plasma_state->objects_notify, object_id,
sizeof(object_id), notify_entry);
}
Expand Down Expand Up @@ -324,8 +363,7 @@ void seal_object(client *client_context, object_id object_id) {
}
}

/* Delete an object that has been created in the hash table. This should only
* be called on objects that are returned by the eviction policy to evict. */
/* Delete an object that has been created in the hash table. */
void delete_object(plasma_store_state *plasma_state, object_id object_id) {
LOG_DEBUG("deleting object");
object_table_entry *entry;
Expand All @@ -346,19 +384,96 @@ void delete_object(plasma_store_state *plasma_state, object_id object_id) {
free(entry);
}

/* Evict an object that has been created in the hash table. This should only
* be called on objects that have been returned by the eviction policy to evict. */
void evict_object(plasma_store_state *plasma_state, object_id object_id) {
LOG_DEBUG("evicting object");
object_table_entry *entry;
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id,
sizeof(object_id), entry);
CHECKM(entry != NULL, "To evict an object it must be in the object table.");
CHECKM(entry->state == SEALED,
"To evict an object it must have been sealed.");
CHECKM(utarray_len(entry->clients) == 0,
"To evict an object, there must be no clients currently using it.");
CHECKM(entry->res == MEMORY,
"To evict an object, it must be in memory.")
uint8_t *pointer = entry->pointer;
//HASH_DELETE(handle, plasma_state->plasma_store_info->objects, entry);
/* Free data */
dlfree(pointer);
entry->res = DISK;
//utarray_free(entry->clients);
//free(entry);
}

/* Should only be called for eviction, not deletion */
void remove_objects(plasma_store_state *plasma_state,
int64_t num_objects_to_evict,
object_id *objects_to_evict) {
if (num_objects_to_evict > 0) {
for (int i = 0; i < num_objects_to_evict; ++i) {
delete_object(plasma_state, objects_to_evict[i]);
evict_object(plasma_state, objects_to_evict[i]);
}
/* Free the array of objects to evict. This array was originally allocated
* by the eviction policy. */
free(objects_to_evict);
}
}

/* Convert object_id to a unique file path on-disk */
char *object_id_to_persist_path(object_id object_id) {
// 2x for up to 2-digit-hex
size_t path_size = strlen(PERSIST_PATH) + 2*UNIQUE_ID_SIZE + 1;
char *file_path = malloc(path_size);
strcpy(file_path, PERSIST_PATH);
for (int i = strlen(PERSIST_PATH); i < path_size - 1; i+=2) {
int j = (i-strlen(PERSIST_PATH)) / 2;
sprintf(&file_path[i], "%hhx", (unsigned int)(object_id.id[j] & 0xFF));
}
file_path[path_size - 1] = '\0';
return file_path;
}

/* Write a sealed object resident in memory to its own file on disk. */
void persist_object(client *client_context, object_id object_id) {
LOG_DEBUG("persisting object");
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id,
sizeof(object_id), entry);
CHECKM(entry != NULL , "To persist an object it must exist.");
CHECKM(entry->state == SEALED, "To persist an object it must be sealed.");
/* Check if the file containing the object already exists. */
for(int i = 0; i < 20; i++){}
char *file_path = object_id_to_persist_path(object_id);
struct stat buffer;
//CHECKM(stat(file_path, &buffer) != 0, "Cannot persist an object twice");
/* Create the file if it does not exist and write object. */
int fd = open(file_path, O_RDWR|O_CREAT, S_IRUSR | S_IWUSR);
CHECKM(fd > 0, "Something went wrong while creating persistence file");
write(fd, entry->pointer, entry->info.data_size + entry->info.metadata_size);
close(fd);
free(file_path);
}

/* Read a persisted object from disk back into memory. */
void get_persisted_object(client *client_context, object_id object_id) {
LOG_DEBUG("reading persisted object");
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id,
sizeof(object_id), entry);
CHECKM(entry != NULL, "There must be an object entry to read it into.");
/* Read object into allocated space. */
char *file_path = object_id_to_persist_path(object_id);
int fd = open(file_path, O_RDONLY);
free(file_path);
CHECKM(fd > 0, "Cannot read an object that was never persisted.");
read(fd, entry->pointer, entry->info.data_size + entry->info.metadata_size);
close(fd);
}

/* Send more notifications to a subscriber. */
void send_notifications(event_loop *loop,
int client_sock,
Expand Down
6 changes: 6 additions & 0 deletions src/plasma/plasma_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,10 @@ void remove_objects(plasma_store_state *plasma_state,
int64_t num_objects_to_evict,
object_id *objects_to_evict);

/* Comment when finalized */
void persist_object(client *client_context,
object_id object_id);
void get_persisted_object(client *client_context,
object_id object_id);

#endif /* PLASMA_STORE_H */
40 changes: 40 additions & 0 deletions src/plasma/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def setUp(self):
time.sleep(2.0)
else:
self.p = subprocess.Popen(command)
time.sleep(2.0)
# Connect to Plasma.
self.plasma_client = plasma.PlasmaClient(store_name, None, 64)
# For the eviction test
Expand Down Expand Up @@ -464,6 +465,45 @@ def test_stresstest(self):

print("it took", b, "seconds to put and transfer the objects")

def test_persist_test(self):
a = time.time()
object_ids = []
for i in range(10001):
object_id = random_object_id()
object_ids.append(object_id)
buf = self.client1.create(object_id, 100000)
temp = 0
buf[0] = object_id[0]

self.client1.seal(object_id)
print("Now accessing everything")
for i in range(len(object_ids)):
buf = self.client1.get(object_ids[len(object_ids) - 1 - i])
self.assertEqual(buf[0], object_ids[len(object_ids) - 1 - i][0])
b = time.time() - a

print("it took", b, "seconds to put and transfer the objects")

def test_LRU_thrash_test(self):
a = time.time()
object_ids = []
for i in range(10001):
object_id = random_object_id()
object_ids.append(object_id)
buf = self.client1.create(object_id, 100000)
temp = 0
buf[0] = object_id[0]

self.client1.seal(object_id)
print("Now accessing everything")

for i in range(0, len(object_ids)):
buf = self.client1.get(object_ids[i])
self.assertEqual(buf[0], object_ids[i][0])
b = time.time() - a

print("it took", b, "seconds to put and transfer the objects")

if __name__ == "__main__":
if len(sys.argv) > 1:
# pop the argument so we don't mess with unittest's own argument parser
Expand Down