Skip to content

Commit

Permalink
feat(protocol): support filter by apikeys and topic
Browse files Browse the repository at this point in the history
fix: fix test
  • Loading branch information
hengyoush committed Jan 15, 2025
1 parent 4efc8eb commit 1129e6a
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 9 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_verification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ name: Build Verification
on:
workflow_dispatch:
push:
branches: [ "main", "dev","feature/*" ]
branches: [ "main", "dev","feature/*","unstable" ]
pull_request:
branches: [ "main", "dev","feature/*" ]
branches: [ "main", "dev","feature/*","unstable" ]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ name: Test
on:
workflow_dispatch:
push:
branches: [ "main", "dev","feature/*" ]
branches: [ "main", "dev","feature/*","unstable" ]
pull_request:
branches: [ "main", "dev","feature/*" ]
branches: [ "main", "dev","feature/*","unstable" ]

env:
kyanos_log_option: --bpf-event-log-level 5 --conntrack-log-level 5 --agent-log-level 5
Expand Down
5 changes: 4 additions & 1 deletion agent/protocol/kafka/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
_ // Placeholder for 58-59
KDescribeCluster
KDescribeProducers
KApiKeyNone
)

// API KEY to string
Expand Down Expand Up @@ -433,11 +434,13 @@ type Request struct {
ApiVersion int16
ClientId string
Msg string
OriginReq any
}

type Response struct {
protocol.FrameBase
Msg string
Msg string
OriginResp any
}

func (p *Packet) FormatToString() string {
Expand Down
102 changes: 102 additions & 0 deletions agent/protocol/kafka/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package kafka

import (
"kyanos/agent/protocol"
. "kyanos/agent/protocol/kafka/common"
"kyanos/bpf"
)

type KafkaFilter struct {
apiKey []APIKey
topic string
producer bool
consumer bool
}

func NewKafkaFilter(apiKey []int32, topic string, producer bool, consumer bool) *KafkaFilter {
apiKeys := make([]APIKey, len(apiKey))
for i, key := range apiKey {
apiKeys[i] = APIKey(key)
}

return &KafkaFilter{
apiKey: apiKeys,
topic: topic,
producer: producer,
consumer: consumer,
}
}

func (k *KafkaFilter) Filter(req protocol.ParsedMessage, resp protocol.ParsedMessage) bool {
pass := true
pass = pass && (len(k.apiKey) == 0 || containsAPIKey(k.apiKey, req.(*Request).Apikey))
if k.topic != "" && pass {
kafkaReq, ok := req.(*Request)
if !ok {
return false
}
if kafkaReq.Apikey == KProduce {
if k.producer && kafkaReq.OriginReq != nil {
originProduceReq := kafkaReq.OriginReq.(ProduceReq)
matched := false
for _, topic := range originProduceReq.Topics {
if topic.Name == k.topic {
matched = true
break
}
}
if len(originProduceReq.Topics) == 0 || !matched {
pass = false
}
} else {
pass = false
}
} else if kafkaReq.Apikey == KFetch {
kafkaResp, ok := resp.(*Response)
if !ok {
return false
}
if k.consumer && kafkaResp.OriginResp != nil {
originFetchResp := kafkaResp.OriginResp.(FetchResp)
matched := false
for _, topic := range originFetchResp.Topics {
if topic.Name == k.topic {
matched = true
break
}
}
if len(originFetchResp.Topics) == 0 || !matched {
pass = false
}
} else {
pass = false
}
} else {
pass = false
}
}
return pass
}

func containsAPIKey(apiKeys []APIKey, key APIKey) bool {
for _, apiKey := range apiKeys {
if apiKey == key {
return true
}
}
return false
}

func (k *KafkaFilter) FilterByProtocol(p bpf.AgentTrafficProtocolT) bool {
return p == bpf.AgentTrafficProtocolTKProtocolKafka
}

func (k *KafkaFilter) FilterByRequest() bool {
return len(k.apiKey) == 0 || (k.producer && k.topic != "")
}

func (k *KafkaFilter) FilterByResponse() bool {
return (k.consumer && k.topic != "")
}

var _ protocol.ProtocolFilter = &KafkaFilter{}
4 changes: 4 additions & 0 deletions agent/protocol/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ func ProcessProduceReq(decoder *decoder.PacketDecoder, req *common.Request) erro
return err
}
req.Msg = string(jsonData)
req.OriginReq = r
return nil
}

Expand All @@ -340,6 +341,7 @@ func ProcessProduceResp(decoder *decoder.PacketDecoder, resp *common.Response) e
return err
}
resp.Msg = string(jsonData)
resp.OriginResp = r
return nil
}

Expand All @@ -353,6 +355,7 @@ func ProcessFetchReq(decoder *decoder.PacketDecoder, req *common.Request) error
return err
}
req.Msg = string(jsonData)
req.OriginReq = r
return nil
}

Expand All @@ -366,6 +369,7 @@ func ProcessFetchResp(decoder *decoder.PacketDecoder, resp *common.Response) err
return err
}
resp.Msg = string(jsonData)
resp.OriginResp = r
return nil
}

Expand Down
1 change: 1 addition & 0 deletions bpf/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var ProtocolNamesMap = map[AgentTrafficProtocolT]string{
AgentTrafficProtocolTKProtocolRedis: "Redis",
AgentTrafficProtocolTKProtocolMySQL: "MySQL",
AgentTrafficProtocolTKProtocolRocketMQ: "RocketMQ",
AgentTrafficProtocolTKProtocolKafka: "Kafka",
}

var StepCNNames [AgentStepTEnd + 1]string = [AgentStepTEnd + 1]string{"开始", "SSLWrite", "系统调用(出)", "TCP层(出)", "IP层(出)", "QDISC", "DEV层(出)", "网卡(出)", "网卡(进)", "DEV层(进)", "IP层(进)", "TCP层(进)", "用户拷贝", "系统调用(进)", "SSLRead", "结束"}
Expand Down
27 changes: 23 additions & 4 deletions cmd/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,37 @@ import (
var _ = kafka.ProcessFetchReq
var kafkaCmd *cobra.Command = &cobra.Command{
Use: "kafka",
Short: "watch RocketMQ message",
Short: "watch Kafka message",
Run: func(cmd *cobra.Command, args []string) {

apikeys, err := cmd.Flags().GetInt32Slice("apikeys")
if err != nil {
logger.Fatalf("Invalid apikeys: %v\n", err)
}
topic, err := cmd.Flags().GetString("topic")
if err != nil {
logger.Fatalf("Invalid topic: %v\n", err)
}
producer, err := cmd.Flags().GetBool("producer")
if err != nil {
logger.Fatalf("Invalid producer: %v\n", err)
}
consumer, err := cmd.Flags().GetBool("consumer")
if err != nil {
logger.Fatalf("Invalid consumer: %v\n", err)
}
filter := kafka.NewKafkaFilter(apikeys, topic, producer, consumer)
options.MessageFilter = filter
options.LatencyFilter = initLatencyFilter(cmd)
options.SizeFilter = initSizeFilter(cmd)
startAgent()
},
}

func init() {
// kafkaCmd.Flags().Int32Slice("request-codes", []int32{}, "Specify the request codes to monitor (e.g., 10, 11), separated by ','")
// kafkaCmd.Flags().StringSlice("languages", []string{}, "Specify the languages to monitor (e.g., Java, Go, Rust, CPP), separated by ','")
kafkaCmd.Flags().Int32Slice("apikeys", []int32{}, "Specify the apikeys to monitor (e.g., 0, 1), separated by ','")
kafkaCmd.Flags().String("topic", "", "Specify the topic to monitor")
kafkaCmd.Flags().Bool("producer", true, "Monitor only producer request")
kafkaCmd.Flags().Bool("consumer", true, "Monitor only fetch request")

kafkaCmd.PersistentFlags().SortFlags = false
copy := *kafkaCmd
Expand Down
14 changes: 14 additions & 0 deletions docs/cn/watch.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,20 @@ kyanos 支持根据 IP 端口等三/四层信息过滤,可以指定以下选
> 更多支持的语言,请参阅
> [这里](https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java)

#### Kafka 协议过滤 <Badge type="tip" text="1.5.0" />

| 过滤条件 | 命令行 flag | 示例 |
| :------- | :-------------- | :---------------------------------------------------------------------- |
| 主题名称 | `topic` | `--topic quickstart-events` |
| 生产者 | `producer` | `--producer` 观察 producer 的请求,指定topic时有用,默认为true |
| 消费者 | `consumer` | `--consumer` 观察 consumer 的请求,指定topic时有用,默认为true |
| 请求代码 | `apikeys` | `--apikeys 10,11` 只观察Kafka APIKEY为 10 和 11 的 |

> 有关API Key的含义和值,请参阅
> [这里](https://kafka.apache.org/protocol#protocol_api_keys)

#### MYSQL 协议过滤

> 已支持 MySQL 协议抓取,根据条件过滤仍在实现中...
Expand Down
14 changes: 14 additions & 0 deletions docs/watch.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ Here are the options available for filtering by each protocol:
> For more supported languages, please refer to
> [here](https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java).
#### Kafka Protocol Filtering <Badge type="tip" text="1.5.0" />

| Filter Condition | Command Line Flag | Example |
| :--------------- | :---------------- | :------ |
| Topic Name | `topic` | `--topic quickstart-events` |
| Producer | `producer` | `--producer` Observe producer requests, useful when specifying a topic, default is true |
| Consumer | `consumer` | `--consumer` Observe consumer requests, useful when specifying a topic, default is true |
| Request Code | `apikeys` | `--apikeys 10,11` Only observe Kafka APIKEYs 10 and 11 |


> For the meaning and values of API Keys, refer to
> [here](https://kafka.apache.org/protocol#protocol_api_keys).

#### MySQL Protocol Filtering

> MySQL protocol capturing is supported, but filtering by conditions is still in
Expand Down

0 comments on commit 1129e6a

Please sign in to comment.