Skip to content

Commit

Permalink
test(unittest): add unittest for kafka protocol parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
hengyoush committed Jan 11, 2025
1 parent 0769b2c commit 8f36c22
Show file tree
Hide file tree
Showing 20 changed files with 1,645 additions and 17 deletions.
2 changes: 1 addition & 1 deletion agent/buffer/stream_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (sb *StreamBuffer) IsEmpty() bool {
return len(sb.buffers) == 0
}
func (sb *StreamBuffer) Clear() {
sb.buffers = sb.buffers[:]
sb.buffers = sb.buffers[0:0]
sb.timestamps.Clear()
}
func (sb *StreamBuffer) RemovePrefix(length int) {
Expand Down
6 changes: 6 additions & 0 deletions agent/protocol/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@ type BinaryDecoder struct {
func NewBinaryDecoder(buf []byte) *BinaryDecoder {
return &BinaryDecoder{buf: buf, str: string(buf)}
}
func (b *BinaryDecoder) RemainingBytes() int {
return len(b.str)
}

func (b *BinaryDecoder) Buf() []byte {
return b.buf
}
func (b *BinaryDecoder) SubBuf(length int) []byte {
return b.buf[len(b.buf)-len(b.str)+length:]
}

func (b *BinaryDecoder) SetBuf(buf []byte) {
b.buf = buf
Expand Down
104 changes: 104 additions & 0 deletions agent/protocol/kafka/common/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,107 @@ type FetchResp struct {
func (p FetchResp) ToJSON() ([]byte, error) {
return json.Marshal(p)
}

func (p FetchReqPartition) Equals(other FetchReqPartition) bool {
return p.Index == other.Index &&
p.CurrentLeaderEpoch == other.CurrentLeaderEpoch &&
p.FetchOffset == other.FetchOffset &&
p.LastFetchedEpoch == other.LastFetchedEpoch &&
p.LogStartOffset == other.LogStartOffset &&
p.PartitionMaxBytes == other.PartitionMaxBytes
}

func (p FetchReqTopic) Equals(other FetchReqTopic) bool {
if p.Name != other.Name || len(p.Partitions) != len(other.Partitions) {
return false
}
for i := range p.Partitions {
if !p.Partitions[i].Equals(other.Partitions[i]) {
return false
}
}
return true
}

func (p FetchForgottenTopicsData) Equals(other FetchForgottenTopicsData) bool {
if p.Name != other.Name || len(p.PartitionIndices) != len(other.PartitionIndices) {
return false
}
for i := range p.PartitionIndices {
if p.PartitionIndices[i] != other.PartitionIndices[i] {
return false
}
}
return true
}

func (p FetchReq) Equals(other FetchReq) bool {
if p.ReplicaID != other.ReplicaID ||
p.SessionID != other.SessionID ||
len(p.Topics) != len(other.Topics) ||
len(p.ForgottenTopics) != len(other.ForgottenTopics) {
return false
}
for i := range p.Topics {
if !p.Topics[i].Equals(other.Topics[i]) {
return false
}
}
for i := range p.ForgottenTopics {
if !p.ForgottenTopics[i].Equals(other.ForgottenTopics[i]) {
return false
}
}
return true
}

func (p FetchRespAbortedTransaction) Equals(other FetchRespAbortedTransaction) bool {
return p.ProducerID == other.ProducerID &&
p.FirstOffset == other.FirstOffset
}

func (p FetchRespPartition) Equals(other FetchRespPartition) bool {
if p.Index != other.Index ||
p.ErrorCode != other.ErrorCode ||
p.HighWatermark != other.HighWatermark ||
p.LastStableOffset != other.LastStableOffset ||
p.LogStartOffset != other.LogStartOffset ||
p.PreferredReadReplica != other.PreferredReadReplica ||
!p.MessageSet.Equals(other.MessageSet) ||
len(p.AbortedTransactions) != len(other.AbortedTransactions) {
return false
}
for i := range p.AbortedTransactions {
if !p.AbortedTransactions[i].Equals(other.AbortedTransactions[i]) {
return false
}
}
return true
}

func (p FetchRespTopic) Equals(other FetchRespTopic) bool {
if p.Name != other.Name || len(p.Partitions) != len(other.Partitions) {
return false
}
for i := range p.Partitions {
if !p.Partitions[i].Equals(other.Partitions[i]) {
return false
}
}
return true
}

func (p FetchResp) Equals(other FetchResp) bool {
if p.ThrottleTimeMs != other.ThrottleTimeMs ||
p.ErrorCode != other.ErrorCode ||
p.SessionID != other.SessionID ||
len(p.Topics) != len(other.Topics) {
return false
}
for i := range p.Topics {
if !p.Topics[i].Equals(other.Topics[i]) {
return false
}
}
return true
}
74 changes: 74 additions & 0 deletions agent/protocol/kafka/common/join_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,77 @@ type JoinGroupResp struct {
func (j JoinGroupResp) ToJSON() ([]byte, error) {
return json.Marshal(j)
}

func (lhs JoinGroupProtocol) Equal(rhs JoinGroupProtocol) bool {
return lhs.Protocol == rhs.Protocol
}

func (lhs JoinGroupMember) Equal(rhs JoinGroupMember) bool {
if lhs.MemberID != rhs.MemberID {
return false
}
return lhs.GroupInstanceID == rhs.GroupInstanceID
}

func (lhs JoinGroupReq) Equal(rhs JoinGroupReq) bool {
if lhs.GroupID != rhs.GroupID {
return false
}
if lhs.SessionTimeoutMs != rhs.SessionTimeoutMs {
return false
}
if lhs.RebalanceTimeoutMs != rhs.RebalanceTimeoutMs {
return false
}
if lhs.MemberID != rhs.MemberID {
return false
}
if lhs.GroupInstanceID != rhs.GroupInstanceID {
return false
}
if lhs.ProtocolType != rhs.ProtocolType {
return false
}
if len(lhs.Protocols) != len(rhs.Protocols) {
return false
}
for i := range lhs.Protocols {
if !lhs.Protocols[i].Equal(rhs.Protocols[i]) {
return false
}
}
return true
}

func (lhs JoinGroupResp) Equal(rhs JoinGroupResp) bool {
if lhs.ThrottleTimeMs != rhs.ThrottleTimeMs {
return false
}
if lhs.ErrorCode != rhs.ErrorCode {
return false
}
if lhs.GenerationID != rhs.GenerationID {
return false
}
if lhs.ProtocolType != rhs.ProtocolType {
return false
}
if lhs.ProtocolName != rhs.ProtocolName {
return false
}
if lhs.Leader != rhs.Leader {
return false
}
if lhs.MemberID != rhs.MemberID {
return false
}
if len(lhs.Members) != len(rhs.Members) {
return false
}
for i := range lhs.Members {
if !lhs.Members[i].Equal(rhs.Members[i]) {
return false
}
}
return true
}
16 changes: 16 additions & 0 deletions agent/protocol/kafka/common/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"encoding/json"
"reflect"
)

type MetadataReqTopic struct {
Expand All @@ -23,3 +24,18 @@ type MetadataReq struct {
func (m MetadataReq) ToJSON() ([]byte, error) {
return json.Marshal(m)
}

func vectorsEqual[T any](lhs, rhs []T) bool {
return len(lhs) == len(rhs) && reflect.DeepEqual(lhs, rhs)
}

func (lhs MetadataReqTopic) Equal(rhs MetadataReqTopic) bool {
return lhs.TopicID == rhs.TopicID && lhs.Name == rhs.Name
}

func (lhs MetadataReq) Equal(rhs MetadataReq) bool {
return lhs.AllowAutoTopicCreation == rhs.AllowAutoTopicCreation &&
lhs.IncludeClusterAuthorizedOperations == rhs.IncludeClusterAuthorizedOperations &&
lhs.IncludeTopicAuthorizedOperations == rhs.IncludeTopicAuthorizedOperations &&
vectorsEqual(lhs.Topics, rhs.Topics)
}
104 changes: 104 additions & 0 deletions agent/protocol/kafka/common/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,107 @@ type ProduceResp struct {
func (p ProduceResp) ToJSON() ([]byte, error) {
return json.Marshal(p)
}

func (lhs ProduceReqPartition) Equal(rhs ProduceReqPartition) bool {
return lhs.Index == rhs.Index && lhs.MessageSet.Equals(rhs.MessageSet)
}

func (lhs ProduceReqTopic) Equal(rhs ProduceReqTopic) bool {
if lhs.Name != rhs.Name {
return false
}
if len(lhs.Partitions) != len(rhs.Partitions) {
return false
}
for i := range lhs.Partitions {
if !lhs.Partitions[i].Equal(rhs.Partitions[i]) {
return false
}
}
return true
}

func (lhs ProduceReq) Equal(rhs ProduceReq) bool {
if lhs.TransactionalID != rhs.TransactionalID {
return false
}
if lhs.Acks != rhs.Acks {
return false
}
if lhs.TimeoutMs != rhs.TimeoutMs {
return false
}
if len(lhs.Topics) != len(rhs.Topics) {
return false
}
for i := range lhs.Topics {
if !lhs.Topics[i].Equal(rhs.Topics[i]) {
return false
}
}
return true
}

func (lhs RecordError) Equal(rhs RecordError) bool {
return lhs.BatchIndex == rhs.BatchIndex && lhs.ErrorMessage == rhs.ErrorMessage
}

func (lhs ProduceRespPartition) Equal(rhs ProduceRespPartition) bool {
if lhs.Index != rhs.Index {
return false
}
if lhs.ErrorCode != rhs.ErrorCode {
return false
}
if lhs.BaseOffset != rhs.BaseOffset {
return false
}
if lhs.LogAppendTimeMs != rhs.LogAppendTimeMs {
return false
}
if lhs.LogStartOffset != rhs.LogStartOffset {
return false
}
if lhs.ErrorMessage != rhs.ErrorMessage {
return false
}
if len(lhs.RecordErrors) != len(rhs.RecordErrors) {
return false
}
for i := range lhs.RecordErrors {
if !lhs.RecordErrors[i].Equal(rhs.RecordErrors[i]) {
return false
}
}
return true
}

func (lhs ProduceRespTopic) Equal(rhs ProduceRespTopic) bool {
if lhs.Name != rhs.Name {
return false
}
if len(lhs.Partitions) != len(rhs.Partitions) {
return false
}
for i := range lhs.Partitions {
if !lhs.Partitions[i].Equal(rhs.Partitions[i]) {
return false
}
}
return true
}

func (lhs ProduceResp) Equal(rhs ProduceResp) bool {
if lhs.ThrottleTimeMs != rhs.ThrottleTimeMs {
return false
}
if len(lhs.Topics) != len(rhs.Topics) {
return false
}
for i := range lhs.Topics {
if !lhs.Topics[i].Equal(rhs.Topics[i]) {
return false
}
}
return true
}
47 changes: 47 additions & 0 deletions agent/protocol/kafka/common/sync_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,50 @@ type SyncGroupResp struct {
func (s SyncGroupResp) ToJSON() ([]byte, error) {
return json.Marshal(s)
}

func (lhs SyncGroupAssignment) Equal(rhs SyncGroupAssignment) bool {
return lhs.MemberID == rhs.MemberID
}

func (lhs SyncGroupReq) Equal(rhs SyncGroupReq) bool {
if lhs.GroupID != rhs.GroupID {
return false
}
if lhs.GenerationID != rhs.GenerationID {
return false
}
if lhs.MemberID != rhs.MemberID {
return false
}
if lhs.GroupInstanceID != rhs.GroupInstanceID {
return false
}
if lhs.ProtocolType != rhs.ProtocolType {
return false
}
if lhs.ProtocolName != rhs.ProtocolName {
return false
}
if len(lhs.Assignments) != len(rhs.Assignments) {
return false
}
for i := range lhs.Assignments {
if !lhs.Assignments[i].Equal(rhs.Assignments[i]) {
return false
}
}
return true
}

func (lhs SyncGroupResp) Equal(rhs SyncGroupResp) bool {
if lhs.ThrottleTimeMs != rhs.ThrottleTimeMs {
return false
}
if lhs.ErrorCode != rhs.ErrorCode {
return false
}
if lhs.ProtocolType != rhs.ProtocolType {
return false
}
return lhs.ProtocolName == rhs.ProtocolName
}
Loading

0 comments on commit 8f36c22

Please sign in to comment.