Skip to content

Commit

Permalink
Create index inside writer and transformation
Browse files Browse the repository at this point in the history
This creates the index while writing the file saving an extra pass
  • Loading branch information
richiejp committed Dec 3, 2024
1 parent 486377c commit 2f2858d
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 22 deletions.
2 changes: 1 addition & 1 deletion app/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ ${ZSV_UTIL_A}: ${BUILD_DIR}/objs/utils/util.a
@mkdir -p `dirname $@`
cp -p $< $@

UTIL_A_OBJ:=writer file dirs-no-jq os ${UTIL_A_OBJ_WIN}
UTIL_A_OBJ:=index writer file dirs-no-jq os ${UTIL_A_OBJ_WIN}
UTIL_A_OBJ:=$(addprefix ${BUILD_DIR}/objs/utils/,$(addsuffix .o,${UTIL_A_OBJ}))

${BUILD_DIR}/objs/utils/util.a: ${UTIL_A_OBJ}
Expand Down
7 changes: 2 additions & 5 deletions app/ext_example/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ TEST_PASS=printf "${COLOR_BLUE}$@: ${COLOR_GREEN}Passed${COLOR_NONE}\n"
TEST_FAIL=(printf "${COLOR_BLUE}$@: ${COLOR_RED}Failed!${COLOR_NONE}\n" && exit 1)
TEST_INIT=printf "${COLOR_PINK}$@: ${COLOR_NONE}\n"

UTILS1=
CFLAGS_SHARED=-shared
ifneq ($(findstring emcc,$(CC)),) # emcc
CFLAGS_SHARED=-s SIDE_MODULE=1 -s LINKABLE=1
Expand All @@ -86,9 +85,6 @@ else
INSTALLED_EXTENSION=
endif

UTILS1+=writer
UTILS=$(addprefix ${BUILD_DIR}/objs/utils/,$(addsuffix .o,${UTILS1}))

CFLAGS+= -I${THIS_LIB_BASE}/include -I${PREFIX}/include

all: ${TARGET} ${TARGET_SHEET}
Expand Down Expand Up @@ -218,8 +214,9 @@ YAJL_SRC_DIR=${THIS_MAKEFILE_DIR}/../external/yajl
YAJL_INCLUDE=-I${YAJL_SRC_DIR}/build/yajl-2.1.1/include
YAJL_HELPER_INCLUDE=-I${THIS_MAKEFILE_DIR}/../external/yajl_helper
${TARGET_SHEET}: LIBS="../external/sqlite3/sqlite3.c" -lzsv -lzsvutil -L${PREFIX}/lib -ljq -lutf8proc
${TARGET}: LIBS=-lzsvutil -lzsv -L${PREFIX}/lib
${TARGET} ${TARGET_SHEET}: ${BUILD_DIR}/bin/zsvext%.${SO} : %_extension.c ${UTILS}
@mkdir -p `dirname "$@"`
${CC} ${CFLAGS} ${CFLAGS_SHARED} $< ${UTILS} -o $@ ${LIBS} ${YAJL_INCLUDE} ${YAJL_HELPER_INCLUDE}
${CC} ${CFLAGS} ${CFLAGS_SHARED} $< -o $@ ${LIBS} ${YAJL_INCLUDE} ${YAJL_HELPER_INCLUDE}

.PHONY: all test test-% clean install
3 changes: 2 additions & 1 deletion app/sheet/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ static void build_memory_index_row_handler(void *ctx) {
struct zsvsheet_indexer *ixr = ctx;
struct zsv_index *ix = ixr->ix;
zsv_parser parser = ixr->parser;
uint64_t line_end = zsv_cum_scanned_length(parser);

if (zsv_index_add_row(ix, parser) != zsv_index_status_ok)
if (zsv_index_add_row(ix, line_end) != zsv_index_status_ok)
zsv_abort(parser);
}

Expand Down
24 changes: 19 additions & 5 deletions app/sheet/transformation.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "transformation.h"
#include "pthread.h"
#include "zsv/utils/file.h"
#include "zsv/utils/index.h"
#include "zsv/utils/prop.h"

struct zsvsheet_transformation {
Expand Down Expand Up @@ -60,6 +61,7 @@ enum zsv_status zsvsheet_transformation_new(struct zsvsheet_transformation_opts

struct zsv_csv_writer_options writer_opts = {
.with_bom = 0,
.index = opts.index,
.write = transformation_write,
.stream = trn,
.table_init = NULL,
Expand Down Expand Up @@ -153,6 +155,7 @@ static void *zsvsheet_run_buffer_transformation(void *arg) {
char *buff_status_old = uib->status;
uib->write_progressed = 1;
uib->write_done = 1;
uib->index_ready = 1;
if (buff_status_old == trn->default_status)
uib->status = NULL;
pthread_mutex_unlock(mutex);
Expand All @@ -172,19 +175,24 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx,
const char *filename = zsvsheet_buffer_data_filename(buff);
enum zsvsheet_status stat = zsvsheet_status_error;
struct zsvsheet_buffer_info info = zsvsheet_buffer_get_info(buff);
struct zsv_index *index = NULL;

// TODO: Starting a second transformation before the first ends works, but if the second is faster
// than the first then it can end prematurely and read a partially written row.
// We could override the input stream reader to wait for more data when it sees EOF
if (info.write_in_progress && !info.write_done)
return zsvsheet_status_busy;

if (!(index = zsv_index_new()))
return zsvsheet_status_memory;

// TODO: custom_prop_handler is not passed to extensions?
struct zsvsheet_transformation_opts trn_opts = {
.custom_prop_handler = NULL,
.input_filename = filename,
.on_done = opts.on_done,
.ui_buffer = NULL,
.index = index,
};
zsvsheet_transformation trn = NULL;
struct zsv_opts zopts = zsvsheet_buffer_get_zsv_opts(buff);
Expand All @@ -194,13 +202,13 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx,
zopts.stream = fopen(filename, "rb");

if (!zopts.stream)
goto out;
goto error;

trn_opts.zsv_opts = zopts;

enum zsv_status zst = zsvsheet_transformation_new(trn_opts, &trn);
if (zst != zsv_status_ok)
return stat;
goto error;

// Transform part of the file to initially populate the UI buffer
// TODO: If the transformation is a reduction that doesn't output for some time this will caus a pause
Expand All @@ -214,13 +222,13 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx,
case zsv_status_no_more_input:
case zsv_status_cancelled:
if (zsv_finish(parser) != zsv_status_ok)
goto out;
goto error;
zsv_writer_flush(trn->writer);
break;
case zsv_status_ok:
break;
default:
goto out;
goto error;
}

struct zsvsheet_ui_buffer_opts uibopts = {0};
Expand All @@ -230,14 +238,17 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx,

stat = zsvsheet_open_file_opts(ctx, &uibopts);
if (stat != zsvsheet_status_ok)
goto out;
goto error;

struct zsvsheet_ui_buffer *nbuff = zsvsheet_buffer_current(ctx);
trn->ui_buffer = nbuff;
nbuff->write_progressed = 1;
nbuff->index_started = 1;
nbuff->index = index;

if (zst != zsv_status_ok) {
nbuff->write_done = 1;
nbuff->index_ready = 1;
goto out;
}

Expand All @@ -247,6 +258,9 @@ enum zsvsheet_status zsvsheet_push_transformation(zsvsheet_proc_context_t ctx,
zsvsheet_ui_buffer_create_worker(nbuff, zsvsheet_run_buffer_transformation, trn);
return stat;

error:
free(index);

out:
if (trn && trn->on_done)
opts.on_done(trn);
Expand Down
1 change: 1 addition & 0 deletions app/sheet/transformation.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct zsvsheet_transformation_opts {
const char *input_filename;
void (*on_done)(zsvsheet_transformation trn);
struct zsvsheet_ui_buffer *ui_buffer;
struct zsv_index *index;
};

enum zsv_status zsvsheet_transformation_new(struct zsvsheet_transformation_opts, zsvsheet_transformation *);
Expand Down
4 changes: 1 addition & 3 deletions app/utils/index.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include <assert.h>
#include <ctype.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Expand Down Expand Up @@ -30,10 +29,9 @@ void zsv_index_delete(struct zsv_index *ix) {
}
}

enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, zsv_parser parser) {
enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, uint64_t line_end) {
struct zsv_index_array *arr = ix->array;
size_t len = arr->len, cap = arr->capacity;
uint64_t line_end = zsv_cum_scanned_length(parser);

if (!ix->header_line_end) {
ix->header_line_end = line_end;
Expand Down
20 changes: 16 additions & 4 deletions app/utils/writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* https://opensource.org/licenses/MIT
*/

#include "zsv/utils/index.h"
#include <zsv/utils/writer.h>
#include <zsv/utils/compiler.h>
#include <stdio.h>
Expand Down Expand Up @@ -108,6 +109,7 @@ struct zsv_output_buff {
size_t (*write)(const void *restrict, size_t size, size_t nitems, void *restrict stream);
void *stream;
size_t used;
uint64_t written;
unsigned char close_on_delete : 1;
unsigned char _ : 7;
};
Expand All @@ -122,6 +124,7 @@ struct zsv_writer_data {
void *table_init_ctx;

const char *cell_prepend;
struct zsv_index *index;

unsigned char with_bom : 1;
unsigned char started : 1;
Expand All @@ -132,6 +135,7 @@ struct zsv_writer_data {

static inline void zsv_output_buff_flush(struct zsv_output_buff *b) {
b->write(b->buff, b->used, 1, b->stream);
b->written += b->used;
b->used = 0;
}

Expand All @@ -141,6 +145,7 @@ static inline void zsv_output_buff_write(struct zsv_output_buff *b, const unsign
zsv_output_buff_flush(b);
if (n > ZSV_OUTPUT_BUFF_SIZE) { // n too big, so write directly
b->write(s, n, 1, b->stream);
b->written += n;
return;
}
}
Expand Down Expand Up @@ -199,6 +204,7 @@ zsv_csv_writer zsv_writer_new(struct zsv_csv_writer_options *opts) {
w->with_bom = opts->with_bom;
w->table_init = opts->table_init;
w->table_init_ctx = opts->table_init_ctx;
w->index = opts->index;
}
}
return w;
Expand All @@ -223,17 +229,21 @@ enum zsv_writer_status zsv_writer_delete(zsv_csv_writer w) {
if (!w)
return zsv_writer_status_missing_handle;

if (w->started)
zsv_output_buff_write(&w->out, (const unsigned char *)"\n", 1);

if (w->out.stream && w->out.write && w->out.buff)
zsv_output_buff_flush(&w->out);

if (w->started)
w->out.write("\n", 1, 1, w->out.stream);
if (w->started && w->index)
zsv_index_add_row(w->index, w->out.written);

if (w->out.buff)
free(w->out.buff);

if (w->out.close_on_delete && w->out.stream)
fclose(w->out.stream);

free(w);
return zsv_writer_status_ok;
}
Expand Down Expand Up @@ -266,9 +276,11 @@ enum zsv_writer_status zsv_writer_cell(zsv_csv_writer w, char new_row, const uns
if (w->with_bom)
zsv_output_buff_write(&w->out, (const unsigned char *)"\xef\xbb\xbf", 3);
w->started = 1;
} else if (new_row)
} else if (new_row) {
if (w->index)
zsv_index_add_row(w->index, (uint64_t)(w->out.used + w->out.written));
zsv_output_buff_write(&w->out, (const unsigned char *)"\n", 1);
else
} else
zsv_output_buff_write(&w->out, (const unsigned char *)",", 1);

if (VERY_UNLIKELY(w->cell_prepend && *w->cell_prepend)) {
Expand Down
5 changes: 2 additions & 3 deletions include/zsv/utils/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
#define ZSV_UTILS_INDEX_H

#include <stdint.h>
#include <stdio.h>
#include <pthread.h>
#include <stddef.h>

#include "zsv/common.h"

Expand Down Expand Up @@ -37,7 +36,7 @@ struct zsv_index {

struct zsv_index *zsv_index_new(void);
void zsv_index_delete(struct zsv_index *ix);
enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, zsv_parser parser);
enum zsv_index_status zsv_index_add_row(struct zsv_index *ix, uint64_t line_end);
enum zsv_index_status zsv_index_row_end_offset(const struct zsv_index *ix, uint64_t row, uint64_t *offset_out,
uint64_t *remaining_rows_out);
enum zsv_index_status zsv_index_seek_row(const struct zsv_index *ix, struct zsv_opts *opts, uint64_t row);
Expand Down
1 change: 1 addition & 0 deletions include/zsv/utils/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct zsv_csv_writer_options {
void (*table_init)(void *);
void *table_init_ctx;
const char *output_path; // if provided, will be created by zsv_writer_new() and closed by zsv_writer_delete()
struct zsv_index *index;
};

void zsv_writer_set_default_opts(struct zsv_csv_writer_options opts);
Expand Down

0 comments on commit 2f2858d

Please sign in to comment.