Skip to content

Commit

Permalink
fix #482: callback memory repo
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiruha01 committed Jul 29, 2024
1 parent 09a8cec commit 25f99a6
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 49 deletions.
9 changes: 7 additions & 2 deletions pybotx/bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@
BotXAPIUsersAsCSVRequestPayload,
UsersAsCSVMethod,
)
from pybotx.constants import BOTX_DEFAULT_TIMEOUT, STICKER_PACKS_PER_PAGE
from pybotx.constants import (
BOTX_DEFAULT_TIMEOUT,
STICKER_PACKS_PER_PAGE,
STRAY_CALLBACK_DEFAULT_TIMEOUT,
)
from pybotx.converters import optional_sequence_to_list
from pybotx.image_validators import (
ensure_file_content_is_png,
Expand Down Expand Up @@ -264,6 +268,7 @@ def __init__(
httpx_client: Optional[httpx.AsyncClient] = None,
exception_handlers: Optional[ExceptionHandlersDict] = None,
default_callback_timeout: float = BOTX_DEFAULT_TIMEOUT,
drop_stray_callbacks_timeout: float = STRAY_CALLBACK_DEFAULT_TIMEOUT,
callback_repo: Optional[CallbackRepoProto] = None,
) -> None:
if not collectors:
Expand All @@ -283,7 +288,7 @@ def __init__(
self._httpx_client = httpx_client or httpx.AsyncClient()

if not callback_repo:
callback_repo = CallbackMemoryRepo()
callback_repo = CallbackMemoryRepo(timeout=drop_stray_callbacks_timeout)

self._callbacks_manager = CallbackManager(callback_repo)

Expand Down
38 changes: 27 additions & 11 deletions pybotx/bot/callbacks/callback_memory_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,57 @@
from uuid import UUID

from pybotx.bot.callbacks.callback_repo_proto import CallbackRepoProto
from pybotx.bot.exceptions import BotShuttingDownError, BotXMethodCallbackNotFoundError
from pybotx.bot.exceptions import BotShuttingDownError
from pybotx.client.exceptions.callbacks import CallbackNotReceivedError
from pybotx.logger import logger
from pybotx.models.method_callbacks import BotXMethodCallback

if TYPE_CHECKING:
from asyncio import Future # noqa: WPS458


class CallbackMemoryRepo(CallbackRepoProto):
def __init__(self) -> None:
def __init__(self, timeout: float = 0) -> None:
self._callback_futures: Dict[UUID, "Future[BotXMethodCallback]"] = {}
self.timeout = timeout

async def create_botx_method_callback(self, sync_id: UUID) -> None:
self._callback_futures[sync_id] = asyncio.Future()
self._callback_futures.setdefault(sync_id, asyncio.Future())

async def set_botx_method_callback_result(
self,
callback: BotXMethodCallback,
) -> None:
sync_id = callback.sync_id

future = self._get_botx_method_callback(sync_id)
if sync_id not in self._callback_futures:
logger.warning(
f"Callback `{sync_id}` doesn't exist yet or already "
f"waited or timed out. Waiting for {self.timeout}s "
f"for it or will be ignored.",
)
self._callback_futures.setdefault(sync_id, asyncio.Future())
asyncio.create_task(self._wait_and_drop_stray_callback(sync_id))

future = self._callback_futures[sync_id]
future.set_result(callback)

async def wait_botx_method_callback(
self,
sync_id: UUID,
timeout: float,
) -> BotXMethodCallback:
future = self._get_botx_method_callback(sync_id)
future = self._callback_futures[sync_id]

try:
return await asyncio.wait_for(future, timeout=timeout)
result = await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError as exc:
del self._callback_futures[sync_id] # noqa: WPS420
raise CallbackNotReceivedError(sync_id) from exc

del self._callback_futures[sync_id] # noqa: WPS420
return result

async def pop_botx_method_callback(
self,
sync_id: UUID,
Expand All @@ -55,8 +69,10 @@ async def stop_callbacks_waiting(self) -> None:
),
)

def _get_botx_method_callback(self, sync_id: UUID) -> "Future[BotXMethodCallback]":
try:
return self._callback_futures[sync_id]
except KeyError:
raise BotXMethodCallbackNotFoundError(sync_id) from None
async def _wait_and_drop_stray_callback(self, sync_id: UUID) -> None:
await asyncio.sleep(self.timeout)
if sync_id not in self._callback_futures:
return

self._callback_futures.pop(sync_id, None)
logger.debug(f"Callback `{sync_id}` was dropped")
3 changes: 3 additions & 0 deletions pybotx/client/notifications_api/direct_notification.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Any, Dict, List, Literal, Optional, Union
from uuid import UUID

Expand Down Expand Up @@ -159,6 +160,8 @@ async def execute(
response,
)

await asyncio.sleep(2)

await self._process_callback(
api_model.result.sync_id,
wait_callback,
Expand Down
1 change: 1 addition & 0 deletions pybotx/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
MAX_NOTIFICATION_BODY_LENGTH: Final = 4096
MAX_FILE_LEN_IN_LOGS: Final = 64
BOTX_DEFAULT_TIMEOUT: Final = 60
STRAY_CALLBACK_DEFAULT_TIMEOUT: Final = 30
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pybotx"
version = "0.69.1"
version = "0.69.2"
description = "A python library for interacting with eXpress BotX API"
authors = [
"Sidnev Nikolay <[email protected]>",
Expand Down
149 changes: 114 additions & 35 deletions tests/client/test_botx_method_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,48 @@ async def call_foo_bar(

async def test__botx_method_callback__callback_not_found(
bot_account: BotAccountWithSecret,
loguru_caplog: pytest.LogCaptureFixture,
) -> None:
# - Arrange -
built_bot = Bot(collectors=[HandlerCollector()], bot_accounts=[bot_account])
memory_repo = CallbackMemoryRepo(timeout=0.5)
built_bot = Bot(
collectors=[HandlerCollector()],
bot_accounts=[bot_account],
callback_repo=memory_repo,
)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
with pytest.raises(BotXMethodCallbackNotFoundError) as exc:
await bot.set_raw_botx_method_result(
{
"status": "error",
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"reason": "chat_not_found",
"errors": [],
"error_data": {
"group_chat_id": "705df263-6bfd-536a-9d51-13524afaab5c",
"error_description": (
"Chat with id 705df263-6bfd-536a-9d51-13524afaab5c not found"
),
},
await bot.set_raw_botx_method_result(
{
"status": "error",
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"reason": "chat_not_found",
"errors": [],
"error_data": {
"group_chat_id": "705df263-6bfd-536a-9d51-13524afaab5c",
"error_description": (
"Chat with id 705df263-6bfd-536a-9d51-13524afaab5c not found"
),
},
verify_request=False,
)
},
verify_request=False,
)

# - Assert -
assert "Callback `21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3` doesn't exist" in str(
exc.value,
assert (
"Callback `21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3` doesn't exist"
in loguru_caplog.text
)
assert memory_repo._callback_futures.get(
UUID("21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3"),
)

await asyncio.sleep(0.7)
# Drop callback after timeout
assert (
memory_repo._callback_futures.get(UUID("21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3"))
is None
)


Expand Down Expand Up @@ -303,7 +319,12 @@ async def test__botx_method_callback__callback_received_after_timeout(
},
),
)
built_bot = Bot(collectors=[HandlerCollector()], bot_accounts=[bot_account])
memory_repo = CallbackMemoryRepo(timeout=0.5)
built_bot = Bot(
collectors=[HandlerCollector()],
bot_accounts=[bot_account],
callback_repo=memory_repo,
)

built_bot.call_foo_bar = types.MethodType(call_foo_bar, built_bot)

Expand All @@ -312,26 +333,28 @@ async def test__botx_method_callback__callback_received_after_timeout(
with pytest.raises(CallbackNotReceivedError) as not_received_exc:
await bot.call_foo_bar(bot_id, baz=1, callback_timeout=0)

with pytest.raises(BotXMethodCallbackNotFoundError) as not_found_exc:
await bot.set_raw_botx_method_result(
{
"status": "error",
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"reason": "quux_error",
"errors": [],
"error_data": {
"group_chat_id": "705df263-6bfd-536a-9d51-13524afaab5c",
"error_description": (
"Chat with id 705df263-6bfd-536a-9d51-13524afaab5c not found"
),
},
await bot.set_raw_botx_method_result(
{
"status": "error",
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"reason": "quux_error",
"errors": [],
"error_data": {
"group_chat_id": "705df263-6bfd-536a-9d51-13524afaab5c",
"error_description": (
"Chat with id 705df263-6bfd-536a-9d51-13524afaab5c not found"
),
},
verify_request=False,
)
},
verify_request=False,
)

# - Assert -
assert "hasn't been received" in str(not_received_exc.value)
assert "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3" in str(not_found_exc.value)
assert (
"Callback `21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3` doesn't exist"
in loguru_caplog.text
)
assert endpoint.called


Expand Down Expand Up @@ -611,6 +634,62 @@ async def test__botx_method_callback__bot_wait_callback_after_its_receiving(
assert endpoint.called


async def test__botx_method_callback__callback_received_before_its_expecting(
respx_mock: MockRouter,
httpx_client: httpx.AsyncClient,
host: str,
bot_id: UUID,
bot_account: BotAccountWithSecret,
) -> None:
"""https://github.com/ExpressApp/pybotx/issues/482."""
# - Arrange -
endpoint = respx_mock.post(
f"https://{host}/foo/bar",
json={"baz": 1},
headers={"Content-Type": "application/json"},
).mock(
return_value=httpx.Response(
HTTPStatus.ACCEPTED,
json={
"status": "ok",
"result": {"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3"},
},
),
)
built_bot = Bot(
collectors=[HandlerCollector()],
bot_accounts=[bot_account],
httpx_client=httpx_client,
callback_repo=CallbackMemoryRepo(timeout=0.5),
)

built_bot.call_foo_bar = types.MethodType(call_foo_bar, built_bot)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
await bot.set_raw_botx_method_result(
{
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"status": "ok",
"result": {},
},
verify_request=False,
)
foo_bar = await bot.call_foo_bar(bot_id, baz=1, wait_callback=False)

callback = await bot.wait_botx_method_callback(foo_bar)

await asyncio.sleep(1)

# - Assert -
assert callback == BotAPIMethodSuccessfulCallback(
sync_id=foo_bar,
status="ok",
result={},
)
assert endpoint.called


async def test__botx_method_callback__bot_dont_wait_received_callback(
respx_mock: MockRouter,
httpx_client: httpx.AsyncClient,
Expand Down

0 comments on commit 25f99a6

Please sign in to comment.