@@ -16,6 +16,8 @@ import (
16
16
"testing"
17
17
"time"
18
18
19
+ "golang.org/x/sync/errgroup"
20
+
19
21
"github.com/stretchr/testify/require"
20
22
)
21
23
@@ -439,33 +441,44 @@ func versionRange(lower KafkaVersion) []KafkaVersion {
439
441
440
442
func produceMsgs (t * testing.T , clientVersions []KafkaVersion , codecs []CompressionCodec , flush int , countPerVerCodec int , idempotent bool ) []* ProducerMessage {
441
443
var (
442
- wg sync.WaitGroup
443
444
producers []SyncProducer
444
445
producedMessagesMu sync.Mutex
445
446
producedMessages []* ProducerMessage
446
447
)
448
+ g := errgroup.Group {}
447
449
for _ , prodVer := range clientVersions {
448
450
for _ , codec := range codecs {
449
- t .Run ("producer-" + prodVer .String ()+ "-" + codec .String (), func (t * testing.T ) {
450
- t .Logf ("*** Producing with client version %s codec %s\n " , prodVer , codec )
451
- prodCfg := NewTestConfig ()
452
- prodCfg .Version = prodVer
453
- prodCfg .Producer .Return .Successes = true
454
- prodCfg .Producer .Return .Errors = true
455
- prodCfg .Producer .Flush .MaxMessages = flush
456
- prodCfg .Producer .Compression = codec
457
- prodCfg .Producer .Idempotent = idempotent
458
- if idempotent {
459
- prodCfg .Producer .RequiredAcks = WaitForAll
460
- prodCfg .Net .MaxOpenRequests = 1
461
- }
451
+ prodCfg := NewTestConfig ()
452
+ prodCfg .ClientID = t .Name () + "-Producer-" + prodVer .String ()
453
+ if idempotent {
454
+ prodCfg .ClientID += "-idempotent"
455
+ }
456
+ if codec > 0 {
457
+ prodCfg .ClientID += "-" + codec .String ()
458
+ }
459
+ prodCfg .Metadata .Full = false
460
+ prodCfg .Version = prodVer
461
+ prodCfg .Producer .Return .Successes = true
462
+ prodCfg .Producer .Return .Errors = true
463
+ prodCfg .Producer .Flush .MaxMessages = flush
464
+ prodCfg .Producer .Compression = codec
465
+ prodCfg .Producer .Idempotent = idempotent
466
+ if idempotent {
467
+ prodCfg .Producer .RequiredAcks = WaitForAll
468
+ prodCfg .Net .MaxOpenRequests = 1
469
+ }
462
470
463
- p , err := NewSyncProducer (FunctionalTestEnv .KafkaBrokerAddrs , prodCfg )
464
- if err != nil {
465
- t .Errorf ("Failed to create producer: version=%s, compression=%s, err=%v" , prodVer , codec , err )
466
- return
467
- }
468
- producers = append (producers , p )
471
+ p , err := NewSyncProducer (FunctionalTestEnv .KafkaBrokerAddrs , prodCfg )
472
+ if err != nil {
473
+ t .Fatalf ("Failed to create producer: version=%s, compression=%s, err=%v" , prodVer , codec , err )
474
+ }
475
+ producers = append (producers , p )
476
+
477
+ prodVer := prodVer
478
+ codec := codec
479
+ g .Go (func () error {
480
+ t .Logf ("*** Producing with client version %s codec %s\n " , prodVer , codec )
481
+ var wg sync.WaitGroup
469
482
for i := 0 ; i < countPerVerCodec ; i ++ {
470
483
msg := & ProducerMessage {
471
484
Topic : "test.1" ,
@@ -483,10 +496,14 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
483
496
producedMessagesMu .Unlock ()
484
497
}()
485
498
}
499
+ wg .Wait ()
500
+ return nil
486
501
})
487
502
}
488
503
}
489
- wg .Wait ()
504
+ if err := g .Wait (); err != nil {
505
+ t .Fatal (err )
506
+ }
490
507
491
508
for _ , p := range producers {
492
509
safeClose (t , p )
@@ -496,6 +513,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
496
513
sort .Slice (producedMessages , func (i , j int ) bool {
497
514
return producedMessages [i ].Offset < producedMessages [j ].Offset
498
515
})
516
+ require .NotEmpty (t , producedMessages , "should have produced >0 messages" )
499
517
t .Logf ("*** Total produced %d, firstOffset=%d, lastOffset=%d\n " ,
500
518
len (producedMessages ), producedMessages [0 ].Offset , producedMessages [len (producedMessages )- 1 ].Offset )
501
519
return producedMessages
@@ -504,26 +522,33 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
504
522
func consumeMsgs (t * testing.T , clientVersions []KafkaVersion , producedMessages []* ProducerMessage ) {
505
523
// Consume all produced messages with all client versions supported by the
506
524
// cluster.
525
+ g := errgroup.Group {}
507
526
for _ , consVer := range clientVersions {
508
- t .Run ("consumer-" + consVer .String (), func (t * testing.T ) {
509
- t .Logf ("*** Consuming with client version %s\n " , consVer )
510
- // Create a partition consumer that should start from the first produced
511
- // message.
512
- consCfg := NewTestConfig ()
513
- consCfg .Version = consVer
514
- c , err := NewConsumer (FunctionalTestEnv .KafkaBrokerAddrs , consCfg )
515
- if err != nil {
516
- t .Fatal (err )
517
- }
518
- defer safeClose (t , c )
519
- pc , err := c .ConsumePartition ("test.1" , 0 , producedMessages [0 ].Offset )
520
- if err != nil {
521
- t .Fatal (err )
522
- }
523
- defer safeClose (t , pc )
527
+ // Create a partition consumer that should start from the first produced
528
+ // message.
529
+ consCfg := NewTestConfig ()
530
+ consCfg .ClientID = t .Name () + "-Consumer-" + consVer .String ()
531
+ consCfg .Consumer .MaxProcessingTime = time .Second
532
+ consCfg .Metadata .Full = false
533
+ consCfg .Version = consVer
534
+ c , err := NewConsumer (FunctionalTestEnv .KafkaBrokerAddrs , consCfg )
535
+ if err != nil {
536
+ t .Fatal (err )
537
+ }
538
+ defer safeClose (t , c )
539
+ pc , err := c .ConsumePartition ("test.1" , 0 , producedMessages [0 ].Offset )
540
+ if err != nil {
541
+ t .Fatal (err )
542
+ }
543
+ defer safeClose (t , pc )
524
544
545
+ var wg sync.WaitGroup
546
+ wg .Add (1 )
547
+ consVer := consVer
548
+ g .Go (func () error {
525
549
// Consume as many messages as there have been produced and make sure that
526
550
// order is preserved.
551
+ t .Logf ("*** Consuming with client version %s\n " , consVer )
527
552
for i , prodMsg := range producedMessages {
528
553
select {
529
554
case consMsg := <- pc .Messages ():
@@ -535,10 +560,25 @@ func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages [
535
560
t .Fatalf ("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s" ,
536
561
consVer , i , prodMsg2Str (prodMsg ), consMsg2Str (consMsg ))
537
562
}
538
- case <- time .After (3 * time .Second ):
539
- t .Fatalf ("Timeout waiting for: index=%d, offset=%d, msg=%s" , i , prodMsg .Offset , prodMsg .Value )
563
+ if i == 0 {
564
+ t .Logf ("Consumed first msg: version=%s, index=%d, got=%s" ,
565
+ consVer , i , consMsg2Str (consMsg ))
566
+ wg .Done ()
567
+ }
568
+ if i % 1000 == 0 {
569
+ t .Logf ("Consumed messages: version=%s, index=%d, got=%s" ,
570
+ consVer , i , consMsg2Str (consMsg ))
571
+ }
572
+ case <- time .After (15 * time .Second ):
573
+ t .Fatalf ("Timeout %s waiting for: index=%d, offset=%d, msg=%s" ,
574
+ consCfg .ClientID , i , prodMsg .Offset , prodMsg .Value )
540
575
}
541
576
}
577
+ return nil
542
578
})
579
+ wg .Wait () // wait for first message to be consumed before starting next consumer
580
+ }
581
+ if err := g .Wait (); err != nil {
582
+ t .Fatal (err )
543
583
}
544
584
}
0 commit comments