Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,39 @@
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_serdes.py"
},
{
"name": "No Replay Execution",
"description": "Execution with simples steps and without replay",
"handler": "no_replay_execution.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/no_replay_execution/no_replay_execution.py"
},
{
"name": "Run In Child Context With Failing Step",
"description": "Demonstrates runInChildContext with a failing step followed by a successful wait",
"handler": "run_in_child_context_step_failure.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/run_in_child_context/run_in_child_context_step_failure.py"
},
{
"name": "Comprehensive Operations",
"description": "Complex multi-operation example demonstrating all major operations",
"handler": "comprehensive_operations.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/comprehensive_operations/comprehensive_operations.py"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Complex multi-operation example demonstrating all major operations."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.config import Duration


@durable_execution
def handler(event: dict[str, Any], context: DurableContext) -> dict[str, Any]:
"""Comprehensive example demonstrating all major durable operations."""
print(f"Starting comprehensive operations example with event: {event}")

# Step 1: ctx.step - Simple step that returns a result
step1_result: str = context.step(
lambda _: "Step 1 completed successfully",
name="step1",
)

# Step 2: ctx.wait - Wait for 1 second
context.wait(Duration.from_seconds(1))

# Step 3: ctx.map - Map with 5 iterations returning numbers 1 to 5
map_input = [1, 2, 3, 4, 5]

map_results = context.map(
inputs=map_input,
func=lambda ctx, item, index, _: ctx.step(
lambda _: item, name=f"map-step-{index}"
),
name="map-numbers",
).to_dict()

# Step 4: ctx.parallel - 3 branches, each returning a fruit name

parallel_results = context.parallel(
functions=[
lambda ctx: ctx.step(lambda _: "apple", name="fruit-step-1"),
lambda ctx: ctx.step(lambda _: "banana", name="fruit-step-2"),
lambda ctx: ctx.step(lambda _: "orange", name="fruit-step-3"),
]
).to_dict()

# Final result combining all operations
return {
"step1": step1_result,
"waitCompleted": True,
"mapResults": map_results,
"parallelResults": parallel_results,
}
15 changes: 15 additions & 0 deletions examples/src/no_replay_execution/no_replay_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Demonstrates step execution tracking when no replay occurs."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, bool]:
"""Handler demonstrating step execution without replay."""
context.step(lambda _: "user-1", name="fetch-user-1")
context.step(lambda _: "user-2", name="fetch-user-2")

return {"completed": True}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Demonstrates runInChildContext with a failing step followed by a successful wait."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.config import StepConfig, Duration
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
create_retry_strategy,
)


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, bool]:
"""Handler demonstrating runInChildContext with failing step."""

def child_with_failure(ctx: DurableContext) -> None:
"""Child context with a failing step."""

retry_config = RetryStrategyConfig(
max_attempts=3,
initial_delay=Duration.from_seconds(1),
max_delay=Duration.from_seconds(10),
backoff_rate=2.0,
)
step_config = StepConfig(retry_strategy=create_retry_strategy(retry_config))

def failing_step(_: DurableContext) -> None:
"""Step that always fails."""
raise Exception("Step failed in child context")

ctx.step(
failing_step,
name="failing-step",
config=step_config,
)

try:
context.run_in_child_context(
child_with_failure,
name="child-with-failure",
)
except Exception as error:
# Catch and ignore child context and step errors
result = {"success": True, "error": str(error)}

context.wait(Duration.from_seconds(1), name="wait-after-failure")

return result
40 changes: 40 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,43 @@ Resources:
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
NoReplayExecution:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: no_replay_execution.handler
Description: Execution with simples steps and without replay
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
RunInChildContextStepFailure:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: run_in_child_context_step_failure.handler
Description: Demonstrates runInChildContext with a failing step followed by
a successful wait
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
ComprehensiveOperations:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: comprehensive_operations.handler
Description: Complex multi-operation example demonstrating all major operations
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Tests for comprehensive_operations."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.comprehensive_operations import comprehensive_operations
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=comprehensive_operations.handler,
lambda_function_name="Comprehensive Operations",
)
def test_execute_all_operations_successfully(durable_runner):
"""Test that all operations execute successfully."""
with durable_runner:
result = durable_runner.run(input={"message": "test"}, timeout=30)

assert result.status is InvocationStatus.SUCCEEDED

result_data = deserialize_operation_payload(result.result)

assert result_data["step1"] == "Step 1 completed successfully"
assert result_data["waitCompleted"] is True

# verify map results
map_results = result_data["mapResults"]
assert len(map_results["all"]) == 5
assert [item["result"] for item in map_results["all"]] == [1, 2, 3, 4, 5]
assert map_results["completionReason"] == "ALL_COMPLETED"

# verify parallel results
parallel_results = result_data["parallelResults"]
assert len(parallel_results["all"]) == 3
assert [item["result"] for item in parallel_results["all"]] == [
"apple",
"banana",
"orange",
]
assert parallel_results["completionReason"] == "ALL_COMPLETED"

# Get all operations including nested ones
all_ops = result.get_all_operations()

# Verify step1 operation
step1_ops = [
op for op in all_ops if op.operation_type.value == "STEP" and op.name == "step1"
]
assert len(step1_ops) == 1
step1_op = step1_ops[0]
assert (
deserialize_operation_payload(step1_op.result)
== "Step 1 completed successfully"
)

# Verify wait operation (should be at index 1)
wait_op = result.operations[1]
assert wait_op.operation_type.value == "WAIT"

# Verify individual map step operations exist with correct names
for i in range(5):
map_step_ops = [
op
for op in all_ops
if op.operation_type.value == "STEP" and op.name == f"map-step-{i}"
]
assert len(map_step_ops) == 1
assert deserialize_operation_payload(map_step_ops[0].result) == i + 1

# Verify individual parallel step operations exist
fruit_step_1_ops = [
op
for op in all_ops
if op.operation_type.value == "STEP" and op.name == "fruit-step-1"
]
assert len(fruit_step_1_ops) == 1
assert deserialize_operation_payload(fruit_step_1_ops[0].result) == "apple"

fruit_step_2_ops = [
op
for op in all_ops
if op.operation_type.value == "STEP" and op.name == "fruit-step-2"
]
assert len(fruit_step_2_ops) == 1
assert deserialize_operation_payload(fruit_step_2_ops[0].result) == "banana"

fruit_step_3_ops = [
op
for op in all_ops
if op.operation_type.value == "STEP" and op.name == "fruit-step-3"
]
assert len(fruit_step_3_ops) == 1
assert deserialize_operation_payload(fruit_step_3_ops[0].result) == "orange"
52 changes: 52 additions & 0 deletions examples/test/no_replay_execution/test_no_replay_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Tests for no_replay_execution."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.no_replay_execution import no_replay_execution
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=no_replay_execution.handler,
lambda_function_name="No Replay Execution",
)
def test_handle_step_operations_when_no_replay_occurs(durable_runner):
"""Test step operations when no replay occurs."""
with durable_runner:
result = durable_runner.run(input=None, timeout=10)

assert result.status is InvocationStatus.SUCCEEDED

# Verify final result
assert deserialize_operation_payload(result.result) == {"completed": True}

# Get step operations
user1_step_ops = [
op
for op in result.operations
if op.operation_type.value == "STEP" and op.name == "fetch-user-1"
]
assert len(user1_step_ops) == 1
user1_step = user1_step_ops[0]

user2_step_ops = [
op
for op in result.operations
if op.operation_type.value == "STEP" and op.name == "fetch-user-2"
]
assert len(user2_step_ops) == 1
user2_step = user2_step_ops[0]

# Verify first-time execution tracking (no replay)
assert user1_step.operation_type.value == "STEP"
assert user1_step.status.value == "SUCCEEDED"
assert deserialize_operation_payload(user1_step.result) == "user-1"

assert user2_step.operation_type.value == "STEP"
assert user2_step.status.value == "SUCCEEDED"
assert deserialize_operation_payload(user2_step.result) == "user-2"

# Verify both operations tracked
assert len(result.operations) == 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Tests for run_in_child_context_failing_step."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.run_in_child_context import run_in_child_context_step_failure
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=run_in_child_context_step_failure.handler,
lambda_function_name="Run In Child Context With Failing Step",
)
def test_succeed_despite_failing_step_in_child_context(durable_runner):
"""Test that execution succeeds despite failing step in child context."""
with durable_runner:
result = durable_runner.run(input=None, timeout=30)

assert result.status is InvocationStatus.SUCCEEDED

result_data = deserialize_operation_payload(result.result)
assert result_data == {"success": True, "error": "Step failed in child context"}
Loading