Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions python/packages/hosting-invocations/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) Microsoft Corporation.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
30 changes: 30 additions & 0 deletions python/packages/hosting-invocations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# agent-framework-hosting-invocations

Minimal `POST /invoke` channel for [agent-framework-hosting](../hosting). Useful
for smoke-testing, durable-task drivers, and bespoke clients that don't speak
the OpenAI Responses protocol.

## Wire shape

```
POST /invocations/invoke
{
"message": "hello",
"session_id": "user-42",
"stream": false
}
```

Non-streaming response: `{"response": "...", "session_id": "..."}`.
Streaming response: `text/event-stream` of `data:` lines, terminated by
`data: [DONE]`.

## Usage

```python
from agent_framework_hosting import AgentFrameworkHost
from agent_framework_hosting_invocations import InvocationsChannel

host = AgentFrameworkHost(target=my_agent, channels=[InvocationsChannel()])
host.serve()
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.

"""Minimal ``POST /invoke`` channel for :mod:`agent_framework_hosting`."""

from ._channel import InvocationsChannel

__all__ = ["InvocationsChannel"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# Copyright (c) Microsoft. All rights reserved.

"""Minimal ``POST /invoke`` channel.

Inspired by ``agent-framework-foundry-hosting``'s ``InvocationsHostServer``.
A framework-agnostic surface for callers that just want to send a message and
get an answer back — no OpenAI-style envelope, no Responses item lattice.
"""

from __future__ import annotations

from collections.abc import AsyncIterator, Awaitable
from typing import Any, cast

from agent_framework_hosting import (
ChannelContext,
ChannelContribution,
ChannelRequest,
ChannelRunHook,
ChannelSession,
ChannelStreamTransformHook,
apply_run_hook,
logger,
)
from starlette.requests import Request
from starlette.responses import JSONResponse, Response, StreamingResponse
from starlette.routing import Route


class InvocationsChannel:
"""Minimal ``POST /invoke`` surface.

A run hook can rewrite the channel request (e.g. inject a session, add
options) before the host invokes the agent. A stream-transform hook can
rewrite or drop ``AgentResponseUpdate`` chunks before they hit the wire.
"""

name = "invocations"

def __init__(
self,
*,
path: str = "/invocations",
run_hook: ChannelRunHook | None = None,
stream_transform_hook: ChannelStreamTransformHook | None = None,
) -> None:
"""Configure the invocations endpoint.

``path`` is the mount root the host prefixes when registering this
channel's routes (the actual handler is ``POST {path}/invoke``).
``run_hook`` may rewrite the :class:`ChannelRequest` before the host
invokes the target — typically to attach session metadata or
translate the wire payload into ``ChatMessage`` instances.
``stream_transform_hook`` lets callers map or drop individual
``AgentResponseUpdate`` chunks while streaming.
"""
self.path = path
self._hook = run_hook
self._stream_transform_hook = stream_transform_hook
self._ctx: ChannelContext | None = None

def contribute(self, context: ChannelContext) -> ChannelContribution:
"""Capture the host-supplied context and register ``POST /invoke``."""
self._ctx = context
return ChannelContribution(routes=[Route("/invoke", self._handle, methods=["POST"])])

async def _handle(self, request: Request) -> Response:
"""Handle a single ``POST /invoke`` call.

Validates the JSON body shape, builds a :class:`ChannelRequest`
(optionally with a ``ChannelSession`` keyed by ``session_id``),
runs the configured ``run_hook``, and either streams SSE chunks
when ``stream`` is true or returns a single JSON ``{response,
session_id}`` envelope.
"""
if self._ctx is None: # pragma: no cover - guarded by Channel lifecycle
return JSONResponse({"error": "channel not initialized"}, status_code=500)
try:
body: Any = await request.json()
except Exception:
return JSONResponse({"error": "invalid json"}, status_code=400)

if not isinstance(body, dict):
return JSONResponse({"error": "request body must be an object"}, status_code=422)
body_map: dict[str, Any] = cast("dict[str, Any]", body)

message = body_map.get("message")
if not isinstance(message, str) or not message:
return JSONResponse({"error": "missing or empty 'message'"}, status_code=422)

session_id = body_map.get("session_id")
if session_id is not None and not isinstance(session_id, str):
return JSONResponse({"error": "'session_id' must be a string"}, status_code=422)

session = ChannelSession(isolation_key=f"invocations:{session_id}") if session_id else None

attributes: dict[str, Any] = {}
if session_id:
attributes["session_id"] = session_id

channel_request = ChannelRequest(
channel=self.name,
operation="invoke",
input=message,
session=session,
stream=bool(body_map.get("stream")),
attributes=attributes,
)

if self._hook is not None:
channel_request = await apply_run_hook(
self._hook,
channel_request,
target=self._ctx.target,
protocol_request=body_map,
)

if channel_request.stream:
Comment thread
eavanvalkenburg marked this conversation as resolved.
return StreamingResponse(
self._stream(channel_request),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)

result = await self._ctx.run(channel_request)
return JSONResponse({"response": result.text, "session_id": session_id})

async def _stream(self, request: ChannelRequest) -> AsyncIterator[str]:
r"""Yield bare ``data:`` SSE lines for each text chunk + a final ``[DONE]``.

SSE protocol notes:

* The HTTP status is committed when ASGI sends headers, before the
generator runs. Emitting a stream-opening 200 + ``text/event-stream``
and signalling errors via ``event: error`` SSE frames is the
conventional contract — ``EventSource`` and OpenAI-style SSE
consumers treat ``event: error`` as a terminal error condition.
Hard run-acquisition failures (e.g. target rejected) therefore
surface as the first frame, not as an HTTP error code.
* The SSE spec treats ``\r``, ``\n``, and ``\r\n`` as line
terminators. Per-chunk text is split on all three so embedded
carriage returns don't corrupt ``data:`` framing on the wire.
"""
if self._ctx is None: # pragma: no cover - guarded by Channel lifecycle
yield "event: error\ndata: channel not initialized\n\n"
return
try:
stream = self._ctx.run_stream(request)
async for update in stream:
if self._stream_transform_hook is not None:
Comment thread
eavanvalkenburg marked this conversation as resolved.
transformed = self._stream_transform_hook(update)
if isinstance(transformed, Awaitable):
transformed = await transformed
if transformed is None:
continue
update = transformed
chunk = getattr(update, "text", None)
if chunk:
# Each text chunk is its own SSE event so curl-friendly
# consumers can read it directly. Newlines inside the
# chunk are escaped per SSE spec by emitting one
# ``data:`` line per source line. ``splitlines()`` is
# used over ``split('\n')`` so embedded ``\r`` /
# ``\r\n`` don't bleed into the framing.
for line in str(chunk).splitlines() or [""]:
yield f"data: {line}\n"
yield "\n"
try:
# Finalize so context-provider / history hooks on the agent
# still run even though we are emitting our own SSE.
# If finalization fails, the agent's persistence side
# effects (history-provider write, context-provider hooks)
# are unreliable — surface that to the client as an
# ``event: error`` frame so it isn't a silent drop.
await stream.get_final_response()
Comment thread
eavanvalkenburg marked this conversation as resolved.
except Exception as finalize_exc:
logger.exception("Invocations stream finalize failed")
yield "event: error\n"
for line in f"finalize failed: {finalize_exc!s}".splitlines() or [""]:
yield f"data: {line}\n"
yield "\n"
return
except Exception as exc:
logger.exception("Invocations stream consumption failed")
yield "event: error\n"
for line in str(exc).splitlines() or [""]:
yield f"data: {line}\n"
yield "\n"
return
yield "data: [DONE]\n\n"


__all__ = ["InvocationsChannel"]
97 changes: 97 additions & 0 deletions python/packages/hosting-invocations/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
[project]
name = "agent-framework-hosting-invocations"
description = "Minimal POST /invoke channel for agent-framework-hosting."
authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}]
readme = "README.md"
requires-python = ">=3.10"
version = "1.0.0a260424"
license-files = ["LICENSE"]
urls.homepage = "https://aka.ms/agent-framework"
urls.source = "https://github.com/microsoft/agent-framework/tree/main/python"
urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true"
urls.issues = "https://github.com/microsoft/agent-framework/issues"
classifiers = [
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Typing :: Typed",
]
dependencies = [
"agent-framework-core>=1.2.0,<2",
"agent-framework-hosting==1.0.0a260424",
]

[tool.uv]
prerelease = "if-necessary-or-explicit"
environments = [
"sys_platform == 'darwin'",
"sys_platform == 'linux'",
"sys_platform == 'win32'"
]

[tool.uv-dynamic-versioning]
fallback-version = "0.0.0"

[tool.pytest.ini_options]
testpaths = 'tests'
addopts = "-ra -q -r fEX"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = []
timeout = 120
markers = [
"integration: marks tests as integration tests that require external services",
]

[tool.ruff]
extend = "../../pyproject.toml"

[tool.coverage.run]
omit = [
"**/__init__.py"
]

[tool.pyright]
extends = "../../pyproject.toml"
include = ["agent_framework_hosting_invocations"]
exclude = ['tests']

[tool.mypy]
plugins = ['pydantic.mypy']
strict = true
python_version = "3.10"
ignore_missing_imports = true
disallow_untyped_defs = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true

[tool.bandit]
targets = ["agent_framework_hosting_invocations"]
exclude_dirs = ["tests"]

[tool.poe]
executor.type = "uv"
include = "../../shared_tasks.toml"

[tool.poe.tasks.mypy]
help = "Run MyPy for this package."
cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hosting_invocations"

[tool.poe.tasks.test]
help = "Run the default unit test suite for this package."
cmd = 'pytest -m "not integration" --cov=agent_framework_hosting_invocations --cov-report=term-missing:skip-covered tests'

[build-system]
requires = ["flit-core >= 3.11,<4.0"]
build-backend = "flit_core.buildapi"
Empty file.
Loading