From 2a37df979d12085bd6d9bc1bd5cc7aeb88dc49ad Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 21 Jan 2023 19:43:52 -0700 Subject: [PATCH] kgo: patch HookFetchRecordUnbuffered Previously, if a hook implemented both HookFetchRecordBuffered and HookFetchRecordUnbuffered, then HookFetchRecordUnbuffered would never be called. The type switch in this logic would always match the buffered case and never call unbuffered. Reorganizing the conditionals a little bit ensures that the correct hook is always called. --- pkg/kgo/source.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 1e564a3a..e94a7c15 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -289,9 +289,9 @@ type bufferedFetch struct { func (s *source) hook(f *Fetch, buffered, polled bool) { s.cl.cfg.hooks.each(func(h Hook) { - switch h := h.(type) { - case HookFetchRecordBuffered: - if !buffered { + if buffered { + h, ok := h.(HookFetchRecordBuffered) + if !ok { return } for i := range f.Topics { @@ -303,9 +303,9 @@ func (s *source) hook(f *Fetch, buffered, polled bool) { } } } - - case HookFetchRecordUnbuffered: - if buffered { + } else { + h, ok := h.(HookFetchRecordUnbuffered) + if !ok { return } for i := range f.Topics {