Skip to content

Commit 01548a3

Browse files
author
hanxiaolin
committed
fix consumer interceptors
1 parent fca0631 commit 01548a3

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

consumer.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -468,9 +468,7 @@ feederLoop:
468468
}
469469

470470
for i, msg := range msgs {
471-
for _, interceptor := range child.conf.Consumer.Interceptors {
472-
msg.safelyApplyInterceptor(interceptor)
473-
}
471+
child.interceptors(msg)
474472
messageSelect:
475473
select {
476474
case <-child.dying:
@@ -484,6 +482,7 @@ feederLoop:
484482
child.broker.acks.Done()
485483
remainingLoop:
486484
for _, msg = range msgs[i:] {
485+
child.interceptors(msg)
487486
select {
488487
case child.messages <- msg:
489488
case <-child.dying:
@@ -715,6 +714,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
715714
return messages, nil
716715
}
717716

717+
func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
718+
for _, interceptor := range child.conf.Consumer.Interceptors {
719+
msg.safelyApplyInterceptor(interceptor)
720+
}
721+
}
722+
718723
type brokerConsumer struct {
719724
consumer *consumer
720725
broker *Broker

0 commit comments

Comments
 (0)