Skip to content

Commit 5959ea4

Browse files
author
Alex Wang
committed
examples: map partially completion
1 parent 3850837 commit 5959ea4

File tree

4 files changed

+183
-0
lines changed

4 files changed

+183
-0
lines changed

examples/examples-catalog.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,17 @@
346346
},
347347
"path": "./src/map/map_with_failure_tolerance.py"
348348
},
349+
{
350+
"name": "Map Completion Config",
351+
"description": "Reproduces issue where map with minSuccessful loses failure count",
352+
"handler": "map_completion.handler",
353+
"integration": true,
354+
"durableConfig": {
355+
"RetentionPeriodInDays": 7,
356+
"ExecutionTimeout": 300
357+
},
358+
"path": "./src/map/map_completion.py"
359+
},
349360
{
350361
"name": "Parallel with Max Concurrency",
351362
"description": "Parallel operation with maxConcurrency limit",

examples/src/map/map_completion.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
"""Reproduces issue where map with minSuccessful loses failure count."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import (
6+
CompletionConfig,
7+
MapConfig,
8+
StepConfig,
9+
Duration,
10+
)
11+
from aws_durable_execution_sdk_python.context import DurableContext
12+
from aws_durable_execution_sdk_python.execution import durable_execution
13+
from aws_durable_execution_sdk_python.retries import (
14+
RetryStrategyConfig,
15+
create_retry_strategy,
16+
)
17+
18+
19+
@durable_execution
20+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
21+
"""Handler demonstrating map with completion config issue."""
22+
# Test data: Items 2 and 4 will fail (40% failure rate)
23+
items = [
24+
{"id": 1, "shouldFail": False},
25+
{"id": 2, "shouldFail": True}, # Will fail
26+
{"id": 3, "shouldFail": False},
27+
{"id": 4, "shouldFail": True}, # Will fail
28+
{"id": 5, "shouldFail": False},
29+
]
30+
31+
# Fixed completion config that causes the issue
32+
completion_config = CompletionConfig(
33+
min_successful=2,
34+
tolerated_failure_percentage=50,
35+
)
36+
37+
context.logger.info(
38+
f"Starting map with config: min_successful=2, tolerated_failure_percentage=50"
39+
)
40+
context.logger.info(
41+
f"Items pattern: {', '.join(['FAIL' if i['shouldFail'] else 'SUCCESS' for i in items])}"
42+
)
43+
44+
def process_item(
45+
ctx: DurableContext, item: dict[str, Any], index: int, _
46+
) -> dict[str, Any]:
47+
"""Process each item in the map."""
48+
context.logger.info(
49+
f"Processing item {item['id']} (index {index}), shouldFail: {item['shouldFail']}"
50+
)
51+
52+
retry_config = RetryStrategyConfig(
53+
max_attempts=2,
54+
initial_delay=Duration.from_seconds(1),
55+
max_delay=Duration.from_seconds(1),
56+
)
57+
step_config = StepConfig(retry_strategy=create_retry_strategy(retry_config))
58+
59+
def step_function(_: DurableContext) -> dict[str, Any]:
60+
"""Step that processes or fails based on item."""
61+
if item["shouldFail"]:
62+
raise Exception(f"Processing failed for item {item['id']}")
63+
return {
64+
"itemId": item["id"],
65+
"processed": True,
66+
"result": f"Item {item['id']} processed successfully",
67+
}
68+
69+
return ctx.step(
70+
step_function,
71+
name=f"process-item-{index}",
72+
config=step_config,
73+
)
74+
75+
config = MapConfig(
76+
max_concurrency=3,
77+
completion_config=completion_config,
78+
)
79+
80+
results = context.map(
81+
inputs=items,
82+
func=process_item,
83+
name="completion-config-items",
84+
config=config,
85+
)
86+
87+
context.logger.info("Map completed with results:")
88+
context.logger.info(f"Total items processed: {results.total_count}")
89+
context.logger.info(f"Successful items: {results.success_count}")
90+
context.logger.info(f"Failed items: {results.failure_count}")
91+
context.logger.info(f"Has failures: {results.has_failure}")
92+
context.logger.info(f"Batch status: {results.status}")
93+
context.logger.info(f"Completion reason: {results.completion_reason}")
94+
95+
return {
96+
"totalItems": results.total_count,
97+
"successfulCount": results.success_count,
98+
"failedCount": results.failure_count,
99+
"hasFailures": results.has_failure,
100+
"batchStatus": str(results.status),
101+
"completionReason": str(results.completion_reason),
102+
"successfulItems": [
103+
{
104+
"index": item.index,
105+
"itemId": items[item.index]["id"],
106+
}
107+
for item in results.succeeded()
108+
],
109+
"failedItems": [
110+
{
111+
"index": item.index,
112+
"itemId": items[item.index]["id"],
113+
"error": str(item.error),
114+
}
115+
for item in results.failed()
116+
],
117+
}

examples/template.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,19 @@ Resources:
440440
DurableConfig:
441441
RetentionPeriodInDays: 7
442442
ExecutionTimeout: 300
443+
MapCompletion:
444+
Type: AWS::Serverless::Function
445+
Properties:
446+
CodeUri: build/
447+
Handler: map_completion.handler
448+
Description: Reproduces issue where map with minSuccessful loses failure count
449+
Role:
450+
Fn::GetAtt:
451+
- DurableFunctionRole
452+
- Arn
453+
DurableConfig:
454+
RetentionPeriodInDays: 7
455+
ExecutionTimeout: 300
443456
ParallelWithMaxConcurrency:
444457
Type: AWS::Serverless::Function
445458
Properties:
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""Tests for map_completion."""
2+
3+
import json
4+
5+
import pytest
6+
7+
from src.map import map_completion
8+
from test.conftest import deserialize_operation_payload
9+
from aws_durable_execution_sdk_python.execution import InvocationStatus
10+
11+
12+
@pytest.mark.example
13+
@pytest.mark.durable_execution(
14+
handler=map_completion.handler,
15+
lambda_function_name="Map Completion Config",
16+
)
17+
def test_reproduce_completion_config_behavior_with_detailed_logging(durable_runner):
18+
"""Demonstrates map behavior with minSuccessful and concurrent execution."""
19+
with durable_runner:
20+
result = durable_runner.run(input=None, timeout=60)
21+
22+
assert result.status is InvocationStatus.SUCCEEDED
23+
24+
result_data = deserialize_operation_payload(result.result)
25+
26+
# 4 or 5 items are processed despite min_successful=2, which is expected due to the concurrent executor nature.
27+
# When the completion requirements are met and 2 items succeed, a completion event is set and the main thread
28+
# continues to cancel remaining futures. However, background threads cannot be stopped immediately since they're
29+
# not in the critical section. There's a gap between setting the completion_event and all futures actually stopping,
30+
# during which concurrent threads continue processing and increment counters. With max_concurrency=3 and 5 items,
31+
# 4 or 5 items may complete before the cancellation takes effect. This means >= 4 items are processed as expected
32+
# due to concurrency, with 4 or 5 items being typical in practice.
33+
#
34+
# Additionally, failure_count shows 0 because failed items have retry strategies configured and are still retrying
35+
# when execution completes. Failures aren't finalized until retries complete, so they don't appear in the failure_count.
36+
37+
assert result_data["totalItems"] >= 4
38+
assert result_data["successfulCount"] >= 2
39+
assert result_data["failedCount"] == 0
40+
assert result_data["hasFailures"] is False
41+
assert result_data["batchStatus"] == "BatchItemStatus.SUCCEEDED"
42+
assert result_data["completionReason"] == "CompletionReason.MIN_SUCCESSFUL_REACHED"

0 commit comments

Comments
 (0)