diff --git a/src/a2a/server/apps/jsonrpc/jsonrpc_app.py b/src/a2a/server/apps/jsonrpc/jsonrpc_app.py index ff6998cd6..6204eb564 100644 --- a/src/a2a/server/apps/jsonrpc/jsonrpc_app.py +++ b/src/a2a/server/apps/jsonrpc/jsonrpc_app.py @@ -39,6 +39,8 @@ SetTaskPushNotificationConfigRequest, TaskResubscriptionRequest, UnsupportedOperationError, + ListTaskPushNotificationConfigRequest, + DeleteTaskPushNotificationConfigRequest ) from a2a.utils.errors import MethodNotImplementedError @@ -297,12 +299,22 @@ async def _process_non_streaming_request( request_obj, context ) case SetTaskPushNotificationConfigRequest(): - handler_result = await self.handler.set_push_notification( + handler_result = await self.handler.set_push_notification_config( request_obj, context, ) case GetTaskPushNotificationConfigRequest(): - handler_result = await self.handler.get_push_notification( + handler_result = await self.handler.get_push_notification_config( + request_obj, + context, + ) + case ListTaskPushNotificationConfigRequest(): + handler_result = await self.handler.list_push_notification_config( + request_obj, + context, + ) + case DeleteTaskPushNotificationConfigRequest(): + handler_result = await self.handler.delete_push_notification_config( request_obj, context, ) diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index ed4361b55..adb0dd760 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -39,6 +39,10 @@ TaskPushNotificationConfig, TaskQueryParams, UnsupportedOperationError, + ListTaskPushNotificationConfigParams, + DeleteTaskPushNotificationConfigParams, + DeleteTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigSuccessResponse ) from a2a.utils.errors import ServerError from a2a.utils.telemetry import SpanKind, trace_class @@ -393,11 +397,11 @@ async def on_get_task_push_notification_config( raise ServerError(error=TaskNotFoundError()) push_notification_config = await self._push_config_store.get_info(params.id) - if not push_notification_config: + if not push_notification_config or not push_notification_config[0]: raise ServerError(error=InternalError()) return TaskPushNotificationConfig( - taskId=params.id, pushNotificationConfig=push_notification_config + taskId=params.id, pushNotificationConfig=push_notification_config[0] ) async def on_resubscribe_to_task( @@ -431,6 +435,51 @@ async def on_resubscribe_to_task( async for event in result_aggregator.consume_and_emit(consumer): yield event + async def on_list_task_push_notification_config( + self, + params: ListTaskPushNotificationConfigParams, + context: ServerCallContext | None = None, + ) -> list[TaskPushNotificationConfig]: + """Default handler for 'tasks/pushNotificationConfig/list'. + + Requires a `PushConfigStore` to be configured. + """ + if not self._push_config_store: + raise ServerError(error=UnsupportedOperationError()) + + task: Task | None = await self.task_store.get(params.id) + if not task: + raise ServerError(error=TaskNotFoundError()) + + push_notification_config_list = await self._push_config_store.get_info(params.id) + + task_push_notification_config = [] + if push_notification_config_list: + for config in push_notification_config_list: + task_push_notification_config.append(TaskPushNotificationConfig( + taskId=params.id, pushNotificationConfig=config + )) + + return task_push_notification_config + + async def on_delete_task_push_notification_config( + self, + params: DeleteTaskPushNotificationConfigParams, + context: ServerCallContext | None = None, + ) -> None: + """Default handler for 'tasks/pushNotificationConfig/delete'. + + Requires a `PushConfigStore` to be configured. + """ + if not self._push_config_store: + raise ServerError(error=UnsupportedOperationError()) + + task: Task | None = await self.task_store.get(params.id) + if not task: + raise ServerError(error=TaskNotFoundError()) + + await self._push_config_store.delete_info(params.id, params.pushNotificationConfigId) + def should_add_push_info(self, params: MessageSendParams) -> bool: """Determines if push notification info should be set for a task.""" return bool( diff --git a/src/a2a/server/request_handlers/jsonrpc_handler.py b/src/a2a/server/request_handlers/jsonrpc_handler.py index 13d2854b8..783cf9a51 100644 --- a/src/a2a/server/request_handlers/jsonrpc_handler.py +++ b/src/a2a/server/request_handlers/jsonrpc_handler.py @@ -34,6 +34,12 @@ TaskPushNotificationConfig, TaskResubscriptionRequest, TaskStatusUpdateEvent, + ListTaskPushNotificationConfigRequest, + ListTaskPushNotificationConfigResponse, + ListTaskPushNotificationConfigSuccessResponse, + DeleteTaskPushNotificationConfigRequest, + DeleteTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigSuccessResponse ) from a2a.utils.errors import ServerError from a2a.utils.helpers import validate @@ -214,7 +220,7 @@ async def on_resubscribe_to_task( ) ) - async def get_push_notification( + async def get_push_notification_config( self, request: GetTaskPushNotificationConfigRequest, context: ServerCallContext | None = None, @@ -252,7 +258,7 @@ async def get_push_notification( lambda self: self.agent_card.capabilities.pushNotifications, 'Push notifications are not supported by the agent', ) - async def set_push_notification( + async def set_push_notification_config( self, request: SetTaskPushNotificationConfigRequest, context: ServerCallContext | None = None, @@ -325,3 +331,69 @@ async def on_get_task( id=request.id, error=e.error if e.error else InternalError() ) ) + + async def list_push_notification_config( + self, + request: ListTaskPushNotificationConfigRequest, + context: ServerCallContext | None = None, + ) -> ListTaskPushNotificationConfigResponse: + """Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method. + + Args: + request: The incoming `ListTaskPushNotificationConfigRequest` object. + context: Context provided by the server. + + Returns: + A `ListTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error. + """ + try: + config = ( + await self.request_handler.on_list_task_push_notification_config( + request.params, context + ) + ) + return prepare_response_object( + request.id, + config, + (list,), + ListTaskPushNotificationConfigSuccessResponse, + ListTaskPushNotificationConfigResponse, + ) + except ServerError as e: + return ListTaskPushNotificationConfigResponse( + root=JSONRPCErrorResponse( + id=request.id, error=e.error if e.error else InternalError() + ) + ) + + async def delete_push_notification_config( + self, + request: DeleteTaskPushNotificationConfigRequest, + context: ServerCallContext | None = None, + ) -> DeleteTaskPushNotificationConfigResponse: + """Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method. + + Args: + request: The incoming `DeleteTaskPushNotificationConfigRequest` object. + context: Context provided by the server. + + Returns: + A `DeleteTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error. + """ + try: + ( + await self.request_handler.on_delete_task_push_notification_config( + request.params, context + ) + ) + return DeleteTaskPushNotificationConfigResponse( + root=DeleteTaskPushNotificationConfigSuccessResponse( + id=request.id, result=None + ) + ) + except ServerError as e: + return DeleteTaskPushNotificationConfigResponse( + root=JSONRPCErrorResponse( + id=request.id, error=e.error if e.error else InternalError() + ) + ) diff --git a/src/a2a/server/request_handlers/request_handler.py b/src/a2a/server/request_handlers/request_handler.py index 3693d8b6a..c72279ed7 100644 --- a/src/a2a/server/request_handlers/request_handler.py +++ b/src/a2a/server/request_handlers/request_handler.py @@ -12,6 +12,8 @@ TaskPushNotificationConfig, TaskQueryParams, UnsupportedOperationError, + ListTaskPushNotificationConfigParams, + DeleteTaskPushNotificationConfigParams ) from a2a.utils.errors import ServerError @@ -160,3 +162,39 @@ async def on_resubscribe_to_task( """ raise ServerError(error=UnsupportedOperationError()) yield + + @abstractmethod + async def on_list_task_push_notification_config( + self, + params: ListTaskPushNotificationConfigParams, + context: ServerCallContext | None = None, + ) -> list[TaskPushNotificationConfig]: + """Handles the 'tasks/pushNotificationConfig/list' method. + + Retrieves the current push notification configurations for a task. + + Args: + params: Parameters including the task ID. + context: Context provided by the server. + + Returns: + The `list[TaskPushNotificationConfig]` for the task. + """ + + @abstractmethod + async def on_delete_task_push_notification_config( + self, + params: DeleteTaskPushNotificationConfigParams, + context: ServerCallContext | None = None, + ) -> None: + """Handles the 'tasks/pushNotificationConfig/delete' method. + + Deletes a push notification configuration associated with a task. + + Args: + params: Parameters including the task ID. + context: Context provided by the server. + + Returns: + None + """ diff --git a/src/a2a/server/request_handlers/response_helpers.py b/src/a2a/server/request_handlers/response_helpers.py index b4e48ad9a..cfa09410f 100644 --- a/src/a2a/server/request_handlers/response_helpers.py +++ b/src/a2a/server/request_handlers/response_helpers.py @@ -25,6 +25,10 @@ TaskArtifactUpdateEvent, TaskPushNotificationConfig, TaskStatusUpdateEvent, + ListTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigResponse, + ListTaskPushNotificationConfigSuccessResponse, + DeleteTaskPushNotificationConfigSuccessResponse ) @@ -36,6 +40,8 @@ SetTaskPushNotificationConfigResponse, GetTaskPushNotificationConfigResponse, SendStreamingMessageResponse, + ListTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigResponse ) """Type variable for RootModel response types.""" @@ -48,6 +54,8 @@ SetTaskPushNotificationConfigSuccessResponse, GetTaskPushNotificationConfigSuccessResponse, SendStreamingMessageSuccessResponse, + ListTaskPushNotificationConfigSuccessResponse, + DeleteTaskPushNotificationConfigSuccessResponse ) """Type variable for SuccessResponse types.""" @@ -60,6 +68,7 @@ | TaskPushNotificationConfig | A2AError | JSONRPCError + | list[TaskPushNotificationConfig] ) """Type alias for possible event types produced by handlers.""" diff --git a/src/a2a/server/tasks/base_push_notification_sender.py b/src/a2a/server/tasks/base_push_notification_sender.py index ed8f744fa..b74cc54d8 100644 --- a/src/a2a/server/tasks/base_push_notification_sender.py +++ b/src/a2a/server/tasks/base_push_notification_sender.py @@ -6,7 +6,7 @@ PushNotificationConfigStore, ) from a2a.server.tasks.push_notification_sender import PushNotificationSender -from a2a.types import Task +from a2a.types import Task, PushNotificationConfig logger = logging.getLogger(__name__) @@ -27,11 +27,15 @@ def __init__(self, httpx_client: httpx.AsyncClient, config_store: PushNotificati async def send_notification(self, task: Task) -> None: """Sends a push notification for a task if configuration exists.""" - push_info = await self._config_store.get_info(task.id) - if not push_info: + push_configs = await self._config_store.get_info(task.id) + if not push_configs: return - url = push_info.url + + for push_info in push_configs: + await self._dispatch_notification(task, push_info) + async def _dispatch_notification(self, task: Task, push_info: PushNotificationConfig) -> None: + url = push_info.url try: response = await self._client.post( url, json=task.model_dump(mode='json', exclude_none=True) diff --git a/src/a2a/server/tasks/inmemory_push_notification_config_store.py b/src/a2a/server/tasks/inmemory_push_notification_config_store.py index a4b107527..d29110099 100644 --- a/src/a2a/server/tasks/inmemory_push_notification_config_store.py +++ b/src/a2a/server/tasks/inmemory_push_notification_config_store.py @@ -13,34 +13,49 @@ class InMemoryPushNotificationConfigStore(PushNotificationConfigStore): """In-memory implementation of PushNotificationConfigStore interface. - Stores push notification configurations in memory and uses an httpx client - to send notifications. + Stores push notification configurations in memory """ def __init__(self) -> None: - """Initializes the InMemoryPushNotifier. - - Args: - httpx_client: An async HTTP client instance to send notifications. - """ + """Initializes the InMemoryPushNotificationConfigStore.""" self.lock = asyncio.Lock() - self._push_notification_infos: dict[str, PushNotificationConfig] = {} + self._push_notification_infos: dict[str, list[PushNotificationConfig]] = {} async def set_info( self, task_id: str, notification_config: PushNotificationConfig ) -> None: """Sets or updates the push notification configuration for a task in memory.""" async with self.lock: - self._push_notification_infos[task_id] = notification_config + if task_id not in self._push_notification_infos: + self._push_notification_infos[task_id] = [] + + if notification_config.id is None: + notification_config.id = task_id + + for config in self._push_notification_infos[task_id]: + if config.id == notification_config.id: + self._push_notification_infos[task_id].remove(config) + break - async def get_info(self, task_id: str) -> PushNotificationConfig | None: - """Retrieves the push notification configuration for a task from memory.""" - async with self.lock: - return self._push_notification_infos.get(task_id) + self._push_notification_infos[task_id].append(notification_config) + async def get_info(self, task_id: str) -> list[PushNotificationConfig]: + """Retrieves the push notification configuration for a task from memory.""" + async with self.lock: + return self._push_notification_infos.get(task_id) or [] - async def delete_info(self, task_id: str) -> None: + async def delete_info(self, task_id: str, config_id: str | None = None) -> None: """Deletes the push notification configuration for a task from memory.""" async with self.lock: + if config_id is None: + config_id = task_id + if task_id in self._push_notification_infos: - del self._push_notification_infos[task_id] + configurations = self._push_notification_infos[task_id] + if not configurations: + return + + for config in configurations: + if config.id == config_id: + configurations.remove(config) + break diff --git a/src/a2a/server/tasks/inmemory_push_notifier.py b/src/a2a/server/tasks/inmemory_push_notifier.py deleted file mode 100644 index 058b18c00..000000000 --- a/src/a2a/server/tasks/inmemory_push_notifier.py +++ /dev/null @@ -1,62 +0,0 @@ -import asyncio -import logging - -import httpx - -from a2a.server.tasks.push_notifier import PushNotifier -from a2a.types import PushNotificationConfig, Task - - -logger = logging.getLogger(__name__) - - -class InMemoryPushNotifier(PushNotifier): - """In-memory implementation of PushNotifier interface. - - Stores push notification configurations in memory and uses an httpx client - to send notifications. - """ - - def __init__(self, httpx_client: httpx.AsyncClient) -> None: - """Initializes the InMemoryPushNotifier. - - Args: - httpx_client: An async HTTP client instance to send notifications. - """ - self._client = httpx_client - self.lock = asyncio.Lock() - self._push_notification_infos: dict[str, PushNotificationConfig] = {} - - async def set_info( - self, task_id: str, notification_config: PushNotificationConfig - ) -> None: - """Sets or updates the push notification configuration for a task in memory.""" - async with self.lock: - self._push_notification_infos[task_id] = notification_config - - async def get_info(self, task_id: str) -> PushNotificationConfig | None: - """Retrieves the push notification configuration for a task from memory.""" - async with self.lock: - return self._push_notification_infos.get(task_id) - - async def delete_info(self, task_id: str) -> None: - """Deletes the push notification configuration for a task from memory.""" - async with self.lock: - if task_id in self._push_notification_infos: - del self._push_notification_infos[task_id] - - async def send_notification(self, task: Task) -> None: - """Sends a push notification for a task if configuration exists.""" - push_info = await self.get_info(task.id) - if not push_info: - return - url = push_info.url - - try: - response = await self._client.post( - url, json=task.model_dump(mode='json', exclude_none=True) - ) - response.raise_for_status() - logger.info(f'Push-notification sent for URL: {url}') - except Exception as e: - logger.error(f'Error sending push-notification: {e}') diff --git a/src/a2a/server/tasks/push_notification_config_store.py b/src/a2a/server/tasks/push_notification_config_store.py index 85ea9191d..dd93791f5 100644 --- a/src/a2a/server/tasks/push_notification_config_store.py +++ b/src/a2a/server/tasks/push_notification_config_store.py @@ -11,9 +11,9 @@ async def set_info(self, task_id: str, notification_config: PushNotificationConf """Sets or updates the push notification configuration for a task.""" @abstractmethod - async def get_info(self, task_id: str) -> PushNotificationConfig | None: + async def get_info(self, task_id: str) -> list[PushNotificationConfig]: """Retrieves the push notification configuration for a task.""" @abstractmethod - async def delete_info(self, task_id: str) -> None: + async def delete_info(self, task_id: str, config_id: str | None = None) -> None: """Deletes the push notification configuration for a task.""" diff --git a/src/a2a/server/tasks/push_notifier.py b/src/a2a/server/tasks/push_notifier.py deleted file mode 100644 index 6a9040fdd..000000000 --- a/src/a2a/server/tasks/push_notifier.py +++ /dev/null @@ -1,25 +0,0 @@ -from abc import ABC, abstractmethod - -from a2a.types import PushNotificationConfig, Task - - -class PushNotifier(ABC): - """PushNotifier interface to store, retrieve push notification for tasks and send push notifications.""" - - @abstractmethod - async def set_info( - self, task_id: str, notification_config: PushNotificationConfig - ) -> None: - """Sets or updates the push notification configuration for a task.""" - - @abstractmethod - async def get_info(self, task_id: str) -> PushNotificationConfig | None: - """Retrieves the push notification configuration for a task.""" - - @abstractmethod - async def delete_info(self, task_id: str) -> None: - """Deletes the push notification configuration for a task.""" - - @abstractmethod - async def send_notification(self, task: Task) -> None: - """Sends a push notification containing the latest task state.""" diff --git a/tests/server/request_handlers/test_default_request_handler.py b/tests/server/request_handlers/test_default_request_handler.py index feaf5ec27..c3c886805 100644 --- a/tests/server/request_handlers/test_default_request_handler.py +++ b/tests/server/request_handlers/test_default_request_handler.py @@ -25,7 +25,8 @@ TaskStore, TaskUpdater, PushNotificationConfigStore, - PushNotificationSender + PushNotificationSender, + InMemoryPushNotificationConfigStore ) from a2a.types import ( @@ -45,7 +46,9 @@ TaskStatus, TextPart, UnsupportedOperationError, - GetTaskPushNotificationConfigParams + GetTaskPushNotificationConfigParams, + ListTaskPushNotificationConfigParams, + DeleteTaskPushNotificationConfigParams ) @@ -932,7 +935,7 @@ async def dummy_coro_for_task(): @pytest.mark.asyncio async def test_set_task_push_notification_config_no_notifier(): - """Test on_set_task_push_notification_config when _push_notifier is None.""" + """Test on_set_task_push_notification_config when _push_config_store is None.""" request_handler = DefaultRequestHandler( agent_executor=DummyAgentExecutor(), task_store=AsyncMock(spec=TaskStore), @@ -982,8 +985,8 @@ async def test_set_task_push_notification_config_task_not_found(): @pytest.mark.asyncio -async def test_get_task_push_notification_config_no_notifier(): - """Test on_get_task_push_notification_config when _push_notifier is None.""" +async def test_get_task_push_notification_config_no_store(): + """Test on_get_task_push_notification_config when _push_config_store is None.""" request_handler = DefaultRequestHandler( agent_executor=DummyAgentExecutor(), task_store=AsyncMock(spec=TaskStore), @@ -1026,7 +1029,7 @@ async def test_get_task_push_notification_config_task_not_found(): @pytest.mark.asyncio async def test_get_task_push_notification_config_info_not_found(): - """Test on_get_task_push_notification_config when push_notifier.get_info returns None.""" + """Test on_get_task_push_notification_config when push_config_store.get_info returns None.""" mock_task_store = AsyncMock(spec=TaskStore) sample_task = create_sample_task(task_id='task_info_not_found') @@ -1146,3 +1149,281 @@ async def consume_stream(): texts = [p.root.text for e in events for p in e.status.message.parts] assert texts == ['Event 0', 'Event 1', 'Event 2'] + +@pytest.mark.asyncio +async def test_list_task_push_notification_config_no_store(): + """Test on_list_task_push_notification_config when _push_config_store is None.""" + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=AsyncMock(spec=TaskStore), + push_config_store=None, # Explicitly None + ) + params = ListTaskPushNotificationConfigParams(id='task1') + from a2a.utils.errors import ServerError # Local import + + with pytest.raises(ServerError) as exc_info: + await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + assert isinstance(exc_info.value.error, UnsupportedOperationError) + + +@pytest.mark.asyncio +async def test_list_task_push_notification_config_task_not_found(): + """Test on_list_task_push_notification_config when task is not found.""" + mock_task_store = AsyncMock(spec=TaskStore) + mock_task_store.get.return_value = None # Task not found + mock_push_store = AsyncMock(spec=PushNotificationConfigStore) + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=mock_push_store, + ) + params = ListTaskPushNotificationConfigParams(id='non_existent_task') + from a2a.utils.errors import ServerError # Local import + + with pytest.raises(ServerError) as exc_info: + await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + + assert isinstance(exc_info.value.error, TaskNotFoundError) + mock_task_store.get.assert_awaited_once_with('non_existent_task') + mock_push_store.get_info.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_list_no_task_push_notification_config_info(): + """Test on_get_task_push_notification_config when push_config_store.get_info returns []""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='task_info_not_found') + mock_task_store.get.return_value = sample_task + + push_store = InMemoryPushNotificationConfigStore() + + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = ListTaskPushNotificationConfigParams(id='task_info_not_found') + + result = await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + assert result == [] + +@pytest.mark.asyncio +async def test_list_task_push_notification_config_info_with_config(): + """Test on_list_task_push_notification_config with push config+id""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='task_info_not_found') + mock_task_store.get.return_value = sample_task + + push_config1 = PushNotificationConfig(id='config_1', url='http://example.com') + push_config2 = PushNotificationConfig(id='config_2', url='http://example.com') + + push_store = InMemoryPushNotificationConfigStore() + await push_store.set_info('task_1', push_config1) + await push_store.set_info('task_1', push_config2) + + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = ListTaskPushNotificationConfigParams(id='task_1') + + result: list[TaskPushNotificationConfig] = await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + + assert len(result) == 2 + assert result[0].taskId == 'task_1' + assert result[0].pushNotificationConfig == push_config1 + assert result[1].taskId == 'task_1' + assert result[1].pushNotificationConfig == push_config2 + +@pytest.mark.asyncio +async def test_list_task_push_notification_config_info_with_config_and_no_id(): + """Test on_list_task_push_notification_config with no push config id""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='task_info_not_found') + mock_task_store.get.return_value = sample_task + + push_config = PushNotificationConfig(url='http://example.com') + + # insertion without id should replace the existing config + push_store = InMemoryPushNotificationConfigStore() + await push_store.set_info('task_1', push_config) + await push_store.set_info('task_1', push_config) + + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = ListTaskPushNotificationConfigParams(id='task_1') + + result: list[TaskPushNotificationConfig] = await request_handler.on_list_task_push_notification_config( + params, create_server_call_context() + ) + + assert len(result) == 1 + assert result[0].taskId == 'task_1' + assert result[0].pushNotificationConfig.url == push_config.url + assert result[0].pushNotificationConfig.id == 'task_1' + +@pytest.mark.asyncio +async def test_delete_task_push_notification_config_no_store(): + """Test on_delete_task_push_notification_config when _push_config_store is None.""" + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=AsyncMock(spec=TaskStore), + push_config_store=None, # Explicitly None + ) + params = DeleteTaskPushNotificationConfigParams(id='task1', pushNotificationConfigId='config1') + from a2a.utils.errors import ServerError # Local import + + with pytest.raises(ServerError) as exc_info: + await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + assert isinstance(exc_info.value.error, UnsupportedOperationError) + +@pytest.mark.asyncio +async def test_delete_task_push_notification_config_task_not_found(): + """Test on_delete_task_push_notification_config when task is not found.""" + mock_task_store = AsyncMock(spec=TaskStore) + mock_task_store.get.return_value = None # Task not found + mock_push_store = AsyncMock(spec=PushNotificationConfigStore) + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=mock_push_store, + ) + params = DeleteTaskPushNotificationConfigParams(id='non_existent_task', pushNotificationConfigId='config1') + from a2a.utils.errors import ServerError # Local import + + with pytest.raises(ServerError) as exc_info: + await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + + assert isinstance(exc_info.value.error, TaskNotFoundError) + mock_task_store.get.assert_awaited_once_with('non_existent_task') + mock_push_store.get_info.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_delete_no_task_push_notification_config_info(): + """Test on_delete_task_push_notification_config without config info""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='task_1') + mock_task_store.get.return_value = sample_task + + push_store = InMemoryPushNotificationConfigStore() + await push_store.set_info('task_2', PushNotificationConfig(id="config_1", url='http://example.com')) + + + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = DeleteTaskPushNotificationConfigParams(id='task1', pushNotificationConfigId='config_non_existant') + + result = await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + assert result == None + + params = DeleteTaskPushNotificationConfigParams(id='task2', pushNotificationConfigId='config_non_existant') + + result = await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + assert result == None + +@pytest.mark.asyncio +async def test_delete_task_push_notification_config_info_with_config(): + """Test on_list_task_push_notification_config with push config+id""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='task_info_not_found') + mock_task_store.get.return_value = sample_task + + push_config1 = PushNotificationConfig(id='config_1', url='http://example.com') + push_config2 = PushNotificationConfig(id='config_2', url='http://example.com') + + push_store = InMemoryPushNotificationConfigStore() + await push_store.set_info('task_1', push_config1) + await push_store.set_info('task_1', push_config2) + await push_store.set_info('task_2', push_config1) + + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = DeleteTaskPushNotificationConfigParams(id='task_1', pushNotificationConfigId='config_1') + + result1 = await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + + assert result1 == None + + result2 = await request_handler.on_list_task_push_notification_config( + ListTaskPushNotificationConfigParams(id='task_1'), create_server_call_context() + ) + + assert len(result2) == 1 + assert result2[0].taskId == 'task_1' + assert result2[0].pushNotificationConfig == push_config2 + +@pytest.mark.asyncio +async def test_delete_task_push_notification_config_info_with_config_and_no_id(): + """Test on_list_task_push_notification_config with no push config id""" + mock_task_store = AsyncMock(spec=TaskStore) + + sample_task = create_sample_task(task_id='task_info_not_found') + mock_task_store.get.return_value = sample_task + + push_config = PushNotificationConfig(url='http://example.com') + + # insertion without id should replace the existing config + push_store = InMemoryPushNotificationConfigStore() + await push_store.set_info('task_1', push_config) + await push_store.set_info('task_1', push_config) + + + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + push_config_store=push_store, + ) + params = DeleteTaskPushNotificationConfigParams(id='task_1', pushNotificationConfigId='task_1') + + result = await request_handler.on_delete_task_push_notification_config( + params, create_server_call_context() + ) + + assert result == None + + result2 = await request_handler.on_list_task_push_notification_config( + ListTaskPushNotificationConfigParams(id='task_1'), create_server_call_context() + ) + + assert len(result2) == 0 \ No newline at end of file diff --git a/tests/server/request_handlers/test_jsonrpc_handler.py b/tests/server/request_handlers/test_jsonrpc_handler.py index 8fda9b1aa..de6c1453d 100644 --- a/tests/server/request_handlers/test_jsonrpc_handler.py +++ b/tests/server/request_handlers/test_jsonrpc_handler.py @@ -55,7 +55,15 @@ TaskStatusUpdateEvent, TextPart, UnsupportedOperationError, - GetTaskPushNotificationConfigParams + GetTaskPushNotificationConfigParams, + ListTaskPushNotificationConfigRequest, + ListTaskPushNotificationConfigResponse, + ListTaskPushNotificationConfigSuccessResponse, + ListTaskPushNotificationConfigParams, + DeleteTaskPushNotificationConfigParams, + DeleteTaskPushNotificationConfigRequest, + DeleteTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigSuccessResponse, ) from a2a.utils.errors import ServerError @@ -451,7 +459,7 @@ async def test_set_push_notification_success(self) -> None: id='1', params=task_push_config ) response: SetTaskPushNotificationConfigResponse = ( - await handler.set_push_notification(request) + await handler.set_push_notification_config(request) ) self.assertIsInstance( response.root, SetTaskPushNotificationConfigSuccessResponse @@ -483,7 +491,7 @@ async def test_get_push_notification_success(self) -> None: request = SetTaskPushNotificationConfigRequest( id='1', params=task_push_config ) - await handler.set_push_notification(request) + await handler.set_push_notification_config(request) get_request: GetTaskPushNotificationConfigRequest = ( GetTaskPushNotificationConfigRequest( @@ -491,7 +499,7 @@ async def test_get_push_notification_success(self) -> None: ) ) get_response: GetTaskPushNotificationConfigResponse = ( - await handler.get_push_notification(get_request) + await handler.get_push_notification_config(get_request) ) self.assertIsInstance( get_response.root, GetTaskPushNotificationConfigSuccessResponse @@ -747,14 +755,14 @@ async def test_push_notifications_not_supported_error(self) -> None: # Should raise ServerError about push notifications not supported with self.assertRaises(ServerError) as context: - await handler.set_push_notification(request) + await handler.set_push_notification_config(request) self.assertEqual( str(context.exception.error.message), # type: ignore 'Push notifications are not supported by the agent', ) - async def test_on_get_push_notification_no_push_notifier(self) -> None: + async def test_on_get_push_notification_no_push_config_store(self) -> None: """Test get_push_notification with no push notifier configured.""" # Arrange mock_agent_executor = AsyncMock(spec=AgentExecutor) @@ -775,13 +783,13 @@ async def test_on_get_push_notification_no_push_notifier(self) -> None: get_request = GetTaskPushNotificationConfigRequest( id='1', params=TaskIdParams(id=mock_task.id) ) - response = await handler.get_push_notification(get_request) + response = await handler.get_push_notification_config(get_request) # Assert self.assertIsInstance(response.root, JSONRPCErrorResponse) self.assertEqual(response.root.error, UnsupportedOperationError()) # type: ignore - async def test_on_set_push_notification_no_push_notifier(self) -> None: + async def test_on_set_push_notification_no_push_config_store(self) -> None: """Test set_push_notification with no push notifier configured.""" # Arrange mock_agent_executor = AsyncMock(spec=AgentExecutor) @@ -808,7 +816,7 @@ async def test_on_set_push_notification_no_push_notifier(self) -> None: request = SetTaskPushNotificationConfigRequest( id='1', params=task_push_config ) - response = await handler.set_push_notification(request) + response = await handler.set_push_notification_config(request) # Assert self.assertIsInstance(response.root, JSONRPCErrorResponse) @@ -1013,3 +1021,121 @@ async def streaming_coro(): collected_events[0].root, JSONRPCErrorResponse ) self.assertIsInstance(collected_events[0].root.error, InternalError) + + async def test_on_get_push_notification(self) -> None: + """Test get_push_notification_config handling""" + mock_task_store = AsyncMock(spec=TaskStore) + + mock_task = Task(**MINIMAL_TASK) + mock_task_store.get.return_value = mock_task + + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + task_push_config = TaskPushNotificationConfig(taskId=mock_task.id, pushNotificationConfig=PushNotificationConfig(id="config1", url='http://example.com')) + request_handler.on_get_task_push_notification_config.return_value = task_push_config + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + list_request = GetTaskPushNotificationConfigRequest( + id='1', params=GetTaskPushNotificationConfigParams(id=mock_task.id, pushNotificationConfigId="config1") + ) + response = await handler.get_push_notification_config(list_request) + # Assert + self.assertIsInstance(response.root, GetTaskPushNotificationConfigSuccessResponse) + self.assertEqual(response.root.result, task_push_config) # type: ignore + + async def test_on_list_push_notification(self) -> None: + """Test list_push_notification_config handling""" + mock_task_store = AsyncMock(spec=TaskStore) + + mock_task = Task(**MINIMAL_TASK) + mock_task_store.get.return_value = mock_task + + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + task_push_config = TaskPushNotificationConfig(taskId=mock_task.id, pushNotificationConfig=PushNotificationConfig(url='http://example.com')) + request_handler.on_list_task_push_notification_config.return_value = [task_push_config] + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + list_request = ListTaskPushNotificationConfigRequest( + id='1', params=ListTaskPushNotificationConfigParams(id=mock_task.id) + ) + response = await handler.list_push_notification_config(list_request) + # Assert + self.assertIsInstance(response.root, ListTaskPushNotificationConfigSuccessResponse) + self.assertEqual(response.root.result, [task_push_config]) # type: ignore + + async def test_on_list_push_notification_error(self) -> None: + """Test list_push_notification_config handling""" + mock_task_store = AsyncMock(spec=TaskStore) + + mock_task = Task(**MINIMAL_TASK) + mock_task_store.get.return_value = mock_task + + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + task_push_config = TaskPushNotificationConfig(taskId=mock_task.id, pushNotificationConfig=PushNotificationConfig(url='http://example.com')) + # throw server error + request_handler.on_list_task_push_notification_config.side_effect = ServerError(InternalError()) + + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + list_request = ListTaskPushNotificationConfigRequest( + id='1', params=ListTaskPushNotificationConfigParams(id=mock_task.id) + ) + response = await handler.list_push_notification_config(list_request) + # Assert + self.assertIsInstance(response.root, JSONRPCErrorResponse) + self.assertEqual(response.root.error, InternalError()) # type: ignore + + async def test_on_delete_push_notification(self) -> None: + """Test delete_push_notification_config handling""" + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + request_handler.on_delete_task_push_notification_config.return_value = None + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + delete_request = DeleteTaskPushNotificationConfigRequest( + id='1', params=DeleteTaskPushNotificationConfigParams(id="task1", pushNotificationConfigId="config1") + ) + response = await handler.delete_push_notification_config(delete_request) + # Assert + self.assertIsInstance(response.root, DeleteTaskPushNotificationConfigSuccessResponse) + self.assertEqual(response.root.result, None) # type: ignore + + async def test_on_delete_push_notification_error(self) -> None: + """Test delete_push_notification_config error handling""" + + + # Create request handler without a push notifier + request_handler = AsyncMock(spec=DefaultRequestHandler) + # throw server error + request_handler.on_delete_task_push_notification_config.side_effect = ServerError(UnsupportedOperationError()) + + + self.mock_agent_card.capabilities = AgentCapabilities( + pushNotifications=True + ) + handler = JSONRPCHandler(self.mock_agent_card, request_handler) + delete_request = DeleteTaskPushNotificationConfigRequest( + id='1', params=DeleteTaskPushNotificationConfigParams(id="task1", pushNotificationConfigId="config1") + ) + response = await handler.delete_push_notification_config(delete_request) + # Assert + self.assertIsInstance(response.root, JSONRPCErrorResponse) + self.assertEqual(response.root.error, UnsupportedOperationError()) # type: ignore \ No newline at end of file diff --git a/tests/server/tasks/test_inmemory_push_notifier.py b/tests/server/tasks/test_inmemory_push_notifier.py deleted file mode 100644 index 9c43df5f8..000000000 --- a/tests/server/tasks/test_inmemory_push_notifier.py +++ /dev/null @@ -1,230 +0,0 @@ -import unittest - -from unittest.mock import AsyncMock, MagicMock, patch - -import httpx - -from a2a.server.tasks.inmemory_push_notifier import InMemoryPushNotifier -from a2a.types import PushNotificationConfig, Task, TaskState, TaskStatus - - -# Suppress logging for cleaner test output, can be enabled for debugging -# logging.disable(logging.CRITICAL) - - -def create_sample_task(task_id='task123', status_state=TaskState.completed): - return Task( - id=task_id, - contextId='ctx456', - status=TaskStatus(state=status_state), - ) - - -def create_sample_push_config( - url='http://example.com/callback', config_id='cfg1' -): - return PushNotificationConfig(id=config_id, url=url) - - -class TestInMemoryPushNotifier(unittest.IsolatedAsyncioTestCase): - def setUp(self): - self.mock_httpx_client = AsyncMock(spec=httpx.AsyncClient) - self.notifier = InMemoryPushNotifier( - httpx_client=self.mock_httpx_client - ) # Corrected argument name - - def test_constructor_stores_client(self): - self.assertEqual(self.notifier._client, self.mock_httpx_client) - - async def test_set_info_adds_new_config(self): - task_id = 'task_new' - config = create_sample_push_config(url='http://new.url/callback') - - await self.notifier.set_info(task_id, config) - - self.assertIn(task_id, self.notifier._push_notification_infos) - self.assertEqual( - self.notifier._push_notification_infos[task_id], config - ) - - async def test_set_info_updates_existing_config(self): - task_id = 'task_update' - initial_config = create_sample_push_config( - url='http://initial.url/callback', config_id='cfg_initial' - ) - await self.notifier.set_info(task_id, initial_config) - - updated_config = create_sample_push_config( - url='http://updated.url/callback', config_id='cfg_updated' - ) - await self.notifier.set_info(task_id, updated_config) - - self.assertIn(task_id, self.notifier._push_notification_infos) - self.assertEqual( - self.notifier._push_notification_infos[task_id], updated_config - ) - self.assertNotEqual( - self.notifier._push_notification_infos[task_id], initial_config - ) - - async def test_get_info_existing_config(self): - task_id = 'task_get_exist' - config = create_sample_push_config(url='http://get.this/callback') - await self.notifier.set_info(task_id, config) - - retrieved_config = await self.notifier.get_info(task_id) - self.assertEqual(retrieved_config, config) - - async def test_get_info_non_existent_config(self): - task_id = 'task_get_non_exist' - retrieved_config = await self.notifier.get_info(task_id) - self.assertIsNone(retrieved_config) - - async def test_delete_info_existing_config(self): - task_id = 'task_delete_exist' - config = create_sample_push_config(url='http://delete.this/callback') - await self.notifier.set_info(task_id, config) - - self.assertIn(task_id, self.notifier._push_notification_infos) - await self.notifier.delete_info(task_id) - self.assertNotIn(task_id, self.notifier._push_notification_infos) - - async def test_delete_info_non_existent_config(self): - task_id = 'task_delete_non_exist' - # Ensure it doesn't raise an error - try: - await self.notifier.delete_info(task_id) - except Exception as e: - self.fail( - f'delete_info raised {e} unexpectedly for nonexistent task_id' - ) - self.assertNotIn( - task_id, self.notifier._push_notification_infos - ) # Should still not be there - - async def test_send_notification_success(self): - task_id = 'task_send_success' - task_data = create_sample_task(task_id=task_id) - config = create_sample_push_config(url='http://notify.me/here') - await self.notifier.set_info(task_id, config) - - # Mock the post call to simulate success - mock_response = AsyncMock(spec=httpx.Response) - mock_response.status_code = 200 - self.mock_httpx_client.post.return_value = mock_response - - await self.notifier.send_notification(task_data) # Pass only task_data - - self.mock_httpx_client.post.assert_awaited_once() - called_args, called_kwargs = self.mock_httpx_client.post.call_args - self.assertEqual(called_args[0], config.url) - self.assertEqual( - called_kwargs['json'], - task_data.model_dump(mode='json', exclude_none=True), - ) - self.assertNotIn( - 'auth', called_kwargs - ) # auth is not passed by current implementation - mock_response.raise_for_status.assert_called_once() - - async def test_send_notification_no_config(self): - task_id = 'task_send_no_config' - task_data = create_sample_task(task_id=task_id) - - await self.notifier.send_notification(task_data) # Pass only task_data - - self.mock_httpx_client.post.assert_not_called() - - @patch('a2a.server.tasks.inmemory_push_notifier.logger') - async def test_send_notification_http_status_error( - self, mock_logger: MagicMock - ): - task_id = 'task_send_http_err' - task_data = create_sample_task(task_id=task_id) - config = create_sample_push_config(url='http://notify.me/http_error') - await self.notifier.set_info(task_id, config) - - mock_response = MagicMock( - spec=httpx.Response - ) # Use MagicMock for status_code attribute - mock_response.status_code = 404 - mock_response.text = 'Not Found' - http_error = httpx.HTTPStatusError( - 'Not Found', request=MagicMock(), response=mock_response - ) - self.mock_httpx_client.post.side_effect = http_error - - # The method should catch the error and log it, not re-raise - await self.notifier.send_notification(task_data) # Pass only task_data - - self.mock_httpx_client.post.assert_awaited_once() - mock_logger.error.assert_called_once() - # Check that the error message contains the generic part and the specific exception string - self.assertIn( - 'Error sending push-notification', mock_logger.error.call_args[0][0] - ) - self.assertIn(str(http_error), mock_logger.error.call_args[0][0]) - - @patch('a2a.server.tasks.inmemory_push_notifier.logger') - async def test_send_notification_request_error( - self, mock_logger: MagicMock - ): - task_id = 'task_send_req_err' - task_data = create_sample_task(task_id=task_id) - config = create_sample_push_config(url='http://notify.me/req_error') - await self.notifier.set_info(task_id, config) - - request_error = httpx.RequestError('Network issue', request=MagicMock()) - self.mock_httpx_client.post.side_effect = request_error - - await self.notifier.send_notification(task_data) # Pass only task_data - - self.mock_httpx_client.post.assert_awaited_once() - mock_logger.error.assert_called_once() - self.assertIn( - 'Error sending push-notification', mock_logger.error.call_args[0][0] - ) - self.assertIn(str(request_error), mock_logger.error.call_args[0][0]) - - @patch('a2a.server.tasks.inmemory_push_notifier.logger') - async def test_send_notification_with_auth(self, mock_logger: MagicMock): - task_id = 'task_send_auth' - task_data = create_sample_task(task_id=task_id) - auth_info = ('user', 'pass') - config = create_sample_push_config(url='http://notify.me/auth') - config.authentication = MagicMock() # Mocking the structure for auth - config.authentication.schemes = ['basic'] # Assume basic for simplicity - config.authentication.credentials = ( - auth_info # This might need to be a specific model - ) - # For now, let's assume it's a tuple for basic auth - # The actual PushNotificationAuthenticationInfo is more complex - # For this test, we'll simplify and assume InMemoryPushNotifier - # directly uses tuple for httpx's `auth` param if basic. - # A more accurate test would construct the real auth model. - # Given the current implementation of InMemoryPushNotifier, - # it only supports basic auth via tuple. - - await self.notifier.set_info(task_id, config) - - mock_response = AsyncMock(spec=httpx.Response) - mock_response.status_code = 200 - self.mock_httpx_client.post.return_value = mock_response - - await self.notifier.send_notification(task_data) # Pass only task_data - - self.mock_httpx_client.post.assert_awaited_once() - called_args, called_kwargs = self.mock_httpx_client.post.call_args - self.assertEqual(called_args[0], config.url) - self.assertEqual( - called_kwargs['json'], - task_data.model_dump(mode='json', exclude_none=True), - ) - self.assertNotIn( - 'auth', called_kwargs - ) # auth is not passed by current implementation - mock_response.raise_for_status.assert_called_once() - - -if __name__ == '__main__': - unittest.main()