diff --git a/bpf/common/common.h b/bpf/common/common.h index 0179cb086d..dd3b6e9344 100644 --- a/bpf/common/common.h +++ b/bpf/common/common.h @@ -34,6 +34,9 @@ #define MAX_TOPIC_NAME_LEN 64 #define HOST_MAX_LEN 100 #define SCHEME_MAX_LEN 10 +#define HTTP_BODY_MAX_LEN 64 +#define HTTP_HEADER_MAX_LEN 100 +#define HTTP_CONTENT_TYPE_MAX_LEN 16 volatile const u32 mysql_buffer_size = 0; diff --git a/bpf/common/tc_common.h b/bpf/common/tc_common.h index 57cc2a2f63..97400da8d8 100644 --- a/bpf/common/tc_common.h +++ b/bpf/common/tc_common.h @@ -3,7 +3,7 @@ #include #include -enum { MAX_INLINE_LEN = 0x3ff }; +enum { MAX_INLINE_LEN = 0x3ff, MAX_NEEDLE_LEN = 16 }; const char TP[] = "Traceparent: 00-00000000000000000000000000000000-0000000000000000-01\r\n"; const u32 EXTEND_SIZE = sizeof(TP) - 1; @@ -131,3 +131,38 @@ static __always_inline u32 sk_msg_local_port(struct sk_msg_md *ctx) { return data; } + +// find the needle in the haystack, return the position of the first occurrence, return -1 if not found +static __always_inline u32 bpf_memstr(const unsigned char *haystack, + u32 haystack_len, + const unsigned char *needle, + u32 needle_len) { + if (needle_len == 0 || haystack_len < needle_len) { + return INVALID_POS; + } + for (u32 i = 0; i <= haystack_len - needle_len; i++) { + if (i + needle_len > haystack_len) { + return INVALID_POS; + } + u8 found = 1; +#pragma unroll + // max needle length + for (u8 j = 0; j < MAX_NEEDLE_LEN; j++) { + if (j >= needle_len) { + break; + } + if (i + j >= haystack_len) { + found = 0; + break; + } + if (haystack[i + j] != needle[j]) { + found = 0; + break; + } + } + if (found) { + return i; + } + } + return INVALID_POS; +} \ No newline at end of file diff --git a/bpf/gotracer/go_common.h b/bpf/gotracer/go_common.h index 8eec249c3c..9f0be65bc1 100644 --- a/bpf/gotracer/go_common.h +++ b/bpf/gotracer/go_common.h @@ -181,14 +181,14 @@ static __always_inline u64 find_parent_goroutine_in_chain(go_addr_key_t *current return 0; } -static __always_inline void decode_go_traceparent(unsigned char *buf, +static __always_inline void decode_go_traceparent(const unsigned char *buf, unsigned char *trace_id, unsigned char *span_id, unsigned char *flags) { - unsigned char *t_id = buf + 2 + 1; // strlen(ver) + strlen("-") - unsigned char *s_id = + const unsigned char *t_id = buf + 2 + 1; // strlen(ver) + strlen("-") + const unsigned char *s_id = buf + 2 + 1 + 32 + 1; // strlen(ver) + strlen("-") + strlen(trace_id) + strlen("-") - unsigned char *f_id = + const unsigned char *f_id = buf + 2 + 1 + 32 + 1 + 16 + 1; // strlen(ver) + strlen("-") + strlen(trace_id) + strlen("-") + strlen(span_id) + strlen("-") diff --git a/bpf/gotracer/go_nethttp.c b/bpf/gotracer/go_nethttp.c index 3ca21e8d27..aa3c4e5647 100644 --- a/bpf/gotracer/go_nethttp.c +++ b/bpf/gotracer/go_nethttp.c @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include @@ -35,6 +37,10 @@ #include +enum { k_tail_jsonrpc = 0 }; +static const char traceparent[] = "traceparent: "; +static const char content_type[] = "content-type: "; + typedef struct http_client_data { s64 content_length; pid_info pid; @@ -64,10 +70,13 @@ typedef struct server_http_func_invocation { u64 content_length; u64 response_length; u64 status; + u64 body_addr; // pointer to the body buffer tp_info_t tp; - unsigned char method[METHOD_MAX_LEN]; - unsigned char path[PATH_MAX_LEN]; - u8 _pad[5]; + unsigned char content_type[HTTP_CONTENT_TYPE_MAX_LEN]; + u8 method[METHOD_MAX_LEN]; + u8 path[PATH_MAX_LEN]; + u8 json_content_type; + u8 _pad[4]; } server_http_func_invocation_t; struct { @@ -77,6 +86,30 @@ struct { __uint(max_entries, MAX_CONCURRENT_REQUESTS); } ongoing_http_server_requests SEC(".maps"); +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __type(key, u32); + __type(value, unsigned char[HTTP_HEADER_MAX_LEN]); + __uint(max_entries, 1); +} temp_header_mem_store SEC(".maps"); + +static __always_inline unsigned char *temp_header_mem() { + const u32 zero = 0; + return bpf_map_lookup_elem(&temp_header_mem_store, &zero); +} + +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __type(key, u32); + __type(value, unsigned char[HTTP_BODY_MAX_LEN]); + __uint(max_entries, 1); +} temp_body_mem_store SEC(".maps"); + +static __always_inline unsigned char *temp_body_mem() { + const u32 zero = 0; + return bpf_map_lookup_elem(&temp_body_mem_store, &zero); +} + /* HTTP Server */ // This instrumentation attaches uprobe to the following function: @@ -94,12 +127,12 @@ int beyla_uprobe_ServeHTTP(struct pt_regs *ctx) { off_table_t *ot = get_offsets_table(); - // Lookup any traceparent information setup for us by readContinuedLineSlice - server_http_func_invocation_t *tp_inv = + // Lookup any header information setup for us by readContinuedLineSlice + server_http_func_invocation_t *header_inv = bpf_map_lookup_elem(&ongoing_http_server_requests, &g_key); tp_info_t *decoded_tp = 0; - if (tp_inv && valid_trace(tp_inv->tp.trace_id)) { - decoded_tp = &tp_inv->tp; + if (header_inv && valid_trace(header_inv->tp.trace_id)) { + decoded_tp = &header_inv->tp; } server_http_func_invocation_t invocation = { @@ -118,6 +151,15 @@ int beyla_uprobe_ServeHTTP(struct pt_regs *ctx) { // TODO: if context propagation is supported, overwrite the header value in the map with the // new span context and the same thread id. + // get content-type from readContinuedLineSlice + if (header_inv && header_inv->content_type[0]) { + bpf_dbg_printk("Found content type in ongoing request: %s", header_inv->content_type); + if (is_json_content_type((void *)header_inv->content_type, + sizeof(header_inv->content_type))) { + invocation.json_content_type = 1; + } + } + // Get method from Request.Method if (!read_go_str("method", req, @@ -263,31 +305,90 @@ int beyla_uprobe_http2Server_processHeaders(struct pt_regs *ctx) { return 0; } +static __always_inline void update_traceparent(server_http_func_invocation_t *inv, + const unsigned char *header_start) { + decode_go_traceparent(header_start, inv->tp.trace_id, inv->tp.parent_id, &inv->tp.flags); + bpf_dbg_printk("Found traceparent in header %s", header_start); +} + +static __always_inline void update_content_type(server_http_func_invocation_t *inv, + const unsigned char *header_start) { + __builtin_memset(inv->content_type, 0, sizeof(inv->content_type)); + __builtin_memcpy(inv->content_type, header_start, sizeof(inv->content_type)); + bpf_dbg_printk("Found content-type in header %s", inv->content_type); +} + +static __always_inline void handle_traceparent_header(server_http_func_invocation_t *inv, + go_addr_key_t *g_key, + unsigned char *traceparent_start) { + if (inv) { + update_traceparent(inv, traceparent_start); + } else { + server_http_func_invocation_t minimal_inv = {}; + update_traceparent(&minimal_inv, traceparent_start); + bpf_map_update_elem(&ongoing_http_server_requests, g_key, &minimal_inv, BPF_ANY); + } +} + +static __always_inline void handle_content_type_header(server_http_func_invocation_t *inv, + go_addr_key_t *g_key, + unsigned char *content_type_start) { + if (inv) { + update_content_type(inv, content_type_start); + } else { + server_http_func_invocation_t minimal_inv = {}; + update_content_type(&minimal_inv, content_type_start); + bpf_map_update_elem(&ongoing_http_server_requests, g_key, &minimal_inv, BPF_ANY); + } +} + +// Matches the header in the buffer and returns a pointer to the value part of the header. +static __always_inline unsigned char *match_header( + const unsigned char *buf, u32 safe_len, const char *header, u32 header_len, u32 value_len) { + if (safe_len >= header_len + value_len && stricmp((const char *)buf, header, header_len)) { + return (unsigned char *)(buf + header_len); + } + return NULL; +} + SEC("uprobe/readContinuedLineSlice") int beyla_uprobe_readContinuedLineSliceReturns(struct pt_regs *ctx) { bpf_dbg_printk("=== uprobe/proc readContinuedLineSlice returns === "); void *goroutine_addr = GOROUTINE_PTR(ctx); + bpf_dbg_printk("goroutine_addr %lx", goroutine_addr); + go_addr_key_t g_key = {}; + go_addr_key_from_id(&g_key, goroutine_addr); + connection_info_t *existing = bpf_map_lookup_elem(&ongoing_server_connections, &g_key); + if (!existing) { + return 0; + } + u64 len = (u64)GO_PARAM2(ctx); const unsigned char *buf = (const unsigned char *)GO_PARAM1(ctx); - if (len >= (W3C_KEY_LENGTH + W3C_VAL_LENGTH + 2)) { - unsigned char temp[W3C_KEY_LENGTH + W3C_VAL_LENGTH + 2]; - bpf_probe_read(temp, sizeof(temp), buf); - bpf_dbg_printk("goroutine_addr %lx", goroutine_addr); - go_addr_key_t g_key = {}; - go_addr_key_from_id(&g_key, goroutine_addr); + unsigned char *temp = temp_header_mem(); + const u32 safe_len = len > HTTP_HEADER_MAX_LEN ? HTTP_HEADER_MAX_LEN : len; + if (!temp || bpf_probe_read_user(temp, safe_len, buf) != 0) { + bpf_dbg_printk("failed to read buffer"); + return 0; + }; - connection_info_t *existing = bpf_map_lookup_elem(&ongoing_server_connections, &g_key); - if (existing) { - if (stricmp((const char *)temp, "traceparent: ", W3C_KEY_LENGTH + 2)) { - server_http_func_invocation_t inv = {}; - decode_go_traceparent( - temp + W3C_KEY_LENGTH + 2, inv.tp.trace_id, inv.tp.parent_id, &inv.tp.flags); - bpf_dbg_printk("Found traceparent in header %s", temp); - bpf_map_update_elem(&ongoing_http_server_requests, &g_key, &inv, BPF_ANY); - } - } + const u32 w3c_value_start = sizeof(traceparent) - 1; + const u32 content_type_value_start = sizeof(content_type) - 1; + + server_http_func_invocation_t *inv = bpf_map_lookup_elem(&ongoing_http_server_requests, &g_key); + + unsigned char *traceparent_start = + match_header(temp, safe_len, traceparent, w3c_value_start, W3C_VAL_LENGTH); + if (traceparent_start) { + handle_traceparent_header(inv, &g_key, traceparent_start); + } + + unsigned char *content_type_start = match_header( + temp, safe_len, content_type, content_type_value_start, HTTP_CONTENT_TYPE_MAX_LEN); + if (content_type_start) { + handle_content_type_header(inv, &g_key, content_type_start); } return 0; @@ -1182,6 +1283,102 @@ int beyla_uprobe_netFdRead(struct pt_regs *ctx) { return 0; } +SEC("uprobe/bodyRead") +int beyla_uprobe_bodyRead(struct pt_regs *ctx) { + void *goroutine_addr = GOROUTINE_PTR(ctx); + bpf_dbg_printk("=== uprobe/proc body read goroutine === "); + go_addr_key_t g_key = {}; + go_addr_key_from_id(&g_key, goroutine_addr); + + // Get the address of the slice struct (p) + u64 body_addr = (u64)GO_PARAM2(ctx); + + server_http_func_invocation_t *invocation = + bpf_map_lookup_elem(&ongoing_http_server_requests, &g_key); + if (!invocation) { + return 0; + } + invocation->body_addr = body_addr; + + return 0; +} + +SEC("uprobe/bodyReadRet") +int beyla_uprobe_bodyReadReturn(struct pt_regs *ctx) { + bpf_dbg_printk("=== uprobe/proc body read returns goroutine === "); + void *goroutine_addr = GOROUTINE_PTR(ctx); + go_addr_key_t g_key = {}; + go_addr_key_from_id(&g_key, goroutine_addr); + + u64 n = (u64)GO_PARAM1(ctx); + bpf_dbg_printk("n is %llu", n); + + server_http_func_invocation_t *invocation = + bpf_map_lookup_elem(&ongoing_http_server_requests, &g_key); + if (!invocation) { + return 0; + } + if (n <= 0 || !invocation->body_addr) { + return 0; + } + // json_content-type is set in invocation in ServeHTTP + if (invocation->json_content_type != 1) { + bpf_dbg_printk("content type is not json, skipping"); + return 0; + } + + unsigned char *body_buf = temp_body_mem(); + if (!body_buf) { + return 0; + } + + const u32 safe_len = n > HTTP_BODY_MAX_LEN ? HTTP_BODY_MAX_LEN : n; + if (!read_go_str_n("http body", (void *)invocation->body_addr, n, body_buf, safe_len)) { + bpf_dbg_printk("failed to read body, n=%llu, body_addr=%llx", n, invocation->body_addr); + return 0; + } + // bpf_dbg_printk("body is %s", body_buf); + bpf_tail_call(ctx, &jsonrpc_jump_table, k_tail_jsonrpc); + return 0; +} + +//k_tail_jsonrpc +SEC("uprobe/readJsonrpcMethod") +int beyla_read_jsonrpc_method(struct pt_regs *ctx) { + bpf_dbg_printk("=== uprobe/proc read jsonrpc method === "); + void *goroutine_addr = GOROUTINE_PTR(ctx); + go_addr_key_t g_key = {}; + go_addr_key_from_id(&g_key, goroutine_addr); + + server_http_func_invocation_t *invocation = + bpf_map_lookup_elem(&ongoing_http_server_requests, &g_key); + if (!invocation) { + bpf_dbg_printk("can't find invocation info for server call"); + return 0; + } + + // tail call is guaranteed to run on the same CPU as its caller + // so we can shared the same buffer via a per-CPU map + unsigned char *body_buf = temp_body_mem(); + if (!body_buf) { + return 0; + } + if (is_jsonrpc2_body((const unsigned char *)body_buf, HTTP_BODY_MAX_LEN)) { + unsigned char method_buf[JSONRPC_METHOD_BUF_SIZE] = {}; + u32 method_len = extract_jsonrpc2_method( + (const unsigned char *)body_buf, HTTP_BODY_MAX_LEN, method_buf, sizeof(method_buf)); + if (method_len > 0) { + bpf_dbg_printk("JSON-RPC method: %s", method_buf); + read_go_str_n("JSON-RPC method", + (void *)method_buf, + method_len, + invocation->method, + sizeof(invocation->method)); + } + } + return 0; +} + SEC("uprobe/connServeRet") int beyla_uprobe_connServeRet(struct pt_regs *ctx) { bpf_dbg_printk("=== uprobe/proc http conn serve ret === "); diff --git a/bpf/gotracer/maps/jsonrpc_jump_table.h b/bpf/gotracer/maps/jsonrpc_jump_table.h new file mode 100644 index 0000000000..02b8e88f75 --- /dev/null +++ b/bpf/gotracer/maps/jsonrpc_jump_table.h @@ -0,0 +1,11 @@ +#pragma once + +#include +#include + +struct { + __uint(type, BPF_MAP_TYPE_PROG_ARRAY); + __type(key, u32); + __type(value, u32); + __uint(max_entries, 1); +} jsonrpc_jump_table SEC(".maps"); diff --git a/bpf/gotracer/protocol_jsonrpc.h b/bpf/gotracer/protocol_jsonrpc.h new file mode 100644 index 0000000000..9a8537a882 --- /dev/null +++ b/bpf/gotracer/protocol_jsonrpc.h @@ -0,0 +1,162 @@ +#pragma once + +#include +#include +#include +#include +#include + +static const char k_jsonrpc_key[] = "\"jsonrpc\""; +static const u32 k_jsonrpc_key_len = sizeof(k_jsonrpc_key) - 1; +static const char k_jsonrpc_val[] = "\"2.0\""; +static const u32 k_jsonrpc_val_len = sizeof(k_jsonrpc_val) - 1; +static const char k_application_json[] = "application/json"; +static const u32 k_application_json_len = sizeof(k_application_json) - 1; +static const char k_method_key[] = "\"method\""; +static const u32 k_method_key_len = sizeof(k_method_key) - 1; + +enum { JSON_MAX_STRING_LEN = 16, JSONRPC_METHOD_BUF_SIZE = 16 }; + +// should match application/json, application/json-rpc, application/jsonrequest +// listed in https://www.jsonrpc.org/historical/json-rpc-over-http.html +static __always_inline u8 is_json_content_type(const char *c, u32 len) { + if (len < k_application_json_len) { + return 0; + } + // Check for "application/json" at the start + if (c[0] == 'a' && c[1] == 'p' && c[2] == 'p' && c[3] == 'l' && c[4] == 'i' && c[5] == 'c' && + c[6] == 'a' && c[7] == 't' && c[8] == 'i' && c[9] == 'o' && c[10] == 'n' && c[11] == '/' && + c[12] == 'j' && c[13] == 's' && c[14] == 'o' && c[15] == 'n') { + return 1; + } + return 0; +} + +// ref: https://en.cppreference.com/w/c/string/byte/isspace +static __always_inline u8 bpf_isspace(char c) { + return (c == ' ' || c == '\f' || c == '\n' || c == '\r' || c == '\t' || c == '\v'); +} + +// Returns the offset of the next JSON value after skipping whitespace and colon. +// If not found, returns body_len. +static __always_inline u32 json_value_offset(const unsigned char *body, + u32 body_len, + u32 start_pos) { + u32 pos = start_pos; + while (pos < body_len && (bpf_isspace(body[pos]) || body[pos] == ':')) { + pos++; + } + return pos; +} + +// Returns the position of the first occurrence of a string in a JSON body. +// If not found, returns INVALID_POS. +static __always_inline u32 json_str_value(const unsigned char *body, + u32 body_len, + const unsigned char *str, + u32 str_len) { + return bpf_memstr(body, body_len, str, str_len); +} + +// Compares a JSON value at start with a given value. +static __always_inline bool json_value_eq(const char *start, const char *val, u32 val_len) { + + return stricmp(start, val, val_len); +} + +// Extracts a JSON string value starting at a given position. +// Copies up to buf_len-1 bytes into buf, null-terminated. +// Returns the number of bytes copied (not including null terminator), or 0 on error. +static __always_inline u32 extract_json_string( + const unsigned char *body, u32 body_len, u32 value_start, unsigned char *buf, u32 buf_len) { + if (value_start >= body_len || buf_len == 0) { + return 0; + } + + if (body[value_start] != '"') { + return 0; + } + + const u32 str_start = value_start + 1; + u32 value_end = str_start; + while (value_end < body_len && body[value_end] != '"') { + value_end++; + } + const u32 value_len = value_end - str_start; + if (value_len == 0) { + return 0; + } + + const u32 copy_len = value_len < (buf_len - 1) ? value_len : (buf_len - 1); + + for (u32 i = 0; i < buf_len; i++) { + if (i >= copy_len) { + break; + } + buf[i] = body[str_start + i]; + } + buf[copy_len] = '\0'; + return copy_len; +} + +// Looks for '"jsonrpc":"2.0"' +static __always_inline u32 is_jsonrpc2_body(const unsigned char *body, u32 body_len) { + u32 key_pos = + json_str_value(body, body_len, (const unsigned char *)k_jsonrpc_key, k_jsonrpc_key_len); + if (key_pos == INVALID_POS) { + return 0; + } + + bpf_dbg_printk("Found JSON-RPC 2.0 key"); + + u32 val_search_start = key_pos + k_jsonrpc_key_len; + if (val_search_start >= body_len) { + return 0; + } + + val_search_start = json_value_offset(body, body_len, val_search_start); + if (val_search_start >= body_len) { + return 0; + } + + if (val_search_start + k_jsonrpc_val_len >= body_len) { + return 0; + } + + if (!json_value_eq((const char *)(body + val_search_start), + (const char *)k_jsonrpc_val, + k_jsonrpc_val_len)) { + return 0; + } + + bpf_dbg_printk("Found JSON-RPC 2.0 value"); + + return 1; // JSON-RPC 2.0 detected +} + +// Extracts the value of the "method" key from a JSON-RPC 2.0 body. +// Returns the length of the method value, or 0 if not found or error. +// method_buf must be at least method_buf_len bytes. +static __always_inline u32 extract_jsonrpc2_method(const unsigned char *body, + u32 body_len, + unsigned char *method_buf, + u32 method_buf_len) { + u32 key_pos = + json_str_value(body, body_len, (const unsigned char *)k_method_key, k_method_key_len); + if (key_pos == INVALID_POS) { + return 0; + } + + bpf_dbg_printk("Found JSON-RPC method key"); + + u32 val_search_start = key_pos + k_method_key_len; + if (val_search_start >= body_len) { + return 0; + } + + val_search_start = json_value_offset(body, body_len, val_search_start); + if (val_search_start >= body_len) { + return 0; + } + return extract_json_string(body, body_len, val_search_start, method_buf, method_buf_len); +} \ No newline at end of file diff --git a/pkg/components/ebpf/gotracer/gotracer.go b/pkg/components/ebpf/gotracer/gotracer.go index 0f7d752874..ec23760ea3 100644 --- a/pkg/components/ebpf/gotracer/gotracer.go +++ b/pkg/components/ebpf/gotracer/gotracer.go @@ -86,7 +86,22 @@ func (p *Tracer) Load() (*ebpf.CollectionSpec, error) { return loader() } -func (p *Tracer) SetupTailCalls() {} +func (p *Tracer) SetupTailCalls() { + for _, tc := range []struct { + index int + prog *ebpf.Program + }{ + { + index: 0, + prog: p.bpfObjects.BeylaReadJsonrpcMethod, + }, + } { + err := p.bpfObjects.JsonrpcJumpTable.Update(uint32(tc.index), uint32(tc.prog.FD()), ebpf.UpdateAny) + if err != nil { + p.log.Error("error loading info tail call jump table", "error", err) + } + } +} func (p *Tracer) Constants() map[string]any { blackBoxCP := uint32(0) @@ -205,6 +220,10 @@ func (p *Tracer) GoProbes() map[string][]*ebpfcommon.ProbeDesc { Start: p.bpfObjects.BeylaUprobeReadRequestStart, End: p.bpfObjects.BeylaUprobeReadRequestReturns, }}, + "net/http.(*body).Read": {{ + Start: p.bpfObjects.BeylaUprobeBodyRead, + End: p.bpfObjects.BeylaUprobeBodyReadReturn, + }}, "net/textproto.(*Reader).readContinuedLineSlice": {{ End: p.bpfObjects.BeylaUprobeReadContinuedLineSliceReturns, }}, diff --git a/test/integration/components/testserver/duplicate_testserver.sh b/test/integration/components/testserver/duplicate_testserver.sh index 0a7a7ac9f1..47baf2af24 100644 --- a/test/integration/components/testserver/duplicate_testserver.sh +++ b/test/integration/components/testserver/duplicate_testserver.sh @@ -4,7 +4,7 @@ run_testserver() { # prefix for start ports. E.g. 808 sp=$1 - STD_PORT=${1}0 GIN_PORT=${1}1 GORILLA_PORT=${1}2 GORILLA_MID_PORT=${1}3 GRPC_PORT=${1}4 GRPC_TLS_PORT=${1}5 GORILLA_MID2_PORT=${1}7 STD_TLS_PORT=${1}8 ./duped_service + STD_PORT=${1}0 GIN_PORT=${1}1 GORILLA_PORT=${1}2 GORILLA_MID_PORT=${1}3 GRPC_PORT=${1}4 GRPC_TLS_PORT=${1}5 GORILLA_MID2_PORT=${1}7 STD_TLS_PORT=${1}8 JSRONRPC_PORT=${1}9 ./duped_service } # runs testserver twice diff --git a/test/integration/components/testserver/jsonrpc/body/formated.json b/test/integration/components/testserver/jsonrpc/body/formated.json new file mode 100644 index 0000000000..575e293bdf --- /dev/null +++ b/test/integration/components/testserver/jsonrpc/body/formated.json @@ -0,0 +1,11 @@ +{ + "jsonrpc": "2.0", + "method": "Arith.Multiply", + "params": [ + { + "A": 7, + "B": 8 + } + ], + "id": 1 +} \ No newline at end of file diff --git a/test/integration/components/testserver/jsonrpc/jsonrpc.go b/test/integration/components/testserver/jsonrpc/jsonrpc.go new file mode 100644 index 0000000000..fe9e6cf6d1 --- /dev/null +++ b/test/integration/components/testserver/jsonrpc/jsonrpc.go @@ -0,0 +1,85 @@ +package jsonrpc + +import ( + "fmt" + "io" + "log/slog" + "net/http" + "net/rpc" + "net/rpc/jsonrpc" +) + +// Args defines the arguments for the RPC methods. +type Args struct { + A, B int +} + +// Arith provides methods for arithmetic operations. +type Arith struct { + Logger *slog.Logger +} + +// Multiply multiplies two numbers and returns the result. +func (t *Arith) Multiply(args *Args, reply *int) error { + t.Logger.Debug("calling", "method", "Arith.Multiply") + *reply = args.A * args.B + return nil +} + +func (t *Arith) Traceme(args *Args, reply *int) error { + t.Logger.Debug("calling", "method", "Arith.Traceme") + requestURL := "http://pytestserver:8083/tracemetoo" + + t.Logger.Debug("calling", "url", requestURL) + + res, err := http.Get(requestURL) + if err != nil { + slog.Error("error making http request", "error", err) + return err + } + + defer res.Body.Close() + return t.Multiply(args, reply) +} + +// ReadWriteCloserWrapper wraps an io.Reader and io.Writer to implement io.ReadWriteCloser. +type ReadWriteCloserWrapper struct { + io.Reader + io.Writer +} + +// Close is a no-op to satisfy the io.ReadWriteCloser interface. +func (w *ReadWriteCloserWrapper) Close() error { + return nil +} + +func Setup(port int) { + log := slog.With("component", "jsonrpc.Arith") + arith := &Arith{ + Logger: log, + } + _ = rpc.Register(arith) + + address := fmt.Sprintf(":%d", port) + log.Info("starting JSON-RPC server", "address", address) + err := http.ListenAndServe(address, HTTPHandler(log)) + log.Error("JSON-RPC server has unexpectedly stopped", "error", err) +} + +func HTTPHandler(log *slog.Logger) http.HandlerFunc { + return func(rw http.ResponseWriter, req *http.Request) { + log.Debug("received request", "url", req.RequestURI) + if req.RequestURI == "/jsonrpc" { + if req.Method != http.MethodPost { + http.Error(rw, "Only POST method is allowed", http.StatusMethodNotAllowed) + return + } + // Wrap the request body and response writer in a ReadWriteCloser. + conn := &ReadWriteCloserWrapper{Reader: req.Body, Writer: rw} + // Serve the request using JSON-RPC codec. + rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) + } else { + rw.WriteHeader(http.StatusOK) + } + } +} diff --git a/test/integration/components/testserver/std/std.go b/test/integration/components/testserver/std/std.go index 04a6d7e47a..79bf27df4c 100644 --- a/test/integration/components/testserver/std/std.go +++ b/test/integration/components/testserver/std/std.go @@ -1,6 +1,7 @@ package std import ( + "bytes" "context" "crypto/tls" "fmt" @@ -140,11 +141,11 @@ func echoLowPort(rw http.ResponseWriter) { } func echoDist(rw http.ResponseWriter) { - requestURL := "http://pytestserver:8083/tracemetoo" + requestURL := "http://testserver:8088/jsonrpc" slog.Debug("calling", "url", requestURL) - res, err := http.Get(requestURL) + res, err := http.Post(requestURL, "application/json", bytes.NewReader([]byte(`{"jsonrpc":"2.0","method":"Arith.Traceme","params":[{"A":1,"B":2}],"id":1}`))) if err != nil { slog.Error("error making http request", "error", err) rw.WriteHeader(http.StatusInternalServerError) diff --git a/test/integration/components/testserver/testserver.go b/test/integration/components/testserver/testserver.go index c6c64949c3..62c9a7c096 100644 --- a/test/integration/components/testserver/testserver.go +++ b/test/integration/components/testserver/testserver.go @@ -12,6 +12,7 @@ import ( "github.com/open-telemetry/opentelemetry-ebpf-instrumentation/test/integration/components/testserver/gorillamid" "github.com/open-telemetry/opentelemetry-ebpf-instrumentation/test/integration/components/testserver/gorillamid2" grpctest "github.com/open-telemetry/opentelemetry-ebpf-instrumentation/test/integration/components/testserver/grpc/server" + "github.com/open-telemetry/opentelemetry-ebpf-instrumentation/test/integration/components/testserver/jsonrpc" "github.com/open-telemetry/opentelemetry-ebpf-instrumentation/test/integration/components/testserver/std" ) @@ -35,6 +36,7 @@ type config struct { GorillaMid2Port int `env:"GORILLA_MID2_PORT" envDefault:"8087"` GRPCPort int `env:"GRPC_PORT" envDefault:"5051"` GRPCTLSPort int `env:"GRPC_TLS_PORT" envDefault:"50051"` + JSONRPCPort int `env:"JSRONRPC_PORT" envDefault:"8088"` LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"` } @@ -88,6 +90,11 @@ func main() { close(wait) }() + go func() { + jsonrpc.Setup(cfg.JSONRPCPort) + close(wait) + }() + // wait indefinitely unless any server crashes <-wait slog.Warn("stopping process") diff --git a/test/integration/docker-compose-multiexec-host.yml b/test/integration/docker-compose-multiexec-host.yml index b544402d12..4af48a1a14 100644 --- a/test/integration/docker-compose-multiexec-host.yml +++ b/test/integration/docker-compose-multiexec-host.yml @@ -8,6 +8,7 @@ services: image: hatest-testserver ports: - "8080:8080" + - "8088:8088" environment: LOG_LEVEL: DEBUG depends_on: diff --git a/test/integration/docker-compose-multiexec.yml b/test/integration/docker-compose-multiexec.yml index 2381a8e52e..8719c91a6c 100644 --- a/test/integration/docker-compose-multiexec.yml +++ b/test/integration/docker-compose-multiexec.yml @@ -7,7 +7,8 @@ services: dockerfile: test/integration/components/testserver/Dockerfile image: hatest-testserver ports: - - "8080:8080" + - "8080:8080" # std http server + - "8088:8088" # jsonrpc server environment: LOG_LEVEL: DEBUG depends_on: diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index dca9efbb9c..f2d075581f 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -12,6 +12,7 @@ services: - "8082:8082" - "8083:8083" - "8087:8087" + - "8088:8088" - "8383:8383" - "5051:5051" - "50051:50051" diff --git a/test/integration/k8s/daemonset_multi_node/k8s_daemonset_multi_node_traces_test.go b/test/integration/k8s/daemonset_multi_node/k8s_daemonset_multi_node_traces_test.go index 4eee6cfe5e..3309ade508 100644 --- a/test/integration/k8s/daemonset_multi_node/k8s_daemonset_multi_node_traces_test.go +++ b/test/integration/k8s/daemonset_multi_node/k8s_daemonset_multi_node_traces_test.go @@ -20,19 +20,20 @@ import ( // For the this scenario we run two worker nodes, with the following structure: // - worker 1: // testserver [go app] port: 8080 +// testserver [go jsonrpc app] port: 8088 // - worker 2: // pythonserver [python app] port: 8083 // ruby on rails [ruby app] port: 3040 // // The call flow is as follows: // -// testserver [/gotracemetoo] -> Python server [/tracemetoo] -> Ruby server [/users] +// testserver [/gotracemetoo] -> go jsonrpc [/jsonrpc] -> Python server [/tracemetoo] -> Ruby server [/users] // // They should all have the same traceID. Across nodes the TCP context propagation (OTEL_EBPF_BPF_CONTEXT_PROPAGATION) // connects the dots, while on the same node, the networking is optimized and we rely on black-box context propagation to // connect the services. func TestMultiNodeTracing(t *testing.T) { - feat := features.New("Beyla is able to generate distributed traces go->python->ruby"). + feat := features.New("Beyla is able to generate distributed traces go->go jsonrpc->python->ruby"). Assess("it sends connected traces for all services", func(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context { var trace jaeger.Trace @@ -68,6 +69,13 @@ func TestMultiNodeTracing(t *testing.T) { }, trace.Processes[parent.ProcessID].Tags) require.Empty(t, sd, sd.String()) + // Check the information of the Go jsonrpc span + res = trace.FindByOperationName("Arith.T /jsonrpc", "server") + require.Len(t, res, 1) + parent = res[0] + require.NotEmpty(t, parent.TraceID) + require.Equal(t, traceID, parent.TraceID) + // Check the information of the Python span res = trace.FindByOperationName("GET /tracemetoo", "server") require.Len(t, res, 1) diff --git a/test/integration/k8s/manifests/05-uninstrumented-few-services.yml b/test/integration/k8s/manifests/05-uninstrumented-few-services.yml index ae9ffc4ba5..a81eca0139 100644 --- a/test/integration/k8s/manifests/05-uninstrumented-few-services.yml +++ b/test/integration/k8s/manifests/05-uninstrumented-few-services.yml @@ -10,6 +10,9 @@ spec: - port: 8080 name: http0 targetPort: http0 + - port: 8088 + name: http1 + targetPort: http1 --- apiVersion: v1 kind: Service @@ -112,6 +115,9 @@ spec: - containerPort: 8080 hostPort: 8080 name: http0 + - containerPort: 8088 + hostPort: 8088 + name: http1 env: - name: LOG_LEVEL value: "DEBUG" diff --git a/test/integration/k8s/manifests/06-beyla-daemonset-multi-node.yml b/test/integration/k8s/manifests/06-beyla-daemonset-multi-node.yml index 1130216237..dbfc0a1745 100644 --- a/test/integration/k8s/manifests/06-beyla-daemonset-multi-node.yml +++ b/test/integration/k8s/manifests/06-beyla-daemonset-multi-node.yml @@ -20,6 +20,8 @@ data: - exe_path: "/testserver" namespace: integration-test routes: + patterns: + - /jsonrpc unmatched: heuristic otel_metrics_export: endpoint: http://otelcol:4318 diff --git a/test/integration/multiprocess_test.go b/test/integration/multiprocess_test.go index 3853b99160..2569c8636e 100644 --- a/test/integration/multiprocess_test.go +++ b/test/integration/multiprocess_test.go @@ -39,6 +39,13 @@ func TestMultiProcess(t *testing.T) { // it doesn't instrument too the process from the other container checkReportedOnlyOnce(t, "http://localhost:8900", "rename1") }) + t.Run("Go RED metrics: JSON RPC", func(t *testing.T) { + waitForTestComponents(t, instrumentedServiceJSONRPCURL) + testREDMetricsForJSONRPCHTTP(t, instrumentedServiceJSONRPCURL, "testserver", "initial-set") + // checks that, instrumenting the process from this container, + // it doesn't instrument too the process from the other container + checkReportedOnlyOnce(t, instrumentedServiceJSONRPCURL, "rename1") + }) t.Run("Go RED metrics: rust service ssl", func(t *testing.T) { waitForTestComponents(t, "https://localhost:8491") @@ -89,7 +96,7 @@ func TestMultiProcess(t *testing.T) { }) if kprobeTracesEnabled() { - t.Run("Nested traces with kprobes: rust -> java -> node -> go -> python -> rails", func(t *testing.T) { + t.Run("Nested traces with kprobes: rust -> java -> node -> go -> go jsonrpc -> python -> rails", func(t *testing.T) { testNestedHTTPTracesKProbes(t) }) @@ -113,7 +120,7 @@ func TestMultiProcessAppCP(t *testing.T) { require.NoError(t, compose.Up()) if kprobeTracesEnabled() { - t.Run("Nested traces with kprobes: rust -> java -> node -> go -> python -> rails", func(t *testing.T) { + t.Run("Nested traces with kprobes: rust -> java -> node -> go -> go jsonrpc -> python -> rails", func(t *testing.T) { testNestedHTTPTracesKProbes(t) }) } @@ -129,7 +136,7 @@ func TestMultiProcessAppCPNoIP(t *testing.T) { require.NoError(t, compose.Up()) if kprobeTracesEnabled() { - t.Run("Nested traces with kprobes: rust -> java -> node -> go -> python -> rails", func(t *testing.T) { + t.Run("Nested traces with kprobes: rust -> java -> node -> go -> go jsonrpc -> python -> rails", func(t *testing.T) { testNestedHTTPTracesKProbes(t) }) } diff --git a/test/integration/red_test.go b/test/integration/red_test.go index fd8c2fa60a..60e2f4384a 100644 --- a/test/integration/red_test.go +++ b/test/integration/red_test.go @@ -8,6 +8,8 @@ import ( "io" "math/rand/v2" "net/http" + "os" + "path" "strconv" "strings" "testing" @@ -28,6 +30,7 @@ const ( instrumentedServiceGorillaMidURL = "http://localhost:8083" instrumentedServiceGorillaMid2URL = "http://localhost:8087" instrumentedServiceStdTLSURL = "https://localhost:8383" + instrumentedServiceJSONRPCURL = "http://localhost:8088" prometheusHostPort = "localhost:9090" jaegerQueryURL = "http://localhost:16686/api/traces" @@ -93,6 +96,18 @@ func testREDMetricsShortHTTP(t *testing.T) { } } +func testREDMetricsJSONRPCHTTP(t *testing.T) { + for _, testCaseURL := range []string{ + instrumentedServiceJSONRPCURL, + } { + t.Run(testCaseURL, func(t *testing.T) { + waitForTestComponents(t, testCaseURL) + testREDMetricsForJSONRPCHTTP(t, testCaseURL, "testserver", "integration-test") + testSpanMetricsForJSONRPCHTTP(t, "testserver", "integration-test") + }) + } +} + func testExemplarsExist(t *testing.T) { url := "http://" + prometheusHostPort + "/api/v1/query_exemplars?query=http_server_request_duration_seconds_bucket" @@ -222,6 +237,60 @@ func testSpanMetricsForHTTPLibrary(t *testing.T, svcName, svcNs string) { }) } +// **IMPORTANT** Tests must first call -> func testREDMetricsForJSONRPCHTTP(t *testing.T, url, svcName, svcNs string) { +func testSpanMetricsForJSONRPCHTTP(t *testing.T, svcName, svcNs string) { + pq := prom.Client{HostPort: prometheusHostPort} + var results []prom.Result + + expectedSpanName := "Arith.M /jsonrpc" + + // Test span metrics + test.Eventually(t, testTimeout, func(t require.TestingT) { + var err error + results, err = pq.Query(`traces_span_metrics_duration_seconds_count{` + + `span_kind="SPAN_KIND_SERVER",` + + `status_code="STATUS_CODE_UNSET",` + // 404 is OK for server spans + `service_namespace="` + svcNs + `",` + + `service_name="` + svcName + `",` + + `span_name="` + expectedSpanName + `"` + + `}`) + require.NoError(t, err) + // check span metric latency exists + enoughPromResults(t, results) + val := totalPromCount(t, results) + assert.LessOrEqual(t, 3, val) + }) + + test.Eventually(t, testTimeout, func(t require.TestingT) { + var err error + results, err = pq.Query(`traces_span_metrics_calls_total{` + + `span_kind="SPAN_KIND_SERVER",` + + `status_code="STATUS_CODE_UNSET",` + // 404 is OK for server spans + `service_namespace="` + svcNs + `",` + + `service_name="` + svcName + `",` + + `span_name="` + expectedSpanName + `"` + + `}`) + require.NoError(t, err) + // check calls total exists + enoughPromResults(t, results) + val := totalPromCount(t, results) + assert.LessOrEqual(t, 3, val) + }) + + test.Eventually(t, testTimeout, func(t require.TestingT) { + var err error + results, err = pq.Query(`traces_target_info{` + + `service_namespace="` + svcNs + `",` + + `service_name="` + svcName + `",` + + `telemetry_sdk_language="go"` + + `}`) + require.NoError(t, err) + enoughPromResults(t, results) + val := totalPromCount(t, results) + assert.LessOrEqual(t, 1, val) // we report this count for each service, doesn't matter how many calls + }) +} + // **IMPORTANT** Tests must first call -> func testREDMetricsForHTTPLibrary(t *testing.T, url, svcName, svcNs string) { func testServiceGraphMetricsForHTTPLibrary(t *testing.T, svcNs string) { pq := prom.Client{HostPort: prometheusHostPort} @@ -261,6 +330,39 @@ func testServiceGraphMetricsForHTTPLibrary(t *testing.T, svcNs string) { assert.Equal(t, 0, val) } +func testREDMetricsForJSONRPCHTTP(t *testing.T, url, svcName, svcNs string) { + jsonBody, err := os.ReadFile(path.Join(pathRoot, "test", "integration", "components", "testserver", "jsonrpc", "body", "formated.json")) + require.NoError(t, err) + urlPath := "/jsonrpc" + expectedMethod := "Arith.M" + + for i := 0; i < 4; i++ { + doHTTPPost(t, url+urlPath, 200, jsonBody) + } + + // Eventually, Prometheus would make this query visible + pq := prom.Client{HostPort: prometheusHostPort} + var results []prom.Result + test.Eventually(t, testTimeout, func(t require.TestingT) { + var err error + results, err = pq.Query(`http_server_request_duration_seconds_count{` + + `http_request_method="` + expectedMethod + `",` + + `http_response_status_code="200",` + + `service_namespace="` + svcNs + `",` + + `service_name="` + svcName + `",` + + `url_path="` + urlPath + `"}`) + require.NoError(t, err) + enoughPromResults(t, results) + val := totalPromCount(t, results) + assert.LessOrEqual(t, 3, val) + if len(results) > 0 { + res := results[0] + addr := res.Metric["client_address"] + assert.NotNil(t, addr) + } + }) +} + func testREDMetricsForHTTPLibrary(t *testing.T, url, svcName, svcNs string) { path := "/basic/" + rndStr() diff --git a/test/integration/suites_test.go b/test/integration/suites_test.go index 6491fbc4d8..ee56925093 100644 --- a/test/integration/suites_test.go +++ b/test/integration/suites_test.go @@ -27,6 +27,7 @@ func TestSuite(t *testing.T) { compose.Env = append(compose.Env, `OTEL_EBPF_EXECUTABLE_PATH=(pingclient|testserver)`) require.NoError(t, compose.Up()) t.Run("RED metrics", testREDMetricsHTTP) + t.Run("RED JSON RPC metrics", testREDMetricsJSONRPCHTTP) t.Run("HTTP traces", testHTTPTraces) t.Run("HTTP traces (no traceID)", testHTTPTracesNoTraceID) t.Run("GRPC traces", testGRPCTraces) diff --git a/test/integration/traces_test.go b/test/integration/traces_test.go index 5a164d6e4b..4b47fab42b 100644 --- a/test/integration/traces_test.go +++ b/test/integration/traces_test.go @@ -656,6 +656,7 @@ func testNestedHTTPTracesKProbes(t *testing.T) { waitForRubyTestComponents(t, "http://localhost:3041") // ruby waitForTestComponentsSub(t, "http://localhost:8086", "/greeting") // java waitForTestComponents(t, "http://localhost:8091") // rust + waitForTestComponents(t, instrumentedServiceJSONRPCURL) // go jsonrpc // Add and check for specific trace ID // Run couple of requests to make sure we flush out any transactions that might be @@ -664,8 +665,8 @@ func testNestedHTTPTracesKProbes(t *testing.T) { doHTTPGet(t, "http://localhost:8091/dist", 200) } - // rust -> java -> nodejs -> go -> python -> rails - // /dist2 -> /jtrace2 -> /traceme -> /gotracemetoo -> /tracemetoo -> /users + // rust -> java -> nodejs -> go -> go jsonrpc -> python -> rails + // /dist2 -> /jtrace2 -> /traceme -> /gotracemetoo -> /jsonrpc -> /tracemetoo -> /users // Get the first 5 traces var multipleTraces []jaeger.Trace @@ -765,6 +766,26 @@ func testNestedHTTPTracesKProbes(t *testing.T) { ) assert.Empty(t, sd, sd.String()) + // Check the information of the go jsonrpc parent span + res = trace.FindByOperationName("Arith.T /jsonrpc", "server") + require.Len(t, res, 1) + parent = res[0] + require.NotEmpty(t, parent.TraceID) + require.Equal(t, traceID, parent.TraceID) + require.NotEmpty(t, parent.SpanID) + // check duration is at least 2us + assert.Less(t, (2 * time.Microsecond).Microseconds(), parent.Duration) + // check span attributes + sd = parent.Diff( + jaeger.Tag{Key: "http.request.method", Type: "string", Value: "Arith.T"}, + jaeger.Tag{Key: "http.response.status_code", Type: "int64", Value: float64(200)}, + jaeger.Tag{Key: "url.path", Type: "string", Value: "/jsonrpc"}, + jaeger.Tag{Key: "server.port", Type: "int64", Value: float64(8088)}, + jaeger.Tag{Key: "http.route", Type: "string", Value: "/jsonrpc"}, + jaeger.Tag{Key: "span.kind", Type: "string", Value: "server"}, + ) + assert.Empty(t, sd, sd.String()) + // Check the information of the python parent span res = trace.FindByOperationName("GET /tracemetoo", "server") require.Len(t, res, 1)