Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions moto/stepfunctions/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,13 @@ def _handle_name_input_idempotency(
for execution in self.executions:
if execution.name == name:
# Executions with the same name and input are considered idempotent
if execution_input == execution.execution_input:
if (
execution_input == execution.execution_input
and execution.status == "RUNNING"
):
return execution

# If the inputs are different, raise
# If the inputs are different _or_ the execution already finished, raise
raise ExecutionAlreadyExists(
"Execution Already Exists: '" + execution.execution_arn + "'"
)
Expand Down
6 changes: 6 additions & 0 deletions moto/stepfunctions/parser/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ def start_execution(
trace_header: TraceHeader = None,
) -> Execution:
state_machine = self.describe_state_machine(state_machine_arn)
existing_execution = state_machine._handle_name_input_idempotency(
name, execution_input
)
if existing_execution is not None:
# If we found a match for the name and input, return the existing execution.
return existing_execution

# Update event change parameters about the state machine and should not affect those about this execution.
state_machine_clone = copy.deepcopy(state_machine)
Expand Down
15 changes: 15 additions & 0 deletions tests/test_stepfunctions/parser/templates/wait_1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"Comment": "A state machine that simply waits, and then finishes",
"StartAt": "WaitState",
"States": {
"WaitState": {
"Type": "Wait",
"Seconds": 1,
"Next": "SuccessState"
},
"SuccessState": {
"Type": "Succeed",
"Comment": "The state machine completes successfully."
}
}
}
15 changes: 15 additions & 0 deletions tests/test_stepfunctions/parser/templates/wait_15.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"Comment": "A state machine that simply waits, and then finishes",
"StartAt": "WaitState",
"States": {
"WaitState": {
"Type": "Wait",
"Seconds": 15,
"Next": "SuccessState"
},
"SuccessState": {
"Type": "Succeed",
"Comment": "The state machine completes successfully."
}
}
}
69 changes: 69 additions & 0 deletions tests/test_stepfunctions/parser/test_stepfunctions_idempotency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from unittest import SkipTest

import pytest
from botocore.exceptions import ClientError

from moto import settings

from . import (
allow_aws_request,
aws_verified,
verify_execution_result,
)


@aws_verified
@pytest.mark.aws_verified
def test_create_state_machine_twice_after_failure():
if settings.TEST_SERVER_MODE:
raise SkipTest("Don't need to test this in ServerMode")

def _verify_result(client, execution, execution_arn):
name = execution["name"]
arn = execution["stateMachineArn"]
execution_arn = execution["executionArn"]

# Execution fails if we re-start it after failure
with pytest.raises(ClientError) as exc:
client.start_execution(name=name, stateMachineArn=arn)
err = exc.value.response["Error"]
assert err["Code"] == "ExecutionAlreadyExists"
assert err["Message"] == f"Execution Already Exists: '{execution_arn}'"

verify_execution_result(_verify_result, "FAILED", "failure")


@aws_verified
@pytest.mark.aws_verified
def test_create_state_machine_twice_after_success():
if settings.TEST_SERVER_MODE:
raise SkipTest("Don't need to test this in ServerMode")

def _verify_result(client, execution, execution_arn):
name = execution["name"]
arn = execution["stateMachineArn"]
execution_arn = execution["executionArn"]

if execution["status"] == "RUNNING":
# We can start the execution just fine
idempotent = client.start_execution(name=name, stateMachineArn=arn)
assert idempotent["executionArn"] == execution_arn

# We're not done yet - we should check in on the progress later
return False
elif execution["status"] == "SUCCEEDED":
# Execution fails if we re-start it after it finishes
with pytest.raises(ClientError) as exc:
client.start_execution(name=name, stateMachineArn=arn)
err = exc.value.response["Error"]
assert err["Code"] == "ExecutionAlreadyExists"
assert err["Message"] == f"Execution Already Exists: '{execution_arn}'"

# Execution finished, and we verified our error exception
# Return True to indicate we can tear down our StateMachine
return True

# AWS is a little slower, so we need to wait longer
# If we only wait 1 second, our execution might finish before we can retry it
tmpl_name = "wait_15" if allow_aws_request() else "wait_1"
verify_execution_result(_verify_result, None, tmpl_name)
78 changes: 78 additions & 0 deletions tests/test_stepfunctions/test_stepfunctions_idempotency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import os
from unittest import SkipTest, mock

import pytest
from botocore.exceptions import ClientError

from moto import mock_aws, settings
from moto.stepfunctions.models import StepFunctionBackend, stepfunctions_backends
from tests import DEFAULT_ACCOUNT_ID

from .parser import (
verify_execution_result,
)


@mock_aws
@mock.patch.dict(os.environ, {"SF_EXECUTION_HISTORY_TYPE": "FAILURE"})
def test_create_state_machine_twice_after_failure():
if settings.TEST_SERVER_MODE:
raise SkipTest("Don't need to test this in ServerMode")

def _verify_result(client, execution, execution_arn):
name = execution["name"]
arn = execution["stateMachineArn"]
execution_arn = execution["executionArn"]

# Execution fails if we re-start it after failure
with pytest.raises(ClientError) as exc:
client.start_execution(name=name, stateMachineArn=arn)
err = exc.value.response["Error"]
assert err["Code"] == "ExecutionAlreadyExists"
assert err["Message"] == f"Execution Already Exists: '{execution_arn}'"

verify_execution_result(_verify_result, "FAILED", "failure")


@mock_aws
def test_create_state_machine_twice_after_success():
if settings.TEST_SERVER_MODE:
raise SkipTest("Don't need to test this in ServerMode")

def _verify_result(client, execution, execution_arn):
name = execution["name"]
arn = execution["stateMachineArn"]
execution_arn = execution["executionArn"]

if execution["status"] == "RUNNING":
# We can start the execution just fine
idempotent = client.start_execution(name=name, stateMachineArn=arn)
assert idempotent["executionArn"] == execution_arn

# Manually mark our execution as finished
# (Because we don't actually execute anything, the status is always 'RUNNING' if we don't do this manually)
backend: StepFunctionBackend = stepfunctions_backends[DEFAULT_ACCOUNT_ID][
"us-east-1"
]
machine = backend.describe_state_machine(arn)
execution = next(
(x for x in machine.executions if x.execution_arn == execution_arn),
None,
)
execution.status = "SUCCEEDED"

# We're not done yet - we should check in on the progress later
return False
elif execution["status"] == "SUCCEEDED":
# Execution fails if we re-start it after it finishes
with pytest.raises(ClientError) as exc:
client.start_execution(name=name, stateMachineArn=arn)
err = exc.value.response["Error"]
assert err["Code"] == "ExecutionAlreadyExists"
assert err["Message"] == f"Execution Already Exists: '{execution_arn}'"

# Execution finished, and we verified our error exception
# Return True to indicate we can tear down our StateMachine
return True

verify_execution_result(_verify_result, None, tmpl_name="wait_1")