diff --git a/agent/buffer/stream_buffer.go b/agent/buffer/stream_buffer.go index 585de32e..ec9a22f8 100644 --- a/agent/buffer/stream_buffer.go +++ b/agent/buffer/stream_buffer.go @@ -60,7 +60,7 @@ func (sb *StreamBuffer) IsEmpty() bool { return len(sb.buffers) == 0 } func (sb *StreamBuffer) Clear() { - sb.buffers = sb.buffers[:] + sb.buffers = sb.buffers[0:0] sb.timestamps.Clear() } func (sb *StreamBuffer) RemovePrefix(length int) { diff --git a/agent/protocol/decoder.go b/agent/protocol/decoder.go index a08cf29e..bbc1189d 100644 --- a/agent/protocol/decoder.go +++ b/agent/protocol/decoder.go @@ -14,10 +14,16 @@ type BinaryDecoder struct { func NewBinaryDecoder(buf []byte) *BinaryDecoder { return &BinaryDecoder{buf: buf, str: string(buf)} } +func (b *BinaryDecoder) RemainingBytes() int { + return len(b.str) +} func (b *BinaryDecoder) Buf() []byte { return b.buf } +func (b *BinaryDecoder) SubBuf(length int) []byte { + return b.buf[len(b.buf)-len(b.str)+length:] +} func (b *BinaryDecoder) SetBuf(buf []byte) { b.buf = buf diff --git a/agent/protocol/kafka/common/fetch.go b/agent/protocol/kafka/common/fetch.go index 23e7fde2..4dd71f51 100644 --- a/agent/protocol/kafka/common/fetch.go +++ b/agent/protocol/kafka/common/fetch.go @@ -91,3 +91,107 @@ type FetchResp struct { func (p FetchResp) ToJSON() ([]byte, error) { return json.Marshal(p) } + +func (p FetchReqPartition) Equals(other FetchReqPartition) bool { + return p.Index == other.Index && + p.CurrentLeaderEpoch == other.CurrentLeaderEpoch && + p.FetchOffset == other.FetchOffset && + p.LastFetchedEpoch == other.LastFetchedEpoch && + p.LogStartOffset == other.LogStartOffset && + p.PartitionMaxBytes == other.PartitionMaxBytes +} + +func (p FetchReqTopic) Equals(other FetchReqTopic) bool { + if p.Name != other.Name || len(p.Partitions) != len(other.Partitions) { + return false + } + for i := range p.Partitions { + if !p.Partitions[i].Equals(other.Partitions[i]) { + return false + } + } + return true +} + +func (p FetchForgottenTopicsData) Equals(other FetchForgottenTopicsData) bool { + if p.Name != other.Name || len(p.PartitionIndices) != len(other.PartitionIndices) { + return false + } + for i := range p.PartitionIndices { + if p.PartitionIndices[i] != other.PartitionIndices[i] { + return false + } + } + return true +} + +func (p FetchReq) Equals(other FetchReq) bool { + if p.ReplicaID != other.ReplicaID || + p.SessionID != other.SessionID || + len(p.Topics) != len(other.Topics) || + len(p.ForgottenTopics) != len(other.ForgottenTopics) { + return false + } + for i := range p.Topics { + if !p.Topics[i].Equals(other.Topics[i]) { + return false + } + } + for i := range p.ForgottenTopics { + if !p.ForgottenTopics[i].Equals(other.ForgottenTopics[i]) { + return false + } + } + return true +} + +func (p FetchRespAbortedTransaction) Equals(other FetchRespAbortedTransaction) bool { + return p.ProducerID == other.ProducerID && + p.FirstOffset == other.FirstOffset +} + +func (p FetchRespPartition) Equals(other FetchRespPartition) bool { + if p.Index != other.Index || + p.ErrorCode != other.ErrorCode || + p.HighWatermark != other.HighWatermark || + p.LastStableOffset != other.LastStableOffset || + p.LogStartOffset != other.LogStartOffset || + p.PreferredReadReplica != other.PreferredReadReplica || + !p.MessageSet.Equals(other.MessageSet) || + len(p.AbortedTransactions) != len(other.AbortedTransactions) { + return false + } + for i := range p.AbortedTransactions { + if !p.AbortedTransactions[i].Equals(other.AbortedTransactions[i]) { + return false + } + } + return true +} + +func (p FetchRespTopic) Equals(other FetchRespTopic) bool { + if p.Name != other.Name || len(p.Partitions) != len(other.Partitions) { + return false + } + for i := range p.Partitions { + if !p.Partitions[i].Equals(other.Partitions[i]) { + return false + } + } + return true +} + +func (p FetchResp) Equals(other FetchResp) bool { + if p.ThrottleTimeMs != other.ThrottleTimeMs || + p.ErrorCode != other.ErrorCode || + p.SessionID != other.SessionID || + len(p.Topics) != len(other.Topics) { + return false + } + for i := range p.Topics { + if !p.Topics[i].Equals(other.Topics[i]) { + return false + } + } + return true +} diff --git a/agent/protocol/kafka/common/join_group.go b/agent/protocol/kafka/common/join_group.go index 57178b6a..1dcb4376 100644 --- a/agent/protocol/kafka/common/join_group.go +++ b/agent/protocol/kafka/common/join_group.go @@ -49,3 +49,77 @@ type JoinGroupResp struct { func (j JoinGroupResp) ToJSON() ([]byte, error) { return json.Marshal(j) } + +func (lhs JoinGroupProtocol) Equal(rhs JoinGroupProtocol) bool { + return lhs.Protocol == rhs.Protocol +} + +func (lhs JoinGroupMember) Equal(rhs JoinGroupMember) bool { + if lhs.MemberID != rhs.MemberID { + return false + } + return lhs.GroupInstanceID == rhs.GroupInstanceID +} + +func (lhs JoinGroupReq) Equal(rhs JoinGroupReq) bool { + if lhs.GroupID != rhs.GroupID { + return false + } + if lhs.SessionTimeoutMs != rhs.SessionTimeoutMs { + return false + } + if lhs.RebalanceTimeoutMs != rhs.RebalanceTimeoutMs { + return false + } + if lhs.MemberID != rhs.MemberID { + return false + } + if lhs.GroupInstanceID != rhs.GroupInstanceID { + return false + } + if lhs.ProtocolType != rhs.ProtocolType { + return false + } + if len(lhs.Protocols) != len(rhs.Protocols) { + return false + } + for i := range lhs.Protocols { + if !lhs.Protocols[i].Equal(rhs.Protocols[i]) { + return false + } + } + return true +} + +func (lhs JoinGroupResp) Equal(rhs JoinGroupResp) bool { + if lhs.ThrottleTimeMs != rhs.ThrottleTimeMs { + return false + } + if lhs.ErrorCode != rhs.ErrorCode { + return false + } + if lhs.GenerationID != rhs.GenerationID { + return false + } + if lhs.ProtocolType != rhs.ProtocolType { + return false + } + if lhs.ProtocolName != rhs.ProtocolName { + return false + } + if lhs.Leader != rhs.Leader { + return false + } + if lhs.MemberID != rhs.MemberID { + return false + } + if len(lhs.Members) != len(rhs.Members) { + return false + } + for i := range lhs.Members { + if !lhs.Members[i].Equal(rhs.Members[i]) { + return false + } + } + return true +} diff --git a/agent/protocol/kafka/common/metadata.go b/agent/protocol/kafka/common/metadata.go index d89aff48..390a8a02 100644 --- a/agent/protocol/kafka/common/metadata.go +++ b/agent/protocol/kafka/common/metadata.go @@ -2,6 +2,7 @@ package common import ( "encoding/json" + "reflect" ) type MetadataReqTopic struct { @@ -23,3 +24,18 @@ type MetadataReq struct { func (m MetadataReq) ToJSON() ([]byte, error) { return json.Marshal(m) } + +func vectorsEqual[T any](lhs, rhs []T) bool { + return len(lhs) == len(rhs) && reflect.DeepEqual(lhs, rhs) +} + +func (lhs MetadataReqTopic) Equal(rhs MetadataReqTopic) bool { + return lhs.TopicID == rhs.TopicID && lhs.Name == rhs.Name +} + +func (lhs MetadataReq) Equal(rhs MetadataReq) bool { + return lhs.AllowAutoTopicCreation == rhs.AllowAutoTopicCreation && + lhs.IncludeClusterAuthorizedOperations == rhs.IncludeClusterAuthorizedOperations && + lhs.IncludeTopicAuthorizedOperations == rhs.IncludeTopicAuthorizedOperations && + vectorsEqual(lhs.Topics, rhs.Topics) +} diff --git a/agent/protocol/kafka/common/produce.go b/agent/protocol/kafka/common/produce.go index 929955f2..2d4a45c0 100644 --- a/agent/protocol/kafka/common/produce.go +++ b/agent/protocol/kafka/common/produce.go @@ -73,3 +73,107 @@ type ProduceResp struct { func (p ProduceResp) ToJSON() ([]byte, error) { return json.Marshal(p) } + +func (lhs ProduceReqPartition) Equal(rhs ProduceReqPartition) bool { + return lhs.Index == rhs.Index && lhs.MessageSet.Equals(rhs.MessageSet) +} + +func (lhs ProduceReqTopic) Equal(rhs ProduceReqTopic) bool { + if lhs.Name != rhs.Name { + return false + } + if len(lhs.Partitions) != len(rhs.Partitions) { + return false + } + for i := range lhs.Partitions { + if !lhs.Partitions[i].Equal(rhs.Partitions[i]) { + return false + } + } + return true +} + +func (lhs ProduceReq) Equal(rhs ProduceReq) bool { + if lhs.TransactionalID != rhs.TransactionalID { + return false + } + if lhs.Acks != rhs.Acks { + return false + } + if lhs.TimeoutMs != rhs.TimeoutMs { + return false + } + if len(lhs.Topics) != len(rhs.Topics) { + return false + } + for i := range lhs.Topics { + if !lhs.Topics[i].Equal(rhs.Topics[i]) { + return false + } + } + return true +} + +func (lhs RecordError) Equal(rhs RecordError) bool { + return lhs.BatchIndex == rhs.BatchIndex && lhs.ErrorMessage == rhs.ErrorMessage +} + +func (lhs ProduceRespPartition) Equal(rhs ProduceRespPartition) bool { + if lhs.Index != rhs.Index { + return false + } + if lhs.ErrorCode != rhs.ErrorCode { + return false + } + if lhs.BaseOffset != rhs.BaseOffset { + return false + } + if lhs.LogAppendTimeMs != rhs.LogAppendTimeMs { + return false + } + if lhs.LogStartOffset != rhs.LogStartOffset { + return false + } + if lhs.ErrorMessage != rhs.ErrorMessage { + return false + } + if len(lhs.RecordErrors) != len(rhs.RecordErrors) { + return false + } + for i := range lhs.RecordErrors { + if !lhs.RecordErrors[i].Equal(rhs.RecordErrors[i]) { + return false + } + } + return true +} + +func (lhs ProduceRespTopic) Equal(rhs ProduceRespTopic) bool { + if lhs.Name != rhs.Name { + return false + } + if len(lhs.Partitions) != len(rhs.Partitions) { + return false + } + for i := range lhs.Partitions { + if !lhs.Partitions[i].Equal(rhs.Partitions[i]) { + return false + } + } + return true +} + +func (lhs ProduceResp) Equal(rhs ProduceResp) bool { + if lhs.ThrottleTimeMs != rhs.ThrottleTimeMs { + return false + } + if len(lhs.Topics) != len(rhs.Topics) { + return false + } + for i := range lhs.Topics { + if !lhs.Topics[i].Equal(rhs.Topics[i]) { + return false + } + } + return true +} diff --git a/agent/protocol/kafka/common/sync_group.go b/agent/protocol/kafka/common/sync_group.go index e55389a4..8094b12f 100644 --- a/agent/protocol/kafka/common/sync_group.go +++ b/agent/protocol/kafka/common/sync_group.go @@ -36,3 +36,50 @@ type SyncGroupResp struct { func (s SyncGroupResp) ToJSON() ([]byte, error) { return json.Marshal(s) } + +func (lhs SyncGroupAssignment) Equal(rhs SyncGroupAssignment) bool { + return lhs.MemberID == rhs.MemberID +} + +func (lhs SyncGroupReq) Equal(rhs SyncGroupReq) bool { + if lhs.GroupID != rhs.GroupID { + return false + } + if lhs.GenerationID != rhs.GenerationID { + return false + } + if lhs.MemberID != rhs.MemberID { + return false + } + if lhs.GroupInstanceID != rhs.GroupInstanceID { + return false + } + if lhs.ProtocolType != rhs.ProtocolType { + return false + } + if lhs.ProtocolName != rhs.ProtocolName { + return false + } + if len(lhs.Assignments) != len(rhs.Assignments) { + return false + } + for i := range lhs.Assignments { + if !lhs.Assignments[i].Equal(rhs.Assignments[i]) { + return false + } + } + return true +} + +func (lhs SyncGroupResp) Equal(rhs SyncGroupResp) bool { + if lhs.ThrottleTimeMs != rhs.ThrottleTimeMs { + return false + } + if lhs.ErrorCode != rhs.ErrorCode { + return false + } + if lhs.ProtocolType != rhs.ProtocolType { + return false + } + return lhs.ProtocolName == rhs.ProtocolName +} diff --git a/agent/protocol/kafka/common/types.go b/agent/protocol/kafka/common/types.go index bc8f4053..47b601dc 100644 --- a/agent/protocol/kafka/common/types.go +++ b/agent/protocol/kafka/common/types.go @@ -301,7 +301,7 @@ var _ protocol.ParsedMessage = &Response{} type Packet struct { protocol.FrameBase - CorrelationId int32 + CorrelationID int32 Msg string Consumed bool isReq bool diff --git a/agent/protocol/kafka/decoder/fetch.go b/agent/protocol/kafka/decoder/fetch.go index 63536742..346cc046 100644 --- a/agent/protocol/kafka/decoder/fetch.go +++ b/agent/protocol/kafka/decoder/fetch.go @@ -16,6 +16,8 @@ func (pd *PacketDecoder) ExtractFetchReqPartition() (FetchReqPartition, error) { if err != nil { return r, err } + } else { + r.CurrentLeaderEpoch = -1 } r.FetchOffset, err = pd.ExtractInt64() if err != nil { @@ -26,12 +28,16 @@ func (pd *PacketDecoder) ExtractFetchReqPartition() (FetchReqPartition, error) { if err != nil { return r, err } + } else { + r.LastFetchedEpoch = -1 } if pd.apiVersion >= 5 { r.LogStartOffset, err = pd.ExtractInt64() if err != nil { return r, err } + } else { + r.LogStartOffset = -1 } r.PartitionMaxBytes, err = pd.ExtractInt32() if err != nil { @@ -159,7 +165,11 @@ func (pd *PacketDecoder) ExtractFetchRespAbortedTransaction() (FetchRespAbortedT } func (pd *PacketDecoder) ExtractFetchRespPartition() (FetchRespPartition, error) { - var r FetchRespPartition + var r FetchRespPartition = FetchRespPartition{ + LastStableOffset: -1, + LogStartOffset: -1, + PreferredReadReplica: -1, + } var err error r.Index, err = pd.ExtractInt32() if err != nil { diff --git a/agent/protocol/kafka/decoder/fetch_test.go b/agent/protocol/kafka/decoder/fetch_test.go new file mode 100644 index 00000000..5d7a68b0 --- /dev/null +++ b/agent/protocol/kafka/decoder/fetch_test.go @@ -0,0 +1,426 @@ +package decoder_test + +import ( + "testing" + + . "kyanos/agent/protocol/kafka/common" + "kyanos/agent/protocol/kafka/decoder" + + "github.com/stretchr/testify/assert" +) + +func TestFetchReqPartitionEquality(t *testing.T) { + lhs := FetchReqPartition{Index: 1, CurrentLeaderEpoch: 2, FetchOffset: 3, LastFetchedEpoch: 4, LogStartOffset: 5, PartitionMaxBytes: 6} + rhs := FetchReqPartition{Index: 1, CurrentLeaderEpoch: 2, FetchOffset: 3, LastFetchedEpoch: 4, LogStartOffset: 5, PartitionMaxBytes: 6} + assert.Equal(t, lhs, rhs) +} + +func TestFetchReqTopicEquality(t *testing.T) { + lhs := FetchReqTopic{Name: "topic1", Partitions: []FetchReqPartition{{Index: 1}}} + rhs := FetchReqTopic{Name: "topic1", Partitions: []FetchReqPartition{{Index: 1}}} + assert.Equal(t, lhs, rhs) +} + +func TestFetchForgottenTopicsDataEquality(t *testing.T) { + lhs := FetchForgottenTopicsData{Name: "topic1", PartitionIndices: []int32{1, 2, 3}} + rhs := FetchForgottenTopicsData{Name: "topic1", PartitionIndices: []int32{1, 2, 3}} + assert.Equal(t, lhs, rhs) +} + +func TestFetchReqEquality(t *testing.T) { + lhs := FetchReq{ReplicaID: 1, SessionID: 2, SessionEpoch: 3, Topics: []FetchReqTopic{{Name: "topic1"}}, ForgottenTopics: []FetchForgottenTopicsData{{Name: "topic2"}}, RackID: "rack1"} + rhs := FetchReq{ReplicaID: 1, SessionID: 2, SessionEpoch: 3, Topics: []FetchReqTopic{{Name: "topic1"}}, ForgottenTopics: []FetchForgottenTopicsData{{Name: "topic2"}}, RackID: "rack1"} + assert.Equal(t, lhs, rhs) +} + +func TestFetchRespAbortedTransactionEquality(t *testing.T) { + lhs := FetchRespAbortedTransaction{ProducerID: 1, FirstOffset: 2} + rhs := FetchRespAbortedTransaction{ProducerID: 1, FirstOffset: 2} + assert.Equal(t, lhs, rhs) +} + +func TestFetchRespPartitionEquality(t *testing.T) { + lhs := FetchRespPartition{Index: 1, ErrorCode: 2, HighWatermark: 3, LastStableOffset: 4, LogStartOffset: 5, PreferredReadReplica: 6, AbortedTransactions: []FetchRespAbortedTransaction{{ProducerID: 1}}, MessageSet: MessageSet{Size: 7}} + rhs := FetchRespPartition{Index: 1, ErrorCode: 2, HighWatermark: 3, LastStableOffset: 4, LogStartOffset: 5, PreferredReadReplica: 6, AbortedTransactions: []FetchRespAbortedTransaction{{ProducerID: 1}}, MessageSet: MessageSet{Size: 7}} + assert.Equal(t, lhs, rhs) +} + +func TestFetchRespTopicEquality(t *testing.T) { + lhs := FetchRespTopic{Name: "topic1", Partitions: []FetchRespPartition{{Index: 1}}} + rhs := FetchRespTopic{Name: "topic1", Partitions: []FetchRespPartition{{Index: 1}}} + assert.Equal(t, lhs, rhs) +} + +func TestFetchRespEquality(t *testing.T) { + lhs := FetchResp{ThrottleTimeMs: 1, ErrorCode: 2, SessionID: 3, Topics: []FetchRespTopic{{Name: "topic1"}}} + rhs := FetchResp{ThrottleTimeMs: 1, ErrorCode: 2, SessionID: 3, Topics: []FetchRespTopic{{Name: "topic1"}}} + assert.Equal(t, lhs, rhs) +} + +func TestFetchReqEqual(t *testing.T) { + req1 := FetchReq{ + ReplicaID: 1, + SessionID: 1, + SessionEpoch: 1, + Topics: []FetchReqTopic{ + { + Name: "topic1", + Partitions: []FetchReqPartition{ + {Index: 1, CurrentLeaderEpoch: 1, FetchOffset: 1, LastFetchedEpoch: 1, LogStartOffset: 1, PartitionMaxBytes: 1}, + }, + }, + }, + ForgottenTopics: []FetchForgottenTopicsData{ + {Name: "topic1", PartitionIndices: []int32{1}}, + }, + RackID: "rack1", + } + + req2 := FetchReq{ + ReplicaID: 1, + SessionID: 1, + SessionEpoch: 1, + Topics: []FetchReqTopic{ + { + Name: "topic1", + Partitions: []FetchReqPartition{ + {Index: 1, CurrentLeaderEpoch: 1, FetchOffset: 1, LastFetchedEpoch: 1, LogStartOffset: 1, PartitionMaxBytes: 1}, + }, + }, + }, + ForgottenTopics: []FetchForgottenTopicsData{ + {Name: "topic1", PartitionIndices: []int32{1}}, + }, + RackID: "rack1", + } + + assert.True(t, req1.Equals(req2)) +} + +func TestExtractFetchReqV4(t *testing.T) { + input := []byte{ + 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x01, 0xF4, 0x00, 0x00, 0x00, 0x01, 0x03, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x08, 0x6D, 0x79, 0x2D, 0x74, 0x6F, 0x70, 0x69, 0x63, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x7E, 0x00, 0x10, 0x00, 0x00, + } + + expectedResult := FetchReq{ + ReplicaID: -1, + SessionID: 0, + SessionEpoch: -1, + Topics: []FetchReqTopic{{Name: "my-topic", Partitions: []FetchReqPartition{{Index: 0, CurrentLeaderEpoch: -1, FetchOffset: 382, LastFetchedEpoch: -1, PartitionMaxBytes: 1048576, LogStartOffset: -1}}}}, + ForgottenTopics: []FetchForgottenTopicsData{}, + RackID: "", + } + + decoder := decoder.NewPacketDecoder(input) + decoder.SetAPIInfo(KFetch, 4) + result, err := decoder.ExtractFetchReq() + assert.NoError(t, err) + assert.True(t, expectedResult.Equals(result)) +} + +func TestExtractFetchReqV11(t *testing.T) { + input := []byte("\xff\xff\xff\xff\x00\x00\x01\xf4\x00\x00\x00\x01\x03\x20\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x01\x00\x11\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d\x65\x76\x65" + + "\x6e\x74\x73\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\xff\xff\xff\xff\xff\xff\xff\xff\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00") + + expectedResult := FetchReq{ + ReplicaID: -1, + SessionID: 0, + SessionEpoch: 0, + Topics: []FetchReqTopic{{Name: "quickstart-events", Partitions: []FetchReqPartition{{Index: 0, CurrentLeaderEpoch: 0, FetchOffset: 0, LogStartOffset: -1, PartitionMaxBytes: 1048576, LastFetchedEpoch: -1}}}}, + ForgottenTopics: []FetchForgottenTopicsData{}, + RackID: "", + } + + decoder := decoder.NewPacketDecoder(input) + decoder.SetAPIInfo(KFetch, 11) + result, err := decoder.ExtractFetchReq() + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) +} + +func TestExtractFetchReqV12(t *testing.T) { + input := []byte( + "\xff\xff\xff\xff\x00\x00\x01\xf4\x00\x00\x00\x01\x03\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x02\x12\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d\x65\x76\x65\x6e\x74\x73\x02\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff" + + "\xff\xff\xff\xff\x00\x10\x00\x00\x00\x00\x01\x01\x00") + + expectedResult := FetchReq{ + ReplicaID: -1, + SessionID: 0, + SessionEpoch: 0, + Topics: []FetchReqTopic{{Name: "quickstart-events", Partitions: []FetchReqPartition{{Index: 0, CurrentLeaderEpoch: 0, FetchOffset: 0, LogStartOffset: -1, PartitionMaxBytes: 1048576, LastFetchedEpoch: -1}}}}, + ForgottenTopics: []FetchForgottenTopicsData{}, + RackID: "", + } + + decoder := decoder.NewPacketDecoder(input) + decoder.SetAPIInfo(KFetch, 12) + result, err := decoder.ExtractFetchReq() + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) +} + +func TestExtractFetchRespV4(t *testing.T) { + input := []byte("\x00\x00\x00\x00\x00\x00\x00\x01\x00\x08\x6D\x79\x2D\x74\x6F\x70\x69\x63\x00\x00\x00\x01" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7E\x00\x00\x00\x00\x00\x00\x01\x7E\xFF" + + "\xFF\xFF\xFF\x00\x00\x00\x00") + + messageSet := MessageSet{Size: 0, RecordBatches: []RecordBatch{}} + partition := FetchRespPartition{ + Index: 0, + ErrorCode: 0, + HighWatermark: 382, + LastStableOffset: 382, + LogStartOffset: -1, + AbortedTransactions: []FetchRespAbortedTransaction{}, + PreferredReadReplica: -1, + MessageSet: messageSet, + } + topic := FetchRespTopic{ + Name: "my-topic", + Partitions: []FetchRespPartition{partition}, + } + expectedResult := FetchResp{ + ThrottleTimeMs: 0, + ErrorCode: 0, + SessionID: 0, + Topics: []FetchRespTopic{topic}, + } + + decoder := decoder.NewPacketDecoder(input) + decoder.SetAPIInfo(KFetch, 4) + result, err := decoder.ExtractFetchResp() + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) +} + +func TestExtractFetchRespV11(t *testing.T) { + input := []byte( + "\x00\x00\x00\x00\x00\x00\x27\xd5\xb6\xd1\x00\x00\x00\x01\x00\x11\x71\x75\x69\x63\x6b\x73\x74" + + "\x61\x72\x74\x2d\x65\x76\x65\x6e\x74\x73\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff" + + "\xff\xff\xff\xff\xff\xff\x00\x00\x01\x71\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x38\x00" + + "\x00\x00\x00\x02\x7e\x35\x4f\xcb\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7a\xb0\x95\x78\xbc\x00" + + "\x00\x01\x7a\xb0\x95\x78\xbc\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00" + + "\x00\x01\x0c\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x38\x00\x00" + + "\x00\x00\x02\x1b\x91\x32\x93\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7a\xb2\x08\x48\x52\x00\x00" + + "\x01\x7a\xb2\x08\x48\x52\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00" + + "\x01\x0c\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x38\x00\x00\x00" + + "\x00\x02\x99\x41\x19\xe9\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7a\xb2\x08\xde\x56\x00\x00\x01" + + "\x7a\xb2\x08\xde\x56\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01" + + "\x0c\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x46\x00\x00\x00\x00" + + "\x02\xa7\x88\x71\xd8\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7a\xb2\x0a\x70\x1d\x00\x00\x01\x7a" + + "\xb2\x0a\x70\x1d\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x28" + + "\x00\x00\x00\x01\x1c\x4d\x79\x20\x66\x69\x72\x73\x74\x20\x65\x76\x65\x6e\x74\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x04\x00\x00\x00\x47\x00\x00\x00\x00\x02\x5c\x9d\xc5\x05\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x01\x7a\xb2\x0a\xb7\xe5\x00\x00\x01\x7a\xb2\x0a\xb7\xe5\xff\xff\xff\xff\xff\xff" + + "\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x2a\x00\x00\x00\x01\x1e\x4d\x79\x20\x73\x65" + + "\x63\x6f\x6e\x64\x20\x65\x76\x65\x6e\x74\x00") + + recordBatch1 := RecordBatch{Records: []RecordMessage{{Key: "", Value: ""}}} + recordBatch2 := RecordBatch{Records: []RecordMessage{{Key: "", Value: ""}}} + recordBatch3 := RecordBatch{Records: []RecordMessage{{Key: "", Value: ""}}} + recordBatch4 := RecordBatch{Records: []RecordMessage{{Key: "", Value: "My first event"}}} + recordBatch5 := RecordBatch{Records: []RecordMessage{{Key: "", Value: "My second event"}}} + messageSet := MessageSet{ + Size: 369, + RecordBatches: []RecordBatch{recordBatch1, recordBatch2, recordBatch3, recordBatch4, recordBatch5}, + } + partition := FetchRespPartition{ + Index: 0, + ErrorCode: 0, + HighWatermark: 5, + LastStableOffset: 5, + LogStartOffset: 0, + AbortedTransactions: []FetchRespAbortedTransaction{}, + PreferredReadReplica: -1, + MessageSet: messageSet, + } + topic := FetchRespTopic{ + Name: "quickstart-events", + Partitions: []FetchRespPartition{partition}, + } + expectedResult := FetchResp{ + ThrottleTimeMs: 0, + ErrorCode: 0, + SessionID: 668317393, + Topics: []FetchRespTopic{topic}, + } + + decoder := decoder.NewPacketDecoder(input) + decoder.SetAPIInfo(KFetch, 11) + result, err := decoder.ExtractFetchResp() + assert.NoError(t, err) + assert.True(t, expectedResult.Equals(result)) +} + +func TestExtractFetchRespV12(t *testing.T) { + input := []byte( + "\x00\x00\x00\x00\x00\x00\x27\xd5\xb6\xd1\x02\x12\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d" + + "\x65\x76\x65\x6e\x74\x73\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00" + + "\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x01\xff\xff\xff\xff\xf2\x02\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x38\x00\x00\x00\x00\x02\x7e\x35\x4f\xcb\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x01\x7a\xb0\x95\x78\xbc\x00\x00\x01\x7a\xb0\x95\x78\xbc\xff\xff\xff\xff\xff" + + "\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x0c\x00\x00\x00\x01\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x01\x00\x00\x00\x38\x00\x00\x00\x00\x02\x1b\x91\x32\x93\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x01\x7a\xb2\x08\x48\x52\x00\x00\x01\x7a\xb2\x08\x48\x52\xff\xff\xff\xff\xff\xff" + + "\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x0c\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x02\x00\x00\x00\x38\x00\x00\x00\x00\x02\x99\x41\x19\xe9\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x01\x7a\xb2\x08\xde\x56\x00\x00\x01\x7a\xb2\x08\xde\x56\xff\xff\xff\xff\xff\xff\xff" + + "\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x0c\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x03\x00\x00\x00\x46\x00\x00\x00\x00\x02\xa7\x88\x71\xd8\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x01\x7a\xb2\x0a\x70\x1d\x00\x00\x01\x7a\xb2\x0a\x70\x1d\xff\xff\xff\xff\xff\xff\xff\xff" + + "\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x28\x00\x00\x00\x01\x1c\x4d\x79\x20\x66\x69\x72\x73" + + "\x74\x20\x65\x76\x65\x6e\x74\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x47\x00\x00\x00" + + "\x00\x02\x5c\x9d\xc5\x05\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7a\xb2\x0a\xb7\xe5\x00\x00\x01" + + "\x7a\xb2\x0a\xb7\xe5\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01" + + "\x2a\x00\x00\x00\x01\x1e\x4d\x79\x20\x73\x65\x63\x6f\x6e\x64\x20\x65\x76\x65\x6e\x74\x00\x00" + + "\x00\x00") + + recordBatch1 := RecordBatch{Records: []RecordMessage{{Key: "", Value: ""}}} + recordBatch2 := RecordBatch{Records: []RecordMessage{{Key: "", Value: ""}}} + recordBatch3 := RecordBatch{Records: []RecordMessage{{Key: "", Value: ""}}} + recordBatch4 := RecordBatch{Records: []RecordMessage{{Key: "", Value: "My first event"}}} + recordBatch5 := RecordBatch{Records: []RecordMessage{{Key: "", Value: "My second event"}}} + messageSet := MessageSet{ + Size: 369, + RecordBatches: []RecordBatch{recordBatch1, recordBatch2, recordBatch3, recordBatch4, recordBatch5}, + } + partition := FetchRespPartition{ + Index: 0, + ErrorCode: 0, + HighWatermark: 5, + LastStableOffset: 5, + LogStartOffset: 0, + AbortedTransactions: []FetchRespAbortedTransaction{}, + PreferredReadReplica: -1, + MessageSet: messageSet, + } + topic := FetchRespTopic{ + Name: "quickstart-events", + Partitions: []FetchRespPartition{partition}, + } + expectedResult := FetchResp{ + ThrottleTimeMs: 0, + ErrorCode: 0, + SessionID: 668317393, + Topics: []FetchRespTopic{topic}, + } + + decoder := decoder.NewPacketDecoder(input) + decoder.SetAPIInfo(KFetch, 12) + result, err := decoder.ExtractFetchResp() + assert.NoError(t, err) + assert.True(t, expectedResult.Equals(result)) +} + +func TestExtractFetchRespV11MissingMessageSet(t *testing.T) { + input := + "\x00\x00\x00\x00\x00\x00\x27\xd5\xb6\xd1\x00\x00\x00\x01\x00\x11\x71\x75\x69\x63\x6b\x73\x74" + + "\x61\x72\x74\x2d\x65\x76\x65\x6e\x74\x73\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff" + + "\xff\xff\xff\xff\xff\xff\x00\x00\x01\x71\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + messageSet := MessageSet{Size: 369, RecordBatches: []RecordBatch{}} + partition := FetchRespPartition{ + Index: 0, + ErrorCode: 0, + HighWatermark: 5, + LastStableOffset: 5, + LogStartOffset: 0, + AbortedTransactions: []FetchRespAbortedTransaction{}, + PreferredReadReplica: -1, + MessageSet: messageSet, + } + topic := FetchRespTopic{ + Name: "quickstart-events", + Partitions: []FetchRespPartition{partition}, + } + expectedResult := FetchResp{ + ThrottleTimeMs: 0, + ErrorCode: 0, + SessionID: 668317393, + Topics: []FetchRespTopic{topic}, + } + + decoder := decoder.NewPacketDecoder([]byte(input)) + decoder.SetAPIInfo(KFetch, 11) + result, err := decoder.ExtractFetchResp() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + assert.True(t, expectedResult.Equals(result)) +} +func TestExtractFetchRespV12MissingMessageSet(t *testing.T) { + input := []byte(("\x00\x00\x00\x00\x00\x00\x27\xd5\xb6\xd1\x02\x12\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d" + + "\x65\x76\x65\x6e\x74\x73\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00" + + "\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x01\xff\xff\xff\xff\xf2\x02\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x00")) + messageSet := MessageSet{ + Size: 369, + RecordBatches: []RecordBatch{}, + } + partition := FetchRespPartition{ + Index: 0, + ErrorCode: 0, + HighWatermark: 5, + LastStableOffset: 5, + LogStartOffset: 0, + AbortedTransactions: []FetchRespAbortedTransaction{}, + PreferredReadReplica: -1, + MessageSet: messageSet, + } + topic := FetchRespTopic{ + Name: "quickstart-events", + Partitions: []FetchRespPartition{partition}, + } + expectedResult := FetchResp{ + ThrottleTimeMs: 0, + ErrorCode: 0, + SessionID: 668317393, + Topics: []FetchRespTopic{topic}, + } + + decoder := decoder.NewPacketDecoder(input) // input is skipped as per the instruction + decoder.SetAPIInfo(KFetch, 12) + result, err := decoder.ExtractFetchResp() + assert.NoError(t, err) + assert.True(t, expectedResult.Equals(result)) +} diff --git a/agent/protocol/kafka/decoder/join_group_test.go b/agent/protocol/kafka/decoder/join_group_test.go new file mode 100644 index 00000000..153b0d54 --- /dev/null +++ b/agent/protocol/kafka/decoder/join_group_test.go @@ -0,0 +1,70 @@ +package decoder_test + +import ( + "testing" + + . "kyanos/agent/protocol/kafka/common" + . "kyanos/agent/protocol/kafka/decoder" + + "github.com/stretchr/testify/assert" +) + +func TestExtractJoinGroupReq(t *testing.T) { + input := []byte( + "\x16\x63\x6f\x6e\x73\x6f\x6c\x65\x2d\x63\x6f\x6e\x73\x75\x6d\x65\x72\x2d\x33\x35\x34\x30\x00" + + "\x00\x27\x10\x00\x04\x93\xe0\x01\x00\x09\x63\x6f\x6e\x73\x75\x6d\x65\x72\x02\x06\x72\x61\x6e" + + "\x67\x65\x22\x00\x01\x00\x00\x00\x01\x00\x11\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d\x65" + + "\x76\x65\x6e\x74\x73\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00") + expectedResult := JoinGroupReq{ + GroupID: "console-consumer-3540", + SessionTimeoutMs: 10000, + RebalanceTimeoutMs: 300000, + MemberID: "", + GroupInstanceID: "", + ProtocolType: "consumer", + Protocols: []JoinGroupProtocol{{Protocol: "range"}}, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KJoinGroup, 7) + result, err := decoder.ExtractJoinGroupReq() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) + assert.Equal(t, expectedResult, result) +} + +func TestExtractJoinGroupResp(t *testing.T) { + input := []byte( + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x09\x63\x6f\x6e\x73\x75\x6d\x65\x72\x06\x72\x61\x6e" + + "\x67\x65\x46\x63\x6f\x6e\x73\x75\x6d\x65\x72\x2d\x63\x6f\x6e\x73\x6f\x6c\x65\x2d\x63\x6f\x6e" + + "\x73\x75\x6d\x65\x72\x2d\x33\x35\x34\x30\x2d\x31\x2d\x36\x35\x65\x38\x65\x32\x64\x61\x2d\x66" + + "\x65\x38\x38\x2d\x34\x64\x63\x61\x2d\x39\x30\x65\x33\x2d\x30\x62\x37\x30\x63\x39\x61\x62\x61" + + "\x37\x31\x61\x46\x63\x6f\x6e\x73\x75\x6d\x65\x72\x2d\x63\x6f\x6e\x73\x6f\x6c\x65\x2d\x63\x6f" + + "\x6e\x73\x75\x6d\x65\x72\x2d\x33\x35\x34\x30\x2d\x31\x2d\x36\x35\x65\x38\x65\x32\x64\x61\x2d" + + "\x66\x65\x38\x38\x2d\x34\x64\x63\x61\x2d\x39\x30\x65\x33\x2d\x30\x62\x37\x30\x63\x39\x61\x62" + + "\x61\x37\x31\x61\x02\x46\x63\x6f\x6e\x73\x75\x6d\x65\x72\x2d\x63\x6f\x6e\x73\x6f\x6c\x65\x2d" + + "\x63\x6f\x6e\x73\x75\x6d\x65\x72\x2d\x33\x35\x34\x30\x2d\x31\x2d\x36\x35\x65\x38\x65\x32\x64" + + "\x61\x2d\x66\x65\x38\x38\x2d\x34\x64\x63\x61\x2d\x39\x30\x65\x33\x2d\x30\x62\x37\x30\x63\x39" + + "\x61\x62\x61\x37\x31\x61\x00\x22\x00\x01\x00\x00\x00\x01\x00\x11\x71\x75\x69\x63\x6b\x73\x74" + + "\x61\x72\x74\x2d\x65\x76\x65\x6e\x74\x73\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00") + expectedResult := JoinGroupResp{ + ThrottleTimeMs: 0, + ErrorCode: 0, + GenerationID: 1, + ProtocolType: "consumer", + ProtocolName: "range", + Leader: "consumer-console-consumer-3540-1-65e8e2da-fe88-4dca-90e3-0b70c9aba71a", + MemberID: "consumer-console-consumer-3540-1-65e8e2da-fe88-4dca-90e3-0b70c9aba71a", + Members: []JoinGroupMember{ + { + MemberID: "consumer-console-consumer-3540-1-65e8e2da-fe88-4dca-90e3-0b70c9aba71a", + GroupInstanceID: "", + }, + }, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KJoinGroup, 7) + result, err := decoder.ExtractJoinGroupResp() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) + assert.Equal(t, expectedResult, result) +} diff --git a/agent/protocol/kafka/decoder/message_set.go b/agent/protocol/kafka/decoder/message_set.go index d7e1468f..a0d0a2d4 100644 --- a/agent/protocol/kafka/decoder/message_set.go +++ b/agent/protocol/kafka/decoder/message_set.go @@ -127,7 +127,10 @@ func (pd *PacketDecoder) ExtractRecordBatch(offset *int32) (common.RecordBatch, } func (pd *PacketDecoder) ExtractMessageSet() (common.MessageSet, error) { - var messageSet common.MessageSet + var messageSet common.MessageSet = common.MessageSet{ + Size: 0, + RecordBatches: []common.RecordBatch{}, + } var err error offset := int32(0) diff --git a/agent/protocol/kafka/decoder/message_set_test.go b/agent/protocol/kafka/decoder/message_set_test.go new file mode 100644 index 00000000..cd4b08ac --- /dev/null +++ b/agent/protocol/kafka/decoder/message_set_test.go @@ -0,0 +1,62 @@ +package decoder_test + +import ( + "testing" + + . "kyanos/agent/protocol/kafka/common" + "kyanos/agent/protocol/kafka/decoder" + . "kyanos/agent/protocol/kafka/decoder" + + "github.com/stretchr/testify/assert" +) + +func TestExtractRecordMessage(t *testing.T) { + // Empty key and value Record. + { + input := []byte("\x0c\x00\x00\x00\x01\x00\x00") + expectedResult := RecordMessage{} + decoder := decoder.NewPacketDecoder(input) + result, err := decoder.ExtractRecordMessage() + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) + } + { + input := []byte("\x28\x00\x00\x00\x06key\x1cMy first event\x00") + expectedResult := RecordMessage{Key: "key", Value: "My first event"} + decoder := NewPacketDecoder(input) + result, err := decoder.ExtractRecordMessage() + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) + } +} + +func TestExtractRecordBatchV8(t *testing.T) { + input := []byte( + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x46\xff\xff\xff\xff\x02\xa7\x88\x71\xd8\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x01\x7a\xb2\x0a\x70\x1d\x00\x00\x01\x7a\xb2\x0a\x70\x1d\xff" + + "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x28\x00\x00\x00\x01" + + "\x1c\x4d\x79\x20\x66\x69\x72\x73\x74\x20\x65\x76\x65\x6e\x74\x00") + expectedResult := RecordBatch{Records: []RecordMessage{{Key: "", Value: "My first event"}}} + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KProduce, 8) + var batchLength int32 + result, err := decoder.ExtractRecordBatch(&batchLength) + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) +} + +func TestExtractRecordBatchV9(t *testing.T) { + input := []byte( + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x4e\xff\xff\xff\xff\x02\xc0\xde\x91\x11\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x01\x7a\x1b\xc8\x2d\xaa\x00\x00\x01\x7a\x1b\xc8\x2d\xaa\xff" + + "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x38\x00\x00\x00\x01" + + "\x2c\x54\x68\x69\x73\x20\x69\x73\x20\x6d\x79\x20\x66\x69\x72\x73\x74\x20\x65\x76\x65\x6e" + + "\x74\x00") + expectedResult := RecordBatch{Records: []RecordMessage{{Key: "", Value: "This is my first event"}}} + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KProduce, 9) + var batchLength int32 + result, err := decoder.ExtractRecordBatch(&batchLength) + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) +} diff --git a/agent/protocol/kafka/decoder/metadata_test.go b/agent/protocol/kafka/decoder/metadata_test.go new file mode 100644 index 00000000..1d458fa9 --- /dev/null +++ b/agent/protocol/kafka/decoder/metadata_test.go @@ -0,0 +1,44 @@ +package decoder_test + +import ( + "testing" + + . "kyanos/agent/protocol/kafka/common" + . "kyanos/agent/protocol/kafka/decoder" + + "github.com/stretchr/testify/assert" +) + +func TestExtractMetadataReqV5(t *testing.T) { + input := []byte( + "\x00\x00\x00\x01\x00\x10\x6b\x61" + + "\x66\x6b\x61\x5f\x32\x2e\x31\x32\x2d\x31\x2e\x31\x2e\x31\x01") + topic := MetadataReqTopic{TopicID: "", Name: "kafka_2.12-1.1.1"} + expectedResult := MetadataReq{ + Topics: []MetadataReqTopic{topic}, + AllowAutoTopicCreation: true, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KMetadata, 5) + result, err := decoder.ExtractMetadataReq() + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) +} + +func TestExtractMetadataReqV11(t *testing.T) { + input := []byte( + "\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x00\x11\x6b\x61\x66\x6b\x61\x5f\x32\x2e\x31\x32\x2d\x32\x2e\x38" + + "\x2e\x31\x00\x01\x00\x00") + expectedResult := MetadataReq{ + Topics: []MetadataReqTopic{}, + AllowAutoTopicCreation: true, + IncludeClusterAuthorizedOperations: false, + IncludeTopicAuthorizedOperations: false, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KMetadata, 11) + result, err := decoder.ExtractMetadataReq() + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) +} diff --git a/agent/protocol/kafka/decoder/packet_decoder.go b/agent/protocol/kafka/decoder/packet_decoder.go index a612181f..857b4984 100644 --- a/agent/protocol/kafka/decoder/packet_decoder.go +++ b/agent/protocol/kafka/decoder/packet_decoder.go @@ -123,7 +123,7 @@ func (pd *PacketDecoder) ExtractCompactString() (string, error) { return "", err } // length N + 1 is encoded. - len -= -1 + len -= 1 if len < 0 { return "", errors.New("Compact String has negative length.") } @@ -136,7 +136,7 @@ func (pd *PacketDecoder) ExtractCompactNullableString() (string, error) { return "", err } // length N + 1 is encoded. - len -= -1 + len -= 1 if len < -1 { return "", errors.New("Compact String has negative length.") } @@ -269,6 +269,12 @@ func (pd *PacketDecoder) ExtractReqHeader(req *common.Request) (bool, error) { return false, err } req.Apikey = common.APIKey(apiKey) + apiVersion, err := pd.ExtractInt16() + if err != nil { + return false, err + } + req.ApiVersion = apiVersion + pd.SetAPIInfo(req.Apikey, req.ApiVersion) _, err = pd.ExtractInt32() if err != nil { @@ -361,10 +367,10 @@ func (pd *PacketDecoder) MarkOffset(_len int32) error { if _len < 0 { return errors.New("length cannot be negative") } - if int(_len) > len(pd.binaryDecoder.Buf()) { + if int(_len) > pd.binaryDecoder.RemainingBytes() { return errors.New("not enough bytes in MarkOffset") } - pd.markedBufs = append(pd.markedBufs, pd.binaryDecoder.Buf()[_len:]) + pd.markedBufs = append(pd.markedBufs, pd.binaryDecoder.SubBuf(int(_len))) return nil } diff --git a/agent/protocol/kafka/decoder/packet_decoder_test.go b/agent/protocol/kafka/decoder/packet_decoder_test.go new file mode 100644 index 00000000..0c376d04 --- /dev/null +++ b/agent/protocol/kafka/decoder/packet_decoder_test.go @@ -0,0 +1,101 @@ +package decoder_test + +import ( + "math" + "testing" + + // . "kyanos/agent/protocol/kafka/common" + . "kyanos/agent/protocol/kafka/decoder" + + "github.com/stretchr/testify/assert" +) + +type PacketDecoderTestCase[T any] struct { + Input []byte + ExpectedOutput T +} + +func TestExtractUnsignedVarint(t *testing.T) { + testCases := []PacketDecoderTestCase[int32]{ + {Input: []byte{0x00}, ExpectedOutput: 0}, + {Input: []byte{0x03}, ExpectedOutput: 3}, + {Input: []byte{0x96, 0x01}, ExpectedOutput: 150}, + {Input: []byte{0xff, 0xff, 0xff, 0xff, 0x0f}, ExpectedOutput: -1}, + {Input: []byte{0x80, 0xC0, 0xFF, 0xFF, 0x0F}, ExpectedOutput: -8192}, + {Input: []byte{0xff, 0xff, 0xff, 0xff, 0x07}, ExpectedOutput: math.MaxInt32}, + {Input: []byte{0x80, 0x80, 0x80, 0x80, 0x08}, ExpectedOutput: math.MinInt32}, + } + + for _, tc := range testCases { + decoder := NewPacketDecoder(tc.Input) + result, err := decoder.ExtractUnsignedVarint() + assert.NoError(t, err) + assert.Equal(t, tc.ExpectedOutput, result) + } +} + +func TestExtractVarint(t *testing.T) { + testCases := []PacketDecoderTestCase[int32]{ + {Input: []byte{0x00}, ExpectedOutput: 0}, + {Input: []byte{0x01}, ExpectedOutput: -1}, + {Input: []byte{0x02}, ExpectedOutput: 1}, + {Input: []byte{0x7E}, ExpectedOutput: 63}, + {Input: []byte{0x7F}, ExpectedOutput: -64}, + {Input: []byte{0x80, 0x01}, ExpectedOutput: 64}, + {Input: []byte{0x81, 0x01}, ExpectedOutput: -65}, + {Input: []byte{0xFE, 0x7F}, ExpectedOutput: 8191}, + {Input: []byte{0xFF, 0x7F}, ExpectedOutput: -8192}, + {Input: []byte{0x80, 0x80, 0x01}, ExpectedOutput: 8192}, + {Input: []byte{0x81, 0x80, 0x01}, ExpectedOutput: -8193}, + {Input: []byte{0xFE, 0xFF, 0x7F}, ExpectedOutput: 1048575}, + {Input: []byte{0xFF, 0xFF, 0x7F}, ExpectedOutput: -1048576}, + {Input: []byte{0x80, 0x80, 0x80, 0x01}, ExpectedOutput: 1048576}, + {Input: []byte{0x81, 0x80, 0x80, 0x01}, ExpectedOutput: -1048577}, + {Input: []byte{0xFE, 0xFF, 0xFF, 0x7F}, ExpectedOutput: 134217727}, + {Input: []byte{0xFF, 0xFF, 0xFF, 0x7F}, ExpectedOutput: -134217728}, + {Input: []byte{0x80, 0x80, 0x80, 0x80, 0x01}, ExpectedOutput: 134217728}, + {Input: []byte{0x81, 0x80, 0x80, 0x80, 0x01}, ExpectedOutput: -134217729}, + {Input: []byte{0xFE, 0xFF, 0xFF, 0xFF, 0x0F}, ExpectedOutput: math.MaxInt32}, + {Input: []byte{0xFF, 0xFF, 0xFF, 0xFF, 0x0F}, ExpectedOutput: math.MinInt32}, + } + + for _, tc := range testCases { + decoder := NewPacketDecoder(tc.Input) + result, err := decoder.ExtractVarint() + assert.NoError(t, err) + assert.Equal(t, tc.ExpectedOutput, result) + } +} + +func TestExtractVarlong(t *testing.T) { + testCases := []PacketDecoderTestCase[int64]{ + {Input: []byte{0x00}, ExpectedOutput: 0}, + {Input: []byte{0x01}, ExpectedOutput: -1}, + {Input: []byte{0x02}, ExpectedOutput: 1}, + {Input: []byte{0x7E}, ExpectedOutput: 63}, + {Input: []byte{0x7F}, ExpectedOutput: -64}, + {Input: []byte{0x80, 0x01}, ExpectedOutput: 64}, + {Input: []byte{0x81, 0x01}, ExpectedOutput: -65}, + {Input: []byte{0xFE, 0x7F}, ExpectedOutput: 8191}, + {Input: []byte{0xFF, 0x7F}, ExpectedOutput: -8192}, + {Input: []byte{0x80, 0x80, 0x01}, ExpectedOutput: 8192}, + {Input: []byte{0x81, 0x80, 0x01}, ExpectedOutput: -8193}, + {Input: []byte{0xFE, 0xFF, 0x7F}, ExpectedOutput: 1048575}, + {Input: []byte{0xFF, 0xFF, 0x7F}, ExpectedOutput: -1048576}, + {Input: []byte{0x80, 0x80, 0x80, 0x01}, ExpectedOutput: 1048576}, + {Input: []byte{0x81, 0x80, 0x80, 0x01}, ExpectedOutput: -1048577}, + {Input: []byte{0xFE, 0xFF, 0xFF, 0x7F}, ExpectedOutput: 134217727}, + {Input: []byte{0xFF, 0xFF, 0xFF, 0x7F}, ExpectedOutput: -134217728}, + {Input: []byte{0x80, 0x80, 0x80, 0x80, 0x01}, ExpectedOutput: 134217728}, + {Input: []byte{0x81, 0x80, 0x80, 0x80, 0x01}, ExpectedOutput: -134217729}, + {Input: []byte{0xFE, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01}, ExpectedOutput: math.MaxInt64}, + {Input: []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01}, ExpectedOutput: math.MinInt64}, + } + + for _, tc := range testCases { + decoder := NewPacketDecoder(tc.Input) + result, err := decoder.ExtractVarlong() + assert.NoError(t, err) + assert.Equal(t, tc.ExpectedOutput, result) + } +} diff --git a/agent/protocol/kafka/decoder/produce_test.go b/agent/protocol/kafka/decoder/produce_test.go new file mode 100644 index 00000000..acc2586b --- /dev/null +++ b/agent/protocol/kafka/decoder/produce_test.go @@ -0,0 +1,177 @@ +package decoder_test + +import ( + "testing" + + . "kyanos/agent/protocol/kafka/common" + . "kyanos/agent/protocol/kafka/decoder" + + "github.com/stretchr/testify/assert" +) + +func TestExtractProduceReqV7(t *testing.T) { + input := []byte( + "\xFF\xFF\x00\x01\x00\x00\x75\x30\x00\x00\x00\x01\x00\x08\x6D\x79\x2D\x74\x6F\x70\x69\x63\x00" + + "\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x5C\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x50" + + "\x00\x00\x00\x00\x02\x76\x7C\xA6\x2F\x00\x00\x00\x00\x00\x01\x00\x00\x01\x7C\x29\x89\x9A\xA2" + + "\x00\x00\x01\x7C\x29\x89\x9A\xA2\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x00" + + "\x00\x00\x02\x14\x00\x00\x00\x01\x08\x74\x65\x73\x74\x00\x26\x00\x00\x02\x01\x1A\xC2\x48\x6F" + + "\x6C\x61\x2C\x20\x6D\x75\x6E\x64\x6F\x21\x00") + recordBatch := RecordBatch{ + Records: []RecordMessage{ + {Key: "", Value: "test"}, + {Key: "", Value: "\xc2Hola, mundo!"}, + }, + } + messageSet := MessageSet{Size: 92, RecordBatches: []RecordBatch{recordBatch}} + partition := ProduceReqPartition{Index: 0, MessageSet: messageSet} + topic := ProduceReqTopic{Name: "my-topic", Partitions: []ProduceReqPartition{partition}} + expectedResult := ProduceReq{ + TransactionalID: "", + Acks: 1, + TimeoutMs: 30000, + Topics: []ProduceReqTopic{topic}, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KProduce, 7) + result, err := decoder.ExtractProduceReq() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) +} + +func TestExtractProduceReqV8(t *testing.T) { + input := []byte( + "\xff\xff\x00\x01\x00\x00\x05\xdc\x00\x00\x00\x01\x00\x11\x71\x75\x69\x63\x6b\x73\x74\x61" + + "\x72\x74\x2d\x65\x76\x65\x6e\x74\x73\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x52\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x46\xff\xff\xff\xff\x02\xa7\x88\x71\xd8\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x01\x7a\xb2\x0a\x70\x1d\x00\x00\x01\x7a\xb2\x0a\x70\x1d\xff\xff" + + "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x28\x00\x00\x00\x01\x1c" + + "\x4d\x79\x20\x66\x69\x72\x73\x74\x20\x65\x76\x65\x6e\x74\x00") + recordBatch := RecordBatch{ + Records: []RecordMessage{ + {Key: "", Value: "My first event"}, + }, + } + messageSet := MessageSet{Size: 70, RecordBatches: []RecordBatch{recordBatch}} + partition := ProduceReqPartition{Index: 0, MessageSet: messageSet} + topic := ProduceReqTopic{Name: "quickstart-events", Partitions: []ProduceReqPartition{partition}} + expectedResult := ProduceReq{ + TransactionalID: "", + Acks: 1, + TimeoutMs: 1500, + Topics: []ProduceReqTopic{topic}, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KProduce, 8) + result, err := decoder.ExtractProduceReq() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) +} + +func TestExtractProduceReqV9(t *testing.T) { + input := []byte( + "\x00\x00\x01\x00\x00\x05\xdc\x02\x12\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d\x65" + + "\x76\x65\x6e\x74\x73\x02\x00\x00\x00\x00\x5b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + "\x4e\xff\xff\xff\xff\x02\xc0\xde\x91\x11\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7a\x1b\xc8" + + "\x2d\xaa\x00\x00\x01\x7a\x1b\xc8\x2d\xaa\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff" + + "\xff\xff\x00\x00\x00\x01\x38\x00\x00\x00\x01\x2c\x54\x68\x69\x73\x20\x69\x73\x20\x6d\x79" + + "\x20\x66\x69\x72\x73\x74\x20\x65\x76\x65\x6e\x74\x00\x00\x00\x00") + recordBatch := RecordBatch{ + Records: []RecordMessage{ + {Key: "", Value: "This is my first event"}, + }, + } + messageSet := MessageSet{Size: 91, RecordBatches: []RecordBatch{recordBatch}} + partition := ProduceReqPartition{Index: 0, MessageSet: messageSet} + topic := ProduceReqTopic{Name: "quickstart-events", Partitions: []ProduceReqPartition{partition}} + expectedResult := ProduceReq{ + TransactionalID: "", + Acks: 1, + TimeoutMs: 1500, + Topics: []ProduceReqTopic{topic}, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KProduce, 9) + result, err := decoder.ExtractProduceReq() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) +} + +func TestExtractProduceRespV7(t *testing.T) { + input := []byte( + "\x00\x00\x00\x01\x00\x08\x6D\x79\x2D\x74\x6F\x70\x69\x63\x00\x00\x00\x01\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x01\xAE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x00\x00\x00\x00\x00\x00" + + "\x00\x00\x00\x00\x00\x00") + partition := ProduceRespPartition{ + Index: 0, + ErrorCode: 0, + BaseOffset: 430, + LogAppendTimeMs: -1, + LogStartOffset: 0, + RecordErrors: []RecordError{}, + ErrorMessage: "", + } + topic := ProduceRespTopic{Name: "my-topic", Partitions: []ProduceRespPartition{partition}} + expectedResult := ProduceResp{ + Topics: []ProduceRespTopic{topic}, + ThrottleTimeMs: 0, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KProduce, 7) + result, err := decoder.ExtractProduceResp() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) +} + +func TestExtractProduceRespV8(t *testing.T) { + input := []byte( + "\x00\x00\x00\x01\x00\x11\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d\x65\x76\x65\x6e\x74" + + "\x73\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\xff\xff\xff" + + "\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x00" + + "\x00") + partition := ProduceRespPartition{ + Index: 0, + ErrorCode: 0, + BaseOffset: 3, + LogAppendTimeMs: -1, + LogStartOffset: 0, + RecordErrors: []RecordError{}, + ErrorMessage: "", + } + topic := ProduceRespTopic{Name: "quickstart-events", Partitions: []ProduceRespPartition{partition}} + expectedResult := ProduceResp{ + Topics: []ProduceRespTopic{topic}, + ThrottleTimeMs: 0, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KProduce, 8) + result, err := decoder.ExtractProduceResp() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) +} + +func TestExtractProduceRespV9(t *testing.T) { + input := []byte( + "\x02\x12\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d\x65\x76\x65\x6e\x74\x73\x02\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x00" + + "\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00") + partition := ProduceRespPartition{ + Index: 0, + ErrorCode: 0, + BaseOffset: 0, + LogAppendTimeMs: -1, + LogStartOffset: 0, + RecordErrors: []RecordError{}, + ErrorMessage: "", + } + topic := ProduceRespTopic{Name: "quickstart-events", Partitions: []ProduceRespPartition{partition}} + expectedResult := ProduceResp{ + Topics: []ProduceRespTopic{topic}, + ThrottleTimeMs: 0, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KProduce, 9) + result, err := decoder.ExtractProduceResp() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) +} diff --git a/agent/protocol/kafka/decoder/sync_group_test.go b/agent/protocol/kafka/decoder/sync_group_test.go new file mode 100644 index 00000000..b0411a0c --- /dev/null +++ b/agent/protocol/kafka/decoder/sync_group_test.go @@ -0,0 +1,60 @@ +package decoder_test + +import ( + "testing" + + . "kyanos/agent/protocol/kafka/common" + . "kyanos/agent/protocol/kafka/decoder" + + "github.com/stretchr/testify/assert" +) + +func TestExtractSyncGroupReq(t *testing.T) { + input := []byte( + "\x16\x63\x6f\x6e\x73\x6f\x6c\x65\x2d\x63\x6f\x6e\x73\x75\x6d\x65\x72\x2d\x33\x35\x34\x30\x00" + + "\x00\x00\x01\x46\x63\x6f\x6e\x73\x75\x6d\x65\x72\x2d\x63\x6f\x6e\x73\x6f\x6c\x65\x2d\x63\x6f" + + "\x6e\x73\x75\x6d\x65\x72\x2d\x33\x35\x34\x30\x2d\x31\x2d\x36\x35\x65\x38\x65\x32\x64\x61\x2d" + + "\x66\x65\x38\x38\x2d\x34\x64\x63\x61\x2d\x39\x30\x65\x33\x2d\x30\x62\x37\x30\x63\x39\x61\x62" + + "\x61\x37\x31\x61\x00\x09\x63\x6f\x6e\x73\x75\x6d\x65\x72\x06\x72\x61\x6e\x67\x65\x02\x46\x63" + + "\x6f\x6e\x73\x75\x6d\x65\x72\x2d\x63\x6f\x6e\x73\x6f\x6c\x65\x2d\x63\x6f\x6e\x73\x75\x6d\x65" + + "\x72\x2d\x33\x35\x34\x30\x2d\x31\x2d\x36\x35\x65\x38\x65\x32\x64\x61\x2d\x66\x65\x38\x38\x2d" + + "\x34\x64\x63\x61\x2d\x39\x30\x65\x33\x2d\x30\x62\x37\x30\x63\x39\x61\x62\x61\x37\x31\x61\x26" + + "\x00\x01\x00\x00\x00\x01\x00\x11\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d\x65\x76\x65\x6e" + + "\x74\x73\x00\x00\x00\x01\x00\x00\x00\x00\xff\xff\xff\xff\x00\x00") + expectedResult := SyncGroupReq{ + GroupID: "console-consumer-3540", + GenerationID: 1, + MemberID: "consumer-console-consumer-3540-1-65e8e2da-fe88-4dca-90e3-0b70c9aba71a", + GroupInstanceID: "", + ProtocolType: "consumer", + ProtocolName: "range", + Assignments: []SyncGroupAssignment{ + {MemberID: "consumer-console-consumer-3540-1-65e8e2da-fe88-4dca-90e3-0b70c9aba71a"}, + }, + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KSyncGroup, 5) + result, err := decoder.ExtractSyncGroupReq() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) + assert.Equal(t, expectedResult, result) +} + +func TestExtractSyncGroupResp(t *testing.T) { + input := []byte( + "\x00\x00\x00\x00\x00\x00\x09\x63\x6f\x6e\x73\x75\x6d\x65\x72\x06\x72\x61\x6e\x67\x65\x26\x00" + + "\x01\x00\x00\x00\x01\x00\x11\x71\x75\x69\x63\x6b\x73\x74\x61\x72\x74\x2d\x65\x76\x65\x6e\x74" + + "\x73\x00\x00\x00\x01\x00\x00\x00\x00\xff\xff\xff\xff\x00") + expectedResult := SyncGroupResp{ + ThrottleTimeMs: 0, + ErrorCode: 0, + ProtocolType: "consumer", + ProtocolName: "range", + } + decoder := NewPacketDecoder(input) + decoder.SetAPIInfo(KSyncGroup, 5) + result, err := decoder.ExtractSyncGroupResp() + assert.NoError(t, err) + assert.True(t, expectedResult.Equal(result)) + assert.Equal(t, expectedResult, result) +} diff --git a/agent/protocol/kafka/kafka.go b/agent/protocol/kafka/kafka.go index 527a0e13..e96a2aee 100644 --- a/agent/protocol/kafka/kafka.go +++ b/agent/protocol/kafka/kafka.go @@ -77,7 +77,6 @@ func (k *KafkaStreamParser) FindBoundary(streamBuffer *buffer.StreamBuffer, mess return -1 } -// ParseStream implements protocol.ProtocolStreamParser. func (k *KafkaStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType) protocol.ParseResult { buf := streamBuffer.Head().Buffer() var minPacketLength int32 @@ -107,13 +106,18 @@ func (k *KafkaStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messa var requestApiVersion int16 if messageType == protocol.Request { requestApiKeyInt, err := protocol.ExtractBEInt[int16](binaryDecoder) - if err != nil || common.IsValidAPIKey(requestApiKeyInt) { + if err != nil || !common.IsValidAPIKey(requestApiKeyInt) { return protocol.ParseResult{ ParseState: protocol.Invalid, } } requestApiKey = common.APIKey(requestApiKeyInt) requestApiVersion, err = protocol.ExtractBEInt[int16](binaryDecoder) + if err != nil { + return protocol.ParseResult{ + ParseState: protocol.Invalid, + } + } if !common.IsSupportedAPIVersion(requestApiKey, requestApiVersion) { return protocol.ParseResult{ ParseState: protocol.Invalid, @@ -145,7 +149,7 @@ func (k *KafkaStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messa } parseResult := common.Packet{ FrameBase: fb, - CorrelationId: correlationId, + CorrelationID: correlationId, Msg: string(msg), } parseResult.SetIsReq(messageType == protocol.Request) @@ -167,12 +171,12 @@ func (k *KafkaStreamParser) Match(reqStreams map[protocol.StreamId]*protocol.Par } correlationIdMap := make(map[int32]*common.Packet) for _, respMsg := range *respStream { - correlationIdMap[respMsg.(*common.Packet).CorrelationId] = respMsg.(*common.Packet) + correlationIdMap[respMsg.(*common.Packet).CorrelationID] = respMsg.(*common.Packet) } for _, reqMsg := range *reqStream { req := reqMsg.(*common.Packet) - resp, ok := correlationIdMap[req.CorrelationId] + resp, ok := correlationIdMap[req.CorrelationID] if ok { r, err := processReqRespPair(req, resp) if err == nil { @@ -181,14 +185,14 @@ func (k *KafkaStreamParser) Match(reqStreams map[protocol.StreamId]*protocol.Par errorCnt++ } req.Consumed = true - delete(correlationIdMap, req.CorrelationId) - delete(k.correlationIdMap, req.CorrelationId) + delete(correlationIdMap, req.CorrelationID) + delete(k.correlationIdMap, req.CorrelationID) } } // Resp packets left in the map don't have a matched request. for _, resp := range correlationIdMap { - kc.ProtocolParserLog.Debugf("Response packet without a matching request: %v", resp.CorrelationId) + kc.ProtocolParserLog.Debugf("Response packet without a matching request: %v", resp.CorrelationID) errorCnt++ } @@ -203,7 +207,7 @@ func (k *KafkaStreamParser) Match(reqStreams map[protocol.StreamId]*protocol.Par if i > 0 { *reqStream = (*reqStream)[i:] } - *respStream = (*respStream)[:] + *respStream = (*respStream)[0:0] return records } @@ -397,3 +401,13 @@ var _ protocol.ProtocolStreamParser = &KafkaStreamParser{} type KafkaStreamParser struct { correlationIdMap map[int32]struct{} } + +func NewKafkaStreamParser() *KafkaStreamParser { + return &KafkaStreamParser{ + correlationIdMap: make(map[int32]struct{}), + } +} + +func (parser *KafkaStreamParser) GetCorrelationIdMap() map[int32]struct{} { + return parser.correlationIdMap +} diff --git a/agent/protocol/kafka/kafka_test.go b/agent/protocol/kafka/kafka_test.go new file mode 100644 index 00000000..1c98dc34 --- /dev/null +++ b/agent/protocol/kafka/kafka_test.go @@ -0,0 +1,304 @@ +package kafka_test + +import ( + "testing" + "time" + + "kyanos/agent/buffer" + "kyanos/agent/protocol" + "kyanos/agent/protocol/kafka" + . "kyanos/agent/protocol/kafka/common" + + // . "kyanos/agent/protocol/kafka/decoder" + + "github.com/stretchr/testify/assert" +) + +func PacketsEqual(lhs, rhs Packet) bool { + if lhs.Msg != rhs.Msg { + return false + } + if lhs.CorrelationID != rhs.CorrelationID { + return false + } + return true +} + +var kProduceRequest []byte = []byte{ + 0x00, 0x00, 0x00, 0x98, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x04, 0x00, 0x10, 0x63, 0x6f, + 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2d, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, 0x05, 0xdc, 0x02, 0x12, 0x71, 0x75, 0x69, 0x63, 0x6b, 0x73, 0x74, 0x61, + 0x72, 0x74, 0x2d, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x02, 0x00, 0x00, 0x00, 0x00, 0x5b, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x4e, 0xff, 0xff, 0xff, 0xff, 0x02, + 0xc0, 0xde, 0x91, 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x7a, 0x1b, 0xc8, + 0x2d, 0xaa, 0x00, 0x00, 0x01, 0x7a, 0x1b, 0xc8, 0x2d, 0xaa, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01, 0x38, 0x00, 0x00, 0x00, + 0x01, 0x2c, 0x54, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x6d, 0x79, 0x20, 0x66, 0x69, 0x72, + 0x73, 0x74, 0x20, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x00, 0x00, 0x00, 0x00, +} + +var kProduceResponse []byte = []byte{ + 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x04, 0x00, 0x02, 0x12, 0x71, 0x75, 0x69, + 0x63, 0x6b, 0x73, 0x74, 0x61, 0x72, 0x74, 0x2d, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, +} + +// APIKey: 3, APIVersion: 11 +var kMetaDataRequest []byte = []byte{ + 0x00, 0x00, 0x00, 0x1c, 0x00, 0x03, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x01, 0x00, 0x0d, 0x61, 0x64, + 0x6d, 0x69, 0x6e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2d, 0x31, 0x00, 0x01, 0x01, 0x00, 0x00, +} +var kMetaDataResponse []byte = []byte{0x00, 0x00, 0x00, 0x3b, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, + 0x00, 0x00, 0x0a, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x00, 0x00, 0x23, 0x84, + 0x00, 0x00, 0x17, 0x5a, 0x65, 0x76, 0x76, 0x4e, 0x66, 0x47, 0x45, 0x52, 0x30, 0x4f, 0x73, 0x51, + 0x4d, 0x34, 0x77, 0x71, 0x48, 0x5f, 0x6f, 0x75, 0x77, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, +} + +// APIKey: 18, APIVersion: 3 +var kAPIVersionRequest []byte = []byte{ + 0x00, 0x00, 0x00, 0x31, 0x00, 0x12, 0x00, 0x03, 0x00, 0x00, 0x00, 0x02, 0x00, 0x0d, + 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2d, 0x31, 0x00, + 0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, + 0x6a, 0x61, 0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30, 0x00, +} +var kAPIVersionResponse []byte = []byte{ + 0x00, 0x00, 0x01, 0x9e, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x39, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x09, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x06, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x05, + 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x07, 0x00, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x07, + 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x07, 0x00, 0x00, + 0x0c, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x0d, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x0e, 0x00, + 0x00, 0x00, 0x05, 0x00, 0x00, 0x0f, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x13, 0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x15, + 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x16, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x17, 0x00, 0x00, + 0x00, 0x04, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x19, 0x00, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x1b, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, + 0x1c, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x1d, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x1e, 0x00, + 0x00, 0x00, 0x02, 0x00, 0x00, 0x1f, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x22, 0x00, 0x00, 0x00, 0x02, 0x00, + 0x00, 0x23, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x25, + 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x26, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x27, 0x00, 0x00, + 0x00, 0x02, 0x00, 0x00, 0x28, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x29, 0x00, 0x00, 0x00, 0x02, + 0x00, 0x00, 0x2a, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x2b, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, + 0x2c, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x2d, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2e, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x2f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x30, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x31, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x32, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x33, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x38, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3d, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, +} + +func GenPacket(t *testing.T, msgType protocol.MessageType, rawPacket []byte, timestamp int, correlationID int) *Packet { + var result *Packet + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + KafkaStreamParser.GetCorrelationIdMap()[int32(correlationID)] = struct{}{} + packetView := rawPacket + streamBuffer.Add(1, packetView, uint64(timestamp)) + parseState := KafkaStreamParser.ParseStream(streamBuffer, msgType) + // assert.Equal(t, protocol.Success, parseState.ParseState) + result = parseState.ParsedMessages[0].(*Packet) + result.SetTimeStamp(uint64(timestamp)) + return result +} + +var ( + KProduceReqPacket = GenPacket(nil, protocol.Request, kProduceRequest, 0, 4) + KProduceRespPacket = GenPacket(nil, protocol.Response, kProduceResponse, 1, 4) + KMetaDataReqPacket = GenPacket(nil, protocol.Request, kMetaDataRequest, 2, 1) + KMetaDataRespPacket = GenPacket(nil, protocol.Response, kMetaDataResponse, 3, 1) + KAPIVersionReqPacket = GenPacket(nil, protocol.Request, kAPIVersionRequest, 4, 2) + KAPIVersionRespPacket = GenPacket(nil, protocol.Response, kAPIVersionResponse, 5, 2) +) + +func TestKafkaParserBasics(t *testing.T) { + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + + produceFrameView := []byte(kProduceRequest) + streamBuffer.Add(1, produceFrameView, uint64(time.Now().Nanosecond())) + parseState := KafkaStreamParser.ParseStream(streamBuffer, protocol.Request) + _, containsCorrelationid := KafkaStreamParser.GetCorrelationIdMap()[4] + assert.True(t, parseState.ParseState == protocol.Success) + assert.True(t, containsCorrelationid) + + KafkaStreamParser = kafka.NewKafkaStreamParser() + streamBuffer.Clear() + shortProduceFrameView := produceFrameView[:KMinReqPacketLength-1] + streamBuffer.Add(1, shortProduceFrameView, uint64(time.Now().Nanosecond())) + parseState = KafkaStreamParser.ParseStream(streamBuffer, protocol.Request) + assert.True(t, parseState.ParseState == protocol.NeedsMoreData) + assert.Empty(t, KafkaStreamParser.GetCorrelationIdMap()) +} + +func TestKafkaParserParseMultipleRequests(t *testing.T) { + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + + // Add multiple requests to the buffer + request1 := []byte(kProduceRequest) + request2 := []byte(kMetaDataRequest) + streamBuffer.Add(1, request1, uint64(time.Now().Nanosecond())) + streamBuffer.Add(uint64(len(request1))+1, request2, uint64(time.Now().Nanosecond()+1)) + + parseState := KafkaStreamParser.ParseStream(streamBuffer, protocol.Request) + _, containsCorrelationID1 := KafkaStreamParser.GetCorrelationIdMap()[4] + + assert.True(t, parseState.ParseState == protocol.Success) + assert.True(t, containsCorrelationID1) + streamBuffer.RemovePrefix(parseState.ReadBytes) + parseState = KafkaStreamParser.ParseStream(streamBuffer, protocol.Request) + _, containsCorrelationID2 := KafkaStreamParser.GetCorrelationIdMap()[1] + assert.True(t, parseState.ParseState == protocol.Success) + assert.True(t, containsCorrelationID2) +} + +func TestKafkaParserParseMultipleResponses(t *testing.T) { + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + + // Add multiple requests to the buffer + request1 := []byte(kProduceResponse) + request2 := []byte(kMetaDataResponse) + streamBuffer.Add(1, request1, uint64(time.Now().Nanosecond())) + streamBuffer.Add(uint64(len(request1))+1, request2, uint64(time.Now().Nanosecond()+1)) + + parseState := KafkaStreamParser.ParseStream(streamBuffer, protocol.Response) + + assert.True(t, parseState.ParseState == protocol.Success) + packet := parseState.ParsedMessages[0].(*Packet) + assert.True(t, packet.CorrelationID == 4) + + streamBuffer.RemovePrefix(parseState.ReadBytes) + parseState = KafkaStreamParser.ParseStream(streamBuffer, protocol.Response) + assert.True(t, parseState.ParseState == protocol.Success) + packet = parseState.ParsedMessages[0].(*Packet) + assert.True(t, packet.CorrelationID == 1) +} + +func TestKafkaParserParseIncompleteRequest(t *testing.T) { + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + + // Add multiple requests to the buffer + request1 := []byte(kProduceRequest) + request1 = request1[:len(request1)-1] + streamBuffer.Add(1, request1, uint64(time.Now().Nanosecond())) + + parseState := KafkaStreamParser.ParseStream(streamBuffer, protocol.Request) + + assert.True(t, parseState.ParseState == protocol.NeedsMoreData) + assert.Empty(t, KafkaStreamParser.GetCorrelationIdMap()) + assert.Empty(t, parseState.ParsedMessages) +} + +func TestKafkaParserParseInvalidInput(t *testing.T) { + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + + // Add multiple requests to the buffer + request1 := []byte("\x00\x00\x18\x00\x03SELECT name FROM users;") + streamBuffer.Add(1, request1, uint64(time.Now().Nanosecond())) + + parseState := KafkaStreamParser.ParseStream(streamBuffer, protocol.Request) + + assert.True(t, parseState.ParseState == protocol.Invalid) + assert.Empty(t, KafkaStreamParser.GetCorrelationIdMap()) + assert.Empty(t, parseState.ParsedMessages) +} + +func TestKafkaParserFindReqBoundaryAligned(t *testing.T) { + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + + request1 := []byte(kProduceRequest) + request2 := []byte(kMetaDataRequest) + streamBuffer.Add(1, request1, uint64(time.Now().Nanosecond())) + streamBuffer.Add(uint64(len(request1)+1), request2, uint64(time.Now().Nanosecond()+1)) + + boundary := KafkaStreamParser.FindBoundary(streamBuffer, protocol.Request, 0) + assert.Equal(t, boundary, 0) +} + +func TestKafkaParserFindReqBoundaryUnAligned(t *testing.T) { + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + + // Add multiple requests to the buffer + garbage := []byte("some garbage") + request1 := []byte(kProduceRequest) + request2 := []byte(kMetaDataRequest) + streamBuffer.Add(1, garbage, uint64(time.Now().Nanosecond())) + streamBuffer.Add(uint64(len(garbage)+1), request1, uint64(time.Now().Nanosecond())) + streamBuffer.Add(uint64(len(garbage)+len(request1))+1, request2, uint64(time.Now().Nanosecond()+1)) + + boundary := KafkaStreamParser.FindBoundary(streamBuffer, protocol.Request, 0) + assert.Equal(t, uint64(boundary), uint64(len(garbage))) +} + +func TestKafkaParserFindRespBoundaryAligned(t *testing.T) { + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + + request1 := []byte(kProduceResponse) + request2 := []byte(kMetaDataResponse) + streamBuffer.Add(1, request1, uint64(time.Now().Nanosecond())) + streamBuffer.Add(uint64(len(request1)+1), request2, uint64(time.Now().Nanosecond()+1)) + + KafkaStreamParser.GetCorrelationIdMap()[4] = struct{}{} + KafkaStreamParser.GetCorrelationIdMap()[1] = struct{}{} + boundary := KafkaStreamParser.FindBoundary(streamBuffer, protocol.Response, 0) + assert.Equal(t, boundary, 0) +} + +func TestKafkaParserFindRespBoundaryUnAligned(t *testing.T) { + KafkaStreamParser := kafka.NewKafkaStreamParser() + streamBuffer := buffer.New(10000) + + // Add multiple requests to the buffer + garbage := []byte("some garbage") + request1 := []byte(kProduceResponse) + request2 := []byte(kMetaDataResponse) + streamBuffer.Add(1, garbage, uint64(time.Now().Nanosecond())) + streamBuffer.Add(uint64(len(garbage)+1), request1, uint64(time.Now().Nanosecond())) + streamBuffer.Add(uint64(len(garbage)+len(request1))+1, request2, uint64(time.Now().Nanosecond()+1)) + + KafkaStreamParser.GetCorrelationIdMap()[4] = struct{}{} + KafkaStreamParser.GetCorrelationIdMap()[1] = struct{}{} + boundary := KafkaStreamParser.FindBoundary(streamBuffer, protocol.Response, 0) + assert.Equal(t, boundary, len(garbage)) + + // If the correlation_id of produce response (i.e. 4) is not seen, this should skip over it. + delete(KafkaStreamParser.GetCorrelationIdMap(), 4) + boundary = KafkaStreamParser.FindBoundary(streamBuffer, protocol.Response, 0) + assert.Equal(t, boundary, len(garbage)+len(request1)) +} + +func TestKafkaParserMatch(t *testing.T) { + reqStreams := make(map[protocol.StreamId]*protocol.ParsedMessageQueue) + respStreams := make(map[protocol.StreamId]*protocol.ParsedMessageQueue) + KafkaStreamParser := kafka.NewKafkaStreamParser() + + records := KafkaStreamParser.Match(reqStreams, respStreams) + assert.Empty(t, records) + + reqStream := &protocol.ParsedMessageQueue{} + reqStreams[0] = reqStream + *reqStream = append(*reqStream, KProduceReqPacket) + records = KafkaStreamParser.Match(reqStreams, respStreams) + assert.Empty(t, records) + assert.Equal(t, 1, len(*reqStream)) + + respStream := &protocol.ParsedMessageQueue{} + respStreams[0] = respStream + *respStream = append(*respStream, KProduceRespPacket) + records = KafkaStreamParser.Match(reqStreams, respStreams) + assert.Equal(t, 1, len(records)) + assert.Equal(t, 0, len(*reqStream)) + assert.Equal(t, 0, len(*respStream)) +}