Skip to content

Commit

Permalink
fixed recovery hang
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Nov 9, 2020
1 parent a8494c4 commit 40ea0b0
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import asyncio
import gc
import typing
from asyncio import Event
from collections import defaultdict
from time import monotonic
from typing import (
Expand All @@ -73,7 +74,6 @@
from mode import Service, ServiceT, flight_recorder, get_logger
from mode.threads import MethodQueue, QueueServiceThread
from mode.utils.futures import notify
from mode.utils.locks import Event
from mode.utils.text import pluralize
from mode.utils.times import Seconds

Expand Down Expand Up @@ -172,11 +172,7 @@ async def on_stop(self) -> None:
async def _fetcher(self) -> None:
try:
consumer = cast(Consumer, self.app.consumer)
self._drainer = asyncio.ensure_future(
consumer._drain_messages(self),
loop=self.loop,
)
await self._drainer
await consumer._drain_messages(self)
except asyncio.CancelledError:
pass
finally:
Expand Down

0 comments on commit 40ea0b0

Please sign in to comment.