diff --git a/src/include/ndpi_typedefs.h b/src/include/ndpi_typedefs.h index ffc98ecd141..a2e10878fdb 100644 --- a/src/include/ndpi_typedefs.h +++ b/src/include/ndpi_typedefs.h @@ -835,9 +835,6 @@ struct ndpi_flow_tcp_struct { /* NDPI_PROTOCOL_SSH */ u_int32_t ssh_stage:3; - /* NDPI_PROTOCOL_KAFKA */ - u_int32_t kafka_stage:1; - /* NDPI_PROTOCOL_VNC */ u_int32_t vnc_stage:2; // 0 - 3 @@ -891,9 +888,6 @@ struct ndpi_flow_tcp_struct { /* NDPI_PROTOCOL_RADMIN */ u_int32_t radmin_stage:1; - - /* NDPI_PROTOCOL_KAFKA */ - u_int32_t kafka_correlation_id; }; /* ************************************************** */ diff --git a/src/lib/protocols/kafka.c b/src/lib/protocols/kafka.c index cffd1f32f6e..abf0ae3ca9a 100644 --- a/src/lib/protocols/kafka.c +++ b/src/lib/protocols/kafka.c @@ -29,6 +29,14 @@ #include "ndpi_api.h" #include "ndpi_private.h" +static void ndpi_int_kafka_add_connection(struct ndpi_detection_module_struct *ndpi_struct, + struct ndpi_flow_struct *flow) +{ + NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); + ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, + NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); +} + static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct, struct ndpi_flow_struct *flow) { @@ -41,32 +49,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct, * API keys: https://kafka.apache.org/protocol.html#protocol_api_keys * API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs */ - if (packet->payload_packet_len > 40 && - ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4)) + if (packet->payload_packet_len < 8 /* min. required packet length */ || + ntohl(get_u_int32_t(packet->payload, 0)) != (uint32_t)(packet->payload_packet_len - 4)) + { + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; + } + + /* Request */ + if (ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */ + ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */) { - /* Request */ - if (!flow->l4.tcp.kafka_stage && - current_pkt_from_client_to_server(ndpi_struct, flow) && - ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */ - ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */) + if (packet->payload_packet_len < 14) { - flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8)); - flow->l4.tcp.kafka_stage = 1; - return; + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; } - /* Response */ - if (flow->l4.tcp.kafka_stage == 1 && - current_pkt_from_server_to_client(ndpi_struct, flow)) + const uint16_t client_id_len = ntohs(get_u_int16_t(packet->payload, 12)); + if (client_id_len + 12 + 2 > packet->payload_packet_len) { - if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id) - { - NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n"); - ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA, - NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI); + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); return; - } } + if (ndpi_is_printable_buffer(&packet->payload[14], client_id_len) == 0) + { + NDPI_EXCLUDE_PROTO(ndpi_struct, flow); + return; + } + + ndpi_int_kafka_add_connection(ndpi_struct, flow); + return; } NDPI_EXCLUDE_PROTO(ndpi_struct, flow); diff --git a/tests/cfgs/default/pcap/kafka.pcapng b/tests/cfgs/default/pcap/kafka.pcapng index bcdabe98abe..88ddd53a217 100644 Binary files a/tests/cfgs/default/pcap/kafka.pcapng and b/tests/cfgs/default/pcap/kafka.pcapng differ diff --git a/tests/cfgs/default/result/kafka.pcapng.out b/tests/cfgs/default/result/kafka.pcapng.out index ee0deb38df9..47bdc80f339 100644 --- a/tests/cfgs/default/result/kafka.pcapng.out +++ b/tests/cfgs/default/result/kafka.pcapng.out @@ -1,11 +1,14 @@ -DPI Packets (TCP): 6 (6.00 pkts/flow) -Confidence DPI : 1 (flows) -Num dissector calls: 150 (150.00 diss/flow) +Guessed flow protos: 1 + +DPI Packets (TCP): 16 (1.78 pkts/flow) +Confidence Match by port : 1 (flows) +Confidence DPI : 8 (flows) +Num dissector calls: 222 (24.67 diss/flow) LRU cache ookla: 0/0/0 (insert/search/found) -LRU cache bittorrent: 0/0/0 (insert/search/found) +LRU cache bittorrent: 0/3/0 (insert/search/found) LRU cache stun: 0/0/0 (insert/search/found) LRU cache tls_cert: 0/0/0 (insert/search/found) -LRU cache mining: 0/0/0 (insert/search/found) +LRU cache mining: 0/1/0 (insert/search/found) LRU cache msteams: 0/0/0 (insert/search/found) LRU cache stun_zoom: 0/0/0 (insert/search/found) Automa host: 0/0 (search/found) @@ -13,15 +16,23 @@ Automa domain: 0/0 (search/found) Automa tls cert: 0/0 (search/found) Automa risk mask: 0/0 (search/found) Automa common alpns: 0/0 (search/found) -Patricia risk mask: 0/0 (search/found) +Patricia risk mask: 14/0 (search/found) Patricia risk mask IPv6: 0/0 (search/found) Patricia risk: 0/0 (search/found) Patricia risk IPv6: 0/0 (search/found) -Patricia protocols: 2/0 (search/found) +Patricia protocols: 18/0 (search/found) Patricia protocols IPv6: 0/0 (search/found) -Kafka 19 2237 1 +Kafka 41 7067 9 -Acceptable 19 2237 1 +Acceptable 41 7067 9 - 1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 6][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 4][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: TCP connection with unidirectional traffic][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 2 TCP 172.16.17.101:38176 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][3 pkts/1408 bytes <-> 2 pkts/254 bytes][Goodput ratio: 86/48][0.58 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (timestamp)][Plen Bins: 0,40,0,0,0,0,0,0,0,0,0,40,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 3 TCP 172.30.0.237:9092 <-> 172.16.17.101:58052 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: Match by port][DPI packets: 5][cat: RPC/16][4 pkts/974 bytes <-> 1 pkts/110 bytes][Goodput ratio: 73/40][599.70 sec][PLAIN TEXT (172.30.0.237)][Plen Bins: 0,20,0,60,0,0,0,0,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 4 TCP 172.16.17.101:49280 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][2 pkts/201 bytes <-> 3 pkts/788 bytes][Goodput ratio: 34/75][899.84 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (172.30.0.237)][Plen Bins: 20,20,0,40,0,0,0,0,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 5 TCP 172.16.17.101:56556 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes <-> 1 pkts/416 bytes][Goodput ratio: 27/84][0.03 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 50,0,0,0,0,0,0,0,0,0,50,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 6 TCP 172.16.17.101:40042 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/110 bytes <-> 1 pkts/186 bytes][Goodput ratio: 40/64][0.03 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (172.30.0.237)][Plen Bins: 0,50,0,50,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 7 TCP 172.16.17.101:53768 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/110 bytes -> 0 pkts/0 bytes][Goodput ratio: 40/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 0,100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 8 TCP 172.16.17.101:53052 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes -> 0 pkts/0 bytes][Goodput ratio: 27/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + 9 TCP 172.16.17.101:58300 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes -> 0 pkts/0 bytes][Goodput ratio: 27/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]