diff --git a/app/sheet.c b/app/sheet.c index 34796540..996b9d25 100644 --- a/app/sheet.c +++ b/app/sheet.c @@ -699,10 +699,6 @@ int ZSV_MAIN_FUNC(ZSV_COMMAND)(int argc, const char *argv[], struct zsv_opts *op pthread_mutex_lock(&ub->mutex); if (ub->status) zsvsheet_priv_set_status(&display_dims, 1, ub->status); - if (ub->write_progressed) { - handler_state.display_info.update_buffer = true; - ub->write_progressed = 0; - } if (ub->index_ready && ub->dimensions.row_count != ub->index->row_count + 1) { ub->dimensions.row_count = ub->index->row_count + 1; handler_state.display_info.update_buffer = true; diff --git a/app/sheet/index.c b/app/sheet/index.c index 56f5bc7a..98a2eb96 100644 --- a/app/sheet/index.c +++ b/app/sheet/index.c @@ -43,6 +43,8 @@ enum zsv_index_status build_memory_index(struct zsvsheet_index_opts *optsp) { if (!ixr.ix) goto out; + optsp->uib->index = ixr.ix; + char cancelled = 0; while (!cancelled && (zst = zsv_parse_more(ixr.parser)) == zsv_status_ok) { pthread_mutex_lock(&optsp->uib->mutex); @@ -50,16 +52,19 @@ enum zsv_index_status build_memory_index(struct zsvsheet_index_opts *optsp) { cancelled = 1; zst = zsv_status_cancelled; } + zsv_index_commit_rows(ixr.ix); + optsp->uib->index_ready = 1; pthread_mutex_unlock(&optsp->uib->mutex); } zsv_finish(ixr.parser); + pthread_mutex_lock(&optsp->uib->mutex); + zsv_index_commit_rows(ixr.ix); + optsp->uib->index_ready = 1; + pthread_mutex_unlock(&optsp->uib->mutex); - if (zst == zsv_status_no_more_input || zst == zsv_status_cancelled) { + if (zst == zsv_status_no_more_input || zst == zsv_status_cancelled) ret = zsv_index_status_ok; - optsp->uib->index = ixr.ix; - } else - zsv_index_delete(ixr.ix); out: if (ixr.parser) diff --git a/app/sheet/read-data.c b/app/sheet/read-data.c index 90504e12..cb51f496 100644 --- a/app/sheet/read-data.c +++ b/app/sheet/read-data.c @@ -204,7 +204,7 @@ static int read_data(struct zsvsheet_ui_buffer **uibufferp, // a new zsvsheet_ return 0; pthread_mutex_lock(&uibuff->mutex); - char need_index = !uibuff->index_started && (!uibuff->write_in_progress || uibuff->write_done); + char need_index = !uibuff->index_started && !uibuff->write_in_progress; pthread_mutex_unlock(&uibuff->mutex); if (need_index) { @@ -253,7 +253,6 @@ static void *get_data_index(void *gdi) { } pthread_mutex_lock(mutexp); - uib->index_ready = 1; uib->status = old_ui_status; uib->ixopts = NULL; pthread_mutex_unlock(mutexp); diff --git a/app/sheet/transformation.c b/app/sheet/transformation.c index 1ebcbe0c..dd50dd83 100644 --- a/app/sheet/transformation.c +++ b/app/sheet/transformation.c @@ -15,7 +15,7 @@ struct zsvsheet_transformation { FILE *output_stream; unsigned char *output_buffer; int output_fileno; - size_t output_count; + char writer_wrote; struct zsvsheet_transformation_opts opts; void *user_context; @@ -28,7 +28,7 @@ static size_t transformation_write(const void *restrict ptr, size_t size, size_t struct zsvsheet_transformation *trn = stream; const size_t count = fwrite(ptr, size, nitems, trn->output_stream); - trn->output_count += count; + trn->writer_wrote = count > 0; return count > 0 ? count : 0; } @@ -134,14 +134,17 @@ static void *zsvsheet_run_buffer_transformation(void *arg) { zsv_parser parser = trn->parser; pthread_mutex_t *mutex = &uib->mutex; enum zsv_status zst; + char *default_status = trn->default_status; - size_t c = trn->output_count; char cancelled = 0; while (!cancelled && (zst = zsv_parse_more(parser)) == zsv_status_ok) { pthread_mutex_lock(mutex); cancelled = uib->worker_cancelled; - if (trn->output_count != c) - uib->write_progressed = 1; + if (trn->writer_wrote) { + trn->writer_wrote = 0; + zsv_index_commit_rows(uib->index); + uib->index_ready = 1; + } pthread_mutex_unlock(mutex); } @@ -151,20 +154,22 @@ static void *zsvsheet_run_buffer_transformation(void *arg) { if (trn->on_done) trn->on_done(trn); + if (trn->user_context) + free(trn->user_context); + + zsvsheet_transformation_delete(trn); + pthread_mutex_lock(mutex); char *buff_status_old = uib->status; - uib->write_progressed = 1; uib->write_done = 1; + zsv_index_commit_rows(uib->index); uib->index_ready = 1; - if (buff_status_old == trn->default_status) + if (buff_status_old == default_status) uib->status = NULL; pthread_mutex_unlock(mutex); - if (buff_status_old == trn->default_status) + if (buff_status_old == default_status) free(buff_status_old); - if (trn->user_context) - free(trn->user_context); - zsvsheet_transformation_delete(trn); return NULL; } @@ -214,9 +219,10 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx, // TODO: If the transformation is a reduction that doesn't output for some time this will caus a pause zsv_parser parser = zsvsheet_transformation_parser(trn); while ((zst = zsv_parse_more(parser)) == zsv_status_ok) { - if (trn->output_count > 0) + if (trn->writer_wrote) break; } + trn->writer_wrote = 0; switch (zst) { case zsv_status_no_more_input: @@ -242,14 +248,18 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx, struct zsvsheet_ui_buffer *nbuff = zsvsheet_buffer_current(ctx); trn->ui_buffer = nbuff; - nbuff->write_progressed = 1; + zsv_index_commit_rows(index); nbuff->index_started = 1; nbuff->index = index; if (zst != zsv_status_ok) { nbuff->write_done = 1; nbuff->index_ready = 1; - goto out; + if (trn->on_done) + opts.on_done(trn); + zsvsheet_transformation_delete(trn); + zsv_index_commit_rows(index); + return stat; } asprintf(&trn->default_status, "(working) Press ESC to cancel "); @@ -259,9 +269,8 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx, return stat; error: - free(index); + zsv_index_delete(index); -out: if (trn && trn->on_done) opts.on_done(trn); if (trn) diff --git a/app/sheet/ui_buffer.c b/app/sheet/ui_buffer.c index 7cc2faff..ebf30d1f 100644 --- a/app/sheet/ui_buffer.c +++ b/app/sheet/ui_buffer.c @@ -44,11 +44,10 @@ struct zsvsheet_ui_buffer { unsigned char has_row_num : 1; unsigned char mutex_inited : 1; unsigned char write_in_progress : 1; - unsigned char write_progressed : 1; unsigned char write_done : 1; unsigned char worker_active : 1; unsigned char worker_cancelled : 1; - unsigned char _ : 6; + unsigned char _ : 7; }; void zsvsheet_ui_buffer_create_worker(struct zsvsheet_ui_buffer *ub, void *(*start_func)(void *), void *arg) { diff --git a/app/test/Makefile b/app/test/Makefile index 5fc7a505..fd039592 100644 --- a/app/test/Makefile +++ b/app/test/Makefile @@ -623,7 +623,7 @@ test-sheet-3: ${BUILD_DIR}/bin/zsv_sheet${EXE} sleep 2 && \ tmux capture-pane -t $@ -p ${REDIRECT1} ${TMP_DIR}/$@.out && \ tmux send-keys -t $@ "q" && \ - ${CMP} ${TMP_DIR}/$@.out expected/$@.out && ${TEST_PASS} || ${TEST_FAIL}) + ${CMP} ${TMP_DIR}/$@.out expected/$@.out && ${TEST_PASS} || (echo 'Incorrect output:' && cat ${TMP_DIR}/$@.out && ${TEST_FAIL})) test-sheet-4: ${BUILD_DIR}/bin/zsv_sheet${EXE} @${TEST_INIT} diff --git a/app/utils/index.c b/app/utils/index.c index 96720969..ef656b6d 100644 --- a/app/utils/index.c +++ b/app/utils/index.c @@ -7,30 +7,34 @@ #include struct zsv_index *zsv_index_new(void) { - struct zsv_index *ix = malloc(sizeof(*ix)); + struct zsv_index *ix = calloc(1, sizeof(*ix)); if (!ix) return ix; - memset(ix, 0, sizeof(*ix)); - - const size_t init_cap = 256; - ix->array = malloc(sizeof(*ix->array) + init_cap * sizeof(ix->array->u64s[0])); - ix->array->capacity = init_cap; - ix->array->len = 0; + const size_t init_cap = 512; + ix->first = calloc(1, sizeof(*ix->first) + init_cap * sizeof(ix->first->u64s[0])); + ix->first->capacity = init_cap; return ix; } void zsv_index_delete(struct zsv_index *ix) { if (ix) { - free(ix->array); + struct zsv_index_array *arr = ix->first; + + while (arr) { + struct zsv_index_array *a = arr; + arr = arr->next; + free(a); + } + free(ix); } } enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, uint64_t line_end) { - struct zsv_index_array *arr = ix->array; + struct zsv_index_array *arr = ix->first; size_t len = arr->len, cap = arr->capacity; if (!ix->header_line_end) { @@ -38,19 +42,27 @@ enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, uint64_t line_end) return zsv_index_status_ok; } - ix->row_count++; + ix->row_count_local++; - if ((ix->row_count & (ZSV_INDEX_ROW_N - 1)) != 0) + if ((ix->row_count_local & (ZSV_INDEX_ROW_N - 1)) != 0) return zsv_index_status_ok; - if (len >= cap) { - cap *= 2; - arr = realloc(arr, sizeof(*arr) + cap * sizeof(arr->u64s[0])); - if (!arr) - return zsv_index_status_memory; - - arr->capacity = cap; - ix->array = arr; + while (len >= cap) { + assert(len == cap); + + if (!arr->next) { + len = 0; + cap *= 2; + arr->next = calloc(1, sizeof(*arr) + cap * sizeof(arr->u64s[0])); + arr = arr->next; + if (!arr) + return zsv_index_status_memory; + arr->capacity = cap; + } else { + arr = arr->next; + len = arr->len; + cap = arr->capacity; + } } arr->u64s[len] = line_end; @@ -59,22 +71,38 @@ enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, uint64_t line_end) return zsv_index_status_ok; } +void zsv_index_commit_rows(struct zsv_index *ix) { + ix->row_count = ix->row_count_local; +} + enum zsv_index_status zsv_index_row_end_offset(const struct zsv_index *ix, uint64_t row, uint64_t *offset_out, uint64_t *remaining_rows_out) { + assert(ix->row_count <= ix->row_count_local); + if (row > ix->row_count) return zsv_index_status_error; if (row < ZSV_INDEX_ROW_N) { *offset_out = ix->header_line_end; *remaining_rows_out = row; - } else { - const size_t i = (row >> ZSV_INDEX_ROW_SHIFT) - 1; - assert(i < ix->array->len); - *offset_out = (long)ix->array->u64s[i]; - *remaining_rows_out = row & (ZSV_INDEX_ROW_N - 1); + return zsv_index_status_ok; + } + + const size_t i = (row >> ZSV_INDEX_ROW_SHIFT) - 1; + struct zsv_index_array *arr = ix->first; + size_t lens = 0; + + while (i >= lens + arr->len) { + assert(arr->next); + + lens += arr->len; + arr = arr->next; } + *offset_out = (long)arr->u64s[i - lens]; + *remaining_rows_out = row & (ZSV_INDEX_ROW_N - 1); + return zsv_index_status_ok; } diff --git a/include/zsv/utils/index.h b/include/zsv/utils/index.h index f5805c94..9735fc40 100644 --- a/include/zsv/utils/index.h +++ b/include/zsv/utils/index.h @@ -23,20 +23,25 @@ enum zsv_index_status { struct zsv_index_array { size_t capacity; size_t len; + struct zsv_index_array *next; uint64_t u64s[]; }; struct zsv_index { uint64_t header_line_end; + // Reading and writing should be protected with a lock uint64_t row_count; + // Should only be updated by the thread building the index + uint64_t row_count_local; // array containing the offsets of every ZSV_INDEX_ROW_N line end - struct zsv_index_array *array; + struct zsv_index_array *first; }; struct zsv_index *zsv_index_new(void); void zsv_index_delete(struct zsv_index *ix); enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, uint64_t line_end); +void zsv_index_commit_rows(struct zsv_index *ix); enum zsv_index_status zsv_index_row_end_offset(const struct zsv_index *ix, uint64_t row, uint64_t *offset_out, uint64_t *remaining_rows_out); enum zsv_index_status zsv_index_seek_row(const struct zsv_index *ix, struct zsv_opts *opts, uint64_t row);