Skip to content

Commit 2d3ed0c

Browse files
Alex Wangyaythomas
authored andcommitted
example: add callback examples
- add happy cases for create_callback examples
1 parent c6cf6c5 commit 2d3ed0c

12 files changed

+452
-67
lines changed

examples/examples-catalog.json

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,61 @@
444444
"ExecutionTimeout": 300
445445
},
446446
"path": "./src/none_results/none_results.py"
447+
},
448+
{
449+
"name": "Callback Success",
450+
"description": "Creating a callback ID for external systems to use",
451+
"handler": "callback_simple.handler",
452+
"integration": true,
453+
"durableConfig": {
454+
"RetentionPeriodInDays": 7,
455+
"ExecutionTimeout": 300
456+
},
457+
"path": "./src/callback/callback_simple.py"
458+
},
459+
{
460+
"name": "Callback Success None",
461+
"description": "Creating a callback ID for external systems to use",
462+
"handler": "callback_simple.handler",
463+
"integration": true,
464+
"durableConfig": {
465+
"RetentionPeriodInDays": 7,
466+
"ExecutionTimeout": 300
467+
},
468+
"path": "./src/callback/callback_simple.py"
469+
},
470+
{
471+
"name": "Create Callback Heartbeat",
472+
"description": "Demonstrates callback failure scenarios where the error propagates and is handled by framework",
473+
"handler": "callback_heartbeat.handler",
474+
"integration": true,
475+
"durableConfig": {
476+
"RetentionPeriodInDays": 7,
477+
"ExecutionTimeout": 300
478+
},
479+
"path": "./src/callback/callback_heartbeat.py"
480+
},
481+
{
482+
"name": "Create Callback Mixed Operations",
483+
"description": "Demonstrates createCallback mixed with steps, waits, and other operations",
484+
"handler": "callback_mixed_ops.handler",
485+
"integration": true,
486+
"durableConfig": {
487+
"RetentionPeriodInDays": 7,
488+
"ExecutionTimeout": 300
489+
},
490+
"path": "./src/callback/callback_mixed_ops.py"
491+
},
492+
{
493+
"name": "Create Callback Custom Serdes",
494+
"description": "Demonstrates createCallback with custom serialization/deserialization for Date objects",
495+
"handler": "callback_serdes.handler",
496+
"integration": true,
497+
"durableConfig": {
498+
"RetentionPeriodInDays": 7,
499+
"ExecutionTimeout": 300
500+
},
501+
"path": "./src/callback/callback_serdes.py"
447502
}
448503
]
449504
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from typing import TYPE_CHECKING, Any
2+
3+
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
4+
from aws_durable_execution_sdk_python.context import DurableContext
5+
from aws_durable_execution_sdk_python.execution import durable_execution
6+
7+
8+
if TYPE_CHECKING:
9+
from aws_durable_execution_sdk_python.types import Callback
10+
11+
12+
@durable_execution
13+
def handler(_event: Any, context: DurableContext) -> str:
14+
callback_config = CallbackConfig(
15+
timeout=Duration.from_seconds(60), heartbeat_timeout=Duration.from_seconds(10)
16+
)
17+
18+
callback: Callback[str] = context.create_callback(
19+
name="heartbeat_callback", config=callback_config
20+
)
21+
22+
return callback.result()
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""Demonstrates createCallback mixed with steps, waits, and other operations."""
2+
3+
import time
4+
from typing import Any
5+
6+
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
7+
from aws_durable_execution_sdk_python.context import DurableContext
8+
from aws_durable_execution_sdk_python.execution import durable_execution
9+
10+
11+
@durable_execution
12+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
13+
"""Handler demonstrating createCallback mixed with other operations."""
14+
15+
step_result: dict[str, Any] = context.step(
16+
lambda _: {"userId": 123, "name": "John Doe"},
17+
name="fetch-data",
18+
)
19+
20+
callback_config = CallbackConfig(timeout=Duration.from_minutes(1))
21+
callback = context.create_callback(
22+
name="process-user",
23+
config=callback_config,
24+
)
25+
26+
# Mix callback with step and wait operations
27+
context.wait(Duration.from_seconds(1), name="initial-wait")
28+
29+
callback_result = callback.result()
30+
31+
return {
32+
"stepResult": step_result,
33+
"callbackResult": callback_result,
34+
"completed": True,
35+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""Demonstrates createCallback with custom serialization/deserialization for Date objects."""
2+
3+
import json
4+
from datetime import datetime, timezone
5+
from typing import Any, Optional
6+
7+
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
8+
from aws_durable_execution_sdk_python.context import DurableContext
9+
from aws_durable_execution_sdk_python.execution import durable_execution
10+
from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext
11+
12+
13+
class CustomData:
14+
"""Data structure with datetime."""
15+
16+
def __init__(self, id: int, message: str, timestamp: datetime):
17+
self.id = id
18+
self.message = message
19+
self.timestamp = timestamp
20+
21+
def to_dict(self) -> dict[str, Any]:
22+
"""Convert to dictionary."""
23+
return {
24+
"id": self.id,
25+
"message": self.message,
26+
"timestamp": self.timestamp.isoformat(),
27+
}
28+
29+
@staticmethod
30+
def from_dict(data: dict[str, Any]) -> "CustomData":
31+
"""Create from dictionary."""
32+
return CustomData(
33+
id=data["id"],
34+
message=data["message"],
35+
timestamp=datetime.fromisoformat(data["timestamp"].replace("Z", "+00:00")),
36+
)
37+
38+
39+
class CustomDataSerDes(SerDes[CustomData]):
40+
"""Custom serializer for CustomData that handles datetime conversion."""
41+
42+
def serialize(self, value: Optional[CustomData], _: SerDesContext) -> Optional[str]:
43+
"""Serialize CustomData to JSON string."""
44+
if value is None:
45+
return None
46+
return json.dumps(value.to_dict())
47+
48+
def deserialize(
49+
self, payload: Optional[str], _: SerDesContext
50+
) -> Optional[CustomData]:
51+
"""Deserialize JSON string to CustomData."""
52+
if payload is None:
53+
return None
54+
data = json.loads(payload)
55+
return CustomData.from_dict(data)
56+
57+
58+
@durable_execution
59+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
60+
"""Handler demonstrating createCallback with custom serdes."""
61+
callback_config = CallbackConfig(
62+
timeout=Duration.from_seconds(30),
63+
serdes=CustomDataSerDes(),
64+
)
65+
66+
callback = context.create_callback(
67+
name="custom-serdes-callback",
68+
config=callback_config,
69+
)
70+
71+
result: CustomData = callback.result()
72+
73+
return {
74+
"receivedData": result.to_dict(),
75+
"isDateObject": isinstance(result.timestamp, datetime),
76+
}

examples/src/callback/callback.py renamed to examples/src/callback/callback_simple.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
from typing import TYPE_CHECKING, Any
22

3-
from aws_durable_execution_sdk_python.config import CallbackConfig
3+
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
44
from aws_durable_execution_sdk_python.context import DurableContext
55
from aws_durable_execution_sdk_python.execution import durable_execution
6-
from aws_durable_execution_sdk_python.config import Duration
76

87

98
if TYPE_CHECKING:
@@ -20,6 +19,4 @@ def handler(_event: Any, context: DurableContext) -> str:
2019
name="example_callback", config=callback_config
2120
)
2221

23-
# In a real scenario, you would pass callback.callback_id to an external system
24-
# For this example, we'll just return the callback_id to show it was created
25-
return f"Callback created with ID: {callback.callback_id}"
22+
return callback.result()

examples/template.yaml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,3 +557,58 @@ Resources:
557557
DurableConfig:
558558
RetentionPeriodInDays: 7
559559
ExecutionTimeout: 300
560+
CallbackSimple:
561+
Type: AWS::Serverless::Function
562+
Properties:
563+
CodeUri: build/
564+
Handler: callback_simple.handler
565+
Description: Creating a callback ID for external systems to use
566+
Role:
567+
Fn::GetAtt:
568+
- DurableFunctionRole
569+
- Arn
570+
DurableConfig:
571+
RetentionPeriodInDays: 7
572+
ExecutionTimeout: 300
573+
CallbackHeartbeat:
574+
Type: AWS::Serverless::Function
575+
Properties:
576+
CodeUri: build/
577+
Handler: callback_heartbeat.handler
578+
Description: Demonstrates callback failure scenarios where the error propagates
579+
and is handled by framework
580+
Role:
581+
Fn::GetAtt:
582+
- DurableFunctionRole
583+
- Arn
584+
DurableConfig:
585+
RetentionPeriodInDays: 7
586+
ExecutionTimeout: 300
587+
CallbackMixedOps:
588+
Type: AWS::Serverless::Function
589+
Properties:
590+
CodeUri: build/
591+
Handler: callback_mixed_ops.handler
592+
Description: Demonstrates createCallback mixed with steps, waits, and other
593+
operations
594+
Role:
595+
Fn::GetAtt:
596+
- DurableFunctionRole
597+
- Arn
598+
DurableConfig:
599+
RetentionPeriodInDays: 7
600+
ExecutionTimeout: 300
601+
CallbackSerdes:
602+
Type: AWS::Serverless::Function
603+
Properties:
604+
CodeUri: build/
605+
Handler: callback_serdes.handler
606+
Description: Demonstrates createCallback with custom serialization/deserialization
607+
for Date objects
608+
Role:
609+
Fn::GetAtt:
610+
- DurableFunctionRole
611+
- Arn
612+
DurableConfig:
613+
RetentionPeriodInDays: 7
614+
ExecutionTimeout: 300

examples/test/callback/test_callback.py

Lines changed: 0 additions & 32 deletions
This file was deleted.
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Tests for create_callback_heartbeat."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
import time
6+
import json
7+
from src.callback import callback_heartbeat
8+
from test.conftest import deserialize_operation_payload
9+
10+
11+
@pytest.mark.example
12+
@pytest.mark.durable_execution(
13+
handler=callback_heartbeat.handler,
14+
lambda_function_name="Create Callback Heartbeat",
15+
)
16+
def test_handle_callback_operations_with_failure_uncaught(durable_runner):
17+
"""Test handling callback operations with failure."""
18+
test_payload = {"shouldCatchError": False}
19+
20+
heartbeat_interval = 5
21+
total_duration = 20
22+
num_heartbeats = total_duration // heartbeat_interval
23+
24+
with durable_runner:
25+
execution_arn = durable_runner.run_async(input=test_payload, timeout=30)
26+
27+
callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn)
28+
29+
for i in range(num_heartbeats):
30+
print(
31+
f"Sending heartbeat {i + 1}/{num_heartbeats} at {(i + 1) * heartbeat_interval}s"
32+
)
33+
durable_runner.send_callback_heartbeat(callback_id=callback_id)
34+
time.sleep(heartbeat_interval)
35+
36+
callback_result = json.dumps(
37+
{
38+
"status": "completed",
39+
"data": "success after heartbeats",
40+
}
41+
)
42+
durable_runner.send_callback_success(
43+
callback_id=callback_id, result=callback_result.encode()
44+
)
45+
46+
result = durable_runner.wait_for_result(execution_arn=execution_arn)
47+
assert result.status is InvocationStatus.SUCCEEDED
48+
49+
# Assert the callback result is returned
50+
result_data = deserialize_operation_payload(result.result)
51+
assert result_data == callback_result

0 commit comments

Comments
 (0)