From bc76f30333d1f61ad3cd610fb9792325dc720ef6 Mon Sep 17 00:00:00 2001 From: Tobias Rauter Date: Mon, 9 Mar 2020 11:30:16 +0100 Subject: [PATCH] Fixed ack for tombstones in cython-stream --- faust/_cython/streams.pyx | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/faust/_cython/streams.pyx b/faust/_cython/streams.pyx index 94285268f..de5efd955 100644 --- a/faust/_cython/streams.pyx +++ b/faust/_cython/streams.pyx @@ -79,13 +79,14 @@ cdef class StreamIterator: do_ack = stream.enable_acks value = None + event = None - while value is None: + while value is None and event is None: await sleep(0, loop=self.loop) need_slow_get, channel_value = self._try_get_quick_value() if need_slow_get: channel_value = await self.chan_slow_get() - value, sensor_state = self._prepare_event(channel_value) + event, value, sensor_state = self._prepare_event(channel_value) try: for processor in self.processors: @@ -163,10 +164,10 @@ cdef class StreamIterator: stream_state = self.on_stream_event_in( tp, offset, self.stream, event) self.stream._set_current_event(event) - return (event.value, stream_state) + return (event, event.value, stream_state) else: self.stream._set_current_event(None) - return channel_value, stream_state + return None, channel_value, stream_state cdef object _try_get_quick_value(self): if self.chan_is_channel: