Skip to content

Commit ead7656

Browse files
author
Alex Wang
committed
examples: add concurrency callback example
1 parent 2cda53b commit ead7656

File tree

4 files changed

+165
-1
lines changed

4 files changed

+165
-1
lines changed

examples/examples-catalog.json

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,21 @@
444444
"ExecutionTimeout": 300
445445
},
446446
"path": "./src/none_results/none_results.py"
447-
}
447+
},
448+
{
449+
"name": "Create Callback Concurrency",
450+
"description": "Demonstrates multiple concurrent createCallback operations using context.parallel",
451+
"handler": "callback_concurrency.handler",
452+
"integration": true,
453+
"durableConfig": {
454+
"RetentionPeriodInDays": 7,
455+
"ExecutionTimeout": 300
456+
},
457+
"path": "./src/callback/callback_concurrency.py",
458+
"loggingConfig": {
459+
"ApplicationLogLevel": "DEBUG",
460+
"LogFormat": "JSON"
461+
}
462+
}
448463
]
449464
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Demonstrates multiple concurrent createCallback operations using context.parallel."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
6+
from aws_durable_execution_sdk_python.context import DurableContext
7+
from aws_durable_execution_sdk_python.execution import durable_execution
8+
9+
10+
@durable_execution
11+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
12+
"""Handler demonstrating multiple concurrent callback operations."""
13+
14+
callback_config = CallbackConfig(timeout=Duration.from_seconds(30))
15+
16+
def callback_branch_1(ctx: DurableContext) -> str:
17+
"""First callback branch."""
18+
callback = ctx.create_callback(
19+
name="api-call-1",
20+
config=callback_config,
21+
)
22+
return callback.result()
23+
24+
def callback_branch_2(ctx: DurableContext) -> str:
25+
"""Second callback branch."""
26+
callback = ctx.create_callback(
27+
name="api-call-2",
28+
config=callback_config,
29+
)
30+
return callback.result()
31+
32+
def callback_branch_3(ctx: DurableContext) -> str:
33+
"""Third callback branch."""
34+
callback = ctx.create_callback(
35+
name="api-call-3",
36+
config=callback_config,
37+
)
38+
return callback.result()
39+
40+
parallel_results = context.parallel(
41+
functions=[callback_branch_1, callback_branch_2, callback_branch_3],
42+
name="parallel_callbacks",
43+
)
44+
45+
# Extract results from parallel execution
46+
results = parallel_results.get_results()
47+
48+
return {
49+
"results": results,
50+
"allCompleted": True,
51+
}

examples/template.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,3 +557,17 @@ Resources:
557557
DurableConfig:
558558
RetentionPeriodInDays: 7
559559
ExecutionTimeout: 300
560+
CallbackConcurrency:
561+
Type: AWS::Serverless::Function
562+
Properties:
563+
CodeUri: build/
564+
Handler: callback_concurrency.handler
565+
Description: Demonstrates multiple concurrent createCallback operations using
566+
context.parallel
567+
Role:
568+
Fn::GetAtt:
569+
- DurableFunctionRole
570+
- Arn
571+
DurableConfig:
572+
RetentionPeriodInDays: 7
573+
ExecutionTimeout: 300
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Tests for create_callback_concurrent."""
2+
3+
import json
4+
5+
import pytest
6+
from aws_durable_execution_sdk_python.execution import InvocationStatus
7+
8+
from src.callback import callback_concurrency
9+
from test.conftest import deserialize_operation_payload
10+
11+
12+
@pytest.mark.example
13+
@pytest.mark.durable_execution(
14+
handler=callback_concurrency.handler,
15+
lambda_function_name="Create Callback Concurrency",
16+
)
17+
def test_handle_multiple_concurrent_callback_operations(durable_runner):
18+
"""Test handling multiple concurrent callback operations."""
19+
with durable_runner:
20+
# Start the execution (this will pause at the callbacks)
21+
execution_arn = durable_runner.run_async(input=None, timeout=60)
22+
23+
callback_id_1 = durable_runner.wait_for_callback(
24+
execution_arn=execution_arn, name="api-call-1"
25+
)
26+
callback_id_2 = durable_runner.wait_for_callback(
27+
execution_arn=execution_arn, name="api-call-2"
28+
)
29+
callback_id_3 = durable_runner.wait_for_callback(
30+
execution_arn=execution_arn, name="api-call-3"
31+
)
32+
33+
callback_result_2 = json.dumps(
34+
{
35+
"id": 2,
36+
"data": "second",
37+
}
38+
)
39+
durable_runner.send_callback_success(
40+
callback_id=callback_id_2, result=callback_result_2.encode()
41+
)
42+
43+
callback_result_1 = json.dumps(
44+
{
45+
"id": 1,
46+
"data": "first",
47+
}
48+
)
49+
durable_runner.send_callback_success(
50+
callback_id=callback_id_1, result=callback_result_1.encode()
51+
)
52+
53+
callback_result_3 = json.dumps(
54+
{
55+
"id": 3,
56+
"data": "third",
57+
}
58+
)
59+
durable_runner.send_callback_success(
60+
callback_id=callback_id_3, result=callback_result_3.encode()
61+
)
62+
63+
result = durable_runner.wait_for_result(execution_arn=execution_arn)
64+
65+
assert result.status is InvocationStatus.SUCCEEDED
66+
67+
result_data = deserialize_operation_payload(result.result)
68+
69+
assert result_data == {
70+
"results": [callback_result_1, callback_result_2, callback_result_3],
71+
"allCompleted": True,
72+
}
73+
74+
# Verify all callback operations were tracked
75+
operations = result.get_context("parallel_callbacks")
76+
77+
assert len(operations.child_operations) == 3
78+
79+
# Verify all operations are CALLBACK type
80+
for op in operations.child_operations:
81+
assert op.operation_type.value == "CONTEXT"
82+
assert len(op.child_operations) == 1
83+
assert op.child_operations[0].operation_type.value == "CALLBACK"
84+

0 commit comments

Comments
 (0)