diff --git a/faust/_cython/streams.pyx b/faust/_cython/streams.pyx index 94285268f..de5efd955 100644 --- a/faust/_cython/streams.pyx +++ b/faust/_cython/streams.pyx @@ -79,13 +79,14 @@ cdef class StreamIterator: do_ack = stream.enable_acks value = None + event = None - while value is None: + while value is None and event is None: await sleep(0, loop=self.loop) need_slow_get, channel_value = self._try_get_quick_value() if need_slow_get: channel_value = await self.chan_slow_get() - value, sensor_state = self._prepare_event(channel_value) + event, value, sensor_state = self._prepare_event(channel_value) try: for processor in self.processors: @@ -163,10 +164,10 @@ cdef class StreamIterator: stream_state = self.on_stream_event_in( tp, offset, self.stream, event) self.stream._set_current_event(event) - return (event.value, stream_state) + return (event, event.value, stream_state) else: self.stream._set_current_event(None) - return channel_value, stream_state + return None, channel_value, stream_state cdef object _try_get_quick_value(self): if self.chan_is_channel: