Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion include/fluent-bit/wasm/flb_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_log_event.h>

#define FLB_WASM_DEFAULT_HEAP_SIZE 8192
#define FLB_WASM_DEFAULT_STACK_SIZE 8192

struct flb_wasm_config {
size_t heap_size;
size_t stack_size;
int stdinfd;
int stdoutfd;
int stderrfd;
};

/* WASM Context */
struct flb_wasm {
wasm_module_t module;
Expand All @@ -41,9 +52,11 @@ struct flb_wasm {
};

void flb_wasm_init(struct flb_config *config);
struct flb_wasm_config *flb_wasm_config_init(struct flb_config *config);
void flb_wasm_config_destroy(struct flb_wasm_config *wasm_config);
struct flb_wasm *flb_wasm_instantiate(struct flb_config *config, const char *wasm_path,
struct mk_list *acessible_dir_list,
int stdinfd, int stdoutfd, int stderrfd);
struct flb_wasm_config *wasm_config);

char *flb_wasm_call_function_format_json(struct flb_wasm *fw, const char *function_name,
const char* tag_data, size_t tag_len,
Expand Down
27 changes: 26 additions & 1 deletion plugins/filter_wasm/filter_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ static int cb_wasm_filter(const void *data, size_t bytes,
return FLB_FILTER_NOTOUCH;
}

wasm = flb_wasm_instantiate(config, ctx->wasm_path, ctx->accessible_dir_list, -1, -1, -1);
wasm = flb_wasm_instantiate(config, ctx->wasm_path, ctx->accessible_dir_list,
ctx->wasm_conf);
if (wasm == NULL) {
flb_plg_debug(ctx->ins, "instantiate wasm [%s] failed", ctx->wasm_path);
goto on_error;
Expand Down Expand Up @@ -330,6 +331,7 @@ static int cb_wasm_init(struct flb_filter_instance *f_ins,
struct flb_config *config, void *data)
{
struct flb_filter_wasm *ctx = NULL;
struct flb_wasm_config *wasm_conf = NULL;
int ret = -1;
const char *tmp;
int event_format;
Expand Down Expand Up @@ -363,6 +365,18 @@ static int cb_wasm_init(struct flb_filter_instance *f_ins,
}

flb_wasm_init(config);
wasm_conf = flb_wasm_config_init(config);
if (wasm_conf == NULL) {
goto init_error;
}
ctx->wasm_conf = wasm_conf;

if (ctx->wasm_heap_size > FLB_WASM_DEFAULT_HEAP_SIZE) {
wasm_conf->heap_size = ctx->wasm_heap_size;
}
if (ctx->wasm_stack_size > FLB_WASM_DEFAULT_STACK_SIZE) {
wasm_conf->stack_size = ctx->wasm_stack_size;
}

/* Set context */
flb_filter_set_context(f_ins, ctx);
Expand All @@ -378,6 +392,7 @@ static int cb_wasm_exit(void *data, struct flb_config *config)
{
struct flb_filter_wasm *ctx = data;

flb_wasm_config_destroy(ctx->wasm_conf);
flb_wasm_destroy_all(config);
delete_wasm_config(ctx);
return 0;
Expand Down Expand Up @@ -405,6 +420,16 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_filter_wasm, wasm_function_name),
"Set the function name in wasm to execute"
},
{
FLB_CONFIG_MAP_SIZE, "wasm_heap_size", DEFAULT_WASM_HEAP_SIZE,
0, FLB_TRUE, offsetof(struct flb_filter_wasm, wasm_heap_size),
"Set the heap size of wasm runtime"
},
{
FLB_CONFIG_MAP_SIZE, "wasm_stack_size", DEFAULT_WASM_STACK_SIZE,
0, FLB_TRUE, offsetof(struct flb_filter_wasm, wasm_stack_size),
"Set the stack size of wasm runtime"
},
/* EOF */
{0}
};
Expand Down
6 changes: 6 additions & 0 deletions plugins/filter_wasm/filter_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@ enum {
#define FLB_FMT_STR_JSON "json"
#define FLB_FMT_STR_MSGPACK "msgpack"

#define DEFAULT_WASM_HEAP_SIZE "8192"
#define DEFAULT_WASM_STACK_SIZE "8192"

struct flb_filter_wasm {
flb_sds_t wasm_path;
struct mk_list *accessible_dir_list; /* list of directories to be
* accesible from WASM */
flb_sds_t wasm_function_name;
int event_format;
size_t wasm_heap_size;
size_t wasm_stack_size;
struct flb_wasm_config *wasm_conf;
struct flb_filter_instance *ins;
struct flb_wasm *wasm;
};
Expand Down
35 changes: 34 additions & 1 deletion plugins/in_exec_wasi/in_exec_wasi.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ static int in_exec_wasi_collect(struct flb_input_instance *ins,
}
}

wasm = flb_wasm_instantiate(config, ctx->wasi_path, ctx->accessible_dir_list, -1, fileno(stdoutp), -1);
if (ctx->wasm_conf == NULL) {
flb_plg_error(ctx->ins, "wasm_conf cannot be NULL");
return -1;
}
ctx->wasm_conf->stdoutfd = fileno(stdoutp);

wasm = flb_wasm_instantiate(config, ctx->wasi_path, ctx->accessible_dir_list,
ctx->wasm_conf);
if (wasm == NULL) {
flb_plg_debug(ctx->ins, "instantiate wasm [%s] failed", ctx->wasi_path);
goto collect_end;
Expand Down Expand Up @@ -289,6 +296,7 @@ static int in_exec_wasi_init(struct flb_input_instance *in,
struct flb_config *config, void *data)
{
struct flb_exec_wasi *ctx = NULL;
struct flb_wasm_config *wasm_conf = NULL;
int ret = -1;

/* Allocate space for the configuration */
Expand Down Expand Up @@ -341,6 +349,20 @@ static int in_exec_wasi_init(struct flb_input_instance *in,
flb_plg_error(in, "could not set collector for exec input plugin");
goto init_error;
}

wasm_conf = flb_wasm_config_init(config);
if (wasm_conf == NULL) {
goto init_error;
}
ctx->wasm_conf = wasm_conf;

if (ctx->wasm_heap_size > FLB_WASM_DEFAULT_HEAP_SIZE) {
wasm_conf->heap_size = ctx->wasm_heap_size;
}
if (ctx->wasm_stack_size > FLB_WASM_DEFAULT_STACK_SIZE) {
wasm_conf->stack_size = ctx->wasm_stack_size;
}

ctx->coll_fd = ret;

return 0;
Expand Down Expand Up @@ -391,6 +413,7 @@ static int in_exec_wasi_exit(void *data, struct flb_config *config)
{
struct flb_exec_wasi *ctx = data;

flb_wasm_config_destroy(ctx->wasm_conf);
flb_wasm_destroy_all(config);
delete_exec_wasi_config(ctx);
return 0;
Expand Down Expand Up @@ -433,6 +456,16 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_exec_wasi, oneshot),
"execute the command only once"
},
{
FLB_CONFIG_MAP_SIZE, "wasm_heap_size", DEFAULT_WASM_HEAP_SIZE,
0, FLB_TRUE, offsetof(struct flb_exec_wasi, wasm_heap_size),
"Set the heap size of wasm runtime"
},
{
FLB_CONFIG_MAP_SIZE, "wasm_stack_size", DEFAULT_WASM_STACK_SIZE,
0, FLB_TRUE, offsetof(struct flb_exec_wasi, wasm_stack_size),
"Set the stack size of wasm runtime"
},
/* EOF */
{0}
};
Expand Down
6 changes: 6 additions & 0 deletions plugins/in_exec_wasi/in_exec_wasi.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
#define DEFAULT_INTERVAL_SEC "1"
#define DEFAULT_INTERVAL_NSEC "0"

#define DEFAULT_WASM_HEAP_SIZE "8192"
#define DEFAULT_WASM_STACK_SIZE "8192"

struct flb_exec_wasi {
flb_sds_t wasi_path;
struct mk_list *accessible_dir_list; /* list of directories to be
Expand All @@ -44,10 +47,13 @@ struct flb_exec_wasi {
size_t buf_size;
struct flb_input_instance *ins;
struct flb_wasm *wasm;
struct flb_wasm_config *wasm_conf;
int oneshot;
flb_pipefd_t ch_manager[2];
int interval_sec;
int interval_nsec;
size_t wasm_heap_size;
size_t wasm_stack_size;
struct flb_log_event_encoder log_encoder;
int coll_fd;
};
Expand Down
50 changes: 43 additions & 7 deletions src/wasm/flb_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ void flb_wasm_init(struct flb_config *config)
mk_list_init(&config->wasm_list);
}

struct flb_wasm_config *flb_wasm_config_init(struct flb_config *config)
{
struct flb_wasm_config *wasm_config;

wasm_config = flb_calloc(1, sizeof(struct flb_wasm_config));
if (!wasm_config) {
flb_errno();
return NULL;
}

wasm_config->heap_size = FLB_WASM_DEFAULT_HEAP_SIZE;
wasm_config->stack_size = FLB_WASM_DEFAULT_STACK_SIZE;
wasm_config->stdinfd = -1;
wasm_config->stdoutfd = -1;
wasm_config->stderrfd = -1;

return wasm_config;
}

void flb_wasm_config_destroy(struct flb_wasm_config *wasm_config)
{
if (wasm_config != NULL) {
flb_free(wasm_config);
}
}

static int flb_wasm_load_wasm_binary(const char *wasm_path, int8_t **out_buf, uint32_t *out_size)
{
char *buffer;
Expand Down Expand Up @@ -81,10 +107,10 @@ static int flb_wasm_load_wasm_binary(const char *wasm_path, int8_t **out_buf, ui

struct flb_wasm *flb_wasm_instantiate(struct flb_config *config, const char *wasm_path,
struct mk_list *accessible_dir_list,
int stdinfd, int stdoutfd, int stderrfd)
struct flb_wasm_config *wasm_config)
{
struct flb_wasm *fw;
uint32_t buf_size, stack_size = 8 * 1024, heap_size = 8 * 1024;
uint32_t buf_size;
int8_t *buffer = NULL;
char error_buf[128];
#if WASM_ENABLE_LIBC_WASI != 0
Expand All @@ -101,6 +127,14 @@ struct flb_wasm *flb_wasm_instantiate(struct flb_config *config, const char *was

RuntimeInitArgs wasm_args;

if (wasm_config->heap_size < FLB_WASM_DEFAULT_HEAP_SIZE) {
wasm_config->heap_size = FLB_WASM_DEFAULT_HEAP_SIZE;
}

if (wasm_config->stack_size < FLB_WASM_DEFAULT_STACK_SIZE) {
wasm_config->stack_size = FLB_WASM_DEFAULT_STACK_SIZE;
}

fw = flb_malloc(sizeof(struct flb_wasm));
if (!fw) {
flb_errno();
Expand Down Expand Up @@ -150,19 +184,21 @@ struct flb_wasm *flb_wasm_instantiate(struct flb_config *config, const char *was
#if WASM_ENABLE_LIBC_WASI != 0
wasm_runtime_set_wasi_args_ex(module, wasi_dir_list, accessible_dir_list_size, NULL, 0,
NULL, 0, NULL, 0,
(stdinfd != -1) ? stdinfd : STDIN_FILENO,
(stdoutfd != -1) ? stdoutfd : STDOUT_FILENO,
(stderrfd != -1) ? stderrfd : STDERR_FILENO);
(wasm_config->stdinfd != -1) ? wasm_config->stdinfd : STDIN_FILENO,
(wasm_config->stdoutfd != -1) ? wasm_config->stdoutfd : STDOUT_FILENO,
(wasm_config->stderrfd != -1) ? wasm_config->stderrfd : STDERR_FILENO);
#endif

module_inst = wasm_runtime_instantiate(module, stack_size, heap_size,
module_inst = wasm_runtime_instantiate(module,
wasm_config->stack_size,
wasm_config->heap_size,
error_buf, sizeof(error_buf));
if (!module_inst) {
flb_error("Instantiate wasm module failed. error: %s", error_buf);
goto error;
}

exec_env = wasm_runtime_create_exec_env(module_inst, stack_size);
exec_env = wasm_runtime_create_exec_env(module_inst, wasm_config->stack_size);
if (!exec_env) {
flb_error("Create wasm execution environment failed.");
goto error;
Expand Down