Skip to content

Commit

Permalink
sheet: Allow index to be used before it is complete
Browse files Browse the repository at this point in the history
Allows the index to be used in a thread safe way so that entries can
be read in the main thread while more are being added in the worker thread.
  • Loading branch information
richiejp committed Dec 5, 2024
1 parent d21fcb6 commit aa43435
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 54 deletions.
4 changes: 0 additions & 4 deletions app/sheet.c
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,6 @@ int ZSV_MAIN_FUNC(ZSV_COMMAND)(int argc, const char *argv[], struct zsv_opts *op
pthread_mutex_lock(&ub->mutex);
if (ub->status)
zsvsheet_priv_set_status(&display_dims, 1, ub->status);
if (ub->write_progressed) {
handler_state.display_info.update_buffer = true;
ub->write_progressed = 0;
}
if (ub->index_ready && ub->dimensions.row_count != ub->index->row_count + 1) {
ub->dimensions.row_count = ub->index->row_count + 1;
handler_state.display_info.update_buffer = true;
Expand Down
13 changes: 9 additions & 4 deletions app/sheet/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,28 @@ enum zsv_index_status build_memory_index(struct zsvsheet_index_opts *optsp) {
if (!ixr.ix)
goto out;

optsp->uib->index = ixr.ix;

char cancelled = 0;
while (!cancelled && (zst = zsv_parse_more(ixr.parser)) == zsv_status_ok) {
pthread_mutex_lock(&optsp->uib->mutex);
if (optsp->uib->worker_cancelled) {
cancelled = 1;
zst = zsv_status_cancelled;
}
zsv_index_commit_rows(ixr.ix);
optsp->uib->index_ready = 1;
pthread_mutex_unlock(&optsp->uib->mutex);
}

zsv_finish(ixr.parser);
pthread_mutex_lock(&optsp->uib->mutex);
zsv_index_commit_rows(ixr.ix);
optsp->uib->index_ready = 1;
pthread_mutex_unlock(&optsp->uib->mutex);

if (zst == zsv_status_no_more_input || zst == zsv_status_cancelled) {
if (zst == zsv_status_no_more_input || zst == zsv_status_cancelled)
ret = zsv_index_status_ok;
optsp->uib->index = ixr.ix;
} else
zsv_index_delete(ixr.ix);

out:
if (ixr.parser)
Expand Down
3 changes: 1 addition & 2 deletions app/sheet/read-data.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ static int read_data(struct zsvsheet_ui_buffer **uibufferp, // a new zsvsheet_
return 0;

pthread_mutex_lock(&uibuff->mutex);
char need_index = !uibuff->index_started && (!uibuff->write_in_progress || uibuff->write_done);
char need_index = !uibuff->index_started && !uibuff->write_in_progress;
pthread_mutex_unlock(&uibuff->mutex);

if (need_index) {
Expand Down Expand Up @@ -253,7 +253,6 @@ static void *get_data_index(void *gdi) {
}

pthread_mutex_lock(mutexp);
uib->index_ready = 1;
uib->status = old_ui_status;
uib->ixopts = NULL;
pthread_mutex_unlock(mutexp);
Expand Down
41 changes: 25 additions & 16 deletions app/sheet/transformation.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct zsvsheet_transformation {
FILE *output_stream;
unsigned char *output_buffer;
int output_fileno;
size_t output_count;
char writer_wrote;
struct zsvsheet_transformation_opts opts;
void *user_context;

Expand All @@ -28,7 +28,7 @@ static size_t transformation_write(const void *restrict ptr, size_t size, size_t
struct zsvsheet_transformation *trn = stream;

const size_t count = fwrite(ptr, size, nitems, trn->output_stream);
trn->output_count += count;
trn->writer_wrote = count > 0;

return count > 0 ? count : 0;
}
Expand Down Expand Up @@ -134,14 +134,17 @@ static void *zsvsheet_run_buffer_transformation(void *arg) {
zsv_parser parser = trn->parser;
pthread_mutex_t *mutex = &uib->mutex;
enum zsv_status zst;
char *default_status = trn->default_status;

size_t c = trn->output_count;
char cancelled = 0;
while (!cancelled && (zst = zsv_parse_more(parser)) == zsv_status_ok) {
pthread_mutex_lock(mutex);
cancelled = uib->worker_cancelled;
if (trn->output_count != c)
uib->write_progressed = 1;
if (trn->writer_wrote) {
trn->writer_wrote = 0;
zsv_index_commit_rows(uib->index);
uib->index_ready = 1;
}
pthread_mutex_unlock(mutex);
}

Expand All @@ -151,20 +154,22 @@ static void *zsvsheet_run_buffer_transformation(void *arg) {
if (trn->on_done)
trn->on_done(trn);

if (trn->user_context)
free(trn->user_context);

zsvsheet_transformation_delete(trn);

pthread_mutex_lock(mutex);
char *buff_status_old = uib->status;
uib->write_progressed = 1;
uib->write_done = 1;
zsv_index_commit_rows(uib->index);
uib->index_ready = 1;
if (buff_status_old == trn->default_status)
if (buff_status_old == default_status)
uib->status = NULL;
pthread_mutex_unlock(mutex);

if (buff_status_old == trn->default_status)
if (buff_status_old == default_status)
free(buff_status_old);
if (trn->user_context)
free(trn->user_context);
zsvsheet_transformation_delete(trn);

return NULL;
}
Expand Down Expand Up @@ -214,9 +219,10 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx,
// TODO: If the transformation is a reduction that doesn't output for some time this will caus a pause
zsv_parser parser = zsvsheet_transformation_parser(trn);
while ((zst = zsv_parse_more(parser)) == zsv_status_ok) {
if (trn->output_count > 0)
if (trn->writer_wrote)
break;
}
trn->writer_wrote = 0;

switch (zst) {
case zsv_status_no_more_input:
Expand All @@ -242,14 +248,18 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx,

struct zsvsheet_ui_buffer *nbuff = zsvsheet_buffer_current(ctx);
trn->ui_buffer = nbuff;
nbuff->write_progressed = 1;
zsv_index_commit_rows(index);
nbuff->index_started = 1;
nbuff->index = index;

if (zst != zsv_status_ok) {
nbuff->write_done = 1;
nbuff->index_ready = 1;
goto out;
if (trn->on_done)
opts.on_done(trn);
zsvsheet_transformation_delete(trn);
zsv_index_commit_rows(index);
return stat;
}

asprintf(&trn->default_status, "(working) Press ESC to cancel ");
Expand All @@ -259,9 +269,8 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx,
return stat;

error:
free(index);
zsv_index_delete(index);

out:
if (trn && trn->on_done)
opts.on_done(trn);
if (trn)
Expand Down
3 changes: 1 addition & 2 deletions app/sheet/ui_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ struct zsvsheet_ui_buffer {
unsigned char has_row_num : 1;
unsigned char mutex_inited : 1;
unsigned char write_in_progress : 1;
unsigned char write_progressed : 1;
unsigned char write_done : 1;
unsigned char worker_active : 1;
unsigned char worker_cancelled : 1;
unsigned char _ : 6;
unsigned char _ : 7;
};

void zsvsheet_ui_buffer_create_worker(struct zsvsheet_ui_buffer *ub, void *(*start_func)(void *), void *arg) {
Expand Down
2 changes: 1 addition & 1 deletion app/test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ test-sheet-3: ${BUILD_DIR}/bin/zsv_sheet${EXE}
sleep 2 && \
tmux capture-pane -t $@ -p ${REDIRECT1} ${TMP_DIR}/$@.out && \
tmux send-keys -t $@ "q" && \
${CMP} ${TMP_DIR}/$@.out expected/$@.out && ${TEST_PASS} || ${TEST_FAIL})
${CMP} ${TMP_DIR}/$@.out expected/$@.out && ${TEST_PASS} || (echo 'Incorrect output:' && cat ${TMP_DIR}/$@.out && ${TEST_FAIL}))

test-sheet-4: ${BUILD_DIR}/bin/zsv_sheet${EXE}
@${TEST_INIT}
Expand Down
76 changes: 52 additions & 24 deletions app/utils/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,62 @@
#include <zsv/utils/index.h>

struct zsv_index *zsv_index_new(void) {
struct zsv_index *ix = malloc(sizeof(*ix));
struct zsv_index *ix = calloc(1, sizeof(*ix));

if (!ix)
return ix;

memset(ix, 0, sizeof(*ix));

const size_t init_cap = 256;
ix->array = malloc(sizeof(*ix->array) + init_cap * sizeof(ix->array->u64s[0]));
ix->array->capacity = init_cap;
ix->array->len = 0;
const size_t init_cap = 512;
ix->first = calloc(1, sizeof(*ix->first) + init_cap * sizeof(ix->first->u64s[0]));
ix->first->capacity = init_cap;

return ix;
}

void zsv_index_delete(struct zsv_index *ix) {
if (ix) {
free(ix->array);
struct zsv_index_array *arr = ix->first;

while (arr) {
struct zsv_index_array *a = arr;
arr = arr->next;
free(a);
}

free(ix);
}
}

enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, uint64_t line_end) {
struct zsv_index_array *arr = ix->array;
struct zsv_index_array *arr = ix->first;
size_t len = arr->len, cap = arr->capacity;

if (!ix->header_line_end) {
ix->header_line_end = line_end;
return zsv_index_status_ok;
}

ix->row_count++;
ix->row_count_local++;

if ((ix->row_count & (ZSV_INDEX_ROW_N - 1)) != 0)
if ((ix->row_count_local & (ZSV_INDEX_ROW_N - 1)) != 0)
return zsv_index_status_ok;

if (len >= cap) {
cap *= 2;
arr = realloc(arr, sizeof(*arr) + cap * sizeof(arr->u64s[0]));
if (!arr)
return zsv_index_status_memory;

arr->capacity = cap;
ix->array = arr;
while (len >= cap) {
assert(len == cap);

if (!arr->next) {
len = 0;
cap *= 2;
arr->next = calloc(1, sizeof(*arr) + cap * sizeof(arr->u64s[0]));
arr = arr->next;
if (!arr)
return zsv_index_status_memory;
arr->capacity = cap;
} else {
arr = arr->next;
len = arr->len;
cap = arr->capacity;
}
}

arr->u64s[len] = line_end;
Expand All @@ -59,22 +71,38 @@ enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, uint64_t line_end)
return zsv_index_status_ok;
}

void zsv_index_commit_rows(struct zsv_index *ix) {
ix->row_count = ix->row_count_local;
}

enum zsv_index_status zsv_index_row_end_offset(const struct zsv_index *ix, uint64_t row, uint64_t *offset_out,
uint64_t *remaining_rows_out) {
assert(ix->row_count <= ix->row_count_local);

if (row > ix->row_count)
return zsv_index_status_error;

if (row < ZSV_INDEX_ROW_N) {
*offset_out = ix->header_line_end;
*remaining_rows_out = row;
} else {
const size_t i = (row >> ZSV_INDEX_ROW_SHIFT) - 1;

assert(i < ix->array->len);
*offset_out = (long)ix->array->u64s[i];
*remaining_rows_out = row & (ZSV_INDEX_ROW_N - 1);
return zsv_index_status_ok;
}

const size_t i = (row >> ZSV_INDEX_ROW_SHIFT) - 1;
struct zsv_index_array *arr = ix->first;
size_t lens = 0;

while (i >= lens + arr->len) {
assert(arr->next);

lens += arr->len;
arr = arr->next;
}

*offset_out = (long)arr->u64s[i - lens];
*remaining_rows_out = row & (ZSV_INDEX_ROW_N - 1);

return zsv_index_status_ok;
}

Expand Down
7 changes: 6 additions & 1 deletion include/zsv/utils/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,25 @@ enum zsv_index_status {
struct zsv_index_array {
size_t capacity;
size_t len;
struct zsv_index_array *next;
uint64_t u64s[];
};

struct zsv_index {
uint64_t header_line_end;
// Reading and writing should be protected with a lock
uint64_t row_count;
// Should only be updated by the thread building the index
uint64_t row_count_local;

// array containing the offsets of every ZSV_INDEX_ROW_N line end
struct zsv_index_array *array;
struct zsv_index_array *first;
};

struct zsv_index *zsv_index_new(void);
void zsv_index_delete(struct zsv_index *ix);
enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, uint64_t line_end);
void zsv_index_commit_rows(struct zsv_index *ix);
enum zsv_index_status zsv_index_row_end_offset(const struct zsv_index *ix, uint64_t row, uint64_t *offset_out,
uint64_t *remaining_rows_out);
enum zsv_index_status zsv_index_seek_row(const struct zsv_index *ix, struct zsv_opts *opts, uint64_t row);
Expand Down

0 comments on commit aa43435

Please sign in to comment.