diff --git a/generate/definitions/62_broker_registration b/generate/definitions/62_broker_registration new file mode 100644 index 00000000..819b2ef1 --- /dev/null +++ b/generate/definitions/62_broker_registration @@ -0,0 +1,37 @@ +// For KIP-500, BrokerRegistrationRequest is an internal broker-to-broker only +// request. +BrokerRegistrationRequest => key 62, max version 0, flexible v0+ + // The broker ID. + BrokerID: int32 + // The cluster ID of the broker process. + ClusterID: uuid + // The incarnation ID of the broker process. + IncarnationID: uuid + // The listeners for this broker. + Listeners: [=>] + // The name of this endpoint. + Name: string + // The hostname. + Host: string + // The port. + Port: uint16 + // The security protocol. + SecurityProtocol: int16 + // Features on this broker. + Features: [=>] + // The name of the feature. + Name: string + // The minimum supported feature level. + MinSupportedVersion: int16 + // The maximum supported feature level. + MaxSupportedVersion: int16 + // The rack that this broker is in, if any. + Rack: nullable-string + +// BrokerRegistrationResponse is a response to a BrokerRegistrationRequest. +BrokerRegistrationResponse => + ThrottleMillis + // Any error code, or 0. + ErrorCode: int16 + // The broker's assigned epoch, or -1 if none was assigned. + BrokerEpoch: int64(-1) diff --git a/generate/definitions/63_broker_heartbeat b/generate/definitions/63_broker_heartbeat new file mode 100644 index 00000000..9f0e39b3 --- /dev/null +++ b/generate/definitions/63_broker_heartbeat @@ -0,0 +1,25 @@ +// For KIP-500, BrokerHeartbeatRequest is an internal broker-to-broker only +// request. +BrokerHeartbeatRequest => key 63, max version 0, flexible v0+ + // The broker ID. + BrokerID: int32 + // The broker's epoch. + BrokerEpoch: int64(-1) + // The highest metadata offset that the broker has reached. + CurrentMetadataOffset: int64 + // True if the broker wants to be fenced. + WantFence: bool + // True if the broker wants to be shutdown. + WantShutdown: bool + +// BrokerHeartbeatResponse is a response to a BrokerHeartbeatRequest. +BrokerHeartbeatResponse => + ThrottleMillis + // Any error code, or 0. + ErrorCode: int16 + // True if the broker has approximately caught up with the latest metadata. + IsCaughtUp: bool + // True if the broker is fenced. + IsFenced: bool(true) + // True if the broker should proceed with its shutdown. + ShouldShutdown: bool diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 80876e63..482003c5 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -10,7 +10,7 @@ import ( // MaxKey is the maximum key used for any messages in this package. // Note that this value will change as Kafka adds more messages. -const MaxKey = 61 +const MaxKey = 63 // MessageV0 is the message format Kafka used prior to 0.10. // @@ -33905,6 +33905,691 @@ func NewDescribeProducersResponse() DescribeProducersResponse { return v } +type BrokerRegistrationRequestListener struct { + // The name of this endpoint. + Name string + + // The hostname. + Host string + + // The port. + Port uint16 + + // The security protocol. + SecurityProtocol int16 +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to BrokerRegistrationRequestListener. +func (v *BrokerRegistrationRequestListener) Default() { +} + +// NewBrokerRegistrationRequestListener returns a default BrokerRegistrationRequestListener +// This is a shortcut for creating a struct and calling Default yourself. +func NewBrokerRegistrationRequestListener() BrokerRegistrationRequestListener { + var v BrokerRegistrationRequestListener + v.Default() + return v +} + +type BrokerRegistrationRequestFeature struct { + // The name of the feature. + Name string + + // The minimum supported feature level. + MinSupportedVersion int16 + + // The maximum supported feature level. + MaxSupportedVersion int16 +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to BrokerRegistrationRequestFeature. +func (v *BrokerRegistrationRequestFeature) Default() { +} + +// NewBrokerRegistrationRequestFeature returns a default BrokerRegistrationRequestFeature +// This is a shortcut for creating a struct and calling Default yourself. +func NewBrokerRegistrationRequestFeature() BrokerRegistrationRequestFeature { + var v BrokerRegistrationRequestFeature + v.Default() + return v +} + +// For KIP-500, BrokerRegistrationRequest is an internal broker-to-broker only +// request. +type BrokerRegistrationRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // The broker ID. + BrokerID int32 + + // The cluster ID of the broker process. + ClusterID [2]uint64 + + // The incarnation ID of the broker process. + IncarnationID [2]uint64 + + // The listeners for this broker. + Listeners []BrokerRegistrationRequestListener + + // Features on this broker. + Features []BrokerRegistrationRequestFeature + + // The rack that this broker is in, if any. + Rack *string +} + +func (*BrokerRegistrationRequest) Key() int16 { return 62 } +func (*BrokerRegistrationRequest) MaxVersion() int16 { return 0 } +func (v *BrokerRegistrationRequest) SetVersion(version int16) { v.Version = version } +func (v *BrokerRegistrationRequest) GetVersion() int16 { return v.Version } +func (v *BrokerRegistrationRequest) IsFlexible() bool { return v.Version >= 0 } +func (v *BrokerRegistrationRequest) ResponseKind() Response { + return &BrokerRegistrationResponse{Version: v.Version} +} + +// RequestWith is requests v on r and returns the response or an error. +func (v *BrokerRegistrationRequest) RequestWith(ctx context.Context, r Requestor) (*BrokerRegistrationResponse, error) { + kresp, err := r.Request(ctx, v) + if err != nil { + return nil, err + } + return kresp.(*BrokerRegistrationResponse), nil +} + +func (v *BrokerRegistrationRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.BrokerID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ClusterID + dst = kbin.AppendUuid(dst, v) + } + { + v := v.IncarnationID + dst = kbin.AppendUuid(dst, v) + } + { + v := v.Listeners + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Name + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Host + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Port + dst = kbin.AppendUint16(dst, v) + } + { + v := v.SecurityProtocol + dst = kbin.AppendInt16(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + { + v := v.Features + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Name + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.MinSupportedVersion + dst = kbin.AppendInt16(dst, v) + } + { + v := v.MaxSupportedVersion + dst = kbin.AppendInt16(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + { + v := v.Rack + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *BrokerRegistrationRequest) ReadFrom(src []byte) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := b.Int32() + s.BrokerID = v + } + { + v := b.Uuid() + s.ClusterID = v + } + { + v := b.Uuid() + s.IncarnationID = v + } + { + v := s.Listeners + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]BrokerRegistrationRequestListener, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Name = v + } + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Host = v + } + { + v := b.Uint16() + s.Port = v + } + { + v := b.Int16() + s.SecurityProtocol = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Listeners = v + } + { + v := s.Features + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]BrokerRegistrationRequestFeature, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Name = v + } + { + v := b.Int16() + s.MinSupportedVersion = v + } + { + v := b.Int16() + s.MaxSupportedVersion = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Features = v + } + { + var v *string + if isFlexible { + v = b.CompactNullableString() + } else { + v = b.NullableString() + } + s.Rack = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} + +// NewPtrBrokerRegistrationRequest returns a pointer to a default BrokerRegistrationRequest +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrBrokerRegistrationRequest() *BrokerRegistrationRequest { + var v BrokerRegistrationRequest + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to BrokerRegistrationRequest. +func (v *BrokerRegistrationRequest) Default() { +} + +// NewBrokerRegistrationRequest returns a default BrokerRegistrationRequest +// This is a shortcut for creating a struct and calling Default yourself. +func NewBrokerRegistrationRequest() BrokerRegistrationRequest { + var v BrokerRegistrationRequest + v.Default() + return v +} + +// BrokerRegistrationResponse is a response to a BrokerRegistrationRequest. +type BrokerRegistrationResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // ThrottleMillis is how long of a throttle Kafka will apply to the client + // after responding to this request. + ThrottleMillis int32 + + // Any error code, or 0. + ErrorCode int16 + + // The broker's assigned epoch, or -1 if none was assigned. + BrokerEpoch int64 +} + +func (*BrokerRegistrationResponse) Key() int16 { return 62 } +func (*BrokerRegistrationResponse) MaxVersion() int16 { return 0 } +func (v *BrokerRegistrationResponse) SetVersion(version int16) { v.Version = version } +func (v *BrokerRegistrationResponse) GetVersion() int16 { return v.Version } +func (v *BrokerRegistrationResponse) IsFlexible() bool { return v.Version >= 0 } +func (v *BrokerRegistrationResponse) Throttle() (int32, bool) { + return v.ThrottleMillis, v.Version >= 0 +} +func (v *BrokerRegistrationResponse) RequestKind() Request { + return &BrokerRegistrationRequest{Version: v.Version} +} + +func (v *BrokerRegistrationResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ThrottleMillis + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.BrokerEpoch + dst = kbin.AppendInt64(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *BrokerRegistrationResponse) ReadFrom(src []byte) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := b.Int32() + s.ThrottleMillis = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + v := b.Int64() + s.BrokerEpoch = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} + +// NewPtrBrokerRegistrationResponse returns a pointer to a default BrokerRegistrationResponse +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrBrokerRegistrationResponse() *BrokerRegistrationResponse { + var v BrokerRegistrationResponse + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to BrokerRegistrationResponse. +func (v *BrokerRegistrationResponse) Default() { + v.BrokerEpoch = -1 +} + +// NewBrokerRegistrationResponse returns a default BrokerRegistrationResponse +// This is a shortcut for creating a struct and calling Default yourself. +func NewBrokerRegistrationResponse() BrokerRegistrationResponse { + var v BrokerRegistrationResponse + v.Default() + return v +} + +// For KIP-500, BrokerHeartbeatRequest is an internal broker-to-broker only +// request. +type BrokerHeartbeatRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // The broker ID. + BrokerID int32 + + // The broker's epoch. + BrokerEpoch int64 + + // The highest metadata offset that the broker has reached. + CurrentMetadataOffset int64 + + // True if the broker wants to be fenced. + WantFence bool + + // True if the broker wants to be shutdown. + WantShutdown bool +} + +func (*BrokerHeartbeatRequest) Key() int16 { return 63 } +func (*BrokerHeartbeatRequest) MaxVersion() int16 { return 0 } +func (v *BrokerHeartbeatRequest) SetVersion(version int16) { v.Version = version } +func (v *BrokerHeartbeatRequest) GetVersion() int16 { return v.Version } +func (v *BrokerHeartbeatRequest) IsFlexible() bool { return v.Version >= 0 } +func (v *BrokerHeartbeatRequest) ResponseKind() Response { + return &BrokerHeartbeatResponse{Version: v.Version} +} + +// RequestWith is requests v on r and returns the response or an error. +func (v *BrokerHeartbeatRequest) RequestWith(ctx context.Context, r Requestor) (*BrokerHeartbeatResponse, error) { + kresp, err := r.Request(ctx, v) + if err != nil { + return nil, err + } + return kresp.(*BrokerHeartbeatResponse), nil +} + +func (v *BrokerHeartbeatRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.BrokerID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.BrokerEpoch + dst = kbin.AppendInt64(dst, v) + } + { + v := v.CurrentMetadataOffset + dst = kbin.AppendInt64(dst, v) + } + { + v := v.WantFence + dst = kbin.AppendBool(dst, v) + } + { + v := v.WantShutdown + dst = kbin.AppendBool(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *BrokerHeartbeatRequest) ReadFrom(src []byte) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := b.Int32() + s.BrokerID = v + } + { + v := b.Int64() + s.BrokerEpoch = v + } + { + v := b.Int64() + s.CurrentMetadataOffset = v + } + { + v := b.Bool() + s.WantFence = v + } + { + v := b.Bool() + s.WantShutdown = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} + +// NewPtrBrokerHeartbeatRequest returns a pointer to a default BrokerHeartbeatRequest +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrBrokerHeartbeatRequest() *BrokerHeartbeatRequest { + var v BrokerHeartbeatRequest + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to BrokerHeartbeatRequest. +func (v *BrokerHeartbeatRequest) Default() { + v.BrokerEpoch = -1 +} + +// NewBrokerHeartbeatRequest returns a default BrokerHeartbeatRequest +// This is a shortcut for creating a struct and calling Default yourself. +func NewBrokerHeartbeatRequest() BrokerHeartbeatRequest { + var v BrokerHeartbeatRequest + v.Default() + return v +} + +// BrokerHeartbeatResponse is a response to a BrokerHeartbeatRequest. +type BrokerHeartbeatResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // ThrottleMillis is how long of a throttle Kafka will apply to the client + // after responding to this request. + ThrottleMillis int32 + + // Any error code, or 0. + ErrorCode int16 + + // True if the broker has approximately caught up with the latest metadata. + IsCaughtUp bool + + // True if the broker is fenced. + IsFenced bool + + // True if the broker should proceed with its shutdown. + ShouldShutdown bool +} + +func (*BrokerHeartbeatResponse) Key() int16 { return 63 } +func (*BrokerHeartbeatResponse) MaxVersion() int16 { return 0 } +func (v *BrokerHeartbeatResponse) SetVersion(version int16) { v.Version = version } +func (v *BrokerHeartbeatResponse) GetVersion() int16 { return v.Version } +func (v *BrokerHeartbeatResponse) IsFlexible() bool { return v.Version >= 0 } +func (v *BrokerHeartbeatResponse) Throttle() (int32, bool) { return v.ThrottleMillis, v.Version >= 0 } +func (v *BrokerHeartbeatResponse) RequestKind() Request { + return &BrokerHeartbeatRequest{Version: v.Version} +} + +func (v *BrokerHeartbeatResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ThrottleMillis + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.IsCaughtUp + dst = kbin.AppendBool(dst, v) + } + { + v := v.IsFenced + dst = kbin.AppendBool(dst, v) + } + { + v := v.ShouldShutdown + dst = kbin.AppendBool(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *BrokerHeartbeatResponse) ReadFrom(src []byte) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := b.Int32() + s.ThrottleMillis = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + v := b.Bool() + s.IsCaughtUp = v + } + { + v := b.Bool() + s.IsFenced = v + } + { + v := b.Bool() + s.ShouldShutdown = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} + +// NewPtrBrokerHeartbeatResponse returns a pointer to a default BrokerHeartbeatResponse +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrBrokerHeartbeatResponse() *BrokerHeartbeatResponse { + var v BrokerHeartbeatResponse + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to BrokerHeartbeatResponse. +func (v *BrokerHeartbeatResponse) Default() { + v.IsFenced = true +} + +// NewBrokerHeartbeatResponse returns a default BrokerHeartbeatResponse +// This is a shortcut for creating a struct and calling Default yourself. +func NewBrokerHeartbeatResponse() BrokerHeartbeatResponse { + var v BrokerHeartbeatResponse + v.Default() + return v +} + // RequestForKey returns the request corresponding to the given request key // or nil if the key is unknown. func RequestForKey(key int16) Request { @@ -34035,6 +34720,10 @@ func RequestForKey(key int16) Request { return NewPtrDescribeClusterRequest() case 61: return NewPtrDescribeProducersRequest() + case 62: + return NewPtrBrokerRegistrationRequest() + case 63: + return NewPtrBrokerHeartbeatRequest() } } @@ -34168,6 +34857,10 @@ func ResponseForKey(key int16) Response { return NewPtrDescribeClusterResponse() case 61: return NewPtrDescribeProducersResponse() + case 62: + return NewPtrBrokerRegistrationResponse() + case 63: + return NewPtrBrokerHeartbeatResponse() } } @@ -34301,6 +34994,10 @@ func NameForKey(key int16) string { return "DescribeCluster" case 61: return "DescribeProducers" + case 62: + return "BrokerRegistration" + case 63: + return "BrokerHeartbeat" } } diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 329c65c5..db477ea0 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -692,5 +692,12 @@ var maxTip = nextMax(max270, func(v []int16) []int16 { 0, // 61 describe producers ) + // KAFKA-12248 a022072df3c8175950c03263d2bbf2e3ea7a7a5d KIP-500 + // (commit mentions KIP-500, these are actually described in KIP-631) + v = append(v, + 0, // 62 broker registration + 0, // 63 broker heartbeat + ) + return v })