diff --git a/faust/_cython/streams.pyx b/faust/_cython/streams.pyx index 43be9f514..5560a98c4 100644 --- a/faust/_cython/streams.pyx +++ b/faust/_cython/streams.pyx @@ -81,13 +81,14 @@ cdef class StreamIterator: do_ack = stream.enable_acks value = None + event = None - while value is None: + while value is None and event is None: await sleep(0, loop=self.loop) need_slow_get, channel_value = self._try_get_quick_value() if need_slow_get: channel_value = await self.chan_slow_get() - value, sensor_state = self._prepare_event(channel_value) + event, value, sensor_state = self._prepare_event(channel_value) try: for processor in self.processors: @@ -165,10 +166,10 @@ cdef class StreamIterator: stream_state = self.on_stream_event_in( tp, offset, self.stream, event) self.stream._set_current_event(event) - return (event.value, stream_state) + return (event, event.value, stream_state) else: self.stream._set_current_event(None) - return channel_value, stream_state + return None, channel_value, stream_state cdef object _try_get_quick_value(self): if self.chan_is_channel: