diff --git a/moto/stepfunctions/models.py b/moto/stepfunctions/models.py index 68dd4d88e9aa..8c135a8f4d0b 100644 --- a/moto/stepfunctions/models.py +++ b/moto/stepfunctions/models.py @@ -159,10 +159,13 @@ def _handle_name_input_idempotency( for execution in self.executions: if execution.name == name: # Executions with the same name and input are considered idempotent - if execution_input == execution.execution_input: + if ( + execution_input == execution.execution_input + and execution.status == "RUNNING" + ): return execution - # If the inputs are different, raise + # If the inputs are different _or_ the execution already finished, raise raise ExecutionAlreadyExists( "Execution Already Exists: '" + execution.execution_arn + "'" ) diff --git a/moto/stepfunctions/parser/models.py b/moto/stepfunctions/parser/models.py index 0e116ccc5786..d3d8465a53b7 100644 --- a/moto/stepfunctions/parser/models.py +++ b/moto/stepfunctions/parser/models.py @@ -164,6 +164,12 @@ def start_execution( trace_header: TraceHeader = None, ) -> Execution: state_machine = self.describe_state_machine(state_machine_arn) + existing_execution = state_machine._handle_name_input_idempotency( + name, execution_input + ) + if existing_execution is not None: + # If we found a match for the name and input, return the existing execution. + return existing_execution # Update event change parameters about the state machine and should not affect those about this execution. state_machine_clone = copy.deepcopy(state_machine) diff --git a/tests/test_stepfunctions/parser/templates/wait_1.json b/tests/test_stepfunctions/parser/templates/wait_1.json new file mode 100644 index 000000000000..3cbca7305826 --- /dev/null +++ b/tests/test_stepfunctions/parser/templates/wait_1.json @@ -0,0 +1,15 @@ +{ + "Comment": "A state machine that simply waits, and then finishes", + "StartAt": "WaitState", + "States": { + "WaitState": { + "Type": "Wait", + "Seconds": 1, + "Next": "SuccessState" + }, + "SuccessState": { + "Type": "Succeed", + "Comment": "The state machine completes successfully." + } + } +} \ No newline at end of file diff --git a/tests/test_stepfunctions/parser/templates/wait_15.json b/tests/test_stepfunctions/parser/templates/wait_15.json new file mode 100644 index 000000000000..6b57dbb1dcf7 --- /dev/null +++ b/tests/test_stepfunctions/parser/templates/wait_15.json @@ -0,0 +1,15 @@ +{ + "Comment": "A state machine that simply waits, and then finishes", + "StartAt": "WaitState", + "States": { + "WaitState": { + "Type": "Wait", + "Seconds": 15, + "Next": "SuccessState" + }, + "SuccessState": { + "Type": "Succeed", + "Comment": "The state machine completes successfully." + } + } +} \ No newline at end of file diff --git a/tests/test_stepfunctions/parser/test_stepfunctions_idempotency.py b/tests/test_stepfunctions/parser/test_stepfunctions_idempotency.py new file mode 100644 index 000000000000..10ef4c8ab364 --- /dev/null +++ b/tests/test_stepfunctions/parser/test_stepfunctions_idempotency.py @@ -0,0 +1,69 @@ +from unittest import SkipTest + +import pytest +from botocore.exceptions import ClientError + +from moto import settings + +from . import ( + allow_aws_request, + aws_verified, + verify_execution_result, +) + + +@aws_verified +@pytest.mark.aws_verified +def test_create_state_machine_twice_after_failure(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Don't need to test this in ServerMode") + + def _verify_result(client, execution, execution_arn): + name = execution["name"] + arn = execution["stateMachineArn"] + execution_arn = execution["executionArn"] + + # Execution fails if we re-start it after failure + with pytest.raises(ClientError) as exc: + client.start_execution(name=name, stateMachineArn=arn) + err = exc.value.response["Error"] + assert err["Code"] == "ExecutionAlreadyExists" + assert err["Message"] == f"Execution Already Exists: '{execution_arn}'" + + verify_execution_result(_verify_result, "FAILED", "failure") + + +@aws_verified +@pytest.mark.aws_verified +def test_create_state_machine_twice_after_success(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Don't need to test this in ServerMode") + + def _verify_result(client, execution, execution_arn): + name = execution["name"] + arn = execution["stateMachineArn"] + execution_arn = execution["executionArn"] + + if execution["status"] == "RUNNING": + # We can start the execution just fine + idempotent = client.start_execution(name=name, stateMachineArn=arn) + assert idempotent["executionArn"] == execution_arn + + # We're not done yet - we should check in on the progress later + return False + elif execution["status"] == "SUCCEEDED": + # Execution fails if we re-start it after it finishes + with pytest.raises(ClientError) as exc: + client.start_execution(name=name, stateMachineArn=arn) + err = exc.value.response["Error"] + assert err["Code"] == "ExecutionAlreadyExists" + assert err["Message"] == f"Execution Already Exists: '{execution_arn}'" + + # Execution finished, and we verified our error exception + # Return True to indicate we can tear down our StateMachine + return True + + # AWS is a little slower, so we need to wait longer + # If we only wait 1 second, our execution might finish before we can retry it + tmpl_name = "wait_15" if allow_aws_request() else "wait_1" + verify_execution_result(_verify_result, None, tmpl_name) diff --git a/tests/test_stepfunctions/test_stepfunctions_idempotency.py b/tests/test_stepfunctions/test_stepfunctions_idempotency.py new file mode 100644 index 000000000000..977424645887 --- /dev/null +++ b/tests/test_stepfunctions/test_stepfunctions_idempotency.py @@ -0,0 +1,78 @@ +import os +from unittest import SkipTest, mock + +import pytest +from botocore.exceptions import ClientError + +from moto import mock_aws, settings +from moto.stepfunctions.models import StepFunctionBackend, stepfunctions_backends +from tests import DEFAULT_ACCOUNT_ID + +from .parser import ( + verify_execution_result, +) + + +@mock_aws +@mock.patch.dict(os.environ, {"SF_EXECUTION_HISTORY_TYPE": "FAILURE"}) +def test_create_state_machine_twice_after_failure(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Don't need to test this in ServerMode") + + def _verify_result(client, execution, execution_arn): + name = execution["name"] + arn = execution["stateMachineArn"] + execution_arn = execution["executionArn"] + + # Execution fails if we re-start it after failure + with pytest.raises(ClientError) as exc: + client.start_execution(name=name, stateMachineArn=arn) + err = exc.value.response["Error"] + assert err["Code"] == "ExecutionAlreadyExists" + assert err["Message"] == f"Execution Already Exists: '{execution_arn}'" + + verify_execution_result(_verify_result, "FAILED", "failure") + + +@mock_aws +def test_create_state_machine_twice_after_success(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Don't need to test this in ServerMode") + + def _verify_result(client, execution, execution_arn): + name = execution["name"] + arn = execution["stateMachineArn"] + execution_arn = execution["executionArn"] + + if execution["status"] == "RUNNING": + # We can start the execution just fine + idempotent = client.start_execution(name=name, stateMachineArn=arn) + assert idempotent["executionArn"] == execution_arn + + # Manually mark our execution as finished + # (Because we don't actually execute anything, the status is always 'RUNNING' if we don't do this manually) + backend: StepFunctionBackend = stepfunctions_backends[DEFAULT_ACCOUNT_ID][ + "us-east-1" + ] + machine = backend.describe_state_machine(arn) + execution = next( + (x for x in machine.executions if x.execution_arn == execution_arn), + None, + ) + execution.status = "SUCCEEDED" + + # We're not done yet - we should check in on the progress later + return False + elif execution["status"] == "SUCCEEDED": + # Execution fails if we re-start it after it finishes + with pytest.raises(ClientError) as exc: + client.start_execution(name=name, stateMachineArn=arn) + err = exc.value.response["Error"] + assert err["Code"] == "ExecutionAlreadyExists" + assert err["Message"] == f"Execution Already Exists: '{execution_arn}'" + + # Execution finished, and we verified our error exception + # Return True to indicate we can tear down our StateMachine + return True + + verify_execution_result(_verify_result, None, tmpl_name="wait_1")