Skip to content

Commit

Permalink
fix: fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
hengyoush committed Jan 14, 2025
1 parent 646edbf commit 1757ced
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions agent/protocol/kafka/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,12 @@ func NewKafkaFilter(apiKey []int32, topic string, producer bool, consumer bool)

func (k *KafkaFilter) Filter(req protocol.ParsedMessage, resp protocol.ParsedMessage) bool {
pass := true
kafkaReq, ok := req.(*Request)
if !ok {
return false
}
kafkaResp, ok := resp.(*Response)
if !ok {
return false
}
pass = pass && (len(k.apiKey) == 0 || containsAPIKey(k.apiKey, kafkaReq.Apikey))
pass = pass && (len(k.apiKey) == 0 || containsAPIKey(k.apiKey, req.(*Request).Apikey))
if k.topic != "" && pass {
kafkaReq, ok := req.(*Request)
if !ok {
return false
}
if kafkaReq.Apikey == KProduce {
if k.producer && kafkaReq.OriginReq != nil {
originProduceReq := kafkaReq.OriginReq.(ProduceReq)
Expand All @@ -56,6 +52,10 @@ func (k *KafkaFilter) Filter(req protocol.ParsedMessage, resp protocol.ParsedMes
pass = false
}
} else if kafkaReq.Apikey == KFetch {
kafkaResp, ok := resp.(*Response)
if !ok {
return false
}
if k.consumer && kafkaResp.OriginResp != nil {
originFetchResp := kafkaResp.OriginResp.(FetchResp)
matched := false
Expand Down

0 comments on commit 1757ced

Please sign in to comment.