Skip to content

Commit

Permalink
sheet: Offload user transformations to background thread
Browse files Browse the repository at this point in the history
  • Loading branch information
richiejp committed Nov 26, 2024
1 parent b5c908c commit 7cbe704
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 92 deletions.
7 changes: 2 additions & 5 deletions app/ext_example/my_extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,11 @@ void my_transformation_row_handler(void *ctx) {
}

zsvsheet_status my_transformation_command_handler(zsvsheet_proc_context_t ctx) {
struct transformation_context my_ctx = {
.col_count = 0,
.row_count = 0,
};
struct transformation_context *my_ctx = calloc(1, sizeof(*my_ctx));

// TODO: This probably should happen in another worker thread and while that is happening the status should display
// that some work is in progress. The extension author will maybe want to have control over the status message.
return zsv_cb.ext_sheet_push_transformation(ctx, &my_ctx, my_transformation_row_handler);
return zsv_cb.ext_sheet_push_transformation(ctx, my_ctx, my_transformation_row_handler);
}
#endif

Expand Down
47 changes: 24 additions & 23 deletions app/sheet.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,18 @@ static zsvsheet_status zsvsheet_open_file_handler(struct zsvsheet_proc_context *
return zsvsheet_status_ok;
}

// TODO: Use same API as transformation extensions
static zsvsheet_status zsvsheet_filter_handler(struct zsvsheet_proc_context *ctx) {
char prompt_buffer[256] = {0};
struct zsvsheet_builtin_proc_state *state = (struct zsvsheet_builtin_proc_state *)ctx->subcommand_context;
struct zsvsheet_display_info *di = &state->display_info;
struct zsvsheet_ui_buffer *current_ui_buffer = *state->display_info.ui_buffers.current;
int prompt_footer_row = (int)(di->dimensions->rows - di->dimensions->footer_span);
int err;
struct zsvsheet_buffer_info binfo = zsvsheet_buffer_get_info(current_ui_buffer);

if (binfo.transform_started && !binfo.transform_done)
return zsvsheet_status_busy;

if (!current_ui_buffer->filename)
goto out;
Expand Down Expand Up @@ -449,6 +454,7 @@ static zsvsheet_status zsvsheet_help_handler(struct zsvsheet_proc_context *ctx)
.buff_opts = &bopts,
.filename = NULL,
.no_rownum_col_offset = 1,
.transform = 0,
};
struct zsvsheet_ui_buffer *uib;
zsvsheet_screen_buffer_t buffer;
Expand Down Expand Up @@ -690,22 +696,9 @@ int ZSV_MAIN_FUNC(ZSV_COMMAND)(int argc, const char *argv[], struct zsv_opts *op
halfdelay(2); // now ncurses getch() will fire every 2-tenths of a second so we can check for status update

while (true) {
char *status_msg = NULL;
ch = getch();

handler_state.display_info.update_buffer = false;

pthread_mutex_lock(&current_ui_buffer->mutex);
if (current_ui_buffer->status)
status_msg = strdup(current_ui_buffer->status);

if (current_ui_buffer->index_ready &&
current_ui_buffer->dimensions.row_count != current_ui_buffer->index->row_count + 1) {
current_ui_buffer->dimensions.row_count = current_ui_buffer->index->row_count + 1;
handler_state.display_info.update_buffer = true;
}
pthread_mutex_unlock(&current_ui_buffer->mutex);

zsvsheet_priv_set_status(&display_dims, 1, "");

if (ch != ERR) {
Expand All @@ -716,21 +709,29 @@ int ZSV_MAIN_FUNC(ZSV_COMMAND)(int argc, const char *argv[], struct zsv_opts *op
continue;
}

if (handler_state.display_info.update_buffer && current_ui_buffer->filename) {
struct zsvsheet_ui_buffer *ub = current_ui_buffer;
pthread_mutex_lock(&ub->mutex);
if (ub->status)
zsvsheet_priv_set_status(&display_dims, 1, ub->status);
if (ub->transform_progressed) {
handler_state.display_info.update_buffer = true;
ub->transform_progressed = 0;
} else if (ub->index_ready && ub->dimensions.row_count != current_ui_buffer->index->row_count + 1) {
ub->dimensions.row_count = current_ui_buffer->index->row_count + 1;
handler_state.display_info.update_buffer = true;
}
pthread_mutex_unlock(&ub->mutex);

if (handler_state.display_info.update_buffer && ub->filename) {
struct zsvsheet_opts zsvsheet_opts = {0};
if (read_data(&current_ui_buffer, NULL, current_ui_buffer->input_offset.row, current_ui_buffer->input_offset.col,
header_span, &zsvsheet_opts, custom_prop_handler)) {
if (read_data(&ub, NULL, current_ui_buffer->input_offset.row, current_ui_buffer->input_offset.col, header_span,
&zsvsheet_opts, custom_prop_handler)) {
zsvsheet_priv_set_status(&display_dims, 1, "Unexpected error!"); // to do: better error message
continue;
}
}

if (status_msg) {
zsvsheet_priv_set_status(&display_dims, 1, status_msg);
free(status_msg);
}

display_buffer_subtable(current_ui_buffer, header_span, &display_dims);
display_buffer_subtable(ub, header_span, &display_dims);
}

endwin();
Expand All @@ -749,7 +750,7 @@ const char *display_cell(struct zsvsheet_screen_buffer *buff, size_t data_row, s
if (attrs)
attron(attrs);
if (len == 0 || has_multibyte_char(str, len < cell_display_width ? len : cell_display_width) == 0)
mvprintw(row, col * cell_display_width, "%-*.*s", cell_display_width, cell_display_width - 1, str);
mvprintw(row, col * cell_display_width, "%-*.*s", (int)cell_display_width, (int)cell_display_width - 1, str);
else {
size_t used_width;
int err = 0;
Expand Down
54 changes: 39 additions & 15 deletions app/sheet/file.c
Original file line number Diff line number Diff line change
@@ -1,30 +1,43 @@
int zsvsheet_ui_buffer_open_file(const char *filename, const struct zsv_opts *zsv_optsp, const char *row_filter,
struct zsv_prop_handler *custom_prop_handler, const char *opts_used,
struct zsvsheet_ui_buffer **ui_buffer_stack_bottom,
struct zsvsheet_ui_buffer **ui_buffer_stack_top) {
struct zsvsheet_screen_buffer_opts bopts = {0};
struct zsvsheet_ui_buffer_opts uibopts = {0};
uibopts.filename = filename;
if (zsv_optsp)
uibopts.zsv_opts = *zsv_optsp;
uibopts.opts_used = opts_used;
uibopts.buff_opts = &bopts;
int zsvsheet_ui_buffer_open_file_opts(struct zsvsheet_ui_buffer_opts *uibopts,
struct zsv_prop_handler *custom_prop_handler,
struct zsvsheet_ui_buffer **ui_buffer_stack_bottom,
struct zsvsheet_ui_buffer **ui_buffer_stack_top) {
struct zsvsheet_opts zsvsheet_opts = {0};
int err = 0;
struct zsvsheet_screen_buffer_opts bopts = {0};
struct zsvsheet_ui_buffer *tmp_ui_buffer = NULL;
uibopts.row_filter = row_filter;
if ((err = read_data(&tmp_ui_buffer, &uibopts, 0, 0, 0, &zsvsheet_opts, custom_prop_handler)) != 0 ||
!tmp_ui_buffer || !tmp_ui_buffer->buff_used_rows) {

if (!uibopts->buff_opts)
uibopts->buff_opts = &bopts;

int err = 0;
if ((err = read_data(&tmp_ui_buffer, uibopts, 0, 0, 0, &zsvsheet_opts, custom_prop_handler)) != 0 || !tmp_ui_buffer ||
!tmp_ui_buffer->buff_used_rows) {
zsvsheet_ui_buffer_delete(tmp_ui_buffer);
if (err)
return err;
return -1;
}
tmp_ui_buffer->cursor_row = 1; // first row is header
zsvsheet_ui_buffer_push(ui_buffer_stack_bottom, ui_buffer_stack_top, tmp_ui_buffer);

return 0;
}

int zsvsheet_ui_buffer_open_file(const char *filename, const struct zsv_opts *zsv_optsp, const char *row_filter,
struct zsv_prop_handler *custom_prop_handler, const char *opts_used,
struct zsvsheet_ui_buffer **ui_buffer_stack_bottom,
struct zsvsheet_ui_buffer **ui_buffer_stack_top) {
struct zsvsheet_ui_buffer_opts uibopts = {0};

uibopts.filename = filename;
if (zsv_optsp)
uibopts.zsv_opts = *zsv_optsp;
uibopts.opts_used = opts_used;
uibopts.row_filter = row_filter;

return zsvsheet_ui_buffer_open_file_opts(&uibopts, custom_prop_handler, ui_buffer_stack_bottom, ui_buffer_stack_top);
}

/****** API ******/

/**
Expand All @@ -41,3 +54,14 @@ zsvsheet_status zsvsheet_open_file(struct zsvsheet_proc_context *ctx, const char
return zsvsheet_status_error;
return zsvsheet_status_ok;
}

zsvsheet_status zsvsheet_open_file_opts(struct zsvsheet_proc_context *ctx, struct zsvsheet_ui_buffer_opts *opts) {
struct zsvsheet_builtin_proc_state *state = (struct zsvsheet_builtin_proc_state *)ctx->subcommand_context;
struct zsvsheet_display_info *di = &state->display_info;
if (!di || !di->ui_buffers.base || !di->ui_buffers.current)
return zsvsheet_status_error;
int err = zsvsheet_ui_buffer_open_file_opts(opts, NULL, di->ui_buffers.base, di->ui_buffers.current);
if (err)
return zsvsheet_status_error;
return zsvsheet_status_ok;
}
2 changes: 2 additions & 0 deletions app/sheet/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ int zsvsheet_ui_buffer_open_file(const char *filename, const struct zsv_opts *zs
struct zsvsheet_ui_buffer **ui_buffer_stack_bottom,
struct zsvsheet_ui_buffer **ui_buffer_stack_top);

zsvsheet_status zsvsheet_open_file_opts(struct zsvsheet_proc_context *ctx, struct zsvsheet_ui_buffer_opts *opts);

#endif
115 changes: 107 additions & 8 deletions app/sheet/handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,82 @@ struct zsv_opts zsvsheet_buffer_get_zsv_opts(zsvsheet_buffer_t h) {
return opts;
}

/**
* Get information about the type and state of the buffer and its backing file.
*
* This returns a copy of the information. Properties relating to the index and transformations
* are updated by background threads and may be stale upon return. However they only ever
* transition from false to true.
*/
struct zsvsheet_buffer_info zsvsheet_buffer_get_info(zsvsheet_buffer_t h) {
struct zsvsheet_buffer_info info = {0};

if (h) {
struct zsvsheet_ui_buffer *b = h;

pthread_mutex_lock(&b->mutex);
info.index_started = b->index_started;
info.index_ready = b->index_ready;
info.transform_started = b->transform_started;
info.transform_done = b->transform_done;
pthread_mutex_unlock(&b->mutex);
}

return info;
}

struct buffer_transform_ctx {
zsvsheet_transformation trn;
struct zsvsheet_ui_buffer *buff;
};

static void *run_buffer_transformation(void *arg) {
struct buffer_transform_ctx *ctx = arg;
struct zsvsheet_ui_buffer *buff = ctx->buff;
struct zsvsheet_transformation *trn = ctx->trn;
zsv_parser parser = trn->parser;
pthread_mutex_t *mutex = &buff->mutex;
enum zsv_status zst;

size_t c = trn->output_count;
char cancelled = 0;
while (!cancelled && (zst = zsv_parse_more(parser)) == zsv_status_ok) {
pthread_mutex_lock(mutex);
cancelled = buff->worker_cancelled;
if (trn->output_count != c)
buff->transform_progressed = 1;
pthread_mutex_unlock(mutex);
}

if (zst == zsv_status_no_more_input || zst == zsv_status_cancelled)
zst = zsv_finish(parser);

pthread_mutex_lock(mutex);
char *buff_status_old = buff->status;
buff->transform_done = 1;
buff->status = NULL;
pthread_mutex_unlock(mutex);
free(buff_status_old);

free(trn->user_context);
zsvsheet_transformation_delete(trn);
free(ctx);

return NULL;
}

enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx, void *user_context,
void (*row_handler)(void *exec_ctx)) {
zsvsheet_buffer_t buff = zsvsheet_buffer_current(ctx);
const char *filename = zsvsheet_buffer_data_filename(buff);
enum zsvsheet_status stat = zsvsheet_status_error;
struct zsvsheet_buffer_info info = zsvsheet_buffer_get_info(buff);

// TODO: Starting a second transformation before the first ends works, but if the second is faster
// than the first then it can end prematurely and read a partially written row.
// We could override the input stream reader to wait for more data when it sees EOF
if (info.transform_started && !info.transform_done)
return zsvsheet_status_busy;

if (!filename)
filename = zsvsheet_buffer_filename(buff);
Expand All @@ -168,27 +239,55 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx, v
if (zst != zsv_status_ok)
return stat;

// Transform part of the file to initially populate the UI buffer
// TODO: If the transformation is a reduction that doesn't output for some time this will caus a pause
zsv_parser parser = zsvsheet_transformation_parser(trn);

while ((zst = zsv_parse_more(parser)) == zsv_status_ok)
;
while ((zst = zsv_parse_more(parser)) == zsv_status_ok) {
if (trn->output_count > 0)
break;
}

switch (zst) {
case zsv_status_no_more_input:
case zsv_status_cancelled:
if (zsv_finish(parser) != zsv_status_ok)
goto out;
zsv_writer_flush(trn->writer);
break;
case zsv_status_ok:
break;
default:
goto out;
}

zst = zsv_finish(parser);
if (zst != zsv_status_ok)
struct zsvsheet_ui_buffer_opts uibopts = {0};

uibopts.filename = zsvsheet_transformation_filename(trn);
uibopts.transform = 1;

stat = zsvsheet_open_file_opts(ctx, &uibopts);
if (stat != zsvsheet_status_ok)
goto out;

// TODO: need to free filename
stat = zsvsheet_open_file(ctx, strdup(zsvsheet_transformation_filename(trn)), &zopts);
struct zsvsheet_ui_buffer *nbuff = zsvsheet_buffer_current(ctx);

if (zst != zsv_status_ok) {
nbuff->transform_done = 1;
goto out;
}

asprintf(&nbuff->status, "(working) Press ESC to cancel");

struct buffer_transform_ctx *bctx = malloc(sizeof(*bctx));
bctx->trn = trn;
bctx->buff = nbuff;

zsvsheet_ui_buffer_create_worker(nbuff, run_buffer_transformation, bctx);
return stat;

out:
zsvsheet_transformation_delete(trn);
if (trn)
zsvsheet_transformation_delete(trn);

return stat;
}
24 changes: 19 additions & 5 deletions app/sheet/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,15 @@ static enum zsv_status filter_file(struct zsvsheet_index_opts *optsp) {

zsv_parser parser = zsvsheet_transformation_parser(trn);

while ((zst = zsv_parse_more(parser)) == zsv_status_ok)
;
char cancelled = 0;
while (!cancelled && (zst = zsv_parse_more(parser)) == zsv_status_ok) {
pthread_mutex_lock(&optsp->uib->mutex);
if (optsp->uib->worker_cancelled) {
cancelled = 1;
zst = zsv_status_cancelled;
}
pthread_mutex_unlock(&optsp->uib->mutex);
}

switch (zst) {
case zsv_status_no_more_input:
Expand Down Expand Up @@ -147,12 +154,19 @@ enum zsv_index_status build_memory_index(struct zsvsheet_index_opts *optsp) {
if (!ixr.ix)
goto out;

while ((zst = zsv_parse_more(ixr.parser)) == zsv_status_ok)
;
char cancelled = 0;
while (!cancelled && (zst = zsv_parse_more(ixr.parser)) == zsv_status_ok) {
pthread_mutex_lock(&optsp->uib->mutex);
if (optsp->uib->worker_cancelled) {
cancelled = 1;
zst = zsv_status_cancelled;
}
pthread_mutex_unlock(&optsp->uib->mutex);
}

zsv_finish(ixr.parser);

if (zst == zsv_status_no_more_input) {
if (zst == zsv_status_no_more_input || zst == zsv_status_cancelled) {
ret = zsv_index_status_ok;
optsp->uib->index = ixr.ix;
} else
Expand Down
Loading

0 comments on commit 7cbe704

Please sign in to comment.