Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when building images, do chunk compression in worker thread pool #196

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ foreach ident : [
['copy_file_range', '''#define _GNU_SOURCE
#include <sys/syscall.h>
#include <unistd.h>'''],
['reallocarray', '''#include <malloc.h>'''],
]
have = cc.has_function(ident[0], args : '-D_GNU_SOURCE', prefix : ident[1])
conf.set10('HAVE_' + ident[0].to_upper(), have)
Expand Down
39 changes: 39 additions & 0 deletions src/affinity-count.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "affinity-count.h"

#include <errno.h>
#include <sched.h>
#include <sys/types.h>

int cpus_in_affinity_mask(void) {
size_t n = 16;
int r;

for (;;) {
cpu_set_t *c;

c = CPU_ALLOC(n);
if (!c)
return -ENOMEM;

if (sched_getaffinity(0, CPU_ALLOC_SIZE(n), c) >= 0) {
int k;

k = CPU_COUNT_S(CPU_ALLOC_SIZE(n), c);
CPU_FREE(c);

if (k <= 0)
return -EINVAL;

return k;
}

r = -errno;
CPU_FREE(c);

if (r != -EINVAL)
return r;
if (n*2 < n)
return -ENOMEM;
n *= 2;
}
}
8 changes: 8 additions & 0 deletions src/affinity-count.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* SPDX-License-Identifier: LGPL-2.1+ */

#ifndef fooaffinitycounthfoo
#define fooaffinitycounthfoo

int cpus_in_affinity_mask(void);

#endif
242 changes: 233 additions & 9 deletions src/castore.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
#include <dirent.h>
#include <fcntl.h>
#include <lzma.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>

#include "affinity-count.h"
#include "castore.h"
#include "def.h"
#include "dirent-util.h"
Expand All @@ -19,6 +22,8 @@
/* #undef EBADMSG */
/* #define EBADMSG __LINE__ */

#define WORKER_THREADS_MAX 64U

struct CaStore {
char *root;
bool is_cache:1;
Expand All @@ -34,6 +39,10 @@ struct CaStore {

uint64_t n_requests;
uint64_t n_request_bytes;

pthread_t worker_threads[WORKER_THREADS_MAX];
size_t n_worker_threads, n_worker_threads_max;
int worker_thread_socket[2];
};

struct CaStoreIterator {
Expand All @@ -47,28 +56,36 @@ struct CaStoreIterator {
CaStore* ca_store_new(void) {
CaStore *store;

store = new0(CaStore, 1);
store = new(CaStore, 1);
if (!store)
return NULL;

store->digest_type = _CA_DIGEST_TYPE_INVALID;

store->compression = CA_CHUNK_COMPRESSED;
store->compression_type = CA_COMPRESSION_DEFAULT;
*store = (CaStore) {
.digest_type = _CA_DIGEST_TYPE_INVALID,
.compression = CA_CHUNK_COMPRESSED,
.compression_type = CA_COMPRESSION_DEFAULT,
.worker_thread_socket = { -1, -1},
.n_worker_threads_max = (size_t) -1,
};

return store;
}

CaStore *ca_store_new_cache(void) {
CaStore *s;

s = new0(CaStore, 1);
s = new(CaStore, 1);
if (!s)
return NULL;

s->is_cache = true;
s->compression = CA_CHUNK_AS_IS;
s->compression_type = CA_COMPRESSION_DEFAULT;
*s = (CaStore) {
.is_cache = true,
.compression = CA_CHUNK_AS_IS,
.compression_type = CA_COMPRESSION_DEFAULT,

.worker_thread_socket = { -1, -1 },
.n_worker_threads_max = (size_t) -1,
};

return s;
}
Expand All @@ -77,6 +94,8 @@ CaStore* ca_store_unref(CaStore *store) {
if (!store)
return NULL;

(void) ca_store_finalize(store);

if (store->is_cache && store->root)
(void) rm_rf(store->root, REMOVE_ROOT|REMOVE_PHYSICAL);

Expand Down Expand Up @@ -240,6 +259,203 @@ int ca_store_has(CaStore *store, const CaChunkID *chunk_id) {
return ca_chunk_file_test(AT_FDCWD, store->root, chunk_id);
}

struct queue_entry {
CaChunkID chunk_id;
CaChunkCompression effective_compression;
void *data;
size_t size;
};

static void* worker_thread(void *p) {
CaStore *store = p;
int ret = 0, r;

assert(store);
assert(store->worker_thread_socket[1] >= 0);

(void) pthread_setname_np(pthread_self(), "worker-thread");

for (;;) {
struct queue_entry e;
ssize_t n;

n = recv(store->worker_thread_socket[0], &e, sizeof(e), 0);
if (n < 0) {
if (errno == EINTR)
continue;

log_debug_errno(errno, "Failed to read from thread pool socket: %m");
return INT_TO_PTR(errno);
}
if (n == 0) /* Either EOF or zero-sized datagram (Linux doesn't really allow us to
* distinguish that), we take both as an indication to exit the worker thread. */
break;

assert(n == sizeof(e));

r = ca_chunk_file_save(
AT_FDCWD, store->root,
&e.chunk_id,
e.effective_compression, store->compression,
store->compression_type,
e.data, e.size);
free(e.data);

if (r < 0) {
log_debug_errno(r, "Failed to store chunk in store: %m");

if (r != -EEXIST)
ret = r;
}
}

return INT_TO_PTR(ret);
}

static int determine_worker_threads_max(CaStore *store) {
const char *e;
int r;

assert(store);

if (store->n_worker_threads_max != (size_t) -1)
return 0;

e = getenv("CASYNC_WORKER_THREADS");
if (e) {
unsigned u;

r = safe_atou(e, &u);
if (r < 0)
log_debug_errno(r, "Failed to parse $CASYNC_WORKER_THREADS, ignoring: %s", e);
else if (u > WORKER_THREADS_MAX) {
log_debug("$CASYNC_WORKER_THREADS out of range, clamping to %zu: %s", (size_t) WORKER_THREADS_MAX, e);
store->n_worker_threads_max = WORKER_THREADS_MAX;
} else {
store->n_worker_threads_max = u;
return 0;
}
}

r = cpus_in_affinity_mask();
if (r < 0)
return log_debug_errno(r, "Failed to determine CPUs in affinity mask: %m");

store->n_worker_threads_max = MIN((size_t) r, WORKER_THREADS_MAX);
return 0;
}

static int start_worker_thread(CaStore *store) {
int r;

assert(store);

r = determine_worker_threads_max(store);
if (r < 0)
return r;

if (store->n_worker_threads >= (size_t) store->n_worker_threads_max)
return 0;

if (store->worker_thread_socket[0] < 0)
if (socketpair(AF_UNIX, SOCK_SEQPACKET|SOCK_CLOEXEC, 0, store->worker_thread_socket) < 0)
return -errno;

r = pthread_create(store->worker_threads + store->n_worker_threads, NULL, worker_thread, store);
if (r != 0)
return -r;

store->n_worker_threads++;

log_debug("Started store worker thread %zu.", store->n_worker_threads);
return 0;
}

static int submit_to_worker_thread(
CaStore *store,
const CaChunkID *chunkid,
CaChunkCompression effective_compression,
const void *p,
uint64_t l) {

struct queue_entry e;
void *copy = NULL;
ssize_t n;
int r;

assert(store);

/* If there's no need to compress/decompress, then let's do things client side, since the operation
* is likely IO bound, not CPU bound */
if (store->compression == CA_CHUNK_AS_IS ||
store->compression == effective_compression)
return -ENOANO;

/* Before we submit the chunk for compression, let's see if it exists already. If so, let's return
* -EEXIST right away, so that the caller can count reused chunks. Note that this is a bit racy
* currently, as submitted but not yet processed chunks are not considered. */
r = ca_store_has(store, chunkid);
if (r < 0)
return r;
if (r > 0)
return -EEXIST;

/* Let's start a new worker thread each time we have a new job to process, until we reached all
* worker threads we need */
(void) start_worker_thread(store);

/* If there are no worker threads, do things client side */
if (store->n_worker_threads <= 0 ||
store->worker_thread_socket[1] < 0)
return -ENETDOWN;

copy = memdup(p, l);
if (!copy)
return -ENOMEM;

e = (struct queue_entry) {
.chunk_id = *chunkid,
.effective_compression = effective_compression,
.data = copy,
.size = l,
};

n = send(store->worker_thread_socket[1], &e, sizeof(e), 0);
if (n < 0) {
free(copy);
return -errno;
}

assert(n == sizeof(e));
return 0;
}

int ca_store_finalize(CaStore *store) {
int ret = 0, r;
size_t i;

assert(store);

/* Trigger EOF in all worker threads */
store->worker_thread_socket[1] = safe_close(store->worker_thread_socket[1]);

for (i = 0; i < store->n_worker_threads; i++) {
void *p;
r = pthread_join(store->worker_threads[i], &p);
if (r != 0)
ret = -r;
if (p != NULL)
ret = -PTR_TO_INT(p);
}

store->n_worker_threads = 0;
store->worker_thread_socket[0] = safe_close(store->worker_thread_socket[0]);

/* Propagate errors we ran into while processing store requests. This is useful for callers to
* determine whether the worker threads ran into any problems. */
return ret;
}

int ca_store_put(
CaStore *store,
const CaChunkID *chunk_id,
Expand Down Expand Up @@ -273,6 +489,14 @@ int ca_store_put(
store->mkdir_done = true;
}

r = submit_to_worker_thread(
store,
chunk_id,
effective_compression,
data, size);
if (r >= 0)
return 0;

return ca_chunk_file_save(
AT_FDCWD, store->root,
chunk_id,
Expand Down
2 changes: 2 additions & 0 deletions src/castore.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ int ca_store_get(CaStore *store, const CaChunkID *chunk_id, CaChunkCompression d
int ca_store_has(CaStore *store, const CaChunkID *chunk_id);
int ca_store_put(CaStore *store, const CaChunkID *chunk_id, CaChunkCompression effective_compression, const void *data, uint64_t size);

int ca_store_finalize(CaStore *store);

int ca_store_get_requests(CaStore *s, uint64_t *ret);
int ca_store_get_request_bytes(CaStore *s, uint64_t *ret);

Expand Down
Loading