Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 68 additions & 33 deletions plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,55 @@ static size_t receiver_to_unpacker(struct fw_conn *conn, size_t request_size,
return recv_len;
}

static int append_log(struct flb_input_instance *ins, struct fw_conn *conn,
int event_type,
flb_sds_t out_tag, const void *data, size_t len)
{
int ret;
size_t off = 0;
struct cmt *cmt;
struct ctrace *ctr;

if (event_type == FLB_EVENT_TYPE_LOGS) {
flb_input_log_append(conn->in,
out_tag, flb_sds_len(out_tag),
data, len);

return 0;
}
else if (event_type == FLB_EVENT_TYPE_METRICS) {
ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off);
if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
flb_error("cmt_decode_msgpack_create failed. ret=%d", ret);
return -1;
}
flb_input_metrics_append(conn->in,
out_tag, flb_sds_len(out_tag),
cmt);
}
else if (event_type == FLB_EVENT_TYPE_TRACES) {
off = 0;
ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &off);
if (ret == -1) {
return -1;
}

flb_input_trace_append(ins,
out_tag, flb_sds_len(out_tag),
ctr);
}

return 0;
}


int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
{
int ret;
int stag_len;
int event_type;
int contain_options = FLB_FALSE;
size_t index = 0;
size_t off = 0;
size_t chunk_id = -1;
size_t metadata_id = -1;
const char *stag;
Expand All @@ -476,8 +517,6 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
msgpack_unpacker *unp;
size_t all_used = 0;
struct flb_in_fw_config *ctx = conn->ctx;
struct cmt *cmt;
struct ctrace *ctr;

/*
* [tag, time, record]
Expand Down Expand Up @@ -752,56 +791,52 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
return -1;
}

/* Append uncompressed data */
flb_input_log_append(conn->in,
out_tag, flb_sds_len(out_tag),
gz_data, gz_size);
flb_free(gz_data);
}
else {
event_type = FLB_EVENT_TYPE_LOGS;
if (contain_options) {
ret = get_chunk_event_type(ins, root.via.array.ptr[2]);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
flb_free(gz_data);
return -1;
}
event_type = ret;
}

if (event_type == FLB_EVENT_TYPE_LOGS) {
flb_input_log_append(conn->in,
out_tag, flb_sds_len(out_tag),
data, len);
}
else if (event_type == FLB_EVENT_TYPE_METRICS) {
ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off);
if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
flb_error("cmt_decode_msgpack_create failed. ret=%d", ret);
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}
flb_input_metrics_append(conn->in,
out_tag, flb_sds_len(out_tag),
cmt);
ret = append_log(ins, conn,
event_type,
out_tag, gz_data, gz_size);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
flb_free(gz_data);
return -1;
}
else if (event_type == FLB_EVENT_TYPE_TRACES) {
off = 0;
ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &off);
flb_free(gz_data);
}
else {
event_type = FLB_EVENT_TYPE_LOGS;
if (contain_options) {
ret = get_chunk_event_type(ins, root.via.array.ptr[2]);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}
event_type = ret;
}

flb_input_trace_append(ins,
out_tag, flb_sds_len(out_tag),
ctr);
ret = append_log(ins, conn,
event_type,
out_tag, data, len);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}
}

Expand Down
11 changes: 11 additions & 0 deletions plugins/out_forward/forward_format.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ static int append_options(struct flb_forward *ctx,
msgpack_pack_str(mp_pck, 4);
msgpack_pack_str_body(mp_pck, "gzip", 4);
}
else if (fc->compress == COMPRESS_GZIP &&
/* for metrics or traces, we're also able to send as
* gzipped payloads */
(event_type == FLB_EVENT_TYPE_METRICS ||
event_type == FLB_EVENT_TYPE_TRACES)) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "compressed", 10);
msgpack_pack_str(mp_pck, 4);
msgpack_pack_str_body(mp_pck, "gzip", 4);
}

/* event type (FLB_EVENT_TYPE_LOGS, FLB_EVENT_TYPE_METRICS, FLB_EVENT_TYPE_TRACES) */
flb_mp_map_header_append(&mh);
Expand Down