diff --git a/broker_test.go b/broker_test.go index 928e19b3e..6e6b8480c 100644 --- a/broker_test.go +++ b/broker_test.go @@ -1405,6 +1405,7 @@ func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics broke func BenchmarkBroker_Open(b *testing.B) { mb := NewMockBroker(nil, 0) + defer mb.Close() broker := NewBroker(mb.Addr()) // Set the broker id in order to validate local broker metrics broker.id = 0 @@ -1422,6 +1423,7 @@ func BenchmarkBroker_Open(b *testing.B) { func BenchmarkBroker_No_Metrics_Open(b *testing.B) { mb := NewMockBroker(nil, 0) + defer mb.Close() broker := NewBroker(mb.Addr()) broker.id = 0 metrics.UseNilMetrics = true @@ -1438,6 +1440,7 @@ func BenchmarkBroker_No_Metrics_Open(b *testing.B) { func Test_handleThrottledResponse(t *testing.T) { mb := NewMockBroker(nil, 0) + defer mb.Close() broker := NewBroker(mb.Addr()) broker.id = 0 conf := NewTestConfig() diff --git a/client_test.go b/client_test.go index b416fbe02..4bb70a78a 100644 --- a/client_test.go +++ b/client_test.go @@ -1101,6 +1101,7 @@ func TestInitProducerIDConnectionRefused(t *testing.T) { func TestMetricsCleanup(t *testing.T) { seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() seedBroker.Returns(new(MetadataResponse)) config := NewTestConfig() diff --git a/consumer_group_test.go b/consumer_group_test.go index 2e9d180e9..85b8108e3 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -218,6 +218,7 @@ func TestConsumerGroupSessionDoesNotRetryForever(t *testing.T) { config.Consumer.Group.Rebalance.Retry.Backoff = 0 broker0 := NewMockBroker(t, 0) + defer broker0.Close() broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). diff --git a/offset_manager_test.go b/offset_manager_test.go index b8a406ce0..af95bc9e4 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -7,9 +7,11 @@ import ( "time" ) -func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, - backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager, - testClient Client, broker, coordinator *MockBroker) { +func initOffsetManagerWithBackoffFunc( + t *testing.T, + retention time.Duration, + backoffFunc func(retries, maxRetries int) time.Duration, config *Config, +) (om OffsetManager, testClient Client, broker, coordinator *MockBroker) { config.Metadata.Retry.Max = 1 if backoffFunc != nil { config.Metadata.Retry.BackoffFunc = backoffFunc @@ -50,12 +52,14 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration, } func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager, - testClient Client, broker, coordinator *MockBroker) { + testClient Client, broker, coordinator *MockBroker, +) { return initOffsetManagerWithBackoffFunc(t, retention, nil, NewTestConfig()) } func initPartitionOffsetManager(t *testing.T, om OffsetManager, - coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager { + coordinator *MockBroker, initialOffset int64, metadata string, +) PartitionOffsetManager { fetchResponse := new(OffsetFetchResponse) fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{ Err: ErrNoError, @@ -127,6 +131,8 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { config.Consumer.Offsets.AutoCommit.Enable = tt.enable } om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) + defer broker.Close() + defer coordinator.Close() pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") // Wait long enough for the test not to fail.. @@ -160,9 +166,6 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { } } - broker.Close() - coordinator.Close() - // !! om must be closed before the pom so pom.release() is called before pom.Close() safeClose(t, om) safeClose(t, pom) @@ -177,6 +180,8 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { config.Consumer.Offsets.AutoCommit.Enable = false om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) + defer broker.Close() + defer coordinator.Close() pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") // Wait long enough for the test not to fail.. @@ -219,10 +224,6 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { t.Errorf("No request received for after waiting for %v", timeout) } - // Close up - broker.Close() - coordinator.Close() - // !! om must be closed before the pom so pom.release() is called before pom.Close() safeClose(t, om) safeClose(t, pom) @@ -233,6 +234,8 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { // on first fetchInitialOffset call func TestOffsetManagerFetchInitialFail(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t, 0) + defer broker.Close() + defer coordinator.Close() // Error on first fetchInitialOffset call responseBlock := OffsetFetchResponseBlock{ @@ -247,6 +250,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) { // Refresh coordinator newCoordinator := NewMockBroker(t, 3) + defer newCoordinator.Close() broker.Returns(&ConsumerMetadataResponse{ CoordinatorID: newCoordinator.BrokerID(), CoordinatorHost: "127.0.0.1", @@ -265,9 +269,6 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) { t.Error(err) } - broker.Close() - coordinator.Close() - newCoordinator.Close() safeClose(t, pom) safeClose(t, om) safeClose(t, testClient) @@ -281,6 +282,8 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { return 0 } om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewTestConfig()) + defer broker.Close() + defer coordinator.Close() // Error on first fetchInitialOffset call responseBlock := OffsetFetchResponseBlock{ @@ -305,8 +308,6 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { t.Error(err) } - broker.Close() - coordinator.Close() safeClose(t, pom) safeClose(t, om) safeClose(t, testClient) @@ -318,6 +319,8 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { func TestPartitionOffsetManagerInitialOffset(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t, 0) + defer broker.Close() + defer coordinator.Close() testClient.Config().Consumer.Offsets.Initial = OffsetOldest // Kafka returns -1 if no offset has been stored for this partition yet. @@ -333,13 +336,13 @@ func TestPartitionOffsetManagerInitialOffset(t *testing.T) { safeClose(t, pom) safeClose(t, om) - broker.Close() - coordinator.Close() safeClose(t, testClient) } func TestPartitionOffsetManagerNextOffset(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t, 0) + defer broker.Close() + defer coordinator.Close() pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta") offset, meta := pom.NextOffset() @@ -352,13 +355,13 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) { safeClose(t, pom) safeClose(t, om) - broker.Close() - coordinator.Close() safeClose(t, testClient) } func TestPartitionOffsetManagerResetOffset(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t, 0) + defer broker.Close() + defer coordinator.Close() pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") ocResponse := new(OffsetCommitResponse) @@ -379,12 +382,12 @@ func TestPartitionOffsetManagerResetOffset(t *testing.T) { safeClose(t, pom) safeClose(t, om) safeClose(t, testClient) - broker.Close() - coordinator.Close() } func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t, time.Hour) + defer broker.Close() + defer coordinator.Close() pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") ocResponse := new(OffsetCommitResponse) @@ -415,12 +418,12 @@ func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { safeClose(t, pom) safeClose(t, om) safeClose(t, testClient) - broker.Close() - coordinator.Close() } func TestPartitionOffsetManagerMarkOffset(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t, 0) + defer broker.Close() + defer coordinator.Close() pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") ocResponse := new(OffsetCommitResponse) @@ -440,12 +443,12 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) { safeClose(t, pom) safeClose(t, om) safeClose(t, testClient) - broker.Close() - coordinator.Close() } func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t, time.Hour) + defer broker.Close() + defer coordinator.Close() pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") ocResponse := new(OffsetCommitResponse) @@ -475,12 +478,12 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { safeClose(t, pom) safeClose(t, om) safeClose(t, testClient) - broker.Close() - coordinator.Close() } func TestPartitionOffsetManagerCommitErr(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t, 0) + defer broker.Close() + defer coordinator.Close() pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta") // Error on one partition @@ -490,6 +493,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { coordinator.Returns(ocResponse) newCoordinator := NewMockBroker(t, 3) + defer newCoordinator.Close() // For RefreshCoordinator() broker.Returns(&ConsumerMetadataResponse{ @@ -535,9 +539,6 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { t.Error(err) } - broker.Close() - coordinator.Close() - newCoordinator.Close() safeClose(t, om) safeClose(t, testClient) } @@ -545,6 +546,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { // Test of recovery from abort func TestAbortPartitionOffsetManager(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t, 0) + defer broker.Close() pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta") // this triggers an error in the CommitOffset request, @@ -568,6 +570,5 @@ func TestAbortPartitionOffsetManager(t *testing.T) { safeClose(t, pom) safeClose(t, om) - broker.Close() safeClose(t, testClient) } diff --git a/sarama_test.go b/sarama_test.go new file mode 100644 index 000000000..5e3d7ba9b --- /dev/null +++ b/sarama_test.go @@ -0,0 +1,19 @@ +//go:build !functional +// +build !functional + +package sarama + +import ( + "flag" + "log" + "os" + "testing" +) + +func TestMain(m *testing.M) { + flag.Parse() + if f := flag.Lookup("test.v"); f != nil && f.Value.String() == "true" { + Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags) + } + os.Exit(m.Run()) +}