Skip to content

Commit

Permalink
Rework temporary filter file into the index
Browse files Browse the repository at this point in the history
  • Loading branch information
richiejp committed Nov 1, 2024
1 parent 416220b commit 4f5ecac
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 64 deletions.
76 changes: 65 additions & 11 deletions app/sheet/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <zsv/utils/prop.h>

#include "index.h"
#include "zsv/utils/file.h"
#include "zsv/utils/writer.h"

static struct zsvsheet_index *add_line_end(struct zsvsheet_index *ix, uint64_t end) {
size_t len = ix->line_end_len, cap = ix->line_end_capacity;
Expand All @@ -28,10 +30,36 @@ static void build_memory_index_row_handler(void *ctx) {
struct zsvsheet_indexer *ixr = ctx;
struct zsvsheet_index *ix = ixr->ix;
uint64_t line_end = zsv_cum_scanned_length(ixr->parser) + 1;

size_t col_count = zsv_cell_count(ixr->parser);

if (ixr->filter) {
if (col_count == 0)
return;

struct zsv_cell first_cell = zsv_get_cell(ixr->parser, 0);
struct zsv_cell last_cell = zsv_get_cell(ixr->parser, col_count - 1);

if (!memmem(first_cell.str, last_cell.str - first_cell.str + last_cell.len,
ixr->filter, ixr->filter_len))
return;

for (size_t i = 0; i < col_count; i++) {
struct zsv_cell cell = zsv_get_cell(ixr->parser, i);
zsv_writer_cell(ixr->writer, i == 0, cell.str, cell.len, cell.quoted);
}
}

if(!ixr->ix->header_line_end) {
ix->header_line_end = line_end;
} else if((ix->row_count & ((1 << LINE_END_SHIFT) - 1)) == 0) {
if (ixr->filter) {
if (zsv_writer_flush(ixr->writer) != zsv_writer_status_ok) {
zsv_abort(ixr->parser);
return;
}
line_end = ftell(ixr->filter_stream);
}

ix = add_line_end(ix, line_end);
if (!ix) {
zsv_abort(ixr->parser);
Expand All @@ -44,17 +72,22 @@ static void build_memory_index_row_handler(void *ctx) {
ix->row_count++;
}

enum zsvsheet_index_status build_memory_index(struct zsvsheet_index_opts *optsp, struct zsvsheet_index **index_out) {
struct zsvsheet_indexer ixr = {0};
enum zsvsheet_index_status build_memory_index(struct zsvsheet_index_opts *optsp) {
struct zsvsheet_indexer ixr = {
.filter = optsp->row_filter,
.filter_len = optsp->row_filter ? strlen(optsp->row_filter) : 0,
};
enum zsvsheet_index_status ret = zsvsheet_index_status_error;
struct zsv_opts *zopts = optsp->zsv_optsp;
struct zsv_opts ix_zopts = {0};
char *temp_filename;
FILE *temp_f = NULL;

memcpy(&ix_zopts, zopts, sizeof(ix_zopts));

FILE *fp = fopen(optsp->filename, "rb");
if (!fp)
goto close_file;
return ret;

ix_zopts.ctx = &ixr;
ix_zopts.stream = fp;
Expand All @@ -64,31 +97,52 @@ enum zsvsheet_index_status build_memory_index(struct zsvsheet_index_opts *optsp,
optsp->filename, optsp->opts_used,
&ixr.parser);
if (zst != zsv_status_ok)
goto free_parser;
goto out;

if (optsp->row_filter) {
zsv_csv_writer temp_file_writer = NULL;
unsigned char temp_buff[8196];

temp_filename = zsv_get_temp_filename("zsvsheet_filter_XXXXXXXX");
if (!temp_filename)
return ret;

*optsp->temp_filename = temp_filename;

struct zsv_csv_writer_options writer_opts = {0};
if (!(writer_opts.stream = temp_f = fopen(temp_filename, "wb")))
return ret;
if (!(temp_file_writer = zsv_writer_new(&writer_opts)))
goto out;

zsv_writer_set_temp_buff(temp_file_writer, temp_buff, sizeof(temp_buff));
ixr.writer = temp_file_writer;
ixr.filter_stream = temp_f;
}

const size_t initial_cap = 256;
ixr.ix = malloc(sizeof(*ixr.ix) + initial_cap * sizeof(size_t));
if (!ixr.ix)
goto free_parser;
goto out;
memset(ixr.ix, 0, sizeof(*ixr.ix));
ixr.ix->line_end_capacity = initial_cap;

while ((zst = zsv_parse_more(ixr.parser)) == zsv_status_ok)
;

zsv_finish(ixr.parser);

if (zst == zsv_status_no_more_input) {
ret = zsvsheet_index_status_ok;
*index_out = ixr.ix;
*optsp->index = ixr.ix;
} else
free(ixr.ix);

free_parser:
out:
zsv_delete(ixr.parser);

close_file:
fclose(fp);
if (temp_f)
fclose(temp_f);

return ret;
}
Expand Down
8 changes: 7 additions & 1 deletion app/sheet/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <pthread.h>

#include "zsv.h"
#include "zsv/utils/writer.h"

// Decides the number of rows we skip when storing the line end
// 1 << 10 = 1024 means that we store every 1024th line end
Expand All @@ -30,11 +31,16 @@ struct zsvsheet_index {
struct zsvsheet_indexer {
zsv_parser parser;
struct zsvsheet_index *ix;
const char *filter;
size_t filter_len;
zsv_csv_writer writer;
FILE *filter_stream;
};

struct zsvsheet_index_opts {
pthread_mutex_t *mutexp;
const char *filename;
char **temp_filename;
const char *row_filter;
struct zsv_opts *zsv_optsp;
struct zsvsheet_index **index;
Expand All @@ -44,7 +50,7 @@ struct zsvsheet_index_opts {
const char *opts_used;
};

enum zsvsheet_index_status build_memory_index(struct zsvsheet_index_opts *optsp, struct zsvsheet_index **index_out);
enum zsvsheet_index_status build_memory_index(struct zsvsheet_index_opts *optsp);
void get_memory_index(struct zsvsheet_index *ix, uint64_t row, off_t *offset_out, size_t *remaining_rows_out);

#endif
74 changes: 22 additions & 52 deletions app/sheet/read-data.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static void get_data_index_async(struct zsvsheet_ui_buffer *uibuffp, const char
struct zsvsheet_index_opts *gdi = calloc(1, sizeof(*gdi));
gdi->mutexp = mutexp;
gdi->filename = filename;
gdi->temp_filename = &uibuffp->temp_filename;
gdi->zsv_optsp = optsp;
gdi->row_filter = row_filter;
gdi->index = &uibuffp->index;
Expand Down Expand Up @@ -58,9 +59,12 @@ static int read_data(struct zsvsheet_ui_buffer **uibufferp, // a new zsvsheet_
size_t remaining_rows_to_skip = start_row;
size_t remaining_header_to_skip = header_span;
size_t original_row_num = 0;

const char *row_filter = uibuff ? uibuff->row_filter : NULL;
size_t row_filter_len = row_filter ? strlen(row_filter) : 0;
FILE *fp;

assert(filename != NULL);
FILE *fp = fopen(filename, "rb");
fp = fopen(filename, "rb");
if (!fp)
return errno;

Expand All @@ -69,16 +73,28 @@ static int read_data(struct zsvsheet_ui_buffer **uibufferp, // a new zsvsheet_
if (uibuff) {
pthread_mutex_lock(&uibuff->mutex);
if (uibuff->index_ready && start_row > LINE_END_N) {
if (row_filter) {
fclose(fp);
fp = fopen(uibuff->temp_filename, "rb");
if (!fp) {
pthread_mutex_unlock(&uibuff->mutex);
return errno;
}
}

off_t file_start;
get_memory_index(uibuff->index, start_row - LINE_END_N, &file_start, &remaining_rows_to_skip);

if (fseek(fp, file_start, SEEK_SET)) {
return errno;
}

if (!isspace(fgetc(fp))) {
if (fseek(fp, file_start, SEEK_SET))
return errno;
// original csv files can have two char line endings
if (!row_filter) {
if (!isspace(fgetc(fp))) {
if (fseek(fp, file_start, SEEK_SET))
return errno;
}
}

remaining_header_to_skip = 0;
Expand All @@ -98,12 +114,7 @@ static int read_data(struct zsvsheet_ui_buffer **uibufferp, // a new zsvsheet_

size_t find_len = zsvsheet_opts->find ? strlen(zsvsheet_opts->find) : 0;
size_t rows_searched = 0;
const char *row_filter = uibuff ? uibuff->row_filter : NULL;
size_t row_filter_len = row_filter ? strlen(row_filter) : 0;
zsvsheet_buffer_t buffer = uibuff ? uibuff->buffer : NULL;
FILE *temp_f = NULL;
unsigned char temp_buff[4096];
zsv_csv_writer temp_file_writer = NULL;

while (zsv_next_row(parser) == zsv_status_row &&
(rows_read == 0 || rows_read < zsvsheet_buffer_rows(buffer))) { // for each row
Expand Down Expand Up @@ -175,50 +186,11 @@ static int read_data(struct zsvsheet_ui_buffer **uibufferp, // a new zsvsheet_
zsvsheet_buffer_write_cell_w_len(buffer, rows_read, i + rownum_column_offset, c.str, c.len);
}

// if we have a row filter, write it to a temp file
// later if needed this could be optimized in general and where the filtered data is small enough to not need
// indexing
if (row_filter) {
if (rows_read == 0 && uibuff != NULL && uibuff->temp_filename == NULL) {
uibuff->temp_filename = zsv_get_temp_filename("zsvsheet_filter_XXXXXXXX");
if (!uibuff->temp_filename)
; // to do: handle out-of-memory error
else {
struct zsv_csv_writer_options writer_opts = {0};
if (!(writer_opts.stream = temp_f = fopen(uibuff->temp_filename, "wb")))
; // to do: handle fopen error
else if (!(temp_file_writer = zsv_writer_new(&writer_opts)))
; // to do: handle zsv_writer_new error
else {
zsv_writer_set_temp_buff(temp_file_writer, temp_buff, sizeof(temp_buff));
}
}
}
if (temp_file_writer) {
for (size_t i = 0; i < col_count; i++) {
struct zsv_cell cell = zsv_get_cell(parser, i);
zsv_writer_cell(temp_file_writer, i == 0, cell.str, cell.len, cell.quoted);
}
}
}
rows_read++;
}
fclose(fp);
zsv_delete(parser);

if (temp_file_writer) { // finish writing the filtered data to temp file
// to do: do this in a separate thread
while (zsv_next_row(parser) == zsv_status_row) {
size_t col_count = zsv_cell_count(parser);
for (size_t i = 0; i < col_count; i++) {
struct zsv_cell cell = zsv_get_cell(parser, i);
zsv_writer_cell(temp_file_writer, i == 0, cell.str, cell.len, cell.quoted);
}
}
zsv_writer_delete(temp_file_writer);
}
if (temp_f)
fclose(temp_f);

if (uibuff) {
if (!uibuff->index_started) {
Expand Down Expand Up @@ -255,8 +227,7 @@ static void *get_data_index(void *gdi) {
pthread_mutex_t *mutexp = d->mutexp;
int *errp = d->errp;

struct zsvsheet_index *ix;
enum zsvsheet_index_status ix_status = build_memory_index(d, &ix);
enum zsvsheet_index_status ix_status = build_memory_index(d);

if (ix_status != zsvsheet_index_status_ok) {
pthread_mutex_lock(mutexp);
Expand All @@ -267,7 +238,6 @@ static void *get_data_index(void *gdi) {
}

pthread_mutex_lock(mutexp);
*d->index = ix;
*d->index_ready = 1;
free(d);
pthread_mutex_unlock(mutexp);
Expand Down

0 comments on commit 4f5ecac

Please sign in to comment.