diff --git a/pkg/transport/config/config_test.go b/pkg/transport/config/config_test.go index 83a15fa6bb..a707796152 100644 --- a/pkg/transport/config/config_test.go +++ b/pkg/transport/config/config_test.go @@ -37,7 +37,7 @@ func TestConfluentConfig(t *testing.T) { for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - _, err := GetConfluentConfigMap(tc.kafkaConfig) + _, err := GetConfluentConfigMap(tc.kafkaConfig, true) if err != tc.expectedErr { t.Errorf("%s:\nexpected err: %v\ngot err: %v\n", tc.desc, tc.expectedErr, err) } diff --git a/pkg/transport/config/confluent_config.go b/pkg/transport/config/confluent_config.go index a3ae40ddf9..4f2a3e8548 100644 --- a/pkg/transport/config/confluent_config.go +++ b/pkg/transport/config/confluent_config.go @@ -7,16 +7,19 @@ import ( "github.com/stolostron/multicluster-global-hub/pkg/utils" ) -func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig) (*kafka.ConfigMap, error) { +func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig, producer bool) (*kafka.ConfigMap, error) { kafkaConfigMap := &kafka.ConfigMap{ "bootstrap.servers": kafkaConfig.BootstrapServer, "socket.keepalive.enable": "true", - "auto.offset.reset": "earliest", // consumer - "enable.auto.commit": "false", // consumer - "acks": "1", // producer - "retries": "0", // producer // silence spontaneous disconnection logs, kafka recovers by itself. - "log.connection.close": "false", + "log.connection.close": "true", + } + if producer { + _ = kafkaConfigMap.SetKey("acks", "1") + _ = kafkaConfigMap.SetKey("retries", "0") + } else { + _ = kafkaConfigMap.SetKey("enable.auto.commit", "false") + _ = kafkaConfigMap.SetKey("auto.offset.reset", "earliest") } if kafkaConfig.EnableTLS && utils.Validate(kafkaConfig.CaCertPath) { diff --git a/pkg/transport/consumer/kafka_consumer.go b/pkg/transport/consumer/kafka_consumer.go index f1b04a70cd..0712170beb 100644 --- a/pkg/transport/consumer/kafka_consumer.go +++ b/pkg/transport/consumer/kafka_consumer.go @@ -64,7 +64,7 @@ type KafkaConsumer struct { // NewConsumer creates a new instance of Consumer. func NewKafkaConsumer(kafkaConfig *transport.KafkaConfig, log logr.Logger, ) (*KafkaConsumer, error) { - kafkaConfigMap, err := config.GetConfluentConfigMap(kafkaConfig) + kafkaConfigMap, err := config.GetConfluentConfigMap(kafkaConfig, false) if err != nil { return nil, fmt.Errorf("failed to get kafka config map: %w", err) } diff --git a/pkg/transport/producer/kafka_producer.go b/pkg/transport/producer/kafka_producer.go index 4b7e323e4a..19fe3d6cb6 100644 --- a/pkg/transport/producer/kafka_producer.go +++ b/pkg/transport/producer/kafka_producer.go @@ -42,7 +42,7 @@ type KafkaProducer struct { // NewProducer returns a new instance of Producer object. func NewKafkaProducer(compressor compressor.Compressor, kafkaConfig *transport.KafkaConfig, log logr.Logger, ) (*KafkaProducer, error) { - kafkaConfigMap, err := config.GetConfluentConfigMap(kafkaConfig) + kafkaConfigMap, err := config.GetConfluentConfigMap(kafkaConfig, true) if err != nil { return nil, fmt.Errorf("failed to configure kafka-producer - %w", err) } diff --git a/samples/cloudevent/receiver/main.go b/samples/cloudevent/receiver/main.go index b6166c5dbf..9004e9abe4 100644 --- a/samples/cloudevent/receiver/main.go +++ b/samples/cloudevent/receiver/main.go @@ -4,21 +4,26 @@ import ( "context" "fmt" "log" + "os" "github.com/Shopify/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/client" "github.com/stolostron/multicluster-global-hub/samples/config" ) -var ( - groupId = "test-group-id" - topic = "event" -) +var groupId = "test-group-id" func main() { - bootstrapServer, saramaConfig, err := config.GetSaramaConfig() + if len(os.Args) < 2 { + fmt.Println("Please provide at least one topic command-line argument.") + os.Exit(1) + } + topic := os.Args[1] + + bootstrapServer, saramaConfig, err := config.GetSaramaConfigFromKafkaUser() if err != nil { log.Fatalf("failed to get sarama config: %v", err) } @@ -34,7 +39,7 @@ func main() { defer receiver.Close(context.Background()) - c, err := cloudevents.NewClient(receiver) + c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1)) if err != nil { log.Fatalf("failed to create client, %v", err) } diff --git a/samples/cloudevent/sender/main.go b/samples/cloudevent/sender/main.go index 352911467c..6b2a6b87ed 100644 --- a/samples/cloudevent/sender/main.go +++ b/samples/cloudevent/sender/main.go @@ -2,7 +2,9 @@ package main import ( "context" + "fmt" "log" + "os" "github.com/Shopify/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" @@ -13,11 +15,17 @@ import ( const ( count = 10 - topic = "event" ) func main() { - bootstrapServer, saramaConfig, err := config.GetSaramaConfig() + if len(os.Args) < 2 { + fmt.Println("Please provide at least one topic command-line argument.") + os.Exit(1) + } + topic := os.Args[1] + + // bootstrapServer, saramaConfig, err := config.GetSaramaConfig() + bootstrapServer, saramaConfig, err := config.GetSaramaConfigFromKafkaUser() if err != nil { log.Fatalf("failed to get sarama config: %v", err) } @@ -41,6 +49,7 @@ func main() { e.SetID(uuid.New().String()) e.SetType("com.cloudevents.sample.sent") e.SetSource("https://github.com/cloudevents/sdk-go/samples/kafka/sender") + e.SetExtension("test", "foo") _ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ "id": i, "message": "Hello, World!", diff --git a/samples/config/confluent_config.go b/samples/config/confluent_config.go index 16a0846762..785df1a73d 100644 --- a/samples/config/confluent_config.go +++ b/samples/config/confluent_config.go @@ -1,15 +1,28 @@ package config import ( + "context" + "fmt" "log" "os" + kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/stolostron/multicluster-global-hub/pkg/transport" "github.com/stolostron/multicluster-global-hub/pkg/transport/config" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" ) -func GetConfluentConfigMap() (*kafka.ConfigMap, error) { +const ( + KAFkA_USER = "global-hub-kafka-user" + KAFKA_CLUSTER = "kafka" + KAFKA_NAMESPACE = "multicluster-global-hub" +) + +func GetConfluentConfigMap(isProducer bool) (*kafka.ConfigMap, error) { secret, err := GetTransportSecret() if err != nil { log.Fatalf("failed to get transport secret: %v", err) @@ -45,7 +58,75 @@ func GetConfluentConfigMap() (*kafka.ConfigMap, error) { ClientCertPath: clientCrtPath, ClientKeyPath: clientKeyPath, } - configMap, err := config.GetConfluentConfigMap(kafkaConfig) + configMap, err := config.GetConfluentConfigMap(kafkaConfig, isProducer) + if err != nil { + log.Fatalf("failed to get confluent config map: %v", err) + return nil, err + } + return configMap, nil +} + +func GetConfluentConfigMapByKafkaUser(isProducer bool) (*kafka.ConfigMap, error) { + kubeconfig, err := loadDynamicKubeConfig(EnvKubconfig) + if err != nil { + return nil, fmt.Errorf("failed to get kubeconfig") + } + + kafkav1beta2.AddToScheme(scheme.Scheme) + c, err := client.New(kubeconfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return nil, fmt.Errorf("failed to get runtime client") + } + + kafkaCluster := &kafkav1beta2.Kafka{} + err = c.Get(context.TODO(), types.NamespacedName{ + Name: KAFKA_CLUSTER, + Namespace: KAFKA_NAMESPACE, + }, kafkaCluster) + if err != nil { + return nil, err + } + + bootstrapServer := *kafkaCluster.Status.Listeners[1].BootstrapServers + + kafkaUserSecret := &corev1.Secret{} + err = c.Get(context.TODO(), types.NamespacedName{ + Name: KAFkA_USER, + Namespace: KAFKA_NAMESPACE, + }, kafkaUserSecret) + if err != nil { + return nil, err + } + + caCrtPath := "/tmp/ca.crt" + err = os.WriteFile(caCrtPath, []byte(kafkaCluster.Status.Listeners[1].Certificates[0]), 0o600) + if err != nil { + log.Fatalf("failed to write ca.crt: %v", err) + return nil, err + } + + clientCrtPath := "/tmp/client.crt" + err = os.WriteFile(clientCrtPath, kafkaUserSecret.Data["user.crt"], 0o600) + if err != nil { + log.Fatalf("failed to write client.crt: %v", err) + return nil, err + } + + clientKeyPath := "/tmp/client.key" + err = os.WriteFile(clientKeyPath, kafkaUserSecret.Data["user.key"], 0o600) + if err != nil { + log.Fatalf("failed to write client.key: %v", err) + return nil, err + } + + kafkaConfig := &transport.KafkaConfig{ + BootstrapServer: bootstrapServer, + EnableTLS: true, + CaCertPath: caCrtPath, + ClientCertPath: clientCrtPath, + ClientKeyPath: clientKeyPath, + } + configMap, err := config.GetConfluentConfigMap(kafkaConfig, isProducer) if err != nil { log.Fatalf("failed to get confluent config map: %v", err) return nil, err diff --git a/samples/config/kubeconfig.go b/samples/config/kubeconfig.go new file mode 100644 index 0000000000..9ba73d3a1a --- /dev/null +++ b/samples/config/kubeconfig.go @@ -0,0 +1,22 @@ +package config + +import ( + "os" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client/config" +) + +const EnvKubconfig = "KUBECONFIG" + +func loadDynamicKubeConfig(envVar string) (*rest.Config, error) { + kubeconfigPath := os.Getenv(envVar) + if kubeconfigPath != "" { + // Load kubeconfig from the specified path + return clientcmd.BuildConfigFromFlags("", kubeconfigPath) + } + + // Use the in-cluster configuration + return config.GetConfig() +} diff --git a/samples/config/sarama_config.go b/samples/config/sarama_config.go index 4dbe0615d0..d985cdce35 100644 --- a/samples/config/sarama_config.go +++ b/samples/config/sarama_config.go @@ -1,10 +1,17 @@ package config import ( + "context" "crypto/tls" "crypto/x509" + "fmt" + kafkav1beta2 "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2" "github.com/Shopify/sarama" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" ) func GetSaramaConfig() (string, *sarama.Config, error) { @@ -52,3 +59,65 @@ func GetSaramaConfig() (string, *sarama.Config, error) { return string(bootstrapSever), saramaConfig, nil } + +func GetSaramaConfigFromKafkaUser() (string, *sarama.Config, error) { + userName := KAFkA_USER + kubeconfig, err := loadDynamicKubeConfig(EnvKubconfig) + if err != nil { + return "", nil, fmt.Errorf("failed to get kubeconfig") + } + + kafkav1beta2.AddToScheme(scheme.Scheme) + c, err := client.New(kubeconfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return "", nil, fmt.Errorf("failed to get runtime client") + } + + // #nosec G402 + tlsConfig := &tls.Config{} + + kafkaCluster := &kafkav1beta2.Kafka{} + err = c.Get(context.TODO(), types.NamespacedName{ + Name: KAFKA_CLUSTER, + Namespace: KAFKA_NAMESPACE, + }, kafkaCluster) + if err != nil { + return "", nil, err + } + + bootstrapServer := *kafkaCluster.Status.Listeners[1].BootstrapServers + + // Load CA cert + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM([]byte(kafkaCluster.Status.Listeners[1].Certificates[0])) + tlsConfig.RootCAs = caCertPool + + kafkaUserSecret := &corev1.Secret{} + err = c.Get(context.TODO(), types.NamespacedName{ + Name: userName, + Namespace: KAFKA_NAMESPACE, + }, kafkaUserSecret) + if err != nil { + return "", nil, err + } + + // Load client cert + if len(kafkaUserSecret.Data["user.crt"]) > 0 && len(kafkaUserSecret.Data["user.key"]) > 0 { + cert, err := tls.X509KeyPair(kafkaUserSecret.Data["user.crt"], kafkaUserSecret.Data["user.key"]) + if err != nil { + return "", nil, err + } + tlsConfig.Certificates = []tls.Certificate{cert} + tlsConfig.InsecureSkipVerify = false + } else { + // #nosec + tlsConfig.InsecureSkipVerify = true + } + + saramaConfig := sarama.NewConfig() + saramaConfig.Version = sarama.V2_0_0_0 + saramaConfig.Net.TLS.Config = tlsConfig + saramaConfig.Net.TLS.Enable = true + + return bootstrapServer, saramaConfig, nil +} diff --git a/samples/kafka/confluent_consumer/main.go b/samples/kafka/confluent_consumer/main.go index d852d45315..f90cc9c538 100644 --- a/samples/kafka/confluent_consumer/main.go +++ b/samples/kafka/confluent_consumer/main.go @@ -14,10 +14,8 @@ import ( ) const ( - topic = "spec" - messageCount = 10 - consumerId = "test-consumer" - pollTimeoutMs = 100 + messageCount = 10 + consumerId = "consumer-group-1" ) var kafkaMessages = make([]*kafka.Message, 0) @@ -26,138 +24,165 @@ func main() { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL) - kafkaConfigMap, err := config.GetConfluentConfigMap() + if len(os.Args) < 2 { + fmt.Println("Please provide at least one topic command-line argument.") + os.Exit(1) + } + topic := os.Args[1] + + // kafkaConfigMap, err := config.GetConfluentConfigMap() + kafkaConfigMap, err := config.GetConfluentConfigMapByKafkaUser(false) if err != nil { log.Fatalf("failed to get kafka config map: %v", err) } _ = kafkaConfigMap.SetKey("client.id", consumerId) _ = kafkaConfigMap.SetKey("group.id", consumerId) - // kafkaConfigMap.SetKey("auto.offset.reset", "earliest") - // kafkaConfigMap.SetKey("enable.auto.commit", "false") + _ = kafkaConfigMap.SetKey("auto.offset.reset", "earliest") + _ = kafkaConfigMap.SetKey("enable.auto.commit", "true") consumer, err := kafka.NewConsumer(kafkaConfigMap) if err != nil { log.Fatalf("failed to create kafka consumer: %v", err) } - messageChan := make(chan *kafka.Message) - if err := consumer.SubscribeTopics([]string{topic}, nil); err != nil { + + log.Printf(">> subscribe topic %s", topic) + if err := consumer.SubscribeTopics([]string{topic}, rebalanceCallback); err != nil { log.Fatalf("failed to subscribe topic: %v", err) } ctx, cancel := context.WithCancel(context.Background()) go func() { for { select { - case sig := <-signals: - log.Printf("received signal: %v", sig) + case <-ctx.Done(): _ = consumer.Unsubscribe() log.Printf("unsubscribed topic: %s", topic) - cancel() return default: - msg, err := consumer.ReadMessage(pollTimeoutMs) - if err != nil && err.(kafka.Error).Code() != kafka.ErrTimedOut { - log.Fatalf("failed to read message: %v", err) + ev := consumer.Poll(100) + if ev == nil { continue } - if msg == nil { - // log.Println("msg is nil") - continue + if err := processEvent(consumer, ev); err != nil { + log.Printf("## failed to process event: %s \n", ev) } - messageChan <- msg + } } }() // committer := NewCommitter(5*time.Second, topic, consumer) - committer := NewCommitter(5*time.Second, topic, consumer, getKafkaMessages) - committer.start(ctx) - for { - select { - case <-ctx.Done(): - log.Println("context is done") - return - case msg := <-messageChan: - log.Printf("Received message: partition=%d offset=%d val=%s\n", msg.TopicPartition.Partition, - msg.TopicPartition.Offset, msg.Value) - if offset := msg.TopicPartition.Offset; offset%2 == 0 { - kafkaMessages = append(kafkaMessages, msg) - } - } - } + // committer := NewCommitter(5*time.Second, topic, consumer, getKafkaMessages) + // committer.start(ctx) + + sig := <-signals + log.Printf("got signal: %s\n", sig.String()) + cancel() + log.Println("context is done") + + // graceful shutdown + time.Sleep(1 * time.Second) + log.Printf("exit main") + os.Exit(0) } -func getKafkaMessages() []*kafka.Message { - return kafkaMessages -} +// processEvent processes the message/error received from the kafka Consumer's +// Poll() method. +func processEvent(c *kafka.Consumer, ev kafka.Event) error { + switch e := ev.(type) { -type committer struct { - topic string - client *kafka.Consumer - latestCommitted map[int32]kafka.Offset // map of partition -> offset - interval time.Duration - messageFunc func() []*kafka.Message -} + case *kafka.Message: + if e.TopicPartition.Error != nil { + log.Printf("failed message on %s [%d %v]: %v\n", *e.TopicPartition.Topic, e.TopicPartition.Partition, e.TopicPartition.Offset, e.TopicPartition.Error) + } else { + log.Printf("received message on %s [%d %v]: %s\n", *e.TopicPartition.Topic, e.TopicPartition.Partition, e.TopicPartition.Offset, e.Value) + } -// NewCommitter returns a new instance of committer. -func NewCommitter(committerInterval time.Duration, topic string, client *kafka.Consumer, - messageFunc func() []*kafka.Message, -) *committer { - return &committer{ - topic: topic, - client: client, - latestCommitted: make(map[int32]kafka.Offset), - interval: committerInterval, - messageFunc: messageFunc, + // https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/consumer_rebalance_example/consumer_rebalance_example.go + // // Handle manual commit since enable.auto.commit is unset. + // if err := maybeCommit(c, e.TopicPartition); err != nil { + // return err + // } + + case kafka.Error: + // Errors should generally be considered informational, the client + // will try to automatically recover. + return fmt.Errorf("kafka error with %v", e) + default: + // log.Printf("ignored event %v\n", e) } -} -func (c *committer) start(ctx context.Context) { - go c.periodicCommit(ctx) + return nil } -func (c *committer) periodicCommit(ctx context.Context) { - ticker := time.NewTicker(c.interval) - for { - select { - case <-ctx.Done(): - return - - case <-ticker.C: // wait for next time interval - messages := c.messageFunc() - if err := c.commitOffsets(messages); err != nil { - log.Printf("failed to commit offsets: %v", err) - } - } - } +func getKafkaMessages() []*kafka.Message { + return kafkaMessages } -// commitOffsets commits the given offsets per partition mapped. -func (c *committer) commitOffsets(messages []*kafka.Message) error { - for _, msg := range messages { - partition := msg.TopicPartition.Partition - offset := msg.TopicPartition.Offset - // skip request if already committed this offset - if committedOffset, found := c.latestCommitted[partition]; found { - if committedOffset >= offset { - continue - } +// rebalanceCallback is called on each group rebalance to assign additional +// partitions, or remove existing partitions, from the consumer's current +// assignment. +// +// A rebalance occurs when a consumer joins or leaves a consumer group, if it +// changes the topic(s) it's subscribed to, or if there's a change in one of +// the topics it's subscribed to, for example, the total number of partitions +// increases. +// +// The application may use this optional callback to inspect the assignment, +// alter the initial start offset (the .Offset field of each assigned partition), +// and read/write offsets to commit to an alternative store outside of Kafka. +func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error { + switch ev := event.(type) { + case kafka.AssignedPartitions: + // log.Printf("%% %s rebalance: %d new partition(s) assigned: %v\n", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) + + log.Printf("%% %s rebalance: %d new partition(s) assigned\n", c.GetRebalanceProtocol(), len(ev.Partitions)) + + // The application may update the start .Offset of each assigned + // partition and then call Assign(). It is optional to call Assign + // in case the application is not modifying any start .Offsets. In + // that case we don't, the library takes care of it. + // It is called here despite not modifying any .Offsets for illustrative + // purposes. + err := c.Assign(ev.Partitions) + if err != nil { + return err } - topicPartition := kafka.TopicPartition{ - Topic: &c.topic, - Partition: partition, - Offset: offset, + case kafka.RevokedPartitions: + // log.Printf("%% %s rebalance: %d partition(s) revoked: %v\n", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) + + log.Printf("%% %s rebalance: %d partition(s) revoked\n", c.GetRebalanceProtocol(), len(ev.Partitions)) + + // Usually, the rebalance callback for `RevokedPartitions` is called + // just before the partitions are revoked. We can be certain that a + // partition being revoked is not yet owned by any other consumer. + // This way, logic like storing any pending offsets or committing + // offsets can be handled. + // However, there can be cases where the assignment is lost + // involuntarily. In this case, the partition might already be owned + // by another consumer, and operations including committing + // offsets may not work. + if c.AssignmentLost() { + // Our consumer has been kicked out of the group and the + // entire assignment is thus lost. + fmt.Fprintln(os.Stderr, "Assignment lost involuntarily, commit may fail") } - if _, err := c.client.CommitOffsets([]kafka.TopicPartition{ - topicPartition, - }); err != nil { - return fmt.Errorf("failed to commit offset, stopping bulk commit - %w", err) + // Since enable.auto.commit is unset, we need to commit offsets manually + // before the partition is revoked. + commitedOffsets, err := c.Commit() + + if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset { + fmt.Fprintf(os.Stderr, "Failed to commit offsets: %s\n", err) + return err } + fmt.Printf("%% Commited offsets to Kafka: %v\n", commitedOffsets) + + // Similar to Assign, client automatically calls Unassign() unless the + // callback has already called that method. Here, we don't call it. - log.Printf("committed topic %s, partition %d, offset %d", c.topic, partition, offset) - // update commitsMap - c.latestCommitted[partition] = offset + default: + fmt.Fprintf(os.Stderr, "Unxpected event type: %v\n", event) } return nil diff --git a/samples/kafka/confluent_consumer/offset_committer.go b/samples/kafka/confluent_consumer/offset_committer.go new file mode 100644 index 0000000000..3fbb564d14 --- /dev/null +++ b/samples/kafka/confluent_consumer/offset_committer.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +type committer struct { + topic string + client *kafka.Consumer + latestCommitted map[int32]kafka.Offset // map of partition -> offset + interval time.Duration + messageFunc func() []*kafka.Message +} + +// NewCommitter returns a new instance of committer. +func NewCommitter(committerInterval time.Duration, topic string, client *kafka.Consumer, + messageFunc func() []*kafka.Message, +) *committer { + return &committer{ + topic: topic, + client: client, + latestCommitted: make(map[int32]kafka.Offset), + interval: committerInterval, + messageFunc: messageFunc, + } +} + +func (c *committer) start(ctx context.Context) { + go c.periodicCommit(ctx) +} + +func (c *committer) periodicCommit(ctx context.Context) { + ticker := time.NewTicker(c.interval) + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: // wait for next time interval + messages := c.messageFunc() + if err := c.commitOffsets(messages); err != nil { + log.Printf("failed to commit offsets: %v", err) + } + } + } +} + +// commitOffsets commits the given offsets per partition mapped. +func (c *committer) commitOffsets(messages []*kafka.Message) error { + for _, msg := range messages { + partition := msg.TopicPartition.Partition + offset := msg.TopicPartition.Offset + // skip request if already committed this offset + if committedOffset, found := c.latestCommitted[partition]; found { + if committedOffset >= offset { + continue + } + } + + topicPartition := kafka.TopicPartition{ + Topic: &c.topic, + Partition: partition, + Offset: offset, + } + + if _, err := c.client.CommitOffsets([]kafka.TopicPartition{ + topicPartition, + }); err != nil { + return fmt.Errorf("failed to commit offset, stopping bulk commit - %w", err) + } + + log.Printf("committed topic %s, partition %d, offset %d", c.topic, partition, offset) + // update commitsMap + c.latestCommitted[partition] = offset + } + + return nil +} diff --git a/samples/kafka/confluent_producer/main.go b/samples/kafka/confluent_producer/main.go index 0ca3bc734b..40777b3597 100644 --- a/samples/kafka/confluent_producer/main.go +++ b/samples/kafka/confluent_producer/main.go @@ -6,14 +6,14 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/stolostron/multicluster-global-hub/samples/config" ) var ( - topic = "spec" - messageCount = 10 + messageCount = 5 producerId = "test-producer" ) @@ -21,49 +21,83 @@ func main() { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL) - deliveryChan := make(chan kafka.Event) + if len(os.Args) < 2 { + fmt.Println("Please provide at least one topic command-line argument.") + os.Exit(1) + } + topic := os.Args[1] - kafkaConfigMap, err := config.GetConfluentConfigMap() + // kafkaConfigMap, err := config.GetConfluentConfigMap() + kafkaConfigMap, err := config.GetConfluentConfigMapByKafkaUser(true) if err != nil { log.Fatalf("failed to get kafka config map: %v", err) } _ = kafkaConfigMap.SetKey("client.id", producerId) - // kafkaConfigMap.SetKey("acks", "1") - // kafkaConfigMap.SetKey("retries", "0") producer, err := kafka.NewProducer(kafkaConfigMap) if err != nil { log.Fatalf("failed to create kafka producer: %v", err) } + // Listen to all the events on the default events channel + go func() { + count := 0 + for e := range producer.Events() { + switch ev := e.(type) { + case *kafka.Message: + // The message delivery report, indicating success or + // permanent failure after retries have been exhausted. + // Application level retries won't help since the client + // is already configured to do that. + m := ev + if m.TopicPartition.Error != nil { + log.Printf("delivery failed: %v\n", m.TopicPartition.Error) + } else { + log.Printf("delivered message to topic %s [%d] at offset %v\n", + *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + } + count++ + if count == messageCount { + signals <- syscall.SIGKILL + return + } + case kafka.Error: + // Generic client instance-level errors, such as + // broker connection failures, authentication issues, etc. + // + // These errors should generally be considered informational + // as the underlying client will automatically try to + // recover from any errors encountered, the application + // does not need to take action on them. + log.Printf("kafka error: %v\n", ev) + default: + log.Printf("ignored event: %s\n", ev) + } + } + }() + for i := 0; i < messageCount; i++ { - value := fmt.Sprintf("message-%d", i) + value := fmt.Sprintf("message-%s", topic) err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0}, Value: []byte(value), Key: []byte("key"), Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, - }, deliveryChan) + }, nil) if err != nil { - log.Fatalf("failed to produce message: %v", err) - } - } - - // waiting for the end of all messages sent or an OS signal - for i := 0; i < messageCount*2; i++ { - select { - case e := <-deliveryChan: - kafkaMessage, ok := e.(*kafka.Message) - if !ok { - log.Printf("Failed to cast kafka message: %v\n", e) + if err.(kafka.Error).Code() == kafka.ErrQueueFull { + // Producer queue is full, wait 1s for messages + // to be delivered then try again. + time.Sleep(time.Second) continue } - // the offset - log.Printf("Finished to send: partition=%d offset=%d val=%s\n", kafkaMessage.TopicPartition.Partition, - kafkaMessage.TopicPartition.Offset, kafkaMessage.Value) - case sig := <-signals: - log.Printf("Got signal: %v\n", sig) - return + log.Fatalf("failed to produce message: %v\n", err) } } + + sig := <-signals + log.Printf("got signal: %v\n", sig) + producer.Close() + log.Println("close producer") + time.Sleep(1 * time.Second) } diff --git a/samples/kafka/saram_consumer/main.go b/samples/kafka/saram_consumer/main.go index 16e7b90cb7..b3fd0f810b 100644 --- a/samples/kafka/saram_consumer/main.go +++ b/samples/kafka/saram_consumer/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "log" "os" "os/signal" @@ -13,15 +14,21 @@ import ( ) const ( - TopicDefault = "event" GroupIDDefault = "my-group" ) func main() { + if len(os.Args) < 2 { + fmt.Println("Please provide at least one topic command-line argument.") + os.Exit(1) + } + topic := os.Args[1] + signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL) - bootstrapSever, saramaConfig, err := config.GetSaramaConfig() + // bootstrapSever, saramaConfig, err := config.GetSaramaConfig() + bootstrapServer, saramaConfig, err := config.GetSaramaConfigFromKafkaUser() if err != nil { log.Panicf("Error getting the consumer config: %v", err) os.Exit(1) @@ -30,7 +37,7 @@ func main() { saramaConfig.Consumer.Offsets.AutoCommit.Enable = false saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest - consumerGroup, err := sarama.NewConsumerGroup([]string{string(bootstrapSever)}, GroupIDDefault, saramaConfig) + consumerGroup, err := sarama.NewConsumerGroup([]string{string(bootstrapServer)}, GroupIDDefault, saramaConfig) if err != nil { log.Printf("Error creating the Sarama consumer: %v", err) os.Exit(1) @@ -45,7 +52,7 @@ func main() { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims - if err := consumerGroup.Consume(ctx, []string{TopicDefault}, &consumerGroupHandler{}); err != nil { + if err := consumerGroup.Consume(ctx, []string{topic}, &consumerGroupHandler{}); err != nil { log.Printf("Error from consumer: %v", err) } diff --git a/samples/kafka/saram_producer/main.go b/samples/kafka/saram_producer/main.go index b249c4a341..545d3233a8 100644 --- a/samples/kafka/saram_producer/main.go +++ b/samples/kafka/saram_producer/main.go @@ -13,7 +13,6 @@ import ( ) const ( - TopicDefault = "status" DelayDefault = 1000 MessageDefault = "Hello from Go Kafka Sarama" MessageCountDefault = 10 @@ -21,10 +20,17 @@ const ( ) func main() { + if len(os.Args) < 2 { + fmt.Println("Please provide at least one topic command-line argument.") + os.Exit(1) + } + topic := os.Args[1] + signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL) - bootstrapSever, saramaConfig, err := config.GetSaramaConfig() + // bootstrapSever, saramaConfig, err := config.GetSaramaConfig() + bootstrapServer, saramaConfig, err := config.GetSaramaConfigFromKafkaUser() if err != nil { fmt.Printf("Error getting producer config: %v\n", err) os.Exit(1) @@ -33,7 +39,7 @@ func main() { saramaConfig.Producer.Return.Successes = true saramaConfig.Producer.MaxMessageBytes = 1024 * 1000 // 1024KB - producer, err := sarama.NewSyncProducer([]string{string(bootstrapSever)}, saramaConfig) + producer, err := sarama.NewSyncProducer([]string{string(bootstrapServer)}, saramaConfig) if err != nil { log.Printf("Error creating the Sarama sync producer: %v", err) os.Exit(1) @@ -42,17 +48,17 @@ func main() { end := make(chan int, 1) go func() { for i := 0; i < MessageCountDefault; i++ { - value := fmt.Sprintf("%s-%d", "hello", int64(i)) + value := fmt.Sprintf("%s-%d", topic, int64(i)) msg := &sarama.ProducerMessage{ - Topic: TopicDefault, + Topic: topic, Value: sarama.StringEncoder(value), Key: sarama.StringEncoder("key"), } partition, offset, err := producer.SendMessage(msg) if err != nil { - log.Printf("Erros sending message: %v\n", err) + log.Printf("error for sending message: %v\n", err) } else { - log.Printf("Message sent: partition=%d, offset=%d, msg=%s\n", partition, offset, msg.Value) + log.Printf("message sent: partition=%d, offset=%d, msg=%s\n", partition, offset, msg.Value) } // sleep before next message or avoid sleeping @@ -68,9 +74,9 @@ func main() { // waiting for the end of all messages sent or an OS signal select { case <-end: - log.Printf("Finished to send %d messages\n", MessageCountDefault) + log.Printf("finished to send %d messages\n", MessageCountDefault) case sig := <-signals: - log.Printf("Got signal: %v\n", sig) + log.Printf("got signal: %v\n", sig) } err = producer.Close() @@ -78,5 +84,5 @@ func main() { log.Printf("Error closing the Sarama sync producer: %v", err) os.Exit(1) } - log.Printf("Producer closed") + log.Printf("producer closed") }