From 70e5516d48b7d44c0cb389c14e5639781069d613 Mon Sep 17 00:00:00 2001 From: Stevan Milic Date: Sat, 27 Feb 2021 18:19:38 +0100 Subject: [PATCH] fix agents with multiple topics (#116) Co-authored-by: Stevan Milic --- faust/agents/agent.py | 19 +------------------ tests/unit/agents/test_agent.py | 28 +++++++++++++++++++--------- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/faust/agents/agent.py b/faust/agents/agent.py index 20af5c81b..1328e4486 100644 --- a/faust/agents/agent.py +++ b/faust/agents/agent.py @@ -1,6 +1,5 @@ """Agent implementation.""" import asyncio -import sys import typing from contextlib import suppress from contextvars import ContextVar @@ -659,23 +658,7 @@ async def _prepare_actor(self, aref: ActorRefT, beacon: NodeT) -> ActorRefT: else: # agent yields and is an AsyncIterator so we have to consume it. coro = self._slurp(aref, aiter(aref)) - req_version = (3, 8) - cur_version = sys.version_info - if cur_version >= req_version: - if isinstance(self.channel, TopicT): - name = self.channel.get_topic_name() - else: - name = "channel" - task = asyncio.Task( - self._execute_actor(coro, aref), - loop=self.loop, - name=f"{str(aref)}-{name}", - ) - else: - task = asyncio.Task( - self._execute_actor(coro, aref), - loop=self.loop, - ) + task = asyncio.Task(self._execute_actor(coro, aref), loop=self.loop) task._beacon = beacon # type: ignore aref.actor_task = task self._actors.add(aref) diff --git a/tests/unit/agents/test_agent.py b/tests/unit/agents/test_agent.py index ccb8b6900..f73a55f12 100644 --- a/tests/unit/agents/test_agent.py +++ b/tests/unit/agents/test_agent.py @@ -1,5 +1,4 @@ import asyncio -import sys import pytest from mode import SupervisorStrategy, label @@ -404,14 +403,7 @@ async def test_prepare_actor__AsyncIterable(self, *, agent): agent._slurp.assert_called() coro = agent._slurp() agent._execute_actor.assert_called_once_with(coro, aref) - if sys.version_info >= (3, 8): - Task.assert_called_once_with( - agent._execute_actor(), - loop=agent.loop, - name=f"{ret}-testid-tests.unit.agents.test_agent.myagent", - ) - else: - Task.assert_called_once_with(agent._execute_actor(), loop=agent.loop) + Task.assert_called_once_with(agent._execute_actor(), loop=agent.loop) task = Task() assert task._beacon is beacon assert aref.actor_task is task @@ -436,6 +428,24 @@ async def test_prepare_actor__Awaitable(self, *, agent2): assert aref in agent2._actors assert ret is aref + @pytest.mark.asyncio + async def test_prepare_actor__Awaitable_with_multiple_topics(self, *, agent2): + aref = agent2(index=0, active_partitions=None) + asyncio.ensure_future(aref.it).cancel() # silence warning + agent2.channel.topics = ["foo", "bar"] + with patch("asyncio.Task") as Task: + agent2._execute_actor = Mock(name="_execute_actor") + beacon = Mock(name="beacon", autospec=Node) + ret = await agent2._prepare_actor(aref, beacon) + coro = aref + agent2._execute_actor.assert_called_once_with(coro, aref) + Task.assert_called_once_with(agent2._execute_actor(), loop=agent2.loop) + task = Task() + assert task._beacon is beacon + assert aref.actor_task is task + assert aref in agent2._actors + assert ret is aref + @pytest.mark.asyncio async def test_prepare_actor__Awaitable_cannot_have_sinks(self, *, agent2): aref = agent2(index=0, active_partitions=None)