diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index af3bc1932ca..0d1947444c9 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -34,6 +34,7 @@ #define HTTP_CONTENT_JSON 0 #define HTTP_CONTENT_URLENCODED 1 +#define HTTP_CONTENT_MSGPACK 2 static inline char hex2nibble(char c) { @@ -514,7 +515,6 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag, return ret; } - /* * We use two backends for HTTP parsing and it depends on the version of the * protocol: @@ -730,7 +730,101 @@ static int http_prot_uncompress(struct flb_http *ctx, else { return -2; } + return 0; +} + +static ssize_t parse_payload_msgpack(struct flb_http *ctx, flb_sds_t tag, + char *payload, size_t size) +{ + int ret = FLB_EVENT_ENCODER_SUCCESS; + struct flb_time tm; + size_t offset = 0; + msgpack_unpacked result; + msgpack_object *record; + msgpack_object *metadata; + msgpack_object *data; + flb_sds_t tag_from_record = NULL; + + + msgpack_unpacked_init(&result); + + while (ret == FLB_EVENT_ENCODER_SUCCESS && + msgpack_unpack_next(&result, payload, size, &offset) == MSGPACK_UNPACK_SUCCESS) { + + if (result.data.type != MSGPACK_OBJECT_ARRAY) { + msgpack_unpacked_destroy(&result); + return -1; + } + + record = &result.data; + metadata = &record->via.array.ptr[0]; + data = &record->via.array.ptr[1]; + + if (ctx->tag_key) { + tag_from_record = tag_key(ctx, data); + } + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_time_msgpack_to_time(&tm, &metadata->via.array.ptr[0]); + + if (ret == -1) { + msgpack_unpacked_destroy(&result); + return -1; + } + ret = flb_log_event_encoder_set_timestamp( + &ctx->log_encoder, + &tm); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_log_event_encoder_set_body_from_msgpack_object(&ctx->log_encoder, data); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + if (tag_from_record) { + ret = flb_input_log_append(ctx->ins, tag_from_record, + flb_sds_len(tag_from_record), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else if (tag) { + ret = flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else { + ret = flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + + if (ret != 0) { + msgpack_unpacked_destroy(&result); + return -1; + } + + flb_log_event_encoder_reset(&ctx->log_encoder); + } + + msgpack_unpacked_destroy(&result); return 0; } @@ -766,6 +860,11 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, type = HTTP_CONTENT_URLENCODED; } + if (header->val.len == 19 && + strncasecmp(header->val.data, "application/msgpack", 19) == 0) { + type = HTTP_CONTENT_MSGPACK; + } + if (type == -1) { send_response(conn, 400, "error: invalid 'Content-Type'\n"); return -1; @@ -817,6 +916,9 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, else if (type == HTTP_CONTENT_URLENCODED) { ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len); } + else if (type == HTTP_CONTENT_MSGPACK) { + ret = parse_payload_msgpack(ctx, tag, request->data.data, request->data.len); + } if (uncompressed_data != NULL) { flb_free(uncompressed_data); @@ -1201,6 +1303,10 @@ static int process_payload_ng(flb_sds_t tag, type = HTTP_CONTENT_URLENCODED; } + if (strcasecmp(request->content_type, "application/msgpack") == 0) { + type = HTTP_CONTENT_MSGPACK; + } + if (type == -1) { send_response_ng(response, 400, "error: invalid 'Content-Type'\n"); return -1; @@ -1222,6 +1328,13 @@ static int process_payload_ng(flb_sds_t tag, return parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload)); } } + else if (type == HTTP_CONTENT_MSGPACK) { + ctx = (struct flb_http *) request->stream->user_data; + payload = (char *) request->body; + if (payload) { + return parse_payload_msgpack(ctx, tag, payload, cfl_sds_len(payload)); + } + } return 0; } diff --git a/tests/runtime/in_http.c b/tests/runtime/in_http.c index 66ddaea5230..032694cc440 100644 --- a/tests/runtime/in_http.c +++ b/tests/runtime/in_http.c @@ -29,6 +29,7 @@ #define JSON_CONTENT_TYPE "application/json" #define JSON_CHARSET_CONTENT_TYPE "application/json; charset=utf-8" +#define MSGPACK_CONTENT_TYPE "application/msgpack" struct http_client_ctx { struct flb_upstream *u; @@ -278,6 +279,153 @@ void flb_test_http() flb_upstream_conn_release(ctx->httpc->u_conn); test_ctx_destroy(ctx); } + +void flb_test_msgpack_legacy() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + char buf[] = "\xdd\x00\x00\x00\x02\xdd\x00\x00" + "\x00\x02\xd7\x00\x65\xd3\x9c\x63" + "\x19\x36\xb8\xd5\x80\x81\xa7\x6d" + "\x65\x73\x73\x61\x67\x65\xa5\x64" + "\x75\x6d\x6d\x79\xbe"; + + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"message\":\"dummy\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + flb_input_set(ctx->flb, ctx->i_ffd, "http2", "off", NULL); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + MSGPACK_CONTENT_TYPE, strlen(MSGPACK_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 201)) { + TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_msgpack() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + char buf[] = "\xdd\x00\x00\x00\x02\xdd\x00\x00" + "\x00\x02\xd7\x00\x65\xd3\x9c\x63" + "\x19\x36\xb8\xd5\x80\x81\xa7\x6d" + "\x65\x73\x73\x61\x67\x65\xa5\x64" + "\x75\x6d\x6d\x79\xbe"; + + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"message\":\"dummy\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + MSGPACK_CONTENT_TYPE, strlen(MSGPACK_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 201)) { + TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + void flb_test_http_successful_response_code(char *response_code) { struct flb_lib_out_cb cb_data; @@ -673,6 +821,8 @@ void flb_test_http_tag_key_with_array_input() TEST_LIST = { {"http", flb_test_http}, + {"msgpack_legacy", flb_test_msgpack_legacy}, + {"msgpack", flb_test_msgpack}, {"successful_response_code_200", flb_test_http_successful_response_code_200}, {"successful_response_code_204", flb_test_http_successful_response_code_204}, {"failure_response_code_400_bad_json", flb_test_http_failure_400_bad_json},