diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index e048479867c..44e93ce7e19 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -63,6 +63,13 @@ struct flb_config { flb_pipefd_t ch_manager[2]; /* channel to administrate fluent bit */ flb_pipefd_t ch_notif[2]; /* channel to receive notifications */ + /* Keeps track of output plugin threads */ + int os_workers_len; /* number of worker OS threads */ + pthread_t *os_workers; /* output worker tids */ + flb_pipefd_t *os_workers_ch[2]; /* channels to send data to output workers */ + struct mk_event_loop **os_workers_evl; /* event loop for output workers */ + struct mk_event *os_workers_event; /* event for output worker pipes */ + /* Channel event loop (just for ch_notif) */ struct mk_event_loop *ch_evl; diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 1b173991ddb..74da5f1e2b8 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -31,7 +31,6 @@ #include #include #include -#include #include #ifdef FLB_HAVE_METRICS @@ -333,7 +332,7 @@ struct flb_thread *flb_input_thread(struct flb_input_instance *i_ins, struct flb_thread *th; struct flb_input_thread *in_th; - th = flb_thread_new(sizeof(struct flb_input_thread), NULL); + th = flb_thread_new(sizeof(struct flb_input_thread), NULL, FLB_THREAD_RUN_MAIN_ONLY, config->ch_manager[1]); if (!th) { return NULL; } @@ -425,7 +424,6 @@ struct flb_thread *flb_input_thread_collect(struct flb_input_collector *coll, * will be returned instead. */ static inline void flb_input_return(struct flb_thread *th) { - int n; uint64_t val; struct flb_input_thread *in_th; @@ -441,10 +439,7 @@ static inline void flb_input_return(struct flb_thread *th) { * We put together the return value with the task_id on the 32 bits at right */ val = FLB_BITS_U64_SET(3 /* FLB_ENGINE_IN_THREAD */, in_th->id); - n = flb_pipe_w(in_th->config->ch_manager[1], (void *) &val, sizeof(val)); - if (n == -1) { - flb_errno(); - } + flb_thread_return(val, th); } static inline int flb_input_buf_paused(struct flb_input_instance *i) @@ -461,7 +456,7 @@ static inline void FLB_INPUT_RETURN() struct flb_thread *th; th = (struct flb_thread *) pthread_getspecific(flb_thread_key); flb_input_return(th); - flb_thread_return(th); + flb_thread_yield(th, FLB_TRUE); } static inline int flb_input_config_map_set(struct flb_input_instance *ins, diff --git a/include/fluent-bit/flb_oauth2.h b/include/fluent-bit/flb_oauth2.h index 48f96ed52ce..5f2b5f1049d 100644 --- a/include/fluent-bit/flb_oauth2.h +++ b/include/fluent-bit/flb_oauth2.h @@ -60,6 +60,7 @@ struct flb_oauth2 *flb_oauth2_create(struct flb_config *config, const char *auth_url, int expire_sec); void flb_oauth2_destroy(struct flb_oauth2 *ctx); int flb_oauth2_token_len(struct flb_oauth2 *ctx); +void flb_oauth2_payload_clear(struct flb_oauth2 *ctx); int flb_oauth2_payload_append(struct flb_oauth2 *ctx, const char *key_str, int key_len, const char *val_str, int val_len); diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 36fce9acf90..26b0098b6e8 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -49,10 +49,11 @@ #endif /* Output plugin masks */ -#define FLB_OUTPUT_NET 32 /* output address may set host and port */ +#define FLB_OUTPUT_NET 32 /* output address may set host and port */ #define FLB_OUTPUT_PLUGIN_CORE 0 #define FLB_OUTPUT_PLUGIN_PROXY 1 -#define FLB_OUTPUT_NO_MULTIPLEX 512 +#define FLB_OUTPUT_NO_MULTIPLEX 512 +#define FLB_OUTPUT_MULTITHREAD 1024 /* * Tests callbacks @@ -485,8 +486,12 @@ struct flb_thread *flb_output_thread(struct flb_task *task, struct flb_thread *th; /* Create a new thread */ - th = flb_thread_new(sizeof(struct flb_output_thread), - cb_output_thread_destroy); + int worker_id = FLB_THREAD_RUN_MAIN_ONLY; + if (o_ins->flags & FLB_OUTPUT_MULTITHREAD) { + worker_id = FLB_THREAD_RUN_ANYWHERE; + } + + th = flb_thread_new(sizeof(struct flb_output_thread), cb_output_thread_destroy, worker_id, o_ins->ch_events[1]); if (!th) { return NULL; } @@ -543,7 +548,6 @@ struct flb_thread *flb_output_thread(struct flb_task *task, * a return value. The return value is either FLB_OK, FLB_RETRY or FLB_ERROR. */ static inline void flb_output_return(int ret, struct flb_thread *th) { - int n; uint32_t set; uint64_t val; struct flb_task *task; @@ -566,32 +570,7 @@ static inline void flb_output_return(int ret, struct flb_thread *th) { */ set = FLB_TASK_SET(ret, task->id, out_th->id); val = FLB_BITS_U64_SET(2 /* FLB_ENGINE_TASK */, set); - - n = flb_pipe_w(out_th->o_ins->ch_events[1], (void *) &val, sizeof(val)); - if (n == -1) { - flb_errno(); - } - -#ifdef FLB_HAVE_METRICS - if (out_th->o_ins->metrics) { - if (ret == FLB_OK) { - records = task->records; - flb_metrics_sum(FLB_METRIC_OUT_OK_RECORDS, records, - out_th->o_ins->metrics); - flb_metrics_sum(FLB_METRIC_OUT_OK_BYTES, task->size, - out_th->o_ins->metrics); - } - else if (ret == FLB_ERROR) { - flb_metrics_sum(FLB_METRIC_OUT_ERROR, 1, out_th->o_ins->metrics); - } - else if (ret == FLB_RETRY) { - /* - * Counting retries is happening in the event loop/scheduler side - * since it also needs to count if some retry fails to re-schedule. - */ - } - } -#endif + flb_thread_return(val, th); } static inline void flb_output_return_do(int x) diff --git a/include/fluent-bit/flb_thread_libco.h b/include/fluent-bit/flb_thread_libco.h index d38cc193b57..ee7bc6c8177 100644 --- a/include/fluent-bit/flb_thread_libco.h +++ b/include/fluent-bit/flb_thread_libco.h @@ -36,6 +36,8 @@ #include #endif +#define FLB_THREAD_RUN_ANYWHERE (-1) +#define FLB_THREAD_RUN_MAIN_ONLY (-2) struct flb_thread { #ifdef FLB_HAVE_VALGRIND @@ -46,6 +48,25 @@ struct flb_thread { cothread_t caller; cothread_t callee; + /* Worker thread id to run this thread on. + * FLB_THREAD_RUN_ANYWHERE allows the thread to run on any worker + * FLB_THREAD_RUN_MAIN_ONLY requires the thread to run in the main thread + * A non-negative value corresponds to the worker id of the worker + * that can run this thread. + */ + int desired_worker_id; + + /* Did this coroutine return? */ + int returned; + uint64_t return_val; + + /* Is this coroutine able to be scheduled? + * Prevents scheduling a thread multiple times. */ + char scheduled; + + /* Channel this thread uses for returning */ + flb_pipefd_t ret_channel; + void *data; /* @@ -88,28 +109,40 @@ static FLB_INLINE void flb_thread_destroy(struct flb_thread *th) flb_free(th); } -#define flb_thread_return(th) co_switch(th->caller) static FLB_INLINE void flb_thread_resume(struct flb_thread *th) { pthread_setspecific(flb_thread_key, (void *) th); - /* - * In the past we used to have a flag to mark when a coroutine - * has finished (th->ended == MK_TRUE), now we let the coroutine - * to submit an event to the event loop indicating what's going on - * through the call FLB_OUTPUT_RETURN(...). - * - * So we just swap context and let the event loop to handle all - * the cleanup required. - */ + if(th->returned) { + flb_error("[thread] running thread=%p that already returned", th); + return; + } th->caller = co_active(); co_switch(th->callee); + + if(th->returned) { + int n = flb_pipe_w(th->ret_channel, (void *) &th->return_val, sizeof(th->return_val)); + if (n == -1) { + flb_errno(); + } + } else { + __atomic_clear(&th->scheduled, __ATOMIC_SEQ_CST); + } + +} + + +static FLB_INLINE void flb_thread_return(uint64_t val, struct flb_thread *th) +{ + th->returned = 1; + th->return_val = val; + } static FLB_INLINE struct flb_thread *flb_thread_new(size_t data_size, - void (*cb_destroy) (void *)) + void (*cb_destroy) (void *), int desired_worker_id, flb_pipefd_t ret_channel) { void *p; @@ -124,6 +157,11 @@ static FLB_INLINE struct flb_thread *flb_thread_new(size_t data_size, th = (struct flb_thread *) p; th->cb_destroy = NULL; + th->desired_worker_id = desired_worker_id; + th->returned = 0; + th->return_val = (uint64_t) 0; + __atomic_clear(&th->scheduled, __ATOMIC_SEQ_CST); + th->ret_channel = ret_channel; flb_trace("[thread %p] created (custom data at %p, size=%lu", th, FLB_THREAD_DATA(th), data_size); diff --git a/include/fluent-bit/flb_upstream.h b/include/fluent-bit/flb_upstream.h index 5246aa538e6..056eb24789e 100644 --- a/include/fluent-bit/flb_upstream.h +++ b/include/fluent-bit/flb_upstream.h @@ -86,6 +86,8 @@ struct flb_upstream { struct mk_list destroy_queue; + pthread_mutex_t connection_pool_mutex; + #ifdef FLB_HAVE_TLS /* context with mbedTLS data to handle certificates and keys */ struct flb_tls *tls; diff --git a/lib/mbedtls-2.24.0/include/mbedtls/config.h b/lib/mbedtls-2.24.0/include/mbedtls/config.h index 1e6e052756d..8f22571b611 100644 --- a/lib/mbedtls-2.24.0/include/mbedtls/config.h +++ b/lib/mbedtls-2.24.0/include/mbedtls/config.h @@ -1982,7 +1982,7 @@ * * Uncomment this to enable pthread mutexes. */ -//#define MBEDTLS_THREADING_PTHREAD +#define MBEDTLS_THREADING_PTHREAD /** * \def MBEDTLS_USE_PSA_CRYPTO @@ -3286,7 +3286,7 @@ * * Enable this layer to allow use of mutexes within mbed TLS */ -//#define MBEDTLS_THREADING_C +#define MBEDTLS_THREADING_C /** * \def MBEDTLS_TIMING_C diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index e2db6f3deb8..b68296cc288 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -38,6 +38,9 @@ #include "stackdriver_helper.h" #include #include +#include + + /* * Base64 Encoding in JWT must: @@ -202,12 +205,7 @@ static int get_oauth2_token(struct flb_stackdriver *ctx) time_t expires; char payload[1024]; - /* Create oauth2 context */ - ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000); - if (!ctx->o) { - flb_plg_error(ctx->ins, "cannot create oauth2 context"); - return -1; - } + flb_oauth2_payload_clear(ctx->o); /* In case of using metadata server, fetch token from there */ if (ctx->metadata_server_auth) { @@ -263,23 +261,32 @@ static int get_oauth2_token(struct flb_stackdriver *ctx) return 0; } +static pthread_mutex_t token_mutex = PTHREAD_MUTEX_INITIALIZER; static char *get_google_token(struct flb_stackdriver *ctx) { int ret = 0; + char* output = NULL; - if (!ctx->o) { - ret = get_oauth2_token(ctx); + if (pthread_mutex_lock(&token_mutex)){ + flb_plg_error(ctx->ins, "error locking mutex"); + return NULL; } - else if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { - flb_oauth2_destroy(ctx->o); + + + if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { ret = get_oauth2_token(ctx); } - if (ret != 0) { + if (ret == 0) { + output = ctx->o->access_token; + } + + if (pthread_mutex_unlock(&token_mutex)){ + flb_plg_error(ctx->ins, "error locking mutex"); return NULL; } - return ctx->o->access_token; + return output; } static bool validate_msgpack_unpacked_data(msgpack_object root) @@ -869,6 +876,13 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, return -1; } + /* Create oauth2 context */ + ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000); + if (!ctx->o) { + flb_plg_error(ctx->ins, "cannot create oauth2 context"); + return -1; + } + /* Metadata Upstream Sync flags */ ctx->metadata_u->flags &= ~FLB_IO_ASYNC; @@ -1867,6 +1881,7 @@ static void set_authorization_header(struct flb_http_client *c, flb_http_add_header(c, "Authorization", 13, header, len); } + static void cb_stackdriver_flush(const void *data, size_t bytes, const char *tag, int tag_len, struct flb_input_instance *i_ins, @@ -1988,5 +2003,5 @@ struct flb_output_plugin out_stackdriver_plugin = { .test_formatter.callback = stackdriver_format, /* Plugin flags */ - .flags = FLB_OUTPUT_NET | FLB_IO_TLS, + .flags = FLB_OUTPUT_NET | FLB_IO_TLS | FLB_OUTPUT_MULTITHREAD, }; diff --git a/src/flb_engine.c b/src/flb_engine.c index d0f282e55d3..6055003357f 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -105,6 +106,7 @@ static inline int handle_output_event(flb_pipefd_t fd, struct flb_config *config int thread_id; int retries; int retry_seconds; + int records; uint32_t type; uint32_t key; uint64_t val; @@ -161,6 +163,14 @@ static inline int handle_output_event(flb_pipefd_t fd, struct flb_config *config /* A thread has finished, delete it */ if (ret == FLB_OK) { + +#ifdef FLB_HAVE_METRICS + records = task->records; + flb_metrics_sum(FLB_METRIC_OUT_OK_RECORDS, records, + out_th->o_ins->metrics); + flb_metrics_sum(FLB_METRIC_OUT_OK_BYTES, task->size, + out_th->o_ins->metrics); +#endif /* Inform the user if a 'retry' succedeed */ if (mk_list_size(&task->retries) > 0) { retries = flb_task_retry_count(task, out_th->parent); @@ -257,6 +267,9 @@ static inline int handle_output_event(flb_pipefd_t fd, struct flb_config *config } } else if (ret == FLB_ERROR) { +#ifdef FLB_HAVE_METRICS + flb_metrics_sum(FLB_METRIC_OUT_ERROR, 1, out_th->o_ins->metrics); +#endif flb_output_thread_destroy_id(thread_id, task); if (task->users == 0 && mk_list_size(&task->retries) == 0) { flb_task_destroy(task, FLB_TRUE); @@ -416,13 +429,119 @@ static int flb_engine_log_start(struct flb_config *config) return 0; } -int flb_engine_start(struct flb_config *config) +struct flb_engine_worker_argument { + struct flb_config *config; + int worker_id; +}; + +void flb_engine_worker(void *arguments_struct) +{ + int ret; + char *worker_name = "flb-engine-worker"; + struct mk_event *event; + + // Unpack arguments and free struct containing them + struct flb_config *config = ((struct flb_engine_worker_argument*) arguments_struct)->config; + int worker_id = ((struct flb_engine_worker_argument*) arguments_struct)->worker_id; + struct mk_event_loop *evl = config->os_workers_evl[worker_id]; + flb_pipefd_t channel = config->os_workers_ch[0][worker_id]; + flb_free(arguments_struct); + + mk_utils_worker_rename(worker_name); + + + // Sit in the event loop waiting for things to happen + while (1) { + mk_event_wait(evl); + mk_event_foreach(event, evl) { + if (event->type == FLB_ENGINE_EV_THREAD) { + struct flb_upstream_conn *u_conn; + struct flb_thread *th; + + /* + * Check if we have some co-routine associated to this event, + * if so, resume the co-routine + */ + u_conn = (struct flb_upstream_conn *) event; + th = u_conn->thread; + if (th) { + flb_trace("[%s-%d] resuming thread=%p\n", worker_name, worker_id, th); + flb_thread_resume(th); + } + } else { + // Running a thread + struct flb_thread *th; + ret = flb_pipe_r(channel, &th, sizeof(th)); + if (ret == 0){ + // Pipe closed, exit and clean up + // Todo... + return; + } else if (ret < 0){ + flb_errno(); + flb_engine_exit(config); + return; + } + flb_trace("[%s-%d] resuming thread=%p from pipe\n", worker_name, worker_id, th); + flb_thread_resume(th); + } + } + } +} + + +int flb_engine_start_workers(struct flb_config *config) +{ + int ret; + config->os_workers_len = 4; + if (config->os_workers_len <= 0) { + return 0; + } + + config->os_workers = flb_malloc(sizeof(pthread_t) * config->os_workers_len); + config->os_workers_ch[0] = flb_malloc(sizeof(flb_pipefd_t) * config->os_workers_len); + config->os_workers_ch[1] = flb_malloc(sizeof(flb_pipefd_t) * config->os_workers_len); + config->os_workers_evl = flb_malloc(sizeof(struct mk_event_loop *) * config->os_workers_len); + config->os_workers_event = flb_malloc(sizeof(struct mk_event) * config->os_workers_len); + if (config->os_workers == NULL || config->os_workers_ch[0] == NULL || config->os_workers_ch[1] == NULL || config->os_workers_evl == NULL || config->os_workers_event == NULL){ + return -1; + } + + for (int i=0; i < config->os_workers_len; i++) { + config->os_workers_evl[i] = mk_event_loop_create(256); + if (!config->os_workers_evl[i]) { + return -1; + } + ret = mk_event_channel_create(config->os_workers_evl[i], + &config->os_workers_ch[0][i], + &config->os_workers_ch[1][i], + &config->os_workers_event[i]); + if (ret != 0) { + return -1; + } + + struct flb_engine_worker_argument *arg = flb_malloc(sizeof(struct flb_engine_worker_argument)); + if (arg == NULL){ + return -1; + } + arg->worker_id = i; + arg->config = config;; + ret = mk_utils_worker_spawn(flb_engine_worker, arg, &config->os_workers[i]); + if (ret == -1) { + return -1; + } + + } + return 0; +} + + int flb_engine_start(struct flb_config *config) { int ret; char tmp[16]; struct flb_time t_flush; struct mk_event *event; struct mk_event_loop *evl; + int next_out_thread = 0; /* HTTP Server */ #ifdef FLB_HAVE_HTTP @@ -431,6 +550,12 @@ int flb_engine_start(struct flb_config *config) } #endif + /* Start output plugin worker threads */ + ret = flb_engine_start_workers(config); + if (ret == -1){ + return -1; + } + /* Create the event loop and set it in the global configuration */ evl = mk_event_loop_create(256); if (!evl) { @@ -452,6 +577,7 @@ int flb_engine_start(struct flb_config *config) flb_debug("[engine] coroutine stack size: %u bytes (%s)", config->coro_stack_size, tmp); + /* * Create a communication channel: this routine creates a channel to * signal the Engine event loop. It's useful to stop the event loop @@ -510,7 +636,7 @@ int flb_engine_start(struct flb_config *config) flb_utils_error(FLB_ERR_CFG_FLUSH_CREATE); } - /* Initialize the scheduler */ + /* Initialize the scheduler*/ ret = flb_sched_init(config); if (ret == -1) { flb_error("[engine] scheduler could not start"); @@ -642,8 +768,40 @@ int flb_engine_start(struct flb_config *config) u_conn = (struct flb_upstream_conn *) event; th = u_conn->thread; if (th) { - flb_trace("[engine] resuming thread=%p", th); - flb_thread_resume(th); + if (__atomic_test_and_set (&th->scheduled, __ATOMIC_SEQ_CST)){ + flb_trace("[engine] not resuming thread=%p as it is already scheduled", th); + continue; + } + if (config->os_workers_len == 0 || th->desired_worker_id == FLB_THREAD_RUN_MAIN_ONLY){ + flb_trace("[engine] resuming thread=%p on main thread", th); + flb_thread_resume(th); + } else if (th->desired_worker_id >= 0) { + flb_trace("[engine] resuming thread=%p on worker %d", th, th->desired_worker); + ret = flb_pipe_w(config->os_workers_ch[1][th->desired_worker_id], &th, sizeof(struct flb_thread *)); + if (ret == -1) { + flb_errno(); + flb_error("[engine] cannot send work to worker %d but thread %p can only be scheduled there", th->desired_worker_id, th); + } else if (ret == 0){ + flb_error("[engine] cannot send work to worker %d (EOF) but thread %p can only be scheduled there", th->desired_worker_id, th); + } + + } else { + for (int i=0; i < config->os_workers_len; i++){ + flb_trace("[engine] resuming thread=%p on worker %d (attempt %d)", th, next_out_thread, i); + ret = flb_pipe_w(config->os_workers_ch[1][next_out_thread], &th, sizeof(struct flb_thread *)); + if (ret == -1) { + flb_errno(); + flb_error("[engine] cannot send work to worker %d", next_out_thread); + } else if (ret == 0){ + flb_error("[engine] cannot send work to worker %d (EOF)", next_out_thread); + } + next_out_thread = (next_out_thread + 1 ) % config->os_workers_len; + } + if (ret <= 0){ + flb_error("[engine] cannot talk to any worker threads"); + flb_engine_exit(config); + } + } } } else if (event->type == FLB_ENGINE_EV_OUTPUT) { diff --git a/src/flb_io_tls.c b/src/flb_io_tls.c index 7313866d138..fc84d16a92c 100644 --- a/src/flb_io_tls.c +++ b/src/flb_io_tls.c @@ -41,6 +41,23 @@ #define FLB_TLS_CLIENT "Fluent Bit" + +#include + +#define LOCK_OR_RETURN(mutex, ret) { \ + if (pthread_mutex_lock(mutex)){ \ + flb_error("error locking mutex"); \ + return ret; \ + } \ + } +#define UNLOCK_OR_RETURN(mutex, ret) { \ + if (pthread_mutex_unlock(mutex)){ \ + flb_error("error locking mutex"); \ + return ret; \ + } \ + } + + #define io_tls_error(ret) _io_tls_error(ret, __FILE__, __LINE__) void _io_tls_error(int ret, char *file, int line) @@ -329,10 +346,12 @@ int net_io_tls_handshake(void *_u_conn, void *_th) struct flb_upstream *u = u_conn->u; struct flb_thread *th = _th; + LOCK_OR_RETURN(&u->connection_pool_mutex, -1); session = flb_tls_session_new(u->tls->context); if (!session) { flb_error("[io_tls] could not create TLS session for %s:%i", u->tcp_host, u->tcp_port); + UNLOCK_OR_RETURN(&u->connection_pool_mutex, -1); return -1; } if (!u->tls->context->vhost) { @@ -350,6 +369,7 @@ int net_io_tls_handshake(void *_u_conn, void *_th) mbedtls_ssl_set_bio(&session->ssl, &u_conn->tls_net_context, mbedtls_net_send, mbedtls_net_recv, NULL); + UNLOCK_OR_RETURN(&u->connection_pool_mutex, -1); retry_handshake: ret = mbedtls_ssl_handshake(&session->ssl); diff --git a/src/flb_oauth2.c b/src/flb_oauth2.c index f088208e940..79ab85de3b1 100644 --- a/src/flb_oauth2.c +++ b/src/flb_oauth2.c @@ -240,6 +240,13 @@ struct flb_oauth2 *flb_oauth2_create(struct flb_config *config, return NULL; } + +/* Clear the current payload */ +void flb_oauth2_payload_clear(struct flb_oauth2 *ctx) +{ + ctx->payload[0] = '\0'; +} + /* Append a key/value to the request body */ int flb_oauth2_payload_append(struct flb_oauth2 *ctx, const char *key_str, int key_len, diff --git a/src/flb_upstream.c b/src/flb_upstream.c index cce908085da..7333b664aab 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -29,6 +29,21 @@ #include #include +#include + +#define LOCK_OR_RETURN(mutex, ret) { \ + if (pthread_mutex_lock(mutex)){ \ + flb_error("error locking mutex"); \ + return ret; \ + } \ + } +#define UNLOCK_OR_RETURN(mutex, ret) { \ + if (pthread_mutex_unlock(mutex)){ \ + flb_error("error locking mutex"); \ + return ret; \ + } \ + } + /* Config map for Upstream networking setup */ struct flb_config_map upstream_net[] = { { @@ -140,6 +155,8 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config, u->n_connections = 0; u->flags |= FLB_IO_ASYNC; + pthread_mutex_init(&u->connection_pool_mutex, NULL); + mk_list_init(&u->av_queue); mk_list_init(&u->busy_queue); mk_list_init(&u->destroy_queue); @@ -300,8 +317,10 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u) MK_EVENT_ZERO(&conn->event); + LOCK_OR_RETURN(&u->connection_pool_mutex, NULL); /* Link new connection to the busy queue */ mk_list_add(&conn->_head, &u->busy_queue); + UNLOCK_OR_RETURN(&u->connection_pool_mutex, NULL); /* Increase counter */ u->n_connections++; @@ -353,6 +372,7 @@ int flb_upstream_destroy(struct flb_upstream *u) flb_free(u->proxy_username); flb_free(u->proxy_password); mk_list_del(&u->_head); + pthread_mutex_destroy(&u->connection_pool_mutex); flb_free(u); return 0; @@ -386,8 +406,10 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u) u->net.keepalive ? "enabled": "disabled", u->net.keepalive_idle_timeout); + LOCK_OR_RETURN(&u->connection_pool_mutex, NULL); /* On non Keepalive mode, always create a new TCP connection */ if (u->net.keepalive == FLB_FALSE) { + UNLOCK_OR_RETURN(&u->connection_pool_mutex, NULL); return create_conn(u); } @@ -429,14 +451,17 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u) * * So just return the connection context. */ + UNLOCK_OR_RETURN(&u->connection_pool_mutex, NULL); return conn; } /* No keepalive connection available, create a new one */ if (!conn) { - conn = create_conn(u); + UNLOCK_OR_RETURN(&u->connection_pool_mutex, NULL); + return create_conn(u); } + UNLOCK_OR_RETURN(&u->connection_pool_mutex, NULL); return conn; } @@ -447,13 +472,17 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u) static int cb_upstream_conn_ka_dropped(void *data) { struct flb_upstream_conn *conn; + int ret; conn = (struct flb_upstream_conn *) data; flb_debug("[upstream] KA connection #%i to %s:%i has been disconnected " "by the remote service", conn->fd, conn->u->tcp_host, conn->u->tcp_port); - return destroy_conn(conn); + LOCK_OR_RETURN(&conn->u->connection_pool_mutex, -1); + ret = destroy_conn(conn); + UNLOCK_OR_RETURN(&conn->u->connection_pool_mutex, -1); + return ret; } int flb_upstream_conn_release(struct flb_upstream_conn *conn) @@ -464,6 +493,8 @@ int flb_upstream_conn_release(struct flb_upstream_conn *conn) /* Upstream context */ u = conn->u; + LOCK_OR_RETURN(&u->connection_pool_mutex, -1); + /* If this is a valid KA connection just recycle */ if (conn->u->net.keepalive == FLB_TRUE && conn->recycle == FLB_TRUE && conn->fd > -1) { /* @@ -489,7 +520,10 @@ int flb_upstream_conn_release(struct flb_upstream_conn *conn) flb_debug("[upstream] KA connection #%i to %s:%i could not be " "registered, closing.", conn->fd, conn->u->tcp_host, conn->u->tcp_port); - return destroy_conn(conn); + ret = destroy_conn(conn); + + UNLOCK_OR_RETURN(&u->connection_pool_mutex, -1); + return ret; } flb_debug("[upstream] KA connection #%i to %s:%i is now available", @@ -499,14 +533,23 @@ int flb_upstream_conn_release(struct flb_upstream_conn *conn) /* if we have exceeded our max number of uses of this connection, destroy it */ if (conn->u->net.keepalive_max_recycle > 0 && conn->ka_count > conn->u->net.keepalive_max_recycle) { flb_debug("[upstream] KA count %i exceeded configured limit of %i: closing.", conn->ka_count, conn->u->net.keepalive_max_recycle); - return destroy_conn(conn); + ret = destroy_conn(conn); + UNLOCK_OR_RETURN(&u->connection_pool_mutex, -1); + return ret; } + UNLOCK_OR_RETURN(&u->connection_pool_mutex, -1); + return 0; } /* No keepalive connections must be destroyed */ - return destroy_conn(conn); + ret = destroy_conn(conn); + + + UNLOCK_OR_RETURN(&u->connection_pool_mutex, -1); + return ret; + } int flb_upstream_conn_timeouts(struct flb_config *ctx) @@ -520,10 +563,13 @@ int flb_upstream_conn_timeouts(struct flb_config *ctx) now = time(NULL); + /* Iterate all upstream contexts */ mk_list_foreach(head, &ctx->upstreams) { u = mk_list_entry(head, struct flb_upstream, _head); + LOCK_OR_RETURN(&u->connection_pool_mutex, -1); + /* Iterate every busy connection */ mk_list_foreach(u_head, &u->busy_queue) { u_conn = mk_list_entry(u_head, struct flb_upstream_conn, _head); @@ -563,7 +609,7 @@ int flb_upstream_conn_timeouts(struct flb_config *ctx) u_conn->fd, u->tcp_host, u->tcp_port); } } - + UNLOCK_OR_RETURN(&u->connection_pool_mutex, -1); } return 0;