From 31b1df7b21308bb7bbf7c1aab5b9b5c8883329b6 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 31 Jan 2021 16:33:14 -0700 Subject: [PATCH] KIP-664: add support for DescribeProducers One of many protocol changes from the KIP, thus we will not update the readme yet. --- generate/definitions/61_describe_producers | 47 ++ pkg/kmsg/generated.go | 579 ++++++++++++++++++++- pkg/kversion/kversion.go | 5 + 3 files changed, 630 insertions(+), 1 deletion(-) create mode 100644 generate/definitions/61_describe_producers diff --git a/generate/definitions/61_describe_producers b/generate/definitions/61_describe_producers new file mode 100644 index 00000000..1cdc21c9 --- /dev/null +++ b/generate/definitions/61_describe_producers @@ -0,0 +1,47 @@ +// Introduced for KIP-664, DescribeProducersRequest allows for introspecting +// the state of the transaction coordinator. This request can be used to detect +// hanging transactions or other EOS-related problems. +// +// This request allows for describing the state of the active +// idempotent/transactional producers. +DescribeProducersRequest => key 61, max version 0, flexible v0+ + // The topics to describe producers for. + Topics: [=>] + Topic: string + // The partitions to list producers for for the given topic. + Partitions: [int32] + +// DescribeProducersResponse is a response to a DescribeProducersRequest. +DescribeProducersResponse => + ThrottleMillis + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + // The partition error code, or 0 if there was no error. + // + // NOT_LEADER_OR_FOLLOWER is returned if the broker receiving this request + // is not the leader of the partition. + // + // TOPIC_AUTHORIZATION_FAILED is returned if the user does not have Describe + // permissions on the topic. + // + // UNKNOWN_TOPIC_OR_PARTITION is returned if the partition is not known to exist. + // + // Other errors may be returned corresponding to the partition being offline, etc. + ErrorCode: int16 + // The partition error message, which may be null if no additional details are available. + ErrorMessage: nullable-string + // The current idempotent or transactional producers producing to this partition, + // and the metadata related to their produce requests. + ActiveProducers: [=>] + ProducerID: int64 + ProducerEpoch: int32 + // The last sequence produced. + LastSequence: int32(-1) + // The last timestamp produced. + LastTimestamp: int64(-1) + // The epoch of the transactional coordinator for this last produce. + CoordinatorEpoch: int32 + // The first offset of the transaction. + CurrentTxnStartOffset: int64(-1) diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 1f1d37c9..80876e63 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -10,7 +10,7 @@ import ( // MaxKey is the maximum key used for any messages in this package. // Note that this value will change as Kafka adds more messages. -const MaxKey = 60 +const MaxKey = 61 // MessageV0 is the message format Kafka used prior to 0.10. // @@ -33334,6 +33334,577 @@ func NewDescribeClusterResponse() DescribeClusterResponse { return v } +type DescribeProducersRequestTopic struct { + Topic string + + // The partitions to list producers for for the given topic. + Partitions []int32 +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to DescribeProducersRequestTopic. +func (v *DescribeProducersRequestTopic) Default() { +} + +// NewDescribeProducersRequestTopic returns a default DescribeProducersRequestTopic +// This is a shortcut for creating a struct and calling Default yourself. +func NewDescribeProducersRequestTopic() DescribeProducersRequestTopic { + var v DescribeProducersRequestTopic + v.Default() + return v +} + +// Introduced for KIP-664, DescribeProducersRequest allows for introspecting +// the state of the transaction coordinator. This request can be used to detect +// hanging transactions or other EOS-related problems. +// +// This request allows for describing the state of the active +// idempotent/transactional producers. +type DescribeProducersRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // The topics to describe producers for. + Topics []DescribeProducersRequestTopic +} + +func (*DescribeProducersRequest) Key() int16 { return 61 } +func (*DescribeProducersRequest) MaxVersion() int16 { return 0 } +func (v *DescribeProducersRequest) SetVersion(version int16) { v.Version = version } +func (v *DescribeProducersRequest) GetVersion() int16 { return v.Version } +func (v *DescribeProducersRequest) IsFlexible() bool { return v.Version >= 0 } +func (v *DescribeProducersRequest) ResponseKind() Response { + return &DescribeProducersResponse{Version: v.Version} +} + +// RequestWith is requests v on r and returns the response or an error. +func (v *DescribeProducersRequest) RequestWith(ctx context.Context, r Requestor) (*DescribeProducersResponse, error) { + kresp, err := r.Request(ctx, v) + if err != nil { + return nil, err + } + return kresp.(*DescribeProducersResponse), nil +} + +func (v *DescribeProducersRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *DescribeProducersRequest) ReadFrom(src []byte) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeProducersRequestTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]int32, l) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.Partitions = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} + +// NewPtrDescribeProducersRequest returns a pointer to a default DescribeProducersRequest +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrDescribeProducersRequest() *DescribeProducersRequest { + var v DescribeProducersRequest + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to DescribeProducersRequest. +func (v *DescribeProducersRequest) Default() { +} + +// NewDescribeProducersRequest returns a default DescribeProducersRequest +// This is a shortcut for creating a struct and calling Default yourself. +func NewDescribeProducersRequest() DescribeProducersRequest { + var v DescribeProducersRequest + v.Default() + return v +} + +type DescribeProducersResponseTopicPartitionActiveProducer struct { + ProducerID int64 + + ProducerEpoch int32 + + // The last sequence produced. + LastSequence int32 + + // The last timestamp produced. + LastTimestamp int64 + + // The epoch of the transactional coordinator for this last produce. + CoordinatorEpoch int32 + + // The first offset of the transaction. + CurrentTxnStartOffset int64 +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to DescribeProducersResponseTopicPartitionActiveProducer. +func (v *DescribeProducersResponseTopicPartitionActiveProducer) Default() { + v.LastSequence = -1 + v.LastTimestamp = -1 + v.CurrentTxnStartOffset = -1 +} + +// NewDescribeProducersResponseTopicPartitionActiveProducer returns a default DescribeProducersResponseTopicPartitionActiveProducer +// This is a shortcut for creating a struct and calling Default yourself. +func NewDescribeProducersResponseTopicPartitionActiveProducer() DescribeProducersResponseTopicPartitionActiveProducer { + var v DescribeProducersResponseTopicPartitionActiveProducer + v.Default() + return v +} + +type DescribeProducersResponseTopicPartition struct { + Partition int32 + + // The partition error code, or 0 if there was no error. + // + // NOT_LEADER_OR_FOLLOWER is returned if the broker receiving this request + // is not the leader of the partition. + // + // TOPIC_AUTHORIZATION_FAILED is returned if the user does not have Describe + // permissions on the topic. + // + // UNKNOWN_TOPIC_OR_PARTITION is returned if the partition is not known to exist. + // + // Other errors may be returned corresponding to the partition being offline, etc. + ErrorCode int16 + + // The partition error message, which may be null if no additional details are available. + ErrorMessage *string + + // The current idempotent or transactional producers producing to this partition, + // and the metadata related to their produce requests. + ActiveProducers []DescribeProducersResponseTopicPartitionActiveProducer +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to DescribeProducersResponseTopicPartition. +func (v *DescribeProducersResponseTopicPartition) Default() { +} + +// NewDescribeProducersResponseTopicPartition returns a default DescribeProducersResponseTopicPartition +// This is a shortcut for creating a struct and calling Default yourself. +func NewDescribeProducersResponseTopicPartition() DescribeProducersResponseTopicPartition { + var v DescribeProducersResponseTopicPartition + v.Default() + return v +} + +type DescribeProducersResponseTopic struct { + Topic string + + Partitions []DescribeProducersResponseTopicPartition +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to DescribeProducersResponseTopic. +func (v *DescribeProducersResponseTopic) Default() { +} + +// NewDescribeProducersResponseTopic returns a default DescribeProducersResponseTopic +// This is a shortcut for creating a struct and calling Default yourself. +func NewDescribeProducersResponseTopic() DescribeProducersResponseTopic { + var v DescribeProducersResponseTopic + v.Default() + return v +} + +// DescribeProducersResponse is a response to a DescribeProducersRequest. +type DescribeProducersResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // ThrottleMillis is how long of a throttle Kafka will apply to the client + // after responding to this request. + ThrottleMillis int32 + + Topics []DescribeProducersResponseTopic +} + +func (*DescribeProducersResponse) Key() int16 { return 61 } +func (*DescribeProducersResponse) MaxVersion() int16 { return 0 } +func (v *DescribeProducersResponse) SetVersion(version int16) { v.Version = version } +func (v *DescribeProducersResponse) GetVersion() int16 { return v.Version } +func (v *DescribeProducersResponse) IsFlexible() bool { return v.Version >= 0 } +func (v *DescribeProducersResponse) Throttle() (int32, bool) { return v.ThrottleMillis, v.Version >= 0 } +func (v *DescribeProducersResponse) RequestKind() Request { + return &DescribeProducersRequest{Version: v.Version} +} + +func (v *DescribeProducersResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ThrottleMillis + dst = kbin.AppendInt32(dst, v) + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.ErrorMessage + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.ActiveProducers + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.ProducerID + dst = kbin.AppendInt64(dst, v) + } + { + v := v.ProducerEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LastSequence + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LastTimestamp + dst = kbin.AppendInt64(dst, v) + } + { + v := v.CoordinatorEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.CurrentTxnStartOffset + dst = kbin.AppendInt64(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *DescribeProducersResponse) ReadFrom(src []byte) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := b.Int32() + s.ThrottleMillis = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeProducersResponseTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeProducersResponseTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + var v *string + if isFlexible { + v = b.CompactNullableString() + } else { + v = b.NullableString() + } + s.ErrorMessage = v + } + { + v := s.ActiveProducers + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeProducersResponseTopicPartitionActiveProducer, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int64() + s.ProducerID = v + } + { + v := b.Int32() + s.ProducerEpoch = v + } + { + v := b.Int32() + s.LastSequence = v + } + { + v := b.Int64() + s.LastTimestamp = v + } + { + v := b.Int32() + s.CoordinatorEpoch = v + } + { + v := b.Int64() + s.CurrentTxnStartOffset = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.ActiveProducers = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Partitions = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} + +// NewPtrDescribeProducersResponse returns a pointer to a default DescribeProducersResponse +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrDescribeProducersResponse() *DescribeProducersResponse { + var v DescribeProducersResponse + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to DescribeProducersResponse. +func (v *DescribeProducersResponse) Default() { +} + +// NewDescribeProducersResponse returns a default DescribeProducersResponse +// This is a shortcut for creating a struct and calling Default yourself. +func NewDescribeProducersResponse() DescribeProducersResponse { + var v DescribeProducersResponse + v.Default() + return v +} + // RequestForKey returns the request corresponding to the given request key // or nil if the key is unknown. func RequestForKey(key int16) Request { @@ -33462,6 +34033,8 @@ func RequestForKey(key int16) Request { return NewPtrFetchSnapshotRequest() case 60: return NewPtrDescribeClusterRequest() + case 61: + return NewPtrDescribeProducersRequest() } } @@ -33593,6 +34166,8 @@ func ResponseForKey(key int16) Response { return NewPtrFetchSnapshotResponse() case 60: return NewPtrDescribeClusterResponse() + case 61: + return NewPtrDescribeProducersResponse() } } @@ -33724,6 +34299,8 @@ func NameForKey(key int16) string { return "FetchSnapshot" case 60: return "DescribeCluster" + case 61: + return "DescribeProducers" } } diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 64ce812c..329c65c5 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -687,5 +687,10 @@ var maxTip = nextMax(max270, func(v []int16) []int16 { v[19]++ // 7 create topics v[20]++ // 6 delete topics + // KAFKA-12238 e9edf104866822d9e6c3b637ffbf338767b5bf27 KIP-664 + v = append(v, + 0, // 61 describe producers + ) + return v })