Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_downstream.h>
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_network.h>
#include <msgpack.h>

Expand Down Expand Up @@ -153,6 +155,92 @@ static int in_fw_collect(struct flb_input_instance *ins,
return 0;
}

static void delete_users(struct flb_in_fw_config *ctx)
{
struct mk_list *tmp;
struct mk_list *head;
struct flb_in_fw_user *user;

mk_list_foreach_safe(head, tmp, &ctx->users) {
user = mk_list_entry(head, struct flb_in_fw_user, _head);
flb_sds_destroy(user->name);
flb_sds_destroy(user->password);
mk_list_del(&user->_head);
flb_free(user);
}
}

static int setup_users(struct flb_in_fw_config *ctx,
struct flb_input_instance *ins)
{
flb_sds_t tmp;
struct mk_list *head;
struct mk_list *split;
struct flb_split_entry *sentry;
struct flb_kv *kv;
struct flb_in_fw_user *user;

/* Iterate all input properties */
mk_list_foreach(head, &ins->properties) {
kv = mk_list_entry(head, struct flb_kv, _head);

/* Create a new user */
user = flb_malloc(sizeof(struct flb_in_fw_user));
if (!user) {
flb_errno();
return -1;
}

/* Get the type */
if (strcasecmp(kv->key, "security.users") != 0) {
/* Other property. Skip */
flb_free(user);
continue;
}

/* As a value we expect a pair of a username and a passowrd */
split = flb_utils_split(kv->val, ' ', 1);
if (mk_list_size(split) != 2) {
flb_plg_error(ctx->ins,
"invalid value, expected username and password");
delete_users(ctx);
flb_free(user);
flb_utils_split_free(split);
return -1;
}

/* Get first value (user's name) */
sentry = mk_list_entry_first(split, struct flb_split_entry, _head);
tmp = flb_sds_create_len(sentry->value, sentry->len + 1);
if (tmp == NULL) {
delete_users(ctx);
flb_free(user);
flb_utils_split_free(split);
return -1;
}
user->name = tmp;

/* Get remaining content (password) */
sentry = mk_list_entry_last(split, struct flb_split_entry, _head);
tmp = flb_sds_create_len(sentry->value, sentry->len);
if (tmp == NULL) {
delete_users(ctx);
flb_free(user);
flb_utils_split_free(split);
return -1;
}
user->password = tmp;

/* Release split */
flb_utils_split_free(split);

/* Link to parent list */
mk_list_add(&user->_head, &ctx->users);
}

return 0;
}

/* Initialize plugin */
static int in_fw_init(struct flb_input_instance *ins,
struct flb_config *config, void *data)
Expand All @@ -172,6 +260,7 @@ static int in_fw_init(struct flb_input_instance *ins,
ctx->coll_fd = -1;
ctx->ins = ins;
mk_list_init(&ctx->connections);
mk_list_init(&ctx->users);

/* Set the context */
flb_input_set_context(ins, ctx);
Expand Down Expand Up @@ -229,6 +318,13 @@ static int in_fw_init(struct flb_input_instance *ins,
}
}

/* Load users */
ret = setup_users(ctx, ins);
if (ret == -1) {
flb_free(ctx);
return -1;
}

flb_input_downstream_set(ctx->downstream, ctx->ins);

flb_net_socket_nonblocking(ctx->downstream->server_fd);
Expand Down Expand Up @@ -275,6 +371,7 @@ static int in_fw_exit(void *data, struct flb_config *config)
return 0;
}

delete_users(ctx);
fw_conn_del_all(ctx);
fw_config_destroy(ctx);
return 0;
Expand All @@ -287,6 +384,21 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_fw_config, tag_prefix),
"Prefix incoming tag with the defined value."
},
{
FLB_CONFIG_MAP_STR, "shared_key", NULL,
0, FLB_FALSE, 0,
"Shared key for authentication"
},
{
FLB_CONFIG_MAP_STR, "self_hostname", NULL,
0, FLB_FALSE, 0,
"Hostname"
},
{
FLB_CONFIG_MAP_STR, "security.users", NULL,
FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
"Specify username and password pairs."
},
{
FLB_CONFIG_MAP_STR, "unix_path", NULL,
0, FLB_TRUE, offsetof(struct flb_in_fw_config, unix_path),
Expand Down
24 changes: 24 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

enum {
FW_HANDSHAKE_HELO = 1,
FW_HANDSHAKE_PINGPONG = 2,
FW_HANDSHAKE_ESTABLISHED = 3,
};

struct flb_in_fw_helo {
flb_sds_t nonce;
int nonce_len;
flb_sds_t salt;
int salt_len;
};

struct flb_in_fw_user {
flb_sds_t name;
flb_sds_t password;
struct mk_list _head;
};

struct flb_in_fw_config {
size_t buffer_max_size; /* Max Buffer size */
size_t buffer_chunk_size; /* Chunk allocation size */
Expand All @@ -40,6 +59,11 @@ struct flb_in_fw_config {
unsigned int unix_perm; /* Permission for socket */
flb_sds_t unix_perm_str; /* Permission (config map) */

/* secure forward */
flb_sds_t shared_key; /* shared key */
flb_sds_t self_hostname; /* hostname used in certificate */
struct mk_list users; /* username and password pairs */

int coll_fd;
struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* List of active connections */
Expand Down
21 changes: 21 additions & 0 deletions plugins/in_forward/fw_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ struct flb_in_fw_config *fw_config_init(struct flb_input_instance *i_ins)
flb_debug("[in_fw] Listen='%s' TCP_Port=%s",
config->listen, config->tcp_port);
}

/* Shared Key */
p = flb_input_get_property("shared_key", i_ins);
if (p) {
config->shared_key = flb_sds_create(p);
}
else {
config->shared_key = NULL;
}

/* Self Hostname */
p = flb_input_get_property("self_hostname", i_ins);
if (p) {
config->self_hostname = flb_sds_create(p);
}
else {
config->self_hostname = flb_sds_create("localhost");
}
return config;
}

Expand Down Expand Up @@ -114,6 +132,9 @@ int fw_config_destroy(struct flb_in_fw_config *config)
flb_free(config->tcp_port);
}

flb_sds_destroy(config->shared_key);
flb_sds_destroy(config->self_hostname);

flb_free(config);

return 0;
Expand Down
48 changes: 46 additions & 2 deletions plugins/in_forward/fw_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,24 @@ int fw_conn_event(void *data)

event = &connection->event;


if (event->mask & MK_EVENT_READ) {
if (conn->handshake_status == FW_HANDSHAKE_PINGPONG) {
flb_plg_trace(ctx->ins, "handshake status = %d", conn->handshake_status);

ret = fw_prot_secure_forward_handshake(ctx->ins, conn);
if (ret == -1) {
flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd);
fw_conn_del(conn);

return -1;
}

conn->handshake_status = FW_HANDSHAKE_ESTABLISHED;
return 0;
}

flb_plg_trace(ctx->ins, "handshake status = %d", conn->handshake_status);

available = (conn->buf_size - conn->buf_len);
if (available < 1) {
if (conn->buf_size >= ctx->buffer_max_size) {
Expand Down Expand Up @@ -116,6 +132,7 @@ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_
{
struct fw_conn *conn;
int ret;
struct flb_in_fw_helo *helo = NULL;

conn = flb_malloc(sizeof(struct fw_conn));
if (!conn) {
Expand All @@ -124,7 +141,25 @@ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_
return NULL;
}

conn->handshake_status = FW_HANDSHAKE_ESTABLISHED;
if (ctx->shared_key != NULL) {
conn->handshake_status = FW_HANDSHAKE_HELO;
helo = flb_malloc(sizeof(struct flb_in_fw_helo));
if (!helo) {
flb_errno();

return NULL;
}
ret = fw_prot_secure_forward_handshake_start(ctx->ins, connection, helo);
if (ret != 0) {
return NULL;
}

conn->handshake_status = FW_HANDSHAKE_PINGPONG;
}

conn->connection = connection;
conn->helo = helo;

/* Set data for the event-loop */
connection->user_data = conn;
Expand Down Expand Up @@ -178,6 +213,15 @@ int fw_conn_del(struct fw_conn *conn)
/* Release resources */
mk_list_del(&conn->_head);

if (conn->helo != NULL) {
if (conn->helo->nonce != NULL) {
flb_sds_destroy(conn->helo->nonce);
}
if (conn->helo->salt != NULL) {
flb_sds_destroy(conn->helo->salt);
}
flb_free(conn->helo);
}
flb_free(conn->buf);
flb_free(conn);

Expand All @@ -196,4 +240,4 @@ int fw_conn_del_all(struct flb_in_fw_config *ctx)
}

return 0;
}
}
7 changes: 7 additions & 0 deletions plugins/in_forward/fw_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#define FLB_IN_FW_CHUNK_SIZE "1024000" /* 1MB */
#define FLB_IN_FW_CHUNK_MAX_SIZE "6144000" /* =FLB_IN_FW_CHUNK_SIZE * 6. 6MB */
#define FLB_IN_FW_NONCE_SIZE 16
#define FLB_IN_FW_SALT_SIZE 16

enum {
FW_NEW = 1, /* it's a new connection */
Expand All @@ -33,16 +35,21 @@ struct fw_conn_stream {
size_t tag_len;
};

struct flb_in_fw_helo;

/* Respresents a connection */
struct fw_conn {
int status; /* Connection status */
int handshake_status; /* handshake status */

/* Buffer */
char *buf; /* Buffer data */
int buf_len; /* Data length */
int buf_size; /* Buffer size */
size_t rest; /* Unpacking offset */

struct flb_in_fw_helo *helo; /* secure forward HELO phase */

struct flb_input_instance *in; /* Parent plugin instance */
struct flb_in_fw_config *ctx; /* Plugin configuration context */
struct flb_connection *connection;
Expand Down
Loading