diff --git a/plugins/in_forward/fw_prot.c b/plugins/in_forward/fw_prot.c index b656a100321..b222ad3620b 100644 --- a/plugins/in_forward/fw_prot.c +++ b/plugins/in_forward/fw_prot.c @@ -451,6 +451,48 @@ static size_t receiver_to_unpacker(struct fw_conn *conn, size_t request_size, return recv_len; } +static int append_log(struct flb_input_instance *ins, struct fw_conn *conn, + int event_type, + flb_sds_t out_tag, const void *data, size_t len) +{ + int ret; + size_t off = 0; + struct cmt *cmt; + struct ctrace *ctr; + + if (event_type == FLB_EVENT_TYPE_LOGS) { + flb_input_log_append(conn->in, + out_tag, flb_sds_len(out_tag), + data, len); + + return 0; + } + else if (event_type == FLB_EVENT_TYPE_METRICS) { + ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off); + if (ret != CMT_DECODE_MSGPACK_SUCCESS) { + flb_error("cmt_decode_msgpack_create failed. ret=%d", ret); + return -1; + } + flb_input_metrics_append(conn->in, + out_tag, flb_sds_len(out_tag), + cmt); + } + else if (event_type == FLB_EVENT_TYPE_TRACES) { + off = 0; + ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &off); + if (ret == -1) { + return -1; + } + + flb_input_trace_append(ins, + out_tag, flb_sds_len(out_tag), + ctr); + } + + return 0; +} + + int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) { int ret; @@ -458,7 +500,6 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) int event_type; int contain_options = FLB_FALSE; size_t index = 0; - size_t off = 0; size_t chunk_id = -1; size_t metadata_id = -1; const char *stag; @@ -476,8 +517,6 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) msgpack_unpacker *unp; size_t all_used = 0; struct flb_in_fw_config *ctx = conn->ctx; - struct cmt *cmt; - struct ctrace *ctr; /* * [tag, time, record] @@ -752,13 +791,6 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) return -1; } - /* Append uncompressed data */ - flb_input_log_append(conn->in, - out_tag, flb_sds_len(out_tag), - gz_data, gz_size); - flb_free(gz_data); - } - else { event_type = FLB_EVENT_TYPE_LOGS; if (contain_options) { ret = get_chunk_event_type(ins, root.via.array.ptr[2]); @@ -766,42 +798,45 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); flb_sds_destroy(out_tag); + flb_free(gz_data); return -1; } event_type = ret; } - if (event_type == FLB_EVENT_TYPE_LOGS) { - flb_input_log_append(conn->in, - out_tag, flb_sds_len(out_tag), - data, len); - } - else if (event_type == FLB_EVENT_TYPE_METRICS) { - ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off); - if (ret != CMT_DECODE_MSGPACK_SUCCESS) { - flb_error("cmt_decode_msgpack_create failed. ret=%d", ret); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - flb_input_metrics_append(conn->in, - out_tag, flb_sds_len(out_tag), - cmt); + ret = append_log(ins, conn, + event_type, + out_tag, gz_data, gz_size); + if (ret == -1) { + msgpack_unpacked_destroy(&result); + msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); + flb_free(gz_data); + return -1; } - else if (event_type == FLB_EVENT_TYPE_TRACES) { - off = 0; - ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &off); + flb_free(gz_data); + } + else { + event_type = FLB_EVENT_TYPE_LOGS; + if (contain_options) { + ret = get_chunk_event_type(ins, root.via.array.ptr[2]); if (ret == -1) { msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); flb_sds_destroy(out_tag); return -1; } + event_type = ret; + } - flb_input_trace_append(ins, - out_tag, flb_sds_len(out_tag), - ctr); + ret = append_log(ins, conn, + event_type, + out_tag, data, len); + if (ret == -1) { + msgpack_unpacked_destroy(&result); + msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); + return -1; } } diff --git a/plugins/out_forward/forward_format.c b/plugins/out_forward/forward_format.c index 48dedd862d6..00aff93d208 100644 --- a/plugins/out_forward/forward_format.c +++ b/plugins/out_forward/forward_format.c @@ -149,6 +149,17 @@ static int append_options(struct flb_forward *ctx, msgpack_pack_str(mp_pck, 4); msgpack_pack_str_body(mp_pck, "gzip", 4); } + else if (fc->compress == COMPRESS_GZIP && + /* for metrics or traces, we're also able to send as + * gzipped payloads */ + (event_type == FLB_EVENT_TYPE_METRICS || + event_type == FLB_EVENT_TYPE_TRACES)) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 10); + msgpack_pack_str_body(mp_pck, "compressed", 10); + msgpack_pack_str(mp_pck, 4); + msgpack_pack_str_body(mp_pck, "gzip", 4); + } /* event type (FLB_EVENT_TYPE_LOGS, FLB_EVENT_TYPE_METRICS, FLB_EVENT_TYPE_TRACES) */ flb_mp_map_header_append(&mh);