Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/transport/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestConfluentConfig(t *testing.T) {

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
_, err := GetConfluentConfigMap(tc.kafkaConfig)
_, err := GetConfluentConfigMap(tc.kafkaConfig, true)
if err != tc.expectedErr {
t.Errorf("%s:\nexpected err: %v\ngot err: %v\n", tc.desc, tc.expectedErr, err)
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/transport/config/confluent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)

func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig) (*kafka.ConfigMap, error) {
func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig, producer bool) (*kafka.ConfigMap, error) {
kafkaConfigMap := &kafka.ConfigMap{
"bootstrap.servers": kafkaConfig.BootstrapServer,
"socket.keepalive.enable": "true",
"auto.offset.reset": "earliest", // consumer
"enable.auto.commit": "false", // consumer
"acks": "1", // producer
"retries": "0", // producer
// silence spontaneous disconnection logs, kafka recovers by itself.
"log.connection.close": "false",
"log.connection.close": "true",
}
if producer {
_ = kafkaConfigMap.SetKey("acks", "1")
_ = kafkaConfigMap.SetKey("retries", "0")
} else {
_ = kafkaConfigMap.SetKey("enable.auto.commit", "false")
_ = kafkaConfigMap.SetKey("auto.offset.reset", "earliest")
}

if kafkaConfig.EnableTLS && utils.Validate(kafkaConfig.CaCertPath) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type KafkaConsumer struct {
// NewConsumer creates a new instance of Consumer.
func NewKafkaConsumer(kafkaConfig *transport.KafkaConfig, log logr.Logger,
) (*KafkaConsumer, error) {
kafkaConfigMap, err := config.GetConfluentConfigMap(kafkaConfig)
kafkaConfigMap, err := config.GetConfluentConfigMap(kafkaConfig, false)
if err != nil {
return nil, fmt.Errorf("failed to get kafka config map: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/producer/kafka_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type KafkaProducer struct {
// NewProducer returns a new instance of Producer object.
func NewKafkaProducer(compressor compressor.Compressor, kafkaConfig *transport.KafkaConfig, log logr.Logger,
) (*KafkaProducer, error) {
kafkaConfigMap, err := config.GetConfluentConfigMap(kafkaConfig)
kafkaConfigMap, err := config.GetConfluentConfigMap(kafkaConfig, true)
if err != nil {
return nil, fmt.Errorf("failed to configure kafka-producer - %w", err)
}
Expand Down
17 changes: 11 additions & 6 deletions samples/cloudevent/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ import (
"context"
"fmt"
"log"
"os"

"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"

"github.com/stolostron/multicluster-global-hub/samples/config"
)

var (
groupId = "test-group-id"
topic = "event"
)
var groupId = "test-group-id"

func main() {
bootstrapServer, saramaConfig, err := config.GetSaramaConfig()
if len(os.Args) < 2 {
fmt.Println("Please provide at least one topic command-line argument.")
os.Exit(1)
}
topic := os.Args[1]

bootstrapServer, saramaConfig, err := config.GetSaramaConfigFromKafkaUser()
if err != nil {
log.Fatalf("failed to get sarama config: %v", err)
}
Expand All @@ -34,7 +39,7 @@ func main() {

defer receiver.Close(context.Background())

c, err := cloudevents.NewClient(receiver)
c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1))
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
Expand Down
13 changes: 11 additions & 2 deletions samples/cloudevent/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"context"
"fmt"
"log"
"os"

"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
Expand All @@ -13,11 +15,17 @@ import (

const (
count = 10
topic = "event"
)

func main() {
bootstrapServer, saramaConfig, err := config.GetSaramaConfig()
if len(os.Args) < 2 {
fmt.Println("Please provide at least one topic command-line argument.")
os.Exit(1)
}
topic := os.Args[1]

// bootstrapServer, saramaConfig, err := config.GetSaramaConfig()
bootstrapServer, saramaConfig, err := config.GetSaramaConfigFromKafkaUser()
if err != nil {
log.Fatalf("failed to get sarama config: %v", err)
}
Expand All @@ -41,6 +49,7 @@ func main() {
e.SetID(uuid.New().String())
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/samples/kafka/sender")
e.SetExtension("test", "foo")
_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"id": i,
"message": "Hello, World!",
Expand Down
85 changes: 83 additions & 2 deletions samples/config/confluent_config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
package config

import (
"context"
"fmt"
"log"
"os"

kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/transport/config"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func GetConfluentConfigMap() (*kafka.ConfigMap, error) {
const (
KAFkA_USER = "global-hub-kafka-user"
KAFKA_CLUSTER = "kafka"
KAFKA_NAMESPACE = "multicluster-global-hub"
)

func GetConfluentConfigMap(isProducer bool) (*kafka.ConfigMap, error) {
secret, err := GetTransportSecret()
if err != nil {
log.Fatalf("failed to get transport secret: %v", err)
Expand Down Expand Up @@ -45,7 +58,75 @@ func GetConfluentConfigMap() (*kafka.ConfigMap, error) {
ClientCertPath: clientCrtPath,
ClientKeyPath: clientKeyPath,
}
configMap, err := config.GetConfluentConfigMap(kafkaConfig)
configMap, err := config.GetConfluentConfigMap(kafkaConfig, isProducer)
if err != nil {
log.Fatalf("failed to get confluent config map: %v", err)
return nil, err
}
return configMap, nil
}

func GetConfluentConfigMapByKafkaUser(isProducer bool) (*kafka.ConfigMap, error) {
kubeconfig, err := loadDynamicKubeConfig(EnvKubconfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubeconfig")
}

kafkav1beta2.AddToScheme(scheme.Scheme)
c, err := client.New(kubeconfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, fmt.Errorf("failed to get runtime client")
}

kafkaCluster := &kafkav1beta2.Kafka{}
err = c.Get(context.TODO(), types.NamespacedName{
Name: KAFKA_CLUSTER,
Namespace: KAFKA_NAMESPACE,
}, kafkaCluster)
if err != nil {
return nil, err
}

bootstrapServer := *kafkaCluster.Status.Listeners[1].BootstrapServers

kafkaUserSecret := &corev1.Secret{}
err = c.Get(context.TODO(), types.NamespacedName{
Name: KAFkA_USER,
Namespace: KAFKA_NAMESPACE,
}, kafkaUserSecret)
if err != nil {
return nil, err
}

caCrtPath := "/tmp/ca.crt"
err = os.WriteFile(caCrtPath, []byte(kafkaCluster.Status.Listeners[1].Certificates[0]), 0o600)
if err != nil {
log.Fatalf("failed to write ca.crt: %v", err)
return nil, err
}

clientCrtPath := "/tmp/client.crt"
err = os.WriteFile(clientCrtPath, kafkaUserSecret.Data["user.crt"], 0o600)
if err != nil {
log.Fatalf("failed to write client.crt: %v", err)
return nil, err
}

clientKeyPath := "/tmp/client.key"
err = os.WriteFile(clientKeyPath, kafkaUserSecret.Data["user.key"], 0o600)
if err != nil {
log.Fatalf("failed to write client.key: %v", err)
return nil, err
}

kafkaConfig := &transport.KafkaConfig{
BootstrapServer: bootstrapServer,
EnableTLS: true,
CaCertPath: caCrtPath,
ClientCertPath: clientCrtPath,
ClientKeyPath: clientKeyPath,
}
configMap, err := config.GetConfluentConfigMap(kafkaConfig, isProducer)
if err != nil {
log.Fatalf("failed to get confluent config map: %v", err)
return nil, err
Expand Down
22 changes: 22 additions & 0 deletions samples/config/kubeconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package config

import (
"os"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

const EnvKubconfig = "KUBECONFIG"

func loadDynamicKubeConfig(envVar string) (*rest.Config, error) {
kubeconfigPath := os.Getenv(envVar)
if kubeconfigPath != "" {
// Load kubeconfig from the specified path
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}

// Use the in-cluster configuration
return config.GetConfig()
}
69 changes: 69 additions & 0 deletions samples/config/sarama_config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package config

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"

kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2"
"github.com/Shopify/sarama"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func GetSaramaConfig() (string, *sarama.Config, error) {
Expand Down Expand Up @@ -52,3 +59,65 @@ func GetSaramaConfig() (string, *sarama.Config, error) {

return string(bootstrapSever), saramaConfig, nil
}

func GetSaramaConfigFromKafkaUser() (string, *sarama.Config, error) {
userName := KAFkA_USER
kubeconfig, err := loadDynamicKubeConfig(EnvKubconfig)
if err != nil {
return "", nil, fmt.Errorf("failed to get kubeconfig")
}

kafkav1beta2.AddToScheme(scheme.Scheme)
c, err := client.New(kubeconfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return "", nil, fmt.Errorf("failed to get runtime client")
}

// #nosec G402
tlsConfig := &tls.Config{}

kafkaCluster := &kafkav1beta2.Kafka{}
err = c.Get(context.TODO(), types.NamespacedName{
Name: KAFKA_CLUSTER,
Namespace: KAFKA_NAMESPACE,
}, kafkaCluster)
if err != nil {
return "", nil, err
}

bootstrapServer := *kafkaCluster.Status.Listeners[1].BootstrapServers

// Load CA cert
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(kafkaCluster.Status.Listeners[1].Certificates[0]))
tlsConfig.RootCAs = caCertPool

kafkaUserSecret := &corev1.Secret{}
err = c.Get(context.TODO(), types.NamespacedName{
Name: userName,
Namespace: KAFKA_NAMESPACE,
}, kafkaUserSecret)
if err != nil {
return "", nil, err
}

// Load client cert
if len(kafkaUserSecret.Data["user.crt"]) > 0 && len(kafkaUserSecret.Data["user.key"]) > 0 {
cert, err := tls.X509KeyPair(kafkaUserSecret.Data["user.crt"], kafkaUserSecret.Data["user.key"])
if err != nil {
return "", nil, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
tlsConfig.InsecureSkipVerify = false
} else {
// #nosec
tlsConfig.InsecureSkipVerify = true
}

saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
saramaConfig.Net.TLS.Config = tlsConfig
saramaConfig.Net.TLS.Enable = true

return bootstrapServer, saramaConfig, nil
}
Loading