Skip to content

Commit 5229aff

Browse files
committed
feat(protocol): support filter by apikeys and topic
fix: fix test fix: fix test
1 parent 4e1ab3d commit 5229aff

File tree

10 files changed

+167
-10
lines changed

10 files changed

+167
-10
lines changed

.github/workflows/build_verification.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ name: Build Verification
33
on:
44
workflow_dispatch:
55
push:
6-
branches: [ "main", "dev","feature/*" ]
6+
branches: [ "main", "dev","feature/*","unstable" ]
77
pull_request:
8-
branches: [ "main", "dev","feature/*" ]
8+
branches: [ "main", "dev","feature/*","unstable" ]
99

1010
concurrency:
1111
group: ${{ github.workflow }}-${{ github.ref }}

.github/workflows/test.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ name: Test
33
on:
44
workflow_dispatch:
55
push:
6-
branches: [ "main", "dev","feature/*" ]
6+
branches: [ "main", "dev","feature/*","unstable" ]
77
pull_request:
8-
branches: [ "main", "dev","feature/*" ]
8+
branches: [ "main", "dev","feature/*","unstable" ]
99

1010
env:
1111
kyanos_log_option: --bpf-event-log-level 5 --conntrack-log-level 5 --agent-log-level 5

agent/protocol/kafka/common/types.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ const (
7676
_ // Placeholder for 58-59
7777
KDescribeCluster
7878
KDescribeProducers
79+
KApiKeyNone
7980
)
8081

8182
// API KEY to string
@@ -433,11 +434,13 @@ type Request struct {
433434
ApiVersion int16
434435
ClientId string
435436
Msg string
437+
OriginReq any
436438
}
437439

438440
type Response struct {
439441
protocol.FrameBase
440-
Msg string
442+
Msg string
443+
OriginResp any
441444
}
442445

443446
func (p *Packet) FormatToString() string {

agent/protocol/kafka/filter.go

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package kafka
2+
3+
import (
4+
"kyanos/agent/protocol"
5+
. "kyanos/agent/protocol/kafka/common"
6+
"kyanos/bpf"
7+
)
8+
9+
type KafkaFilter struct {
10+
apiKey []APIKey
11+
topic string
12+
producer bool
13+
consumer bool
14+
}
15+
16+
func NewKafkaFilter(apiKey []int32, topic string, producer bool, consumer bool) *KafkaFilter {
17+
apiKeys := make([]APIKey, len(apiKey))
18+
for i, key := range apiKey {
19+
apiKeys[i] = APIKey(key)
20+
}
21+
22+
return &KafkaFilter{
23+
apiKey: apiKeys,
24+
topic: topic,
25+
producer: producer,
26+
consumer: consumer,
27+
}
28+
}
29+
30+
func (k *KafkaFilter) Filter(req protocol.ParsedMessage, resp protocol.ParsedMessage) bool {
31+
pass := true
32+
pass = pass && (len(k.apiKey) == 0 || containsAPIKey(k.apiKey, req.(*Request).Apikey))
33+
if k.topic != "" && pass {
34+
kafkaReq, ok := req.(*Request)
35+
if !ok {
36+
return false
37+
}
38+
if kafkaReq.Apikey == KProduce {
39+
if k.producer && kafkaReq.OriginReq != nil {
40+
originProduceReq := kafkaReq.OriginReq.(ProduceReq)
41+
matched := false
42+
for _, topic := range originProduceReq.Topics {
43+
if topic.Name == k.topic {
44+
matched = true
45+
break
46+
}
47+
}
48+
if len(originProduceReq.Topics) == 0 || !matched {
49+
pass = false
50+
}
51+
} else {
52+
pass = false
53+
}
54+
} else if kafkaReq.Apikey == KFetch {
55+
kafkaResp, ok := resp.(*Response)
56+
if !ok {
57+
return false
58+
}
59+
if k.consumer && kafkaResp.OriginResp != nil {
60+
originFetchResp := kafkaResp.OriginResp.(FetchResp)
61+
matched := false
62+
for _, topic := range originFetchResp.Topics {
63+
if topic.Name == k.topic {
64+
matched = true
65+
break
66+
}
67+
}
68+
if len(originFetchResp.Topics) == 0 || !matched {
69+
pass = false
70+
}
71+
} else {
72+
pass = false
73+
}
74+
} else {
75+
pass = false
76+
}
77+
}
78+
return pass
79+
}
80+
81+
func containsAPIKey(apiKeys []APIKey, key APIKey) bool {
82+
for _, apiKey := range apiKeys {
83+
if apiKey == key {
84+
return true
85+
}
86+
}
87+
return false
88+
}
89+
90+
func (k *KafkaFilter) FilterByProtocol(p bpf.AgentTrafficProtocolT) bool {
91+
return p == bpf.AgentTrafficProtocolTKProtocolKafka
92+
}
93+
94+
func (k *KafkaFilter) FilterByRequest() bool {
95+
return len(k.apiKey) == 0 || (k.producer && k.topic != "")
96+
}
97+
98+
func (k *KafkaFilter) FilterByResponse() bool {
99+
return (k.consumer && k.topic != "")
100+
}
101+
102+
var _ protocol.ProtocolFilter = &KafkaFilter{}

agent/protocol/kafka/kafka.go

+4
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ func ProcessProduceReq(decoder *decoder.PacketDecoder, req *common.Request) erro
327327
return err
328328
}
329329
req.Msg = string(jsonData)
330+
req.OriginReq = r
330331
return nil
331332
}
332333

@@ -340,6 +341,7 @@ func ProcessProduceResp(decoder *decoder.PacketDecoder, resp *common.Response) e
340341
return err
341342
}
342343
resp.Msg = string(jsonData)
344+
resp.OriginResp = r
343345
return nil
344346
}
345347

@@ -353,6 +355,7 @@ func ProcessFetchReq(decoder *decoder.PacketDecoder, req *common.Request) error
353355
return err
354356
}
355357
req.Msg = string(jsonData)
358+
req.OriginReq = r
356359
return nil
357360
}
358361

@@ -366,6 +369,7 @@ func ProcessFetchResp(decoder *decoder.PacketDecoder, resp *common.Response) err
366369
return err
367370
}
368371
resp.Msg = string(jsonData)
372+
resp.OriginResp = r
369373
return nil
370374
}
371375

bpf/common.go

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ var ProtocolNamesMap = map[AgentTrafficProtocolT]string{
99
AgentTrafficProtocolTKProtocolRedis: "Redis",
1010
AgentTrafficProtocolTKProtocolMySQL: "MySQL",
1111
AgentTrafficProtocolTKProtocolRocketMQ: "RocketMQ",
12+
AgentTrafficProtocolTKProtocolKafka: "Kafka",
1213
}
1314

1415
var StepCNNames [AgentStepTEnd + 1]string = [AgentStepTEnd + 1]string{"开始", "SSLWrite", "系统调用(出)", "TCP层(出)", "IP层(出)", "QDISC", "DEV层(出)", "网卡(出)", "网卡(进)", "DEV层(进)", "IP层(进)", "TCP层(进)", "用户拷贝", "系统调用(进)", "SSLRead", "结束"}

cmd/kafka.go

+23-4
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,37 @@ import (
99
var _ = kafka.ProcessFetchReq
1010
var kafkaCmd *cobra.Command = &cobra.Command{
1111
Use: "kafka",
12-
Short: "watch RocketMQ message",
12+
Short: "watch Kafka message",
1313
Run: func(cmd *cobra.Command, args []string) {
14-
14+
apikeys, err := cmd.Flags().GetInt32Slice("apikeys")
15+
if err != nil {
16+
logger.Fatalf("Invalid apikeys: %v\n", err)
17+
}
18+
topic, err := cmd.Flags().GetString("topic")
19+
if err != nil {
20+
logger.Fatalf("Invalid topic: %v\n", err)
21+
}
22+
producer, err := cmd.Flags().GetBool("producer")
23+
if err != nil {
24+
logger.Fatalf("Invalid producer: %v\n", err)
25+
}
26+
consumer, err := cmd.Flags().GetBool("consumer")
27+
if err != nil {
28+
logger.Fatalf("Invalid consumer: %v\n", err)
29+
}
30+
filter := kafka.NewKafkaFilter(apikeys, topic, producer, consumer)
31+
options.MessageFilter = filter
1532
options.LatencyFilter = initLatencyFilter(cmd)
1633
options.SizeFilter = initSizeFilter(cmd)
1734
startAgent()
1835
},
1936
}
2037

2138
func init() {
22-
// kafkaCmd.Flags().Int32Slice("request-codes", []int32{}, "Specify the request codes to monitor (e.g., 10, 11), separated by ','")
23-
// kafkaCmd.Flags().StringSlice("languages", []string{}, "Specify the languages to monitor (e.g., Java, Go, Rust, CPP), separated by ','")
39+
kafkaCmd.Flags().Int32Slice("apikeys", []int32{}, "Specify the apikeys to monitor (e.g., 0, 1), separated by ','")
40+
kafkaCmd.Flags().String("topic", "", "Specify the topic to monitor")
41+
kafkaCmd.Flags().Bool("producer", true, "Monitor only producer request")
42+
kafkaCmd.Flags().Bool("consumer", true, "Monitor only fetch request")
2443

2544
kafkaCmd.PersistentFlags().SortFlags = false
2645
copy := *kafkaCmd

docs/cn/watch.md

+14
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,20 @@ kyanos 支持根据 IP 端口等三/四层信息过滤,可以指定以下选
155155
> 更多支持的语言,请参阅
156156
> [这里](https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java)
157157
158+
159+
#### Kafka 协议过滤 <Badge type="tip" text="1.5.0" />
160+
161+
| 过滤条件 | 命令行 flag | 示例 |
162+
| :------- | :-------------- | :---------------------------------------------------------------------- |
163+
| 主题名称 | `topic` | `--topic quickstart-events` |
164+
| 生产者 | `producer` | `--producer` 观察 producer 的请求,指定topic时有用,默认为true |
165+
| 消费者 | `consumer` | `--consumer` 观察 consumer 的请求,指定topic时有用,默认为true |
166+
| 请求代码 | `apikeys` | `--apikeys 10,11` 只观察Kafka APIKEY为 10 和 11 的 |
167+
168+
> 有关API Key的含义和值,请参阅
169+
> [这里](https://kafka.apache.org/protocol#protocol_api_keys)
170+
171+
158172
#### MYSQL 协议过滤
159173

160174
> 已支持 MySQL 协议抓取,根据条件过滤仍在实现中...

docs/watch.md

+14
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,20 @@ Here are the options available for filtering by each protocol:
183183
> For more supported languages, please refer to
184184
> [here](https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java).
185185
186+
#### Kafka Protocol Filtering <Badge type="tip" text="1.5.0" />
187+
188+
| Filter Condition | Command Line Flag | Example |
189+
| :--------------- | :---------------- | :------ |
190+
| Topic Name | `topic` | `--topic quickstart-events` |
191+
| Producer | `producer` | `--producer` Observe producer requests, useful when specifying a topic, default is true |
192+
| Consumer | `consumer` | `--consumer` Observe consumer requests, useful when specifying a topic, default is true |
193+
| Request Code | `apikeys` | `--apikeys 10,11` Only observe Kafka APIKEYs 10 and 11 |
194+
195+
196+
> For the meaning and values of API Keys, refer to
197+
> [here](https://kafka.apache.org/protocol#protocol_api_keys).
198+
199+
186200
#### MySQL Protocol Filtering
187201

188202
> MySQL protocol capturing is supported, but filtering by conditions is still in

testdata/test_filter_by_l4.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ LNAME_REMOTE_PORT="${FILE_PREFIX}_filter_by_remote_port.log"
99
LNAME_LOCAL_PORT="${FILE_PREFIX}_filter_by_local_port.log"
1010

1111
function test_filter_by_remote_ip() {
12-
remote_ip=$(dig example.com +short)
12+
remote_ip=$(dig example.com +short | tail -n 1)
1313
timeout 20 ${CMD} watch --debug-output http --remote-ips "$remote_ip" 2>&1 | tee "${LNAME_IP}" &
1414
sleep 10
1515
curl http://"$remote_ip" &>/dev/null || true

0 commit comments

Comments
 (0)