Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/src/chained_invoke/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Chained invoke examples
28 changes: 28 additions & 0 deletions examples/src/chained_invoke/invoke_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Example demonstrating basic chained invoke."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict:
"""Parent function that invokes a child function."""
result = context.invoke(
function_name="calculator",
payload={"a": 10, "b": 5},
name="invoke_calculator",
)
return {"calculation_result": result}


def calculator_handler(event: dict, context: Any) -> dict:
"""Child handler that performs calculation."""
a = event.get("a", 0)
b = event.get("b", 0)
return {
"sum": a + b,
"product": a * b,
"difference": a - b,
}
30 changes: 30 additions & 0 deletions examples/src/chained_invoke/map_with_invoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Example demonstrating map operations that invoke child functions."""

from typing import Any

from aws_durable_execution_sdk_python.config import MapConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[int]:
"""Process items using map where each item invokes a child function."""
items = [1, 2, 3, 4, 5]

return context.map(
inputs=items,
func=lambda ctx, item, index, _: ctx.invoke(
function_name="doubler",
payload={"value": item},
name=f"invoke_item_{index}",
),
name="map_with_invoke",
config=MapConfig(max_concurrency=2),
).get_results()


def doubler_handler(event: dict, context: Any) -> dict:
"""Child handler that doubles the input value."""
value = event.get("value", 0)
return {"result": value * 2}
52 changes: 52 additions & 0 deletions examples/src/chained_invoke/nested_invoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Example demonstrating nested chained invokes (invoke calling invoke)."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict:
"""Parent function that invokes a child which invokes another child."""
result = context.invoke(
function_name="orchestrator",
payload={"value": 5},
name="invoke_orchestrator",
)
return {"final_result": result}


@durable_execution
def orchestrator_handler(event: dict, context: DurableContext) -> dict:
"""Middle function that invokes the worker."""
value = event.get("value", 0)

# First invoke: add 10
added = context.invoke(
function_name="adder",
payload={"value": value, "add": 10},
name="invoke_adder",
)

# Second invoke: multiply by 2
multiplied = context.invoke(
function_name="multiplier",
payload={"value": added["result"]},
name="invoke_multiplier",
)

return {"result": multiplied["result"], "steps": ["add_10", "multiply_2"]}


def adder_handler(event: dict, context: Any) -> dict:
"""Leaf handler that adds values."""
value = event.get("value", 0)
add = event.get("add", 0)
return {"result": value + add}


def multiplier_handler(event: dict, context: Any) -> dict:
"""Leaf handler that multiplies by 2."""
value = event.get("value", 0)
return {"result": value * 2}
39 changes: 39 additions & 0 deletions examples/src/chained_invoke/parallel_with_invoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Example demonstrating parallel operations that invoke child functions."""

from typing import Any

from aws_durable_execution_sdk_python.config import ParallelConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[str]:
"""Execute parallel branches where each invokes a different child function."""
return context.parallel(
functions=[
lambda ctx: ctx.invoke(
function_name="greeter",
payload={"name": "Alice"},
name="greet_alice",
),
lambda ctx: ctx.invoke(
function_name="greeter",
payload={"name": "Bob"},
name="greet_bob",
),
lambda ctx: ctx.invoke(
function_name="greeter",
payload={"name": "Charlie"},
name="greet_charlie",
),
],
name="parallel_with_invoke",
config=ParallelConfig(max_concurrency=3),
).get_results()


def greeter_handler(event: dict, context: Any) -> dict:
"""Child handler that creates a greeting."""
name = event.get("name", "World")
return {"greeting": f"Hello, {name}!"}
1 change: 1 addition & 0 deletions examples/test/chained_invoke/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Chained invoke tests
27 changes: 27 additions & 0 deletions examples/test/chained_invoke/test_invoke_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Tests for basic chained invoke example."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus

from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
from src.chained_invoke import invoke_basic
from test.conftest import deserialize_operation_payload


def test_invoke_basic():
"""Test basic chained invoke example."""
with DurableFunctionTestRunner(handler=invoke_basic.handler) as runner:
runner.register_handler("calculator", invoke_basic.calculator_handler)
result = runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED
parsed = deserialize_operation_payload(result.result)
assert parsed["calculation_result"]["sum"] == 15
assert parsed["calculation_result"]["product"] == 50
assert parsed["calculation_result"]["difference"] == 5

# Verify the invoke operation
invoke_op = result.get_invoke("invoke_calculator")
assert invoke_op is not None
assert invoke_op.status is OperationStatus.SUCCEEDED
28 changes: 28 additions & 0 deletions examples/test/chained_invoke/test_map_with_invoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Tests for map with chained invoke example."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus

from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
from src.chained_invoke import map_with_invoke
from test.conftest import deserialize_operation_payload


def test_map_with_invoke():
"""Test map operation where each item invokes a child function."""
with DurableFunctionTestRunner(handler=map_with_invoke.handler) as runner:
runner.register_handler("doubler", map_with_invoke.doubler_handler)
result = runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED

# Each item [1,2,3,4,5] is doubled, returning {"result": value*2}
parsed = deserialize_operation_payload(result.result)
expected = [{"result": 2}, {"result": 4}, {"result": 6}, {"result": 8}, {"result": 10}]
assert parsed == expected

# Verify the map operation
map_op = result.get_context("map_with_invoke")
assert map_op is not None
assert map_op.status is OperationStatus.SUCCEEDED
37 changes: 37 additions & 0 deletions examples/test/chained_invoke/test_nested_invoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Tests for nested chained invoke example."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus

from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
from src.chained_invoke import nested_invoke
from test.conftest import deserialize_operation_payload


def test_nested_invoke():
"""Test nested chained invokes (invoke calling invoke).

Flow: handler -> orchestrator -> adder -> multiplier
Value: 5 -> add 10 = 15 -> multiply 2 = 30
"""
with DurableFunctionTestRunner(handler=nested_invoke.handler) as runner:
# Register the orchestrator (which is also a durable function)
runner.register_handler("orchestrator", nested_invoke.orchestrator_handler)
# Register the leaf handlers
runner.register_handler("adder", nested_invoke.adder_handler)
runner.register_handler("multiplier", nested_invoke.multiplier_handler)

result = runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED

parsed = deserialize_operation_payload(result.result)
# 5 + 10 = 15, 15 * 2 = 30
assert parsed["final_result"]["result"] == 30
assert parsed["final_result"]["steps"] == ["add_10", "multiply_2"]

# Verify the top-level invoke operation
invoke_op = result.get_invoke("invoke_orchestrator")
assert invoke_op is not None
assert invoke_op.status is OperationStatus.SUCCEEDED
32 changes: 32 additions & 0 deletions examples/test/chained_invoke/test_parallel_with_invoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Tests for parallel with chained invoke example."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus

from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
from src.chained_invoke import parallel_with_invoke
from test.conftest import deserialize_operation_payload


def test_parallel_with_invoke():
"""Test parallel operation where each branch invokes a child function."""
with DurableFunctionTestRunner(handler=parallel_with_invoke.handler) as runner:
runner.register_handler("greeter", parallel_with_invoke.greeter_handler)
result = runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED

parsed = deserialize_operation_payload(result.result)
expected = [
{"greeting": "Hello, Alice!"},
{"greeting": "Hello, Bob!"},
{"greeting": "Hello, Charlie!"},
]
assert parsed == expected

# Verify the parallel operation
parallel_op = result.get_context("parallel_with_invoke")
assert parallel_op is not None
assert parallel_op.status is OperationStatus.SUCCEEDED
assert len(parallel_op.child_operations) == 3
Loading
Loading