Skip to content

Commit f078934

Browse files
authored
bugfix duplicate channel creation on every message visit (#387)
1 parent 40cf0d4 commit f078934

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

partition_processor.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -706,12 +706,13 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
706706
}
707707

708708
defer it.Release()
709+
stopping := pp.stopping()
709710
for it.Next() {
710711
// add one that we were able to be put into the queue.
711712
// wg.Done will be called by the visit handler as commit
712713
wg.Add(1)
713714
select {
714-
case <-pp.stopping():
715+
case <-stopping:
715716
drainVisitInput()
716717
wg.Done()
717718
return ErrVisitAborted
@@ -741,7 +742,7 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
741742
wg.Wait()
742743
}()
743744
select {
744-
case <-pp.stopping():
745+
case <-stopping:
745746
drainVisitInput()
746747
return ErrVisitAborted
747748
case <-ctx.Done():

0 commit comments

Comments
 (0)