Skip to content

Commit

Permalink
fix(fvt): handle msgset vs batchset
Browse files Browse the repository at this point in the history
- update the TestFuncProducing tests to exercise a number of different
  client versions so we cover the old msgset (which the tests were
  originally written for) and batchset format, as well as all the
  currently supported versions of ProduceRequest
- relax some of the metrics validation to account for messages being
  sent in batches on the higher kafkaVersions

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 17, 2023
1 parent 6c50cb6 commit f64705d
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 74 deletions.
166 changes: 92 additions & 74 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,46 +24,38 @@ const TestBatchSize = 1000

func TestFuncProducing(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncProducingGzip(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
config.Producer.Compression = CompressionGZIP
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncProducingSnappy(t *testing.T) {
config := NewFunctionalTestConfig()
config.Producer.Compression = CompressionSnappy
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncProducingZstd(t *testing.T) {
config := NewFunctionalTestConfig()
config.Producer.Compression = CompressionZSTD
testProducingMessages(t, config)
testProducingMessages(t, config, V2_1_0_0) // must be at least 2.1.0.0 for zstd
}

func TestFuncProducingNoResponse(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
config.Producer.RequiredAcks = NoResponse
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncProducingFlushing(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
config.Producer.Flush.Messages = TestBatchSize / 8
config.Producer.Flush.Frequency = 250 * time.Millisecond
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncMultiPartitionProduce(t *testing.T) {
Expand Down Expand Up @@ -804,7 +796,7 @@ func TestInterceptors(t *testing.T) {
safeClose(t, consumer)
}

func testProducingMessages(t *testing.T, config *Config) {
func testProducingMessages(t *testing.T, config *Config, minVersion KafkaVersion) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -818,75 +810,91 @@ func testProducingMessages(t *testing.T, config *Config) {
config.Producer.Return.Successes = true
config.Consumer.Return.Errors = true

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
kafkaVersions := map[KafkaVersion]bool{}
for _, v := range []KafkaVersion{MinVersion, V0_10_0_0, V0_11_0_0, V1_0_0_0, V2_0_0_0, V2_1_0_0} {
if v.IsAtLeast(minVersion) {
kafkaVersions[v] = true
}
}
defer safeClose(t, client)

// Keep in mind the current offset
initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
if err != nil {
t.Fatal(err)
if upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION")); err != nil {
kafkaVersions[upper] = true
}

producer, err := NewAsyncProducerFromClient(client)
if err != nil {
t.Fatal(err)
}
for version := range kafkaVersions {
t.Run(t.Name()+"-v"+version.String(), func(t *testing.T) {
checkKafkaVersion(t, version.String())
config.Version = version
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client)

expectedResponses := TestBatchSize
for i := 1; i <= TestBatchSize; {
msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
select {
case producer.Input() <- msg:
i++
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
expectedResponses--
}
}
for expectedResponses > 0 {
select {
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
expectedResponses--
}
}
safeClose(t, producer)
// Keep in mind the current offset
initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
if err != nil {
t.Fatal(err)
}

// Validate producer metrics before using the consumer minus the offset request
validateProducerMetrics(t, client)
producer, err := NewAsyncProducerFromClient(client)
if err != nil {
t.Fatal(err)
}

master, err := NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}
consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
if err != nil {
t.Fatal(err)
}
expectedResponses := TestBatchSize
for i := 1; i <= TestBatchSize; {
msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
select {
case producer.Input() <- msg:
i++
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
expectedResponses--
}
}
for expectedResponses > 0 {
select {
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
expectedResponses--
}
}
safeClose(t, producer)

for i := 1; i <= TestBatchSize; i++ {
select {
case <-time.After(10 * time.Second):
t.Fatal("Not received any more events in the last 10 seconds.")
// Validate producer metrics before using the consumer minus the offset request
validateProducerMetrics(t, client)

case err := <-consumer.Errors():
t.Error(err)
master, err := NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}
consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
if err != nil {
t.Fatal(err)
}

case message := <-consumer.Messages():
if string(message.Value) != fmt.Sprintf("testing %d", i) {
t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
for i := 1; i <= TestBatchSize; i++ {
select {
case <-time.After(10 * time.Second):
t.Fatal("Not received any more events in the last 10 seconds.")

case err := <-consumer.Errors():
t.Error(err)

case message := <-consumer.Messages():
if string(message.Value) != fmt.Sprintf("testing %d", i) {
t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
}
}
}
}
}

validateConsumerMetrics(t, client)
validateConsumerMetrics(t, client)

safeClose(t, consumer)
safeClose(t, consumer)
})
}
}

// TestAsyncProducerRemoteBrokerClosed ensures that the async producer can
Expand Down Expand Up @@ -996,11 +1004,21 @@ func validateProducerMetrics(t *testing.T, client Client) {
if compressionEnabled {
// We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
if client.Config().Version.IsAtLeast(V0_11_0_0) {
// slightly better compression with batching
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 40))
} else {
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
}
metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
} else {
// We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
if client.Config().Version.IsAtLeast(V0_11_0_0) {
// records will be grouped in batchSet rather than msgSet
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 4))
} else {
metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
}
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
}
Expand Down
8 changes: 8 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func countMeterValidator(name string, expectedCount int) *metricValidator {

func minCountMeterValidator(name string, minCount int) *metricValidator {
return meterValidator(name, func(t *testing.T, meter metrics.Meter) {
t.Helper()
count := meter.Count()
if count < int64(minCount) {
t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count)
Expand All @@ -116,6 +117,7 @@ func histogramValidator(name string, extraValidator func(*testing.T, metrics.His
return &metricValidator{
name: name,
validator: func(t *testing.T, metric interface{}) {
t.Helper()
if histogram, ok := metric.(metrics.Histogram); !ok {
t.Errorf("Expected histogram metric for '%s', got %T", name, metric)
} else {
Expand All @@ -127,6 +129,7 @@ func histogramValidator(name string, extraValidator func(*testing.T, metrics.His

func countHistogramValidator(name string, expectedCount int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
count := histogram.Count()
if count != int64(expectedCount) {
t.Errorf("Expected histogram metric '%s' count = %d, got %d", name, expectedCount, count)
Expand All @@ -136,6 +139,7 @@ func countHistogramValidator(name string, expectedCount int) *metricValidator {

func minCountHistogramValidator(name string, minCount int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
count := histogram.Count()
if count < int64(minCount) {
t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count)
Expand All @@ -145,6 +149,7 @@ func minCountHistogramValidator(name string, minCount int) *metricValidator {

func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
min := int(histogram.Min())
if min != expectedMin {
t.Errorf("Expected histogram metric '%s' min = %d, got %d", name, expectedMin, min)
Expand All @@ -158,6 +163,7 @@ func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *me

func minValHistogramValidator(name string, minMin int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
min := int(histogram.Min())
if min < minMin {
t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min)
Expand All @@ -167,6 +173,7 @@ func minValHistogramValidator(name string, minMin int) *metricValidator {

func maxValHistogramValidator(name string, maxMax int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
max := int(histogram.Max())
if max > maxMax {
t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max)
Expand All @@ -178,6 +185,7 @@ func counterValidator(name string, expectedCount int) *metricValidator {
return &metricValidator{
name: name,
validator: func(t *testing.T, metric interface{}) {
t.Helper()
if counter, ok := metric.(metrics.Counter); !ok {
t.Errorf("Expected counter metric for '%s', got %T", name, metric)
} else {
Expand Down

0 comments on commit f64705d

Please sign in to comment.