@@ -45,7 +45,7 @@ func TestIssue337(t *testing.T) {
45
45
t .Fatal (err )
46
46
}
47
47
48
- ctx , cancel := context .WithTimeout (context .Background (), time .Second )
48
+ ctx , cancel := context .WithTimeout (context .Background (), 3 * time .Second )
49
49
defer cancel ()
50
50
for {
51
51
fs := cl .PollFetches (ctx )
@@ -60,3 +60,55 @@ func TestIssue337(t *testing.T) {
60
60
}
61
61
}
62
62
}
63
+
64
+ func TestDirectPartitionPurge (t * testing.T ) {
65
+ topic , cleanup := tmpTopicPartitions (t , 2 )
66
+ defer cleanup ()
67
+
68
+ cl , _ := NewClient (
69
+ DefaultProduceTopic (topic ),
70
+ RecordPartitioner (ManualPartitioner ()),
71
+ ConsumePartitions (map [string ]map [int32 ]Offset {
72
+ topic : {0 : NewOffset ().At (0 )},
73
+ }),
74
+ )
75
+
76
+ if err := cl .ProduceSync (context .Background (),
77
+ & Record {Partition : 0 , Value : []byte ("foo" )},
78
+ & Record {Partition : 1 , Value : []byte ("bar" )},
79
+ ).FirstErr (); err != nil {
80
+ t .Fatal (err )
81
+ }
82
+ cl .PurgeTopicsFromClient (topic )
83
+
84
+ ctx , cancel := context .WithTimeout (context .Background (), 3 * time .Second )
85
+ fs := cl .PollFetches (ctx )
86
+ cancel ()
87
+ if err := fs .Err0 (); err != context .DeadlineExceeded {
88
+ t .Fatal ("unexpected success when expecting context.DeadlineExceeded" )
89
+ }
90
+
91
+ cl .AddConsumeTopics (topic )
92
+ ctx , cancel = context .WithTimeout (context .Background (), 3 * time .Second )
93
+
94
+ exp := map [string ]bool {
95
+ "foo" : true ,
96
+ "bar" : true ,
97
+ }
98
+ for {
99
+ fs := cl .PollFetches (ctx )
100
+ if err := fs .Err0 (); err == context .DeadlineExceeded {
101
+ break
102
+ }
103
+ fs .EachRecord (func (r * Record ) {
104
+ v := string (r .Value )
105
+ if ! exp [v ] {
106
+ t .Errorf ("saw unexpected value %v" , v )
107
+ }
108
+ delete (exp , v )
109
+ })
110
+ }
111
+ if len (exp ) > 0 {
112
+ t .Errorf ("did not see expected values %v" , exp )
113
+ }
114
+ }
0 commit comments