Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use external addresses for Kakfa internal listeners #22

Merged
merged 1 commit into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10877,6 +10877,19 @@ spec:
containerPort:
format: int32
type: integer
externalListenerForHostname:
description: If set to a non-empty value, the Kafka brokers
will use the external hostname for inter broker communication.
The internal lister will will share the same hostname with
the external listener that is referenced here.
type: string
internalStartingPort:
description: This following options are helpful when you want
to run a Kafka cluster over multiple Kubernetes clusters.
The broker internal ports are computed as the sum of the
internalStartingPort and the broker id.
format: int32
type: integer
name:
type: string
type:
Expand Down
13 changes: 13 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10876,6 +10876,19 @@ spec:
containerPort:
format: int32
type: integer
externalListenerForHostname:
description: If set to a non-empty value, the Kafka brokers
will use the external hostname for inter broker communication.
The internal lister will will share the same hostname with
the external listener that is referenced here.
type: string
internalStartingPort:
description: This following options are helpful when you want
to run a Kafka cluster over multiple Kubernetes clusters.
The broker internal ports are computed as the sum of the
internalStartingPort and the broker id.
format: int32
type: integer
name:
type: string
type:
Expand Down
34 changes: 26 additions & 8 deletions pkg/k8sutil/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *v1bet
return nil
}

func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[string]v1beta1.ListenerStatusList, map[string]v1beta1.ListenerStatusList) {
func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster, externalListenerStatus map[string]v1beta1.ListenerStatusList) (map[string]v1beta1.ListenerStatusList, map[string]v1beta1.ListenerStatusList) {
intListenerStatuses := make(map[string]v1beta1.ListenerStatusList, len(kafkaCluster.Spec.ListenersConfig.InternalListeners))
controllerIntListenerStatuses := make(map[string]v1beta1.ListenerStatusList)

Expand All @@ -323,13 +323,22 @@ func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[str

// add addresses per broker
for _, broker := range kafkaCluster.Spec.Brokers {
var address string
if kafkaCluster.Spec.HeadlessServiceEnabled {
address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name,
kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
} else {
address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace,
kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
var address = ""
if iListener.ExternalListenerForHostname != "" && iListener.InternalStartingPort > 0 {
if eListenerStatus, ok := externalListenerStatus[iListener.ExternalListenerForHostname]; ok {
address = fmt.Sprintf("%s:%d", getHostnameForBrokerId(eListenerStatus, broker.Id),
iListener.InternalStartingPort+broker.Id)
}
}

if address == "" {
if kafkaCluster.Spec.HeadlessServiceEnabled {
address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name,
kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
} else {
address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace,
kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
}
}
listenerStatusList = append(listenerStatusList, v1beta1.ListenerStatus{
Name: fmt.Sprintf("broker-%d", broker.Id),
Expand All @@ -346,3 +355,12 @@ func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[str

return intListenerStatuses, controllerIntListenerStatuses
}

func getHostnameForBrokerId(eListenerStatusList v1beta1.ListenerStatusList, brokerId int32) string {
for _, eListenerStatus := range eListenerStatusList {
if eListenerStatus.Name == fmt.Sprintf("broker-%d", brokerId) {
return strings.Split(eListenerStatus.Address, ":")[0]
}
}
return ""
}
2 changes: 1 addition & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if err != nil {
return errors.WrapIf(err, "could not update status for external listeners")
}
intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster)
intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster, extListenerStatuses)
err = k8sutil.UpdateListenerStatuses(context.Background(), r.Client, r.KafkaCluster, log, intListenerStatuses, extListenerStatuses)
if err != nil {
return errors.WrapIf(err, "failed to update listener statuses")
Expand Down
8 changes: 8 additions & 0 deletions pkg/sdk/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,14 @@ type InternalListenerConfig struct {
CommonListenerSpec `json:",inline"`
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"`
// This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters.
// The broker internal ports are computed as the sum of the internalStartingPort and the broker id.
// +optional
InternalStartingPort int32 `json:"internalStartingPort"`
// If set to a non-empty value, the Kafka brokers will use the external hostname for inter broker communication.
// The internal lister will will share the same hostname with the external listener that is referenced here.
// +optional
ExternalListenerForHostname string `json:"externalListenerForHostname,omitempty"`
}

// CommonListenerSpec defines the common building block for Listener type
Expand Down