@@ -442,12 +442,69 @@ func SaveProxy(t *testing.T, px string) {
442
442
443
443
func setupFunctionalTest (t testing.TB ) {
444
444
resetProxies (t )
445
+ ensureFullyReplicated (t , 60 * time .Second , 5 * time .Second )
445
446
}
446
447
447
448
func teardownFunctionalTest (t testing.TB ) {
448
449
resetProxies (t )
449
450
}
450
451
452
+ func ensureFullyReplicated (t testing.TB , timeout time.Duration , retry time.Duration ) {
453
+ config := NewTestConfig ()
454
+ config .Metadata .Retry .Max = 5
455
+ config .Metadata .Retry .Backoff = 10 * time .Second
456
+ config .ClientID = "sarama-ensureFullyReplicated"
457
+ config .Version = V2_6_0_0
458
+
459
+ var testTopicNames []string
460
+ for topic := range testTopicDetails {
461
+ testTopicNames = append (testTopicNames , topic )
462
+ }
463
+
464
+ timer := time .NewTimer (timeout )
465
+ defer timer .Stop ()
466
+ tick := time .NewTicker (retry )
467
+ defer tick .Stop ()
468
+
469
+ for {
470
+ resp , err := func () (* MetadataResponse , error ) {
471
+ client , err := NewClient (FunctionalTestEnv .KafkaBrokerAddrs , config )
472
+ if err != nil {
473
+ return nil , fmt .Errorf ("failed to connect to kafka: %w" , err )
474
+ }
475
+ defer client .Close ()
476
+
477
+ controller , err := client .Controller ()
478
+ if err != nil {
479
+ return nil , fmt .Errorf ("failed to connect to kafka controller: %w" , err )
480
+ }
481
+ defer controller .Close ()
482
+ return controller .GetMetadata (& MetadataRequest {Version : 5 , Topics : testTopicNames })
483
+ }()
484
+ if err != nil {
485
+ Logger .Printf ("failed to get metadata during test setup: %v\n " , err )
486
+ } else {
487
+ ok := true
488
+ for _ , topic := range resp .Topics {
489
+ for _ , partition := range topic .Partitions {
490
+ if len (partition .Isr ) != 3 {
491
+ ok = false
492
+ Logger .Printf ("topic %s/%d is not fully-replicated Isr=%v Offline=%v\n " , topic .Name , partition .ID , partition .Isr , partition .OfflineReplicas )
493
+ }
494
+ }
495
+ }
496
+ if ok {
497
+ return
498
+ }
499
+ }
500
+ select {
501
+ case <- timer .C :
502
+ t .Fatalf ("timeout waiting for test topics to be fully replicated" )
503
+ case <- tick .C :
504
+ }
505
+ }
506
+ }
507
+
451
508
type kafkaVersion []int
452
509
453
510
func (kv kafkaVersion ) satisfies (other kafkaVersion ) bool {
0 commit comments