Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_downstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ struct flb_downstream {

struct mk_list busy_queue;
struct mk_list destroy_queue;

/* this is a config map reference coming from the plugin net_setup field */
struct flb_net_setup *net_setup;
};

static inline int flb_downstream_is_shutting_down(struct flb_downstream *downstream)
Expand Down
9 changes: 9 additions & 0 deletions src/flb_downstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ struct flb_config_map downstream_net[] = {
"disabled, the timeout is logged as a debug message"
},

{
FLB_CONFIG_MAP_BOOL, "net.keepalive", "true",
0, FLB_TRUE, offsetof(struct flb_net_setup, keepalive),
"Enable or disable Keepalive support"
},

/* EOF */
{0}
};
Expand Down Expand Up @@ -107,6 +113,9 @@ int flb_downstream_setup(struct flb_downstream *stream,
return -1;
}

/* map the net_setup config map coming from the caller */
stream->net_setup = net_setup;

mk_list_init(&stream->busy_queue);
mk_list_init(&stream->destroy_queue);

Expand Down
124 changes: 71 additions & 53 deletions src/http_server/flb_http_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ int uncompress_gzip(char **output_buffer,

/* COMMON */

char *flb_http_server_convert_string_to_lowercase(char *input_buffer,
char *flb_http_server_convert_string_to_lowercase(char *input_buffer,
size_t length)
{
char *output_buffer;
Expand All @@ -75,9 +75,9 @@ char *flb_http_server_convert_string_to_lowercase(char *input_buffer,
}


int flb_http_server_strncasecmp(const uint8_t *first_buffer,
int flb_http_server_strncasecmp(const uint8_t *first_buffer,
size_t first_length,
const char *second_buffer,
const char *second_buffer,
size_t second_length)
{
const char *first_buffer_;
Expand All @@ -89,11 +89,11 @@ int flb_http_server_strncasecmp(const uint8_t *first_buffer,
if (first_length == 0) {
first_length = strlen(first_buffer_);
}

if (second_length == 0) {
second_length = strlen(second_buffer_);
}

if (first_length < second_length) {
return -1;
}
Expand Down Expand Up @@ -123,8 +123,8 @@ static int flb_http_server_session_read(struct flb_http_server_session *session)
return -1;
}

result = (ssize_t) flb_http_server_session_ingest(session,
input_buffer,
result = (ssize_t) flb_http_server_session_ingest(session,
input_buffer,
result);

if (result < 0) {
Expand Down Expand Up @@ -161,11 +161,11 @@ static int flb_http_server_session_write(struct flb_http_server_session *session
}

if (data_sent < data_length) {
memmove(session->outgoing_data,
&session->outgoing_data[data_sent],
memmove(session->outgoing_data,
&session->outgoing_data[data_sent],
data_length - data_sent);

cfl_sds_set_len(session->outgoing_data,
cfl_sds_set_len(session->outgoing_data,
data_length - data_sent);
}
else {
Expand Down Expand Up @@ -204,7 +204,7 @@ static int flb_http_server_inflate_request_body(
}

content_encoding_header_value = flb_http_request_get_header(
request,
request,
"content-encoding");

if (content_encoding_header_value == NULL) {
Expand Down Expand Up @@ -255,13 +255,13 @@ static int flb_http_server_inflate_request_body(

request->body = inflated_body;

snprintf(new_content_length,
sizeof(new_content_length),
snprintf(new_content_length,
sizeof(new_content_length),
"%zu",
output_size);

flb_http_request_unset_header(request, "content-encoding");
flb_http_request_set_header(request,
flb_http_request_set_header(request,
"content-length", strlen("content-length"),
new_content_length, strlen(new_content_length));

Expand All @@ -274,13 +274,24 @@ static int flb_http_server_inflate_request_body(
static int flb_http_server_should_connection_be_closed(
struct flb_http_request *request)
{
char *connection_header_value;
int keepalive = FLB_FALSE;
char *connection_header_value;
struct flb_http_server_session *parent_session;
struct flb_http_server *server;
struct flb_downstream *downstream;

parent_session = (struct flb_http_server_session *) request->stream->parent;

server = parent_session->parent;
downstream = server->downstream;

/*
* user config overrides any protocol defaults, this is set
* with the option 'net.keepalive: off`
*/
if (!downstream->net_setup->keepalive) {
return FLB_TRUE;
}

/* Version behaviors implemented in the following block :
* HTTP/0.9 keep-alive is opt-in
Expand All @@ -290,27 +301,34 @@ static int flb_http_server_should_connection_be_closed(
*/

if (request->protocol_version < HTTP_PROTOCOL_VERSION_20) {
if ((server->flags & FLB_HTTP_SERVER_FLAG_KEEPALIVE) == 0) {
return FLB_TRUE;
}
else {
connection_header_value = flb_http_request_get_header(request,
"connection");
/* HTTP/2 always keeps the connection open */
return FLB_FALSE;
}

if (connection_header_value == NULL) {
if (request->protocol_version < HTTP_PROTOCOL_VERSION_11) {
return FLB_TRUE;
}
}
else {
if (strcasecmp(connection_header_value, "keep-alive") != 0) {
return FLB_TRUE;
}
}
}
/* Set the defaults per protocol version */
if (request->protocol_version == HTTP_PROTOCOL_VERSION_09) {
keepalive = FLB_FALSE;
}
else if (request->protocol_version == HTTP_PROTOCOL_VERSION_10) {
keepalive = FLB_FALSE;
}
else if (request->protocol_version == HTTP_PROTOCOL_VERSION_11) {
keepalive = FLB_TRUE;
}

/* Override protocol defaults by checking connection header */
connection_header_value = flb_http_request_get_header(request,
"connection");
if (connection_header_value &&
strcasecmp(connection_header_value, "keep-alive") == 0) {
keepalive = FLB_TRUE;
}

if (keepalive) {
return FLB_FALSE;
}

return FLB_FALSE;
return FLB_TRUE;
}

static int flb_http_server_client_activity_event_handler(void *data)
Expand All @@ -326,7 +344,7 @@ static int flb_http_server_client_activity_event_handler(void *data)
struct flb_http_stream *stream;
int result;
struct mk_event *event;

connection = (struct flb_connection *) data;

session = (struct flb_http_server_session *) connection->user_data;
Expand All @@ -347,8 +365,8 @@ static int flb_http_server_client_activity_event_handler(void *data)

close_connection = FLB_FALSE;

cfl_list_foreach_safe(iterator,
backup_iterator,
cfl_list_foreach_safe(iterator,
backup_iterator,
&session->request_queue) {
request = cfl_list_entry(iterator, struct flb_http_request, _head);

Expand Down Expand Up @@ -455,7 +473,7 @@ static int flb_http_server_client_connection_event_handler(void *data)

/* HTTP SERVER */

int flb_http_server_init(struct flb_http_server *session,
int flb_http_server_init(struct flb_http_server *session,
int protocol_version,
uint64_t flags,
flb_http_server_request_processor_callback
Expand Down Expand Up @@ -549,23 +567,23 @@ int flb_http_server_stop(struct flb_http_server *server)
}

mk_list_foreach_safe(iterator, iterator_backup, &server->clients) {
session = cfl_list_entry(iterator,
struct flb_http_server_session,
session = cfl_list_entry(iterator,
struct flb_http_server_session,
_head);

flb_http_server_session_destroy(session);
}

server->status = HTTP_SERVER_STOPPED;
}

return 0;
}

int flb_http_server_destroy(struct flb_http_server *server)
{
flb_http_server_stop(server);

if (server->downstream != NULL) {
flb_downstream_destroy(server->downstream);

Expand Down Expand Up @@ -668,17 +686,17 @@ void flb_http_server_session_destroy(struct flb_http_server_session *session)
}
}

int flb_http_server_session_ingest(struct flb_http_server_session *session,
unsigned char *buffer,
int flb_http_server_session_ingest(struct flb_http_server_session *session,
unsigned char *buffer,
size_t length)
{
cfl_sds_t resized_buffer;
int result;

if (session->version == HTTP_PROTOCOL_AUTODETECT ||
if (session->version == HTTP_PROTOCOL_AUTODETECT ||
session->version == HTTP_PROTOCOL_HTTP1) {
resized_buffer = cfl_sds_cat(session->incoming_data,
(const char *) buffer,
resized_buffer = cfl_sds_cat(session->incoming_data,
(const char *) buffer,
length);

if (resized_buffer == NULL) {
Expand All @@ -690,8 +708,8 @@ int flb_http_server_session_ingest(struct flb_http_server_session *session,

if (session->version == HTTP_PROTOCOL_AUTODETECT) {
if (cfl_sds_len(session->incoming_data) >= 24) {
if (strncmp(session->incoming_data,
"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n",
if (strncmp(session->incoming_data,
"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n",
24) == 0) {
session->version = HTTP_PROTOCOL_HTTP2;
}
Expand Down Expand Up @@ -722,13 +740,13 @@ int flb_http_server_session_ingest(struct flb_http_server_session *session,
}

if (session->version == HTTP_PROTOCOL_HTTP1) {
return flb_http1_server_session_ingest(&session->http1,
buffer,
return flb_http1_server_session_ingest(&session->http1,
buffer,
length);
}
else if (session->version == HTTP_PROTOCOL_HTTP2) {
return flb_http2_server_session_ingest(&session->http2,
buffer,
return flb_http2_server_session_ingest(&session->http2,
buffer,
length);
}

Expand Down