diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 317b5b790..f8294e78b 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -47,6 +47,7 @@ import asyncio import gc import typing +from asyncio import Event from collections import defaultdict from time import monotonic from typing import ( @@ -73,7 +74,6 @@ from mode import Service, ServiceT, flight_recorder, get_logger from mode.threads import MethodQueue, QueueServiceThread from mode.utils.futures import notify -from mode.utils.locks import Event from mode.utils.text import pluralize from mode.utils.times import Seconds @@ -172,11 +172,7 @@ async def on_stop(self) -> None: async def _fetcher(self) -> None: try: consumer = cast(Consumer, self.app.consumer) - self._drainer = asyncio.ensure_future( - consumer._drain_messages(self), - loop=self.loop, - ) - await self._drainer + await consumer._drain_messages(self) except asyncio.CancelledError: pass finally: