Skip to content

Commit

Permalink
Merge branch 'feature/kip430' into dev_add-topic-id-to-describe-group…
Browse files Browse the repository at this point in the history
…-response
  • Loading branch information
pranavrth authored Oct 12, 2023
2 parents 34f496c + 98bc49c commit 452f402
Show file tree
Hide file tree
Showing 8 changed files with 495 additions and 345 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ This is a feature release.
* [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in Describe Responses.
(#964, @jainruchir).

## Fixes

* MockCluster can now be shutdown and started again to test broker
availability problems (#998, @kkoehler).
* Fixes a bug in the mock schema registry client where the wrong ID was being
returned for pre-registered schema (#971, @srlk).
* Adds `CreateTopic` method to the MockCluster. (#1047, @mimikwang).
* Fixes an issue where `testing` was being imported by a non-test file,
testhelpers.go. (@dmlambea, #1049).


# v2.2.0
Expand Down
1 change: 1 addition & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ legacy/consumer_channel_example/consumer_channel_example
legacy/producer_channel_example/producer_channel_example
library-version/library-version
mockcluster_example/mockcluster_example
mockcluster_example/mockcluster_failure_example
oauthbearer_consumer_example/oauthbearer_consumer_example
oauthbearer_oidc_example/oauthbearer_oidc_example
oauthbearer_producer_example/oauthbearer_producer_example
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ Examples
[library-version](library-version) - Show the library version

[mockcluster_example](mockcluster_example) - Use a mock cluster for testing

[mockcluster_failure_example](mockcluster_failure_example) - Use a mock cluster for failure testing

[oauthbearer_consumer_example](oauthbearer_consumer_example) - Unsecured SASL/OAUTHBEARER consumer example

Expand Down
130 changes: 130 additions & 0 deletions examples/mockcluster_failure_example/mockcluster_failure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Demonstrates failure modes for mock cluster:
// 1. RTT Duration set to more than Delivery Timeout
// 2. Broker set as being down.
package main

import (
"fmt"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

mockCluster, err := kafka.NewMockCluster(1)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create MockCluster: %s\n", err)
os.Exit(1)
}
defer mockCluster.Close()

// Set RTT > Delivery Timeout
err = mockCluster.SetRoundtripDuration(1, 4*time.Second)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not configure roundtrip duration: %v", err)
return
}
broker := mockCluster.BootstrapServers()

p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"delivery.timeout.ms": 1000,
})

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create producer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Producer %v\n", p)

m, err := sendTestMsg(p)

if m.TopicPartition.Error != nil {
fmt.Printf("EXPECTED: Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Fprintf(os.Stderr, "Message should timeout because of broker configuration")
return
}

fmt.Println("'reset' broker roundtrip duration")
err = mockCluster.SetRoundtripDuration(1, 10*time.Millisecond)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not configure roundtrip duration: %v", err)
return
}

// See what happens when broker is down.
fmt.Println("Set broker down")
err = mockCluster.SetBrokerDown(1)
if err != nil {
fmt.Fprintf(os.Stderr, "Broker should now be down but got error: %v", err)
return
}

m, err = sendTestMsg(p)

if m.TopicPartition.Error != nil {
fmt.Printf("EXPECTED: Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Fprintf(os.Stderr, "Message should timeout because of broker configuration")
return
}

// Bring the broker up again.
fmt.Println("Set broker up again")
err = mockCluster.SetBrokerUp(1)
if err != nil {
fmt.Fprintf(os.Stderr, "Broker should now be up again but got error: %v", err)
return
}

m, err = sendTestMsg(p)
if err != nil {
fmt.Fprintf(os.Stderr, "There shouldn't be an error but got: %v", err)
return
}

fmt.Println("Message was sent!")

}

func sendTestMsg(p *kafka.Producer) (*kafka.Message, error) {

topic := "Test"
value := "Hello Go!"

deliveryChan := make(chan kafka.Event)
defer close(deliveryChan)

err := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)

if err != nil {
return nil, err
}

e := <-deliveryChan
return e.(*kafka.Message), nil
}
37 changes: 36 additions & 1 deletion kafka/mockcluster.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2022 Confluent Inc.
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,7 @@ func (mc *MockCluster) BootstrapServers() string {
}

// SetRoundtripDuration sets the broker round-trip-time delay for the given broker.
// Use brokerID -1 for all brokers, or >= 0 for a specific broker.
func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration) error {
durationInMillis := C.int(duration.Milliseconds())
cError := C.rd_kafka_mock_broker_set_rtt(mc.mcluster, C.int(brokerID), durationInMillis)
Expand All @@ -92,6 +93,40 @@ func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration
return nil
}

// SetBrokerDown disconnects the broker and disallows any new connections.
// This does NOT trigger leader change.
// Use brokerID -1 for all brokers, or >= 0 for a specific broker.
func (mc *MockCluster) SetBrokerDown(brokerID int) error {
cError := C.rd_kafka_mock_broker_set_down(mc.mcluster, C.int(brokerID))
if cError != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(cError)
}
return nil
}

// SetBrokerUp makes the broker accept connections again.
// This does NOT trigger leader change.
// Use brokerID -1 for all brokers, or >= 0 for a specific broker.
func (mc *MockCluster) SetBrokerUp(brokerID int) error {
cError := C.rd_kafka_mock_broker_set_up(mc.mcluster, C.int(brokerID))
if cError != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(cError)
}
return nil
}

// CreateTopic creates a topic without having to use a producer
func (mc *MockCluster) CreateTopic(topic string, partitions, replicationFactor int) error {
topicStr := C.CString(topic)
defer C.free(unsafe.Pointer(topicStr))

cError := C.rd_kafka_mock_topic_create(mc.mcluster, topicStr, C.int(partitions), C.int(replicationFactor))
if cError != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(cError)
}
return nil
}

// Close and destroy the MockCluster
func (mc *MockCluster) Close() {
C.rd_kafka_mock_cluster_destroy(mc.mcluster)
Expand Down
Loading

0 comments on commit 452f402

Please sign in to comment.