Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 114 additions & 1 deletion plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,55 @@ int fw_prot_secure_forward_handshake(struct flb_input_instance *ins,
return -1;
}

static size_t gzip_concatenated_count(const char *data, size_t len)
{
int i;
size_t count = 0;
const uint8_t *p;

p = (const uint8_t *) data;

/* search other gzip starting bits and method. */
for (i = 2; i < len &&
i + 2 <= len; i++) {
if (p[i] == 0x1F && p[i+1] == 0x8B && p[i+2] == 8) {
count++;
}
}

return count;
}

static size_t gzip_concatenated_borders(const char *data, size_t len, size_t **out_borders, size_t border_count)
{
int i;
size_t count = 0;
const uint8_t *p;
size_t *borders = NULL;

p = (const uint8_t *) data;
borders = (size_t *) flb_calloc(1, sizeof(size_t) * (border_count + 1));
if (borders == NULL) {
flb_errno();
return -1;
}

/* search other gzip starting bits and method. */
for (i = 2; i < len &&
i + 2 <= len; i++) {
if (p[i] == 0x1F && p[i+1] == 0x8B && p[i+2] == 8) {
borders[count] = i;
count++;
}
}
/* The length of the last border refers to the original length. */
borders[border_count] = len;

*out_borders = borders;

return count;
}

int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
{
int ret;
Expand Down Expand Up @@ -1488,13 +1537,49 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
}

if (ret == FLB_TRUE) {
ret = flb_gzip_uncompress((void *) data, len,
size_t prev_pos = 0;
size_t gzip_payloads_count = 0;
size_t loop = 0;
size_t *gzip_borders = NULL;
const size_t original_len = len;

gzip_payloads_count = gzip_concatenated_count(data, len);
flb_plg_debug(ctx->ins, "concatenated gzip payload count is %zd",
gzip_payloads_count);
if (gzip_payloads_count > 0) {
if (gzip_concatenated_borders(data, len, &gzip_borders, gzip_payloads_count) < 0) {
flb_plg_error(ctx->ins,
"failed to traverse boundaries of concatenated gzip payloads");
return -1;
}
}

retry_uncompress:
if (gzip_payloads_count > 0) {
if (loop == 0) {
len = gzip_borders[loop];
}
else if (gzip_borders[loop] == original_len) {
len = original_len - gzip_borders[loop - 1];
}
else if (loop >= 1) {
len = gzip_borders[loop] - gzip_borders[loop - 1];
}
}
flb_plg_trace(ctx->ins,
"[gzip decompression] loop = %zd, len = %zd, original_len = %zd",
loop, len, original_len);

ret = flb_gzip_uncompress((void *) (data + prev_pos), len,
&gz_data, &gz_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "gzip uncompress failure");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
if (gzip_borders != NULL) {
flb_free(gzip_borders);
}
return -1;
}

Expand All @@ -1506,6 +1591,9 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
flb_free(gz_data);
if (gzip_borders != NULL) {
flb_free(gzip_borders);
}
return -1;
}
event_type = ret;
Expand All @@ -1519,9 +1607,34 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
flb_free(gz_data);
if (gzip_borders != NULL) {
flb_free(gzip_borders);
}

return -1;
}
flb_free(gz_data);

/* a valid payload of gzip is larger than 18 bytes. */
if (gzip_payloads_count > 0) {
if ((gzip_payloads_count - loop) > 0 &&
(original_len - gzip_borders[loop]) > 18) {
len = original_len - gzip_borders[loop];
flb_plg_debug(ctx->ins, "left unconsumed %zd byte(s)", len);
prev_pos = gzip_borders[loop];
loop++;
goto retry_uncompress;
}
else {
flb_plg_debug(ctx->ins, "left unconsumed %zd byte(s)",
original_len - gzip_borders[loop]);
}
if (loop == gzip_payloads_count) {
if (gzip_borders != NULL) {
flb_free(gzip_borders);
}
}
}
}
else {
event_type = FLB_EVENT_TYPE_LOGS;
Expand Down