Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Kafka dissector. #2456

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/include/ndpi_typedefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,6 @@ struct ndpi_flow_tcp_struct {
/* NDPI_PROTOCOL_SSH */
u_int32_t ssh_stage:3;

/* NDPI_PROTOCOL_KAFKA */
u_int32_t kafka_stage:1;

/* NDPI_PROTOCOL_VNC */
u_int32_t vnc_stage:2; // 0 - 3

Expand Down Expand Up @@ -891,9 +888,6 @@ struct ndpi_flow_tcp_struct {

/* NDPI_PROTOCOL_RADMIN */
u_int32_t radmin_stage:1;

/* NDPI_PROTOCOL_KAFKA */
u_int32_t kafka_correlation_id;
};

/* ************************************************** */
Expand Down
51 changes: 32 additions & 19 deletions src/lib/protocols/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
#include "ndpi_api.h"
#include "ndpi_private.h"

static void ndpi_int_kafka_add_connection(struct ndpi_detection_module_struct *ndpi_struct,
struct ndpi_flow_struct *flow)
{
NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
}

static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
struct ndpi_flow_struct *flow)
{
Expand All @@ -41,32 +49,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
* API keys: https://kafka.apache.org/protocol.html#protocol_api_keys
* API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs
*/
if (packet->payload_packet_len > 40 &&
ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4))
if (packet->payload_packet_len < 8 /* min. required packet length */ ||
ntohl(get_u_int32_t(packet->payload, 0)) != (uint32_t)(packet->payload_packet_len - 4))
{
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}

/* Request */
if (ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
{
/* Request */
if (!flow->l4.tcp.kafka_stage &&
current_pkt_from_client_to_server(ndpi_struct, flow) &&
ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
if (packet->payload_packet_len < 14)
{
flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8));
flow->l4.tcp.kafka_stage = 1;
return;
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}

/* Response */
if (flow->l4.tcp.kafka_stage == 1 &&
current_pkt_from_server_to_client(ndpi_struct, flow))
const uint16_t client_id_len = ntohs(get_u_int16_t(packet->payload, 12));
if (client_id_len + 12 + 2 > packet->payload_packet_len)
{
if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id)
{
NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}
}
if (ndpi_is_printable_buffer(&packet->payload[14], client_id_len) == 0)
{
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}

ndpi_int_kafka_add_connection(ndpi_struct, flow);
return;
}

NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
Expand Down
Binary file modified tests/cfgs/default/pcap/kafka.pcapng
Binary file not shown.
31 changes: 21 additions & 10 deletions tests/cfgs/default/result/kafka.pcapng.out
Original file line number Diff line number Diff line change
@@ -1,27 +1,38 @@
DPI Packets (TCP): 6 (6.00 pkts/flow)
Confidence DPI : 1 (flows)
Num dissector calls: 150 (150.00 diss/flow)
Guessed flow protos: 1

DPI Packets (TCP): 16 (1.78 pkts/flow)
Confidence Match by port : 1 (flows)
Confidence DPI : 8 (flows)
Num dissector calls: 222 (24.67 diss/flow)
LRU cache ookla: 0/0/0 (insert/search/found)
LRU cache bittorrent: 0/0/0 (insert/search/found)
LRU cache bittorrent: 0/3/0 (insert/search/found)
LRU cache stun: 0/0/0 (insert/search/found)
LRU cache tls_cert: 0/0/0 (insert/search/found)
LRU cache mining: 0/0/0 (insert/search/found)
LRU cache mining: 0/1/0 (insert/search/found)
LRU cache msteams: 0/0/0 (insert/search/found)
LRU cache stun_zoom: 0/0/0 (insert/search/found)
Automa host: 0/0 (search/found)
Automa domain: 0/0 (search/found)
Automa tls cert: 0/0 (search/found)
Automa risk mask: 0/0 (search/found)
Automa common alpns: 0/0 (search/found)
Patricia risk mask: 0/0 (search/found)
Patricia risk mask: 14/0 (search/found)
Patricia risk mask IPv6: 0/0 (search/found)
Patricia risk: 0/0 (search/found)
Patricia risk IPv6: 0/0 (search/found)
Patricia protocols: 2/0 (search/found)
Patricia protocols: 18/0 (search/found)
Patricia protocols IPv6: 0/0 (search/found)

Kafka 19 2237 1
Kafka 41 7067 9

Acceptable 19 2237 1
Acceptable 41 7067 9

1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 6][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 4][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: TCP connection with unidirectional traffic][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
2 TCP 172.16.17.101:38176 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][3 pkts/1408 bytes <-> 2 pkts/254 bytes][Goodput ratio: 86/48][0.58 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (timestamp)][Plen Bins: 0,40,0,0,0,0,0,0,0,0,0,40,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
3 TCP 172.30.0.237:9092 <-> 172.16.17.101:58052 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: Match by port][DPI packets: 5][cat: RPC/16][4 pkts/974 bytes <-> 1 pkts/110 bytes][Goodput ratio: 73/40][599.70 sec][PLAIN TEXT (172.30.0.237)][Plen Bins: 0,20,0,60,0,0,0,0,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
4 TCP 172.16.17.101:49280 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][2 pkts/201 bytes <-> 3 pkts/788 bytes][Goodput ratio: 34/75][899.84 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (172.30.0.237)][Plen Bins: 20,20,0,40,0,0,0,0,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
5 TCP 172.16.17.101:56556 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes <-> 1 pkts/416 bytes][Goodput ratio: 27/84][0.03 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 50,0,0,0,0,0,0,0,0,0,50,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
6 TCP 172.16.17.101:40042 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/110 bytes <-> 1 pkts/186 bytes][Goodput ratio: 40/64][0.03 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (172.30.0.237)][Plen Bins: 0,50,0,50,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
7 TCP 172.16.17.101:53768 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/110 bytes -> 0 pkts/0 bytes][Goodput ratio: 40/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 0,100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
8 TCP 172.16.17.101:53052 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes -> 0 pkts/0 bytes][Goodput ratio: 27/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
9 TCP 172.16.17.101:58300 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes -> 0 pkts/0 bytes][Goodput ratio: 27/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
Loading