@@ -51,9 +51,7 @@ type Consumer struct {
51
51
partitionsHeld int64
52
52
partitionsHeldGauge metrics.Gauge
53
53
54
- messagesDoneChan chan string
55
- errorsDoneChan chan string
56
- doneWg sync.WaitGroup
54
+ doneWg sync.WaitGroup
57
55
}
58
56
59
57
type consumerState struct {
@@ -71,8 +69,6 @@ func New(params Params) (*Consumer, error) {
71
69
deadlockDetector : deadlockDetector ,
72
70
partitionIDToState : make (map [int32 ]* consumerState ),
73
71
partitionsHeldGauge : partitionsHeldGauge (params .MetricsFactory ),
74
- messagesDoneChan : make (chan string ),
75
- errorsDoneChan : make (chan string ),
76
72
}, nil
77
73
}
78
74
@@ -92,20 +88,6 @@ func (c *Consumer) Start() {
92
88
go c .handleErrors (pc .Partition (), pc .Errors ())
93
89
}
94
90
}()
95
-
96
- // Expect to receive message and error handler "done" signals from each partition.
97
- go waitForDoneSignals (c .messagesDoneChan , & c .doneWg , c .logger )
98
- go waitForDoneSignals (c .errorsDoneChan , & c .doneWg , c .logger )
99
- }
100
-
101
- // waitForDoneSignals watches the doneChan for incoming "done" messages. If a message is received,
102
- // the doneWg WaitGroup is decremented via a call to Done().
103
- func waitForDoneSignals (doneChan <- chan string , doneWg * sync.WaitGroup , logger * zap.Logger ) {
104
- logger .Debug ("Waiting for done signals" )
105
- for v := range doneChan {
106
- logger .Debug ("Received done signal" , zap .String ("msg" , v ))
107
- doneWg .Done ()
108
- }
109
91
}
110
92
111
93
// Close closes the Consumer and underlying sarama consumer
@@ -120,15 +102,10 @@ func (c *Consumer) Close() error {
120
102
c .logger .Debug ("Waiting for messages and errors to be handled" )
121
103
c .doneWg .Wait ()
122
104
123
- c .logger .Debug ("Closing message and error done channels" )
124
- close (c .messagesDoneChan )
125
- close (c .errorsDoneChan )
126
-
127
105
return err
128
106
}
129
107
130
- // handleMessages handles incoming Kafka messages on a channel. Upon the closure of the message channel,
131
- // handleMessages will signal the messagesDoneChan to indicate the graceful shutdown of message handling is done.
108
+ // handleMessages handles incoming Kafka messages on a channel
132
109
func (c * Consumer ) handleMessages (pc sc.PartitionConsumer ) {
133
110
c .logger .Info ("Starting message handler" , zap .Int32 ("partition" , pc .Partition ()))
134
111
c .partitionMapLock .Lock ()
@@ -141,7 +118,7 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
141
118
c .partitionsHeld --
142
119
c .partitionsHeldGauge .Update (c .partitionsHeld )
143
120
c .partitionMapLock .Unlock ()
144
- c .messagesDoneChan <- "HandleMessages done"
121
+ c .doneWg . Done ()
145
122
}()
146
123
147
124
msgMetrics := c .newMsgMetrics (pc .Partition ())
@@ -185,13 +162,10 @@ func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
185
162
c .logger .Info ("Closed partition consumer" , zap .Int32 ("partition" , partitionConsumer .Partition ()))
186
163
}
187
164
188
- // handleErrors handles incoming Kafka consumer errors on a channel. Upon the closure of the error channel,
189
- // handleErrors will signal the errorsDoneChan to indicate the graceful shutdown of error handling is done.
165
+ // handleErrors handles incoming Kafka consumer errors on a channel
190
166
func (c * Consumer ) handleErrors (partition int32 , errChan <- chan * sarama.ConsumerError ) {
191
167
c .logger .Info ("Starting error handler" , zap .Int32 ("partition" , partition ))
192
- defer func () {
193
- c .errorsDoneChan <- "HandleErrors done"
194
- }()
168
+ defer c .doneWg .Done ()
195
169
196
170
errMetrics := c .newErrMetrics (partition )
197
171
for err := range errChan {
0 commit comments