Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ struct flb_config {
flb_pipefd_t ch_manager[2]; /* channel to administrate fluent bit */
flb_pipefd_t ch_notif[2]; /* channel to receive notifications */

/* Keeps track of output plugin threads */
int os_workers_len; /* number of worker OS threads */
pthread_t *os_workers; /* output worker tids */
flb_pipefd_t *os_workers_ch[2]; /* channels to send data to output workers */
struct mk_event_loop **os_workers_evl; /* event loop for output workers */
struct mk_event *os_workers_event; /* event for output worker pipes */

/* Channel event loop (just for ch_notif) */
struct mk_event_loop *ch_evl;

Expand Down
11 changes: 3 additions & 8 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include <fluent-bit/flb_bits.h>
#include <fluent-bit/flb_pipe.h>
#include <fluent-bit/flb_filter.h>
#include <fluent-bit/flb_thread.h>
#include <fluent-bit/flb_mp.h>

#ifdef FLB_HAVE_METRICS
Expand Down Expand Up @@ -333,7 +332,7 @@ struct flb_thread *flb_input_thread(struct flb_input_instance *i_ins,
struct flb_thread *th;
struct flb_input_thread *in_th;

th = flb_thread_new(sizeof(struct flb_input_thread), NULL);
th = flb_thread_new(sizeof(struct flb_input_thread), NULL, FLB_THREAD_RUN_MAIN_ONLY, config->ch_manager[1]);
if (!th) {
return NULL;
}
Expand Down Expand Up @@ -425,7 +424,6 @@ struct flb_thread *flb_input_thread_collect(struct flb_input_collector *coll,
* will be returned instead.
*/
static inline void flb_input_return(struct flb_thread *th) {
int n;
uint64_t val;
struct flb_input_thread *in_th;

Expand All @@ -441,10 +439,7 @@ static inline void flb_input_return(struct flb_thread *th) {
* We put together the return value with the task_id on the 32 bits at right
*/
val = FLB_BITS_U64_SET(3 /* FLB_ENGINE_IN_THREAD */, in_th->id);
n = flb_pipe_w(in_th->config->ch_manager[1], (void *) &val, sizeof(val));
if (n == -1) {
flb_errno();
}
flb_thread_return(val, th);
}

static inline int flb_input_buf_paused(struct flb_input_instance *i)
Expand All @@ -461,7 +456,7 @@ static inline void FLB_INPUT_RETURN()
struct flb_thread *th;
th = (struct flb_thread *) pthread_getspecific(flb_thread_key);
flb_input_return(th);
flb_thread_return(th);
flb_thread_yield(th, FLB_TRUE);
}

static inline int flb_input_config_map_set(struct flb_input_instance *ins,
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_oauth2.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct flb_oauth2 *flb_oauth2_create(struct flb_config *config,
const char *auth_url, int expire_sec);
void flb_oauth2_destroy(struct flb_oauth2 *ctx);
int flb_oauth2_token_len(struct flb_oauth2 *ctx);
void flb_oauth2_payload_clear(struct flb_oauth2 *ctx);
int flb_oauth2_payload_append(struct flb_oauth2 *ctx,
const char *key_str, int key_len,
const char *val_str, int val_len);
Expand Down
41 changes: 10 additions & 31 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@
#endif

/* Output plugin masks */
#define FLB_OUTPUT_NET 32 /* output address may set host and port */
#define FLB_OUTPUT_NET 32 /* output address may set host and port */
#define FLB_OUTPUT_PLUGIN_CORE 0
#define FLB_OUTPUT_PLUGIN_PROXY 1
#define FLB_OUTPUT_NO_MULTIPLEX 512
#define FLB_OUTPUT_NO_MULTIPLEX 512
#define FLB_OUTPUT_MULTITHREAD 1024

/*
* Tests callbacks
Expand Down Expand Up @@ -485,8 +486,12 @@ struct flb_thread *flb_output_thread(struct flb_task *task,
struct flb_thread *th;

/* Create a new thread */
th = flb_thread_new(sizeof(struct flb_output_thread),
cb_output_thread_destroy);
int worker_id = FLB_THREAD_RUN_MAIN_ONLY;
if (o_ins->flags & FLB_OUTPUT_MULTITHREAD) {
worker_id = FLB_THREAD_RUN_ANYWHERE;
}

th = flb_thread_new(sizeof(struct flb_output_thread), cb_output_thread_destroy, worker_id, o_ins->ch_events[1]);
if (!th) {
return NULL;
}
Expand Down Expand Up @@ -543,7 +548,6 @@ struct flb_thread *flb_output_thread(struct flb_task *task,
* a return value. The return value is either FLB_OK, FLB_RETRY or FLB_ERROR.
*/
static inline void flb_output_return(int ret, struct flb_thread *th) {
int n;
uint32_t set;
uint64_t val;
struct flb_task *task;
Expand All @@ -566,32 +570,7 @@ static inline void flb_output_return(int ret, struct flb_thread *th) {
*/
set = FLB_TASK_SET(ret, task->id, out_th->id);
val = FLB_BITS_U64_SET(2 /* FLB_ENGINE_TASK */, set);

n = flb_pipe_w(out_th->o_ins->ch_events[1], (void *) &val, sizeof(val));
if (n == -1) {
flb_errno();
}

#ifdef FLB_HAVE_METRICS
if (out_th->o_ins->metrics) {
if (ret == FLB_OK) {
records = task->records;
flb_metrics_sum(FLB_METRIC_OUT_OK_RECORDS, records,
out_th->o_ins->metrics);
flb_metrics_sum(FLB_METRIC_OUT_OK_BYTES, task->size,
out_th->o_ins->metrics);
}
else if (ret == FLB_ERROR) {
flb_metrics_sum(FLB_METRIC_OUT_ERROR, 1, out_th->o_ins->metrics);
}
else if (ret == FLB_RETRY) {
/*
* Counting retries is happening in the event loop/scheduler side
* since it also needs to count if some retry fails to re-schedule.
*/
}
}
#endif
flb_thread_return(val, th);
}

static inline void flb_output_return_do(int x)
Expand Down
60 changes: 49 additions & 11 deletions include/fluent-bit/flb_thread_libco.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <valgrind/valgrind.h>
#endif

#define FLB_THREAD_RUN_ANYWHERE (-1)
#define FLB_THREAD_RUN_MAIN_ONLY (-2)
struct flb_thread {

#ifdef FLB_HAVE_VALGRIND
Expand All @@ -46,6 +48,25 @@ struct flb_thread {
cothread_t caller;
cothread_t callee;

/* Worker thread id to run this thread on.
* FLB_THREAD_RUN_ANYWHERE allows the thread to run on any worker
* FLB_THREAD_RUN_MAIN_ONLY requires the thread to run in the main thread
* A non-negative value corresponds to the worker id of the worker
* that can run this thread.
*/
int desired_worker_id;

/* Did this coroutine return? */
int returned;
uint64_t return_val;

/* Is this coroutine able to be scheduled?
* Prevents scheduling a thread multiple times. */
char scheduled;

/* Channel this thread uses for returning */
flb_pipefd_t ret_channel;

void *data;

/*
Expand Down Expand Up @@ -88,28 +109,40 @@ static FLB_INLINE void flb_thread_destroy(struct flb_thread *th)
flb_free(th);
}

#define flb_thread_return(th) co_switch(th->caller)

static FLB_INLINE void flb_thread_resume(struct flb_thread *th)
{
pthread_setspecific(flb_thread_key, (void *) th);

/*
* In the past we used to have a flag to mark when a coroutine
* has finished (th->ended == MK_TRUE), now we let the coroutine
* to submit an event to the event loop indicating what's going on
* through the call FLB_OUTPUT_RETURN(...).
*
* So we just swap context and let the event loop to handle all
* the cleanup required.
*/
if(th->returned) {
flb_error("[thread] running thread=%p that already returned", th);
return;
}

th->caller = co_active();
co_switch(th->callee);

if(th->returned) {
int n = flb_pipe_w(th->ret_channel, (void *) &th->return_val, sizeof(th->return_val));
if (n == -1) {
flb_errno();
}
} else {
__atomic_clear(&th->scheduled, __ATOMIC_SEQ_CST);
}

}


static FLB_INLINE void flb_thread_return(uint64_t val, struct flb_thread *th)
{
th->returned = 1;
th->return_val = val;

}

static FLB_INLINE struct flb_thread *flb_thread_new(size_t data_size,
void (*cb_destroy) (void *))
void (*cb_destroy) (void *), int desired_worker_id, flb_pipefd_t ret_channel)

{
void *p;
Expand All @@ -124,6 +157,11 @@ static FLB_INLINE struct flb_thread *flb_thread_new(size_t data_size,

th = (struct flb_thread *) p;
th->cb_destroy = NULL;
th->desired_worker_id = desired_worker_id;
th->returned = 0;
th->return_val = (uint64_t) 0;
__atomic_clear(&th->scheduled, __ATOMIC_SEQ_CST);
th->ret_channel = ret_channel;

flb_trace("[thread %p] created (custom data at %p, size=%lu",
th, FLB_THREAD_DATA(th), data_size);
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ struct flb_upstream {

struct mk_list destroy_queue;

pthread_mutex_t connection_pool_mutex;

#ifdef FLB_HAVE_TLS
/* context with mbedTLS data to handle certificates and keys */
struct flb_tls *tls;
Expand Down
4 changes: 2 additions & 2 deletions lib/mbedtls-2.24.0/include/mbedtls/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,7 @@
*
* Uncomment this to enable pthread mutexes.
*/
//#define MBEDTLS_THREADING_PTHREAD
#define MBEDTLS_THREADING_PTHREAD

/**
* \def MBEDTLS_USE_PSA_CRYPTO
Expand Down Expand Up @@ -3286,7 +3286,7 @@
*
* Enable this layer to allow use of mutexes within mbed TLS
*/
//#define MBEDTLS_THREADING_C
#define MBEDTLS_THREADING_C

/**
* \def MBEDTLS_TIMING_C
Expand Down
41 changes: 28 additions & 13 deletions plugins/out_stackdriver/stackdriver.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
#include "stackdriver_helper.h"
#include <mbedtls/base64.h>
#include <mbedtls/sha256.h>
#include <pthread.h>



/*
* Base64 Encoding in JWT must:
Expand Down Expand Up @@ -202,12 +205,7 @@ static int get_oauth2_token(struct flb_stackdriver *ctx)
time_t expires;
char payload[1024];

/* Create oauth2 context */
ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000);
if (!ctx->o) {
flb_plg_error(ctx->ins, "cannot create oauth2 context");
return -1;
}
flb_oauth2_payload_clear(ctx->o);

/* In case of using metadata server, fetch token from there */
if (ctx->metadata_server_auth) {
Expand Down Expand Up @@ -263,23 +261,32 @@ static int get_oauth2_token(struct flb_stackdriver *ctx)
return 0;
}

static pthread_mutex_t token_mutex = PTHREAD_MUTEX_INITIALIZER;
static char *get_google_token(struct flb_stackdriver *ctx)
{
int ret = 0;
char* output = NULL;

if (!ctx->o) {
ret = get_oauth2_token(ctx);
if (pthread_mutex_lock(&token_mutex)){
flb_plg_error(ctx->ins, "error locking mutex");
return NULL;
}
else if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
flb_oauth2_destroy(ctx->o);


if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
ret = get_oauth2_token(ctx);
}

if (ret != 0) {
if (ret == 0) {
output = ctx->o->access_token;
}

if (pthread_mutex_unlock(&token_mutex)){
flb_plg_error(ctx->ins, "error locking mutex");
return NULL;
}

return ctx->o->access_token;
return output;
}

static bool validate_msgpack_unpacked_data(msgpack_object root)
Expand Down Expand Up @@ -869,6 +876,13 @@ static int cb_stackdriver_init(struct flb_output_instance *ins,
return -1;
}

/* Create oauth2 context */
ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000);
if (!ctx->o) {
flb_plg_error(ctx->ins, "cannot create oauth2 context");
return -1;
}

/* Metadata Upstream Sync flags */
ctx->metadata_u->flags &= ~FLB_IO_ASYNC;

Expand Down Expand Up @@ -1867,6 +1881,7 @@ static void set_authorization_header(struct flb_http_client *c,
flb_http_add_header(c, "Authorization", 13, header, len);
}


static void cb_stackdriver_flush(const void *data, size_t bytes,
const char *tag, int tag_len,
struct flb_input_instance *i_ins,
Expand Down Expand Up @@ -1988,5 +2003,5 @@ struct flb_output_plugin out_stackdriver_plugin = {
.test_formatter.callback = stackdriver_format,

/* Plugin flags */
.flags = FLB_OUTPUT_NET | FLB_IO_TLS,
.flags = FLB_OUTPUT_NET | FLB_IO_TLS | FLB_OUTPUT_MULTITHREAD,
};
Loading