Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(test): ensure MockBroker closed within test #2575

Merged
merged 2 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,7 @@ func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics broke

func BenchmarkBroker_Open(b *testing.B) {
mb := NewMockBroker(nil, 0)
defer mb.Close()
broker := NewBroker(mb.Addr())
// Set the broker id in order to validate local broker metrics
broker.id = 0
Expand All @@ -1422,6 +1423,7 @@ func BenchmarkBroker_Open(b *testing.B) {

func BenchmarkBroker_No_Metrics_Open(b *testing.B) {
mb := NewMockBroker(nil, 0)
defer mb.Close()
broker := NewBroker(mb.Addr())
broker.id = 0
metrics.UseNilMetrics = true
Expand All @@ -1438,6 +1440,7 @@ func BenchmarkBroker_No_Metrics_Open(b *testing.B) {

func Test_handleThrottledResponse(t *testing.T) {
mb := NewMockBroker(nil, 0)
defer mb.Close()
broker := NewBroker(mb.Addr())
broker.id = 0
conf := NewTestConfig()
Expand Down
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,7 @@ func TestInitProducerIDConnectionRefused(t *testing.T) {

func TestMetricsCleanup(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
seedBroker.Returns(new(MetadataResponse))

config := NewTestConfig()
Expand Down
1 change: 1 addition & 0 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestConsumerGroupSessionDoesNotRetryForever(t *testing.T) {
config.Consumer.Group.Rebalance.Retry.Backoff = 0

broker0 := NewMockBroker(t, 0)
defer broker0.Close()

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
Expand Down
67 changes: 34 additions & 33 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"time"
)

func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {
func initOffsetManagerWithBackoffFunc(
t *testing.T,
retention time.Duration,
backoffFunc func(retries, maxRetries int) time.Duration, config *Config,
) (om OffsetManager, testClient Client, broker, coordinator *MockBroker) {
config.Metadata.Retry.Max = 1
if backoffFunc != nil {
config.Metadata.Retry.BackoffFunc = backoffFunc
Expand Down Expand Up @@ -50,12 +52,14 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
}

func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {
testClient Client, broker, coordinator *MockBroker,
) {
return initOffsetManagerWithBackoffFunc(t, retention, nil, NewTestConfig())
}

func initPartitionOffsetManager(t *testing.T, om OffsetManager,
coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
coordinator *MockBroker, initialOffset int64, metadata string,
) PartitionOffsetManager {
fetchResponse := new(OffsetFetchResponse)
fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
Err: ErrNoError,
Expand Down Expand Up @@ -127,6 +131,8 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
config.Consumer.Offsets.AutoCommit.Enable = tt.enable
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

// Wait long enough for the test not to fail..
Expand Down Expand Up @@ -160,9 +166,6 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
}
}

broker.Close()
coordinator.Close()

// !! om must be closed before the pom so pom.release() is called before pom.Close()
safeClose(t, om)
safeClose(t, pom)
Expand All @@ -177,6 +180,8 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
config.Consumer.Offsets.AutoCommit.Enable = false

om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

// Wait long enough for the test not to fail..
Expand Down Expand Up @@ -219,10 +224,6 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
t.Errorf("No request received for after waiting for %v", timeout)
}

// Close up
broker.Close()
coordinator.Close()

// !! om must be closed before the pom so pom.release() is called before pom.Close()
safeClose(t, om)
safeClose(t, pom)
Expand All @@ -233,6 +234,8 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
// on first fetchInitialOffset call
func TestOffsetManagerFetchInitialFail(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()

// Error on first fetchInitialOffset call
responseBlock := OffsetFetchResponseBlock{
Expand All @@ -247,6 +250,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {

// Refresh coordinator
newCoordinator := NewMockBroker(t, 3)
defer newCoordinator.Close()
broker.Returns(&ConsumerMetadataResponse{
CoordinatorID: newCoordinator.BrokerID(),
CoordinatorHost: "127.0.0.1",
Expand All @@ -265,9 +269,6 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {
t.Error(err)
}

broker.Close()
coordinator.Close()
newCoordinator.Close()
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
Expand All @@ -281,6 +282,8 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
return 0
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewTestConfig())
defer broker.Close()
defer coordinator.Close()

// Error on first fetchInitialOffset call
responseBlock := OffsetFetchResponseBlock{
Expand All @@ -305,8 +308,6 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
t.Error(err)
}

broker.Close()
coordinator.Close()
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
Expand All @@ -318,6 +319,8 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {

func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
testClient.Config().Consumer.Offsets.Initial = OffsetOldest

// Kafka returns -1 if no offset has been stored for this partition yet.
Expand All @@ -333,13 +336,13 @@ func TestPartitionOffsetManagerInitialOffset(t *testing.T) {

safeClose(t, pom)
safeClose(t, om)
broker.Close()
coordinator.Close()
safeClose(t, testClient)
}

func TestPartitionOffsetManagerNextOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")

offset, meta := pom.NextOffset()
Expand All @@ -352,13 +355,13 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {

safeClose(t, pom)
safeClose(t, om)
broker.Close()
coordinator.Close()
safeClose(t, testClient)
}

func TestPartitionOffsetManagerResetOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
Expand All @@ -379,12 +382,12 @@ func TestPartitionOffsetManagerResetOffset(t *testing.T) {
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
Expand Down Expand Up @@ -415,12 +418,12 @@ func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
Expand All @@ -440,12 +443,12 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
Expand Down Expand Up @@ -475,12 +478,12 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerCommitErr(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
defer coordinator.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")

// Error on one partition
Expand All @@ -490,6 +493,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
coordinator.Returns(ocResponse)

newCoordinator := NewMockBroker(t, 3)
defer newCoordinator.Close()

// For RefreshCoordinator()
broker.Returns(&ConsumerMetadataResponse{
Expand Down Expand Up @@ -535,16 +539,14 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
t.Error(err)
}

broker.Close()
coordinator.Close()
newCoordinator.Close()
safeClose(t, om)
safeClose(t, testClient)
}

// Test of recovery from abort
func TestAbortPartitionOffsetManager(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t, 0)
defer broker.Close()
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")

// this triggers an error in the CommitOffset request,
Expand All @@ -568,6 +570,5 @@ func TestAbortPartitionOffsetManager(t *testing.T) {

safeClose(t, pom)
safeClose(t, om)
broker.Close()
safeClose(t, testClient)
}
19 changes: 19 additions & 0 deletions sarama_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//go:build !functional
// +build !functional

package sarama

import (
"flag"
"log"
"os"
"testing"
)

func TestMain(m *testing.M) {
flag.Parse()
if f := flag.Lookup("test.v"); f != nil && f.Value.String() == "true" {
Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)
}
os.Exit(m.Run())
}