Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,9 @@ def history_to_string(event):

def _add_to_open_tasks(self, task: TaskBase):

if task._is_scheduled:
return

if isinstance(task, AtomicTask):
if task.id is None:
task.id = self._sequence_number
Expand Down
44 changes: 44 additions & 0 deletions tests/orchestrator/test_sequential_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,25 @@ def generator_function_reducing_when_all(context):
yield context.call_activity("Hello", "London")
return ""


def generator_function_reuse_task_in_whenany(context):
task1 = context.call_activity("Hello", "Tokyo")
task2 = context.call_activity("Hello", "Seattle")
pending_tasks = [task1, task2]

# Yield until first task is completed
finished_task1 = yield context.task_any(pending_tasks)

# Remove completed task from pending tasks
pending_tasks.remove(finished_task1)

task3 = context.call_activity("Hello", "London")
tasks = pending_tasks + [task3]

# Yield remaining tasks
yield context.task_any(tasks)
return ""

def generator_function_compound_tasks(context):
yield context.call_activity("Hello", "Tokyo")

Expand Down Expand Up @@ -731,6 +750,31 @@ def test_reducing_when_any_pattern():

assert_orchestration_state_equals(expected, result)

def test_reducing_when_any_pattern():
"""Tests that a user can call when_any on a progressively smaller list of already scheduled tasks"""
context_builder = ContextBuilder('generator_function_reuse_task_in_whenany', replay_schema=ReplaySchema.V2)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 2, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_reuse_task_in_whenany)

# this scenario is only supported for V2 replay
expected_state = base_expected_state("",replay_schema=ReplaySchema.V2)
expected_state._actions = [
[WhenAnyAction(
[CallActivityAction("Hello", "Seattle"), CallActivityAction("Hello", "Tokyo")]),
WhenAnyAction(
[CallActivityAction("Hello", "London")])
]
]

expected_state._is_done = True
expected = expected_state.to_json()

assert_orchestration_state_equals(expected, result)

def test_compound_tasks_return_single_action_in_V2():
"""Tests that compound tasks, in the v2 replay schema, are represented as a single "deep" action"""
context_builder = ContextBuilder('test_v2_replay_schema', replay_schema=ReplaySchema.V2)
Expand Down