diff --git a/.serena/.gitignore b/.serena/.gitignore new file mode 100644 index 0000000000..14d86ad623 --- /dev/null +++ b/.serena/.gitignore @@ -0,0 +1 @@ +/cache diff --git a/.serena/project.yml b/.serena/project.yml new file mode 100644 index 0000000000..07b99a772b --- /dev/null +++ b/.serena/project.yml @@ -0,0 +1,87 @@ +# list of languages for which language servers are started; choose from: +# al bash clojure cpp csharp csharp_omnisharp +# dart elixir elm erlang fortran fsharp +# go groovy haskell java julia kotlin +# lua markdown nix pascal perl php +# powershell python python_jedi r rego ruby +# ruby_solargraph rust scala swift terraform toml +# typescript typescript_vts yaml zig +# Note: +# - For C, use cpp +# - For JavaScript, use typescript +# - For Free Pascal / Lazarus, use pascal +# Special requirements: +# - csharp: Requires the presence of a .sln file in the project folder. +# - pascal: Requires Free Pascal Compiler (fpc) and optionally Lazarus. +# When using multiple languages, the first language server that supports a given file will be used for that file. +# The first language is the default language and the respective language server will be used as a fallback. +# Note that when using the JetBrains backend, language servers are not used and this list is correspondingly ignored. +languages: +- java + +# the encoding used by text files in the project +# For a list of possible encodings, see https://docs.python.org/3.11/library/codecs.html#standard-encodings +encoding: "utf-8" + +# whether to use the project's gitignore file to ignore files +# Added on 2025-04-07 +ignore_all_files_in_gitignore: true + +# list of additional paths to ignore +# same syntax as gitignore, so you can use * and ** +# Was previously called `ignored_dirs`, please update your config if you are using that. +# Added (renamed) on 2025-04-07 +ignored_paths: [] + +# whether the project is in read-only mode +# If set to true, all editing tools will be disabled and attempts to use them will result in an error +# Added on 2025-04-18 +read_only: false + +# list of tool names to exclude. We recommend not excluding any tools, see the readme for more details. +# Below is the complete list of tools for convenience. +# To make sure you have the latest list of tools, and to view their descriptions, +# execute `uv run scripts/print_tool_overview.py`. +# +# * `activate_project`: Activates a project by name. +# * `check_onboarding_performed`: Checks whether project onboarding was already performed. +# * `create_text_file`: Creates/overwrites a file in the project directory. +# * `delete_lines`: Deletes a range of lines within a file. +# * `delete_memory`: Deletes a memory from Serena's project-specific memory store. +# * `execute_shell_command`: Executes a shell command. +# * `find_referencing_code_snippets`: Finds code snippets in which the symbol at the given location is referenced. +# * `find_referencing_symbols`: Finds symbols that reference the symbol at the given location (optionally filtered by type). +# * `find_symbol`: Performs a global (or local) search for symbols with/containing a given name/substring (optionally filtered by type). +# * `get_current_config`: Prints the current configuration of the agent, including the active and available projects, tools, contexts, and modes. +# * `get_symbols_overview`: Gets an overview of the top-level symbols defined in a given file. +# * `initial_instructions`: Gets the initial instructions for the current project. +# Should only be used in settings where the system prompt cannot be set, +# e.g. in clients you have no control over, like Claude Desktop. +# * `insert_after_symbol`: Inserts content after the end of the definition of a given symbol. +# * `insert_at_line`: Inserts content at a given line in a file. +# * `insert_before_symbol`: Inserts content before the beginning of the definition of a given symbol. +# * `list_dir`: Lists files and directories in the given directory (optionally with recursion). +# * `list_memories`: Lists memories in Serena's project-specific memory store. +# * `onboarding`: Performs onboarding (identifying the project structure and essential tasks, e.g. for testing or building). +# * `prepare_for_new_conversation`: Provides instructions for preparing for a new conversation (in order to continue with the necessary context). +# * `read_file`: Reads a file within the project directory. +# * `read_memory`: Reads the memory with the given name from Serena's project-specific memory store. +# * `remove_project`: Removes a project from the Serena configuration. +# * `replace_lines`: Replaces a range of lines within a file with new content. +# * `replace_symbol_body`: Replaces the full definition of a symbol. +# * `restart_language_server`: Restarts the language server, may be necessary when edits not through Serena happen. +# * `search_for_pattern`: Performs a search for a pattern in the project. +# * `summarize_changes`: Provides instructions for summarizing the changes made to the codebase. +# * `switch_modes`: Activates modes by providing a list of their names +# * `think_about_collected_information`: Thinking tool for pondering the completeness of collected information. +# * `think_about_task_adherence`: Thinking tool for determining whether the agent is still on track with the current task. +# * `think_about_whether_you_are_done`: Thinking tool for determining whether the task is truly completed. +# * `write_memory`: Writes a named memory (for future reference) to Serena's project-specific memory store. +excluded_tools: [] + +# initial prompt for the project. It will always be given to the LLM upon activating the project +# (contrary to the memories, which are loaded on demand). +initial_prompt: "" + +project_name: "sdk-java" +included_optional_tools: [] diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index d515817b29..9ed89ef05c 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -742,6 +742,28 @@ public DynamicUpdateHandler getHandler() { void await(String reason, Supplier unblockCondition); + /** + * Asynchronously wait until unblockCondition evaluates to true. + * + * @param unblockCondition condition that should return true to indicate completion + * @return Promise that completes when the condition becomes true, or completes exceptionally with + * CanceledFailure if the enclosing CancellationScope is canceled + */ + Promise awaitAsync(Supplier unblockCondition); + + /** + * Asynchronously wait until unblockCondition evaluates to true or timeout expires. + * + * @param timeout maximum time to wait for the condition + * @param timerSummary summary for the timer created by this await, used in workflow history + * @param unblockCondition condition that should return true to indicate completion + * @return Promise that completes with true if the condition was satisfied, false if the timeout + * expired, or exceptionally with CanceledFailure if the enclosing CancellationScope is + * canceled + */ + Promise awaitAsync( + Duration timeout, String timerSummary, Supplier unblockCondition); + Promise newTimer(Duration duration); Promise newTimer(Duration duration, TimerOptions options); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java index 9d99d4c78b..db0c5429a4 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java @@ -75,6 +75,17 @@ public void await(String reason, Supplier unblockCondition) { next.await(reason, unblockCondition); } + @Override + public Promise awaitAsync(Supplier unblockCondition) { + return next.awaitAsync(unblockCondition); + } + + @Override + public Promise awaitAsync( + Duration timeout, String timerSummary, Supplier unblockCondition) { + return next.awaitAsync(timeout, timerSummary, unblockCondition); + } + @Override public Promise newTimer(Duration duration) { return next.newTimer(duration); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java index efafe230af..5646443677 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java @@ -3,6 +3,7 @@ import io.temporal.internal.worker.WorkflowExecutorCache; import io.temporal.workflow.CancellationScope; import java.util.Optional; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -29,7 +30,7 @@ static DeterministicRunner newRunner( SyncWorkflowContext workflowContext, Runnable root, WorkflowExecutorCache cache) { - return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache); + return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache, null); } /** @@ -44,7 +45,29 @@ static DeterministicRunner newRunner( WorkflowThreadExecutor workflowThreadExecutor, SyncWorkflowContext workflowContext, Runnable root) { - return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null); + return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null, null); + } + + /** + * Create new instance of DeterministicRunner with a callback invoked before threads wake up. + * + * @param workflowThreadExecutor executor for workflow thread Runnables + * @param workflowContext workflow context to use + * @param root function that root thread of the runner executes. + * @param cache WorkflowExecutorCache used cache inflight workflows + * @param beforeThreadsWakeUp callback invoked once per loop iteration before threads run. Returns + * true if progress was made (e.g., a condition watcher fired), which causes the loop to + * continue even if all threads are blocked. Returns false if no progress was made. + * @return instance of the DeterministicRunner. + */ + static DeterministicRunner newRunner( + WorkflowThreadExecutor workflowThreadExecutor, + SyncWorkflowContext workflowContext, + Runnable root, + WorkflowExecutorCache cache, + @Nullable Supplier beforeThreadsWakeUp) { + return new DeterministicRunnerImpl( + workflowThreadExecutor, workflowContext, root, cache, beforeThreadsWakeUp); } /** @@ -92,6 +115,24 @@ static DeterministicRunner newRunner( @Nonnull WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name); + /** + * Creates a new repeatable workflow thread that re-evaluates its condition on each + * runUntilBlocked() call. The thread completes when the condition returns true or when + * cancelled/destroyed. + * + *

This is used for async await where the condition can contain blocking operations like + * Workflow.await(), activity calls, etc. Unlike simple condition watchers, these conditions run + * in their own workflow thread context with full workflow capabilities. + * + * @param condition The condition to evaluate repeatedly. May contain blocking operations. + * @param detached Whether the thread is detached from parent cancellation scope + * @param name Optional name for the thread + * @return A new WorkflowThread that repeatedly evaluates the condition + */ + @Nonnull + WorkflowThread newRepeatableThread( + Supplier condition, boolean detached, @Nullable String name); + /** * Retrieve data from runner locals. Returns 1. not found (an empty Optional) 2. found but null * (an Optional of an empty Optional) 3. found and non-null (an Optional of an Optional of a diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java index 234f71bff5..26fe111cde 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java @@ -69,6 +69,9 @@ class DeterministicRunnerImpl implements DeterministicRunner { // always accessed under the runner lock private final List toExecuteInWorkflowThread = new ArrayList<>(); + // Callback invoked before threads wake up in each event loop iteration + @Nullable private final Supplier beforeThreadsWakeUp; + // Access to workflowThreadsToAdd, callbackThreadsToAdd, addedThreads doesn't have to be // synchronized. // Inside DeterministicRunner the access to these variables is under the runner lock. @@ -144,7 +147,7 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) { WorkflowThreadExecutor workflowThreadExecutor, @Nonnull SyncWorkflowContext workflowContext, Runnable root) { - this(workflowThreadExecutor, workflowContext, root, null); + this(workflowThreadExecutor, workflowContext, root, null, null); } DeterministicRunnerImpl( @@ -152,12 +155,22 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) { @Nonnull SyncWorkflowContext workflowContext, Runnable root, WorkflowExecutorCache cache) { + this(workflowThreadExecutor, workflowContext, root, cache, null); + } + + DeterministicRunnerImpl( + WorkflowThreadExecutor workflowThreadExecutor, + @Nonnull SyncWorkflowContext workflowContext, + Runnable root, + WorkflowExecutorCache cache, + @Nullable Supplier beforeThreadsWakeUp) { this.workflowThreadExecutor = workflowThreadExecutor; this.workflowContext = Preconditions.checkNotNull(workflowContext, "workflowContext"); // TODO this should be refactored, publishing of this in an constructor into external objects is // a bad practice this.workflowContext.setRunner(this); this.cache = cache; + this.beforeThreadsWakeUp = beforeThreadsWakeUp; boolean deterministicCancellationScopeOrder = workflowContext .getReplayContext() @@ -208,7 +221,16 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) { appendCallbackThreadsLocked(); } toExecuteInWorkflowThread.clear(); - progress = false; + + // Invoke beforeThreadsWakeUp callback BEFORE running threads. + // This allows the callback to evaluate conditions and complete promises, + // ensuring threads see updated state when they wake up. + if (beforeThreadsWakeUp != null) { + progress = beforeThreadsWakeUp.get(); + } else { + progress = false; + } + Iterator ci = threads.iterator(); while (ci.hasNext()) { WorkflowThread c = ci.next(); @@ -504,6 +526,53 @@ public WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name return result; } + /** + * Creates a new repeatable workflow thread that re-evaluates its condition on each + * runUntilBlocked() call until the condition returns true. + * + *

IMPORTANT: The condition must be read-only (it observes state but must not modify it). This + * is because the condition may be evaluated multiple times per workflow task, and modifying state + * would cause non-determinism. + * + *

The thread reports progress only when the condition becomes true, ensuring the event loop + * doesn't spin indefinitely when conditions remain false. + * + * @param condition The read-only condition to evaluate repeatedly + * @param detached Whether the thread is detached from parent cancellation scope + * @param name Optional name for the thread + * @return A new WorkflowThread that repeatedly evaluates the condition + */ + @Override + @Nonnull + public WorkflowThread newRepeatableThread( + Supplier condition, boolean detached, @Nullable String name) { + if (name == null) { + name = + "repeatable[" + workflowContext.getReplayContext().getWorkflowId() + "]-" + addedThreads; + } + if (rootWorkflowThread == null) { + throw new IllegalStateException( + "newRepeatableThread can be called only with existing root workflow thread"); + } + checkWorkflowThreadOnly(); + checkNotClosed(); + WorkflowThread result = + new RepeatableWorkflowThread( + workflowThreadExecutor, + workflowContext, + this, + name, + WORKFLOW_THREAD_PRIORITY + (addedThreads++), + detached, + CancellationScopeImpl.current(), + condition, + cache, + getContextPropagators(), + getPropagatedContexts()); + workflowThreadsToAdd.add(result); + return result; + } + /** * Executes before any other threads next time runUntilBlockedCalled. Must never be called from * any workflow threads. diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/RepeatableWorkflowThread.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/RepeatableWorkflowThread.java new file mode 100644 index 0000000000..3dbbdf0fc8 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/RepeatableWorkflowThread.java @@ -0,0 +1,382 @@ +package io.temporal.internal.sync; + +import io.temporal.common.context.ContextPropagator; +import io.temporal.internal.worker.WorkflowExecutorCache; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.function.Supplier; +import javax.annotation.Nonnull; + +/** + * Implementation of WorkflowThread that repeatedly evaluates a read-only condition on each + * runUntilBlocked() call until the condition returns true. + * + *

IMPORTANT: The condition must be read-only - it should only observe workflow state, not modify + * it. This is because the condition may be evaluated multiple times per workflow task, and + * modifying state would cause non-determinism. Conditions should be simple boolean expressions that + * check workflow variables or promise states. + * + *

Key behavior: + * + *

    + *
  • Each condition evaluation runs in its own WorkflowThreadImpl for proper workflow context + *
  • Reports progress (returns true) ONLY when condition becomes true or throws an exception + *
  • When condition returns false, reports no progress - acts as "yielded" waiting for state + * change + *
  • This prevents infinite loops in the event loop when conditions remain false + *
+ * + *

The thread completes when: + * + *

    + *
  • The condition returns true + *
  • The condition throws an exception + *
  • The thread is cancelled + *
  • The thread is destroyed (stopNow) + *
+ */ +class RepeatableWorkflowThread implements WorkflowThread { + + /** + * Flag indicating that the user's condition became true. Once true, the thread is considered + * done. + */ + private volatile boolean conditionSatisfied = false; + + /** Flag indicating the thread has been cancelled. */ + private volatile boolean cancelRequested = false; + + /** Cancellation reason if cancelled. */ + private volatile String cancellationReason; + + /** Exception from the internal thread that needs to be propagated. */ + private volatile Throwable propagatedException; + + /** The current internal thread executing the condition. */ + private WorkflowThreadImpl currentEvaluationThread; + + /** Counter for naming internal threads. */ + private int evaluationCount = 0; + + /** Cached cancellation promise, following the pattern from CancellationScopeImpl. */ + private io.temporal.workflow.CompletablePromise cancellationPromise; + + private final WorkflowThreadExecutor workflowThreadExecutor; + private final SyncWorkflowContext syncWorkflowContext; + private final DeterministicRunnerImpl runner; + private final String threadName; + private final int priority; + private final boolean detached; + private final CancellationScopeImpl parentCancellationScope; + private final Supplier condition; + private final WorkflowExecutorCache cache; + private final List contextPropagators; + private final Map propagatedContexts; + private final Map, Object> threadLocalMap = new HashMap<>(); + + RepeatableWorkflowThread( + WorkflowThreadExecutor workflowThreadExecutor, + SyncWorkflowContext syncWorkflowContext, + DeterministicRunnerImpl runner, + @Nonnull String name, + int priority, + boolean detached, + CancellationScopeImpl parentCancellationScope, + Supplier condition, + WorkflowExecutorCache cache, + List contextPropagators, + Map propagatedContexts) { + this.workflowThreadExecutor = workflowThreadExecutor; + this.syncWorkflowContext = syncWorkflowContext; + this.runner = runner; + this.threadName = + com.google.common.base.Preconditions.checkNotNull(name, "Thread name shouldn't be null"); + this.priority = priority; + this.detached = detached; + this.parentCancellationScope = parentCancellationScope; + this.condition = condition; + this.cache = cache; + this.contextPropagators = contextPropagators; + this.propagatedContexts = propagatedContexts; + } + + /** + * Creates a new internal WorkflowThreadImpl for evaluating the condition. The thread's runnable + * evaluates the condition once and sets conditionSatisfied if true. + */ + private WorkflowThreadImpl createEvaluationThread() { + evaluationCount++; + String evalThreadName = threadName + "-eval-" + evaluationCount; + + Runnable evaluationRunnable = + () -> { + if (isCancelled()) { + throw new io.temporal.failure.CanceledFailure(getEffectiveCancellationReason()); + } + + boolean result = condition.get(); + + if (isCancelled()) { + throw new io.temporal.failure.CanceledFailure(getEffectiveCancellationReason()); + } + + if (result) { + conditionSatisfied = true; + } + }; + + return new WorkflowThreadImpl( + workflowThreadExecutor, + syncWorkflowContext, + runner, + evalThreadName, + priority, + detached, + parentCancellationScope, + evaluationRunnable, + cache, + contextPropagators, + propagatedContexts); + } + + @Override + public boolean runUntilBlocked(long deadlockDetectionTimeoutMs) { + if (isDone()) { + return false; + } + + if (currentEvaluationThread == null || currentEvaluationThread.isDone()) { + if (conditionSatisfied) { + return false; + } + currentEvaluationThread = createEvaluationThread(); + } + + currentEvaluationThread.runUntilBlocked(deadlockDetectionTimeoutMs); + + Throwable unhandledException = currentEvaluationThread.getUnhandledException(); + if (unhandledException != null) { + propagatedException = unhandledException; + return true; + } + + // Return true ONLY when condition is satisfied. When condition returns false, + // report no progress so this thread acts as "yielded", allowing other threads + // to change state without spinning the event loop. + return conditionSatisfied; + } + + @Override + public boolean isDone() { + if (conditionSatisfied) { + return true; + } + if (propagatedException != null) { + return true; + } + // Cancelled and no running evaluation thread + if (isCancelled() && (currentEvaluationThread == null || currentEvaluationThread.isDone())) { + return true; + } + return false; + } + + @Override + public Throwable getUnhandledException() { + if (propagatedException != null) { + return propagatedException; + } + if (currentEvaluationThread != null) { + return currentEvaluationThread.getUnhandledException(); + } + return null; + } + + @Override + public void cancel() { + cancel(null); + } + + @Override + public void cancel(String reason) { + cancelRequested = true; + cancellationReason = reason; + if (cancellationPromise != null && !cancellationPromise.isCompleted()) { + cancellationPromise.complete(reason); + } + if (currentEvaluationThread != null) { + currentEvaluationThread.cancel(reason); + } + } + + @Override + public boolean isCancelRequested() { + return isCancelled(); + } + + /** + * Checks if this thread should be considered cancelled. This includes both explicit cancellation + * via cancel() and cancellation propagated through the parent CancellationScope. + */ + private boolean isCancelled() { + return cancelRequested + || (parentCancellationScope != null && parentCancellationScope.isCancelRequested()); + } + + /** Gets the cancellation reason, checking parent scope if needed. */ + private String getEffectiveCancellationReason() { + if (cancellationReason != null) { + return cancellationReason; + } + if (parentCancellationScope != null && parentCancellationScope.isCancelRequested()) { + return parentCancellationScope.getCancellationReason(); + } + return null; + } + + @Override + public String getCancellationReason() { + return getEffectiveCancellationReason(); + } + + @Override + public io.temporal.workflow.Promise getCancellationRequest() { + if (currentEvaluationThread != null) { + return currentEvaluationThread.getCancellationRequest(); + } + if (cancellationPromise == null) { + cancellationPromise = io.temporal.workflow.Workflow.newPromise(); + if (isCancelled()) { + cancellationPromise.complete(getEffectiveCancellationReason()); + } + } + return cancellationPromise; + } + + @Override + public boolean isDetached() { + return detached; + } + + @Override + public Future stopNow() { + cancelRequested = true; + if (currentEvaluationThread != null) { + return currentEvaluationThread.stopNow(); + } + return java.util.concurrent.CompletableFuture.completedFuture(null); + } + + @Override + public boolean isStarted() { + return true; // We start on first runUntilBlocked + } + + @Override + public void start() { + // No-op - we start on first runUntilBlocked() + } + + @Override + public String getName() { + return threadName; + } + + @Override + public void setName(String name) { + // Thread name is immutable for RepeatableWorkflowThread + } + + @Override + public long getId() { + return hashCode(); + } + + @Override + public int getPriority() { + return priority; + } + + @Override + public void run() { + throw new UnsupportedOperationException("not used"); + } + + @Override + public String getStackTrace() { + if (currentEvaluationThread != null) { + return currentEvaluationThread.getStackTrace(); + } + return threadName + "\n\t(not running)"; + } + + @Override + public void addStackTrace(StringBuilder result) { + result.append(threadName); + if (currentEvaluationThread != null) { + result.append(": delegating to "); + currentEvaluationThread.addStackTrace(result); + } else { + result.append("(NOT RUNNING)"); + } + } + + @Override + public void yield(String reason, Supplier unblockCondition) { + if (currentEvaluationThread != null) { + currentEvaluationThread.yield(reason, unblockCondition); + } else { + throw new IllegalStateException("Cannot yield - no evaluation thread running"); + } + } + + @Override + public void exitThread() { + if (currentEvaluationThread != null) { + currentEvaluationThread.exitThread(); + } + } + + @Override + public DeterministicRunnerImpl getRunner() { + return runner; + } + + @Override + public SyncWorkflowContext getWorkflowContext() { + return syncWorkflowContext; + } + + @Override + public WorkflowThreadContext getWorkflowThreadContext() { + if (currentEvaluationThread != null) { + return currentEvaluationThread.getWorkflowThreadContext(); + } + return null; + } + + @Override + public io.temporal.internal.common.NonIdempotentHandle lockDeadlockDetector() { + if (currentEvaluationThread != null) { + return currentEvaluationThread.lockDeadlockDetector(); + } + return null; + } + + @Override + public void setThreadLocal(WorkflowThreadLocalInternal key, T value) { + threadLocalMap.put(key, value); + } + + @SuppressWarnings("unchecked") + @Override + public Optional> getThreadLocal(WorkflowThreadLocalInternal key) { + if (!threadLocalMap.containsKey(key)) { + return Optional.empty(); + } + return Optional.of(Optional.ofNullable((T) threadLocalMap.get(key))); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index a9f1f1107d..77b34e6362 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -124,7 +124,8 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) { context.getWorkflowExecution())) .start(); }, - cache); + cache, + workflowContext.getBeforeThreadsWakeUpCallback()); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 977d9754e6..43dc8b1399 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -54,6 +54,7 @@ import java.time.Duration; import java.time.Instant; import java.util.*; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; @@ -104,6 +105,9 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall private final WorkflowThreadLocal currentUpdateInfo = new WorkflowThreadLocal<>(); @Nullable private String currentDetails; + // Condition watchers for async await functionality + private final List conditionWatchers = new ArrayList<>(); + public SyncWorkflowContext( @Nonnull String namespace, @Nonnull WorkflowExecution workflowExecution, @@ -1327,6 +1331,93 @@ public void await(String reason, Supplier unblockCondition) { WorkflowThread.await(reason, unblockCondition); } + @Override + public Promise awaitAsync(Supplier unblockCondition) { + CompletablePromise result = Workflow.newPromise(); + + Supplier wrappedCondition = + () -> { + try { + if (unblockCondition.get()) { + result.complete(null); + return true; + } + return false; + } catch (RuntimeException e) { + result.completeExceptionally(e); + return true; + } + }; + + WorkflowThread conditionThread = + runner.newRepeatableThread(wrappedCondition, false, "awaitAsync"); + + CancellationScope.current() + .getCancellationRequest() + .thenApply( + (r) -> { + if (!result.isCompleted()) { + result.completeExceptionally(new CanceledFailure(r)); + } + conditionThread.cancel(r); + return r; + }); + + return result; + } + + @Override + public Promise awaitAsync( + Duration timeout, String timerSummary, Supplier unblockCondition) { + CompletablePromise result = Workflow.newPromise(); + + TimerOptions timerOptions = TimerOptions.newBuilder().setSummary(timerSummary).build(); + + // Detached scope allows cancelling the timer when condition is met + CompletablePromise timerPromise = Workflow.newPromise(); + CancellationScope timerScope = + Workflow.newDetachedCancellationScope( + () -> timerPromise.completeFrom(newTimer(timeout, timerOptions))); + timerScope.run(); + + Supplier wrappedCondition = + () -> { + try { + if (unblockCondition.get()) { + result.complete(true); + timerScope.cancel(); + return true; + } + if (timerPromise.isCompleted()) { + result.complete(false); + return true; + } + return false; + } catch (RuntimeException e) { + result.completeExceptionally(e); + timerScope.cancel(); + return true; + } + }; + + WorkflowThread conditionThread = + runner.newRepeatableThread(wrappedCondition, false, "awaitAsync"); + + CancellationScope.current() + .getCancellationRequest() + .thenApply( + (r) -> { + if (!result.isCompleted()) { + result.completeExceptionally(new CanceledFailure(r)); + } + timerScope.cancel(); + conditionThread.cancel(r); + return r; + }); + + return result; + } + @SuppressWarnings("deprecation") @Override public void continueAsNew(ContinueAsNewInput input) { @@ -1584,4 +1675,93 @@ public Failure getFailure() { return failure; } } + + /** + * Returns a callback to be used by DeterministicRunner before threads wake up. This callback + * evaluates condition watchers and completes promises as needed. + */ + public Supplier getBeforeThreadsWakeUpCallback() { + return this::evaluateConditionWatchers; + } + + /** + * Registers a condition watcher for async await functionality. The condition is evaluated + * immediately (inline) before registering. If already satisfied, returns immediately without + * registering a watcher. Otherwise, the condition is re-evaluated at the end of each event loop + * iteration until it returns true. + * + *

IMPORTANT: The condition must never throw exceptions. If it does, the workflow task will + * fail. Callers should handle exceptions within the condition supplier and complete their promise + * appropriately before returning true. + * + * @param condition Supplier that returns true when the wait is complete (caller handles promise + * completion in the supplier body). Evaluated in read-only mode. Must not throw exceptions. + * @return A Runnable that cancels the watcher when invoked (no-op if condition was already + * satisfied). + */ + Runnable registerConditionWatcher(Supplier condition) { + // Evaluate condition inline - if already satisfied, no need to register + setReadOnly(true); + try { + if (condition.get()) { + return () -> {}; + } + } finally { + setReadOnly(false); + } + + ConditionWatcher watcher = new ConditionWatcher(condition); + conditionWatchers.add(watcher); + return () -> watcher.canceled = true; + } + + /** + * Evaluates all condition watchers and removes those that return true. Watchers that are + * satisfied are removed from the list. + * + *

Note: If a condition throws an exception, it will propagate and fail the workflow task. + * Callers should handle exceptions within their condition supplier. + * + * @return true if any condition was satisfied (indicating progress was made) + */ + private boolean evaluateConditionWatchers() { + boolean anyMatched = false; + Iterator it = conditionWatchers.iterator(); + while (it.hasNext()) { + ConditionWatcher watcher = it.next(); + if (watcher.canceled) { + it.remove(); + continue; + } + + // We must set read-only mode here because the condition is evaluated from the runner + // thread, not a workflow thread. + setReadOnly(true); + boolean satisfied; + try { + satisfied = watcher.condition.get(); + } finally { + setReadOnly(false); + } + + if (satisfied) { + it.remove(); + anyMatched = true; + } + } + return anyMatched; + } + + /** + * Holds a condition for async await functionality. The condition is evaluated at the end of each + * event loop iteration and must handle promise completion in its body before returning true. + */ + private static class ConditionWatcher { + final Supplier condition; + boolean canceled; + + ConditionWatcher(Supplier condition) { + this.condition = condition; + } + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 099b2f9b48..6a007c1e99 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -520,6 +520,17 @@ public static boolean await(Duration timeout, String reason, Supplier u }); } + public static Promise awaitAsync(Supplier unblockCondition) { + assertNotReadOnly("awaitAsync"); + return getWorkflowOutboundInterceptor().awaitAsync(unblockCondition); + } + + public static Promise awaitAsync( + Duration timeout, String timerSummary, Supplier unblockCondition) { + assertNotReadOnly("awaitAsync"); + return getWorkflowOutboundInterceptor().awaitAsync(timeout, timerSummary, unblockCondition); + } + public static R sideEffect(Class resultClass, Type resultType, Func func) { assertNotReadOnly("side effect"); return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadBase.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadBase.java new file mode 100644 index 0000000000..7779f8e635 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadBase.java @@ -0,0 +1,476 @@ +package io.temporal.internal.sync; + +import com.google.common.base.Preconditions; +import io.temporal.common.context.ContextPropagator; +import io.temporal.failure.CanceledFailure; +import io.temporal.internal.common.NonIdempotentHandle; +import io.temporal.internal.common.SdkFlag; +import io.temporal.internal.context.ContextThreadLocal; +import io.temporal.internal.logging.LoggerTag; +import io.temporal.internal.replay.ReplayWorkflowContext; +import io.temporal.internal.worker.WorkflowExecutorCache; +import io.temporal.workflow.Functions; +import io.temporal.workflow.Promise; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Abstract base class for workflow thread implementations. This class extracts shared functionality + * from WorkflowThreadImpl to enable code reuse with other workflow thread implementations such as + * RepeatableWorkflowThread. + */ +abstract class WorkflowThreadBase implements WorkflowThread { + + private static final Logger log = LoggerFactory.getLogger(WorkflowThreadBase.class); + + /** + * Abstract base class for runnable wrappers that execute workflow thread logic. Uses the template + * method pattern where {@link #run()} provides the common execution flow and {@link + * #executeLogic()} is implemented by subclasses. + */ + protected abstract class RunnableWrapperBase implements Runnable { + + protected final WorkflowThreadContext threadContext; + // TODO: Move MDC injection logic into an interceptor as this context shouldn't be leaked + // to the WorkflowThreadBase + protected final ReplayWorkflowContext replayWorkflowContext; + protected String originalName; + protected String name; + protected final CancellationScopeImpl cancellationScope; + protected final List contextPropagators; + protected final Map propagatedContexts; + + protected RunnableWrapperBase( + WorkflowThreadContext threadContext, + ReplayWorkflowContext replayWorkflowContext, + String name, + boolean detached, + CancellationScopeImpl parent, + Runnable runnable, + List contextPropagators, + Map propagatedContexts) { + this.threadContext = threadContext; + this.replayWorkflowContext = replayWorkflowContext; + this.name = name; + boolean deterministicCancellationScopeOrder = + replayWorkflowContext.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER); + this.cancellationScope = + new CancellationScopeImpl( + detached, deterministicCancellationScopeOrder, runnable, parent); + Preconditions.checkState( + context.getStatus() == Status.CREATED, "threadContext not in CREATED state"); + this.contextPropagators = contextPropagators; + this.propagatedContexts = propagatedContexts; + } + + /** + * Template method that provides the common execution flow for workflow threads. Subclasses + * implement {@link #executeLogic()} to define their specific behavior. + */ + @Override + public final void run() { + Thread thread = Thread.currentThread(); + originalName = thread.getName(); + thread.setName(name); + + threadContext.initializeCurrentThread(thread); + DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadBase.this); + + MDC.put(LoggerTag.WORKFLOW_ID, replayWorkflowContext.getWorkflowId()); + MDC.put(LoggerTag.WORKFLOW_TYPE, replayWorkflowContext.getWorkflowType().getName()); + MDC.put(LoggerTag.RUN_ID, replayWorkflowContext.getRunId()); + MDC.put(LoggerTag.TASK_QUEUE, replayWorkflowContext.getTaskQueue()); + MDC.put(LoggerTag.NAMESPACE, replayWorkflowContext.getNamespace()); + + // Repopulate the context(s) + ContextThreadLocal.setContextPropagators(this.contextPropagators); + ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts); + try { + // initialYield blocks thread until the first runUntilBlocked is called. + // Otherwise, r starts executing without control of the sync. + threadContext.initialYield(); + executeLogic(); + } catch (DestroyWorkflowThreadError e) { + if (!threadContext.isDestroyRequested()) { + threadContext.setUnhandledException(e); + } + } catch (Error e) { + threadContext.setUnhandledException(e); + } catch (CanceledFailure e) { + if (!isCancelRequested()) { + threadContext.setUnhandledException(e); + } + if (log.isDebugEnabled()) { + log.debug(String.format("Workflow thread \"%s\" run canceled", name)); + } + } catch (Throwable e) { + threadContext.setUnhandledException(e); + } finally { + DeterministicRunnerImpl.setCurrentThreadInternal(null); + threadContext.makeDone(); + thread.setName(originalName); + MDC.clear(); + } + } + + /** + * Subclasses implement this method to define their specific execution logic. This is called + * after thread initialization and before cleanup. + */ + protected abstract void executeLogic(); + + public String getName() { + return name; + } + + StackTraceElement[] getStackTrace() { + @Nullable Thread thread = threadContext.getCurrentThread(); + if (thread != null) { + return thread.getStackTrace(); + } + return new StackTraceElement[0]; + } + + public void setName(String name) { + this.name = name; + @Nullable Thread thread = threadContext.getCurrentThread(); + if (thread != null) { + thread.setName(name); + } + } + } + + protected final WorkflowThreadExecutor workflowThreadExecutor; + protected final WorkflowThreadContext context; + protected final WorkflowExecutorCache cache; + protected final SyncWorkflowContext syncWorkflowContext; + + protected final DeterministicRunnerImpl runner; + protected RunnableWrapperBase task; + protected final int priority; + protected Future taskFuture; + protected final Map, Object> threadLocalMap = new HashMap<>(); + + protected WorkflowThreadBase( + WorkflowThreadExecutor workflowThreadExecutor, + SyncWorkflowContext syncWorkflowContext, + DeterministicRunnerImpl runner, + int priority, + WorkflowExecutorCache cache) { + this.workflowThreadExecutor = workflowThreadExecutor; + this.syncWorkflowContext = Preconditions.checkNotNull(syncWorkflowContext); + this.runner = runner; + this.context = new WorkflowThreadContext(runner.getLock()); + this.cache = cache; + this.priority = priority; + } + + /** + * Factory method for creating the task wrapper. Subclasses implement this to create their + * specific wrapper implementation. + * + * @param name Thread name + * @param detached Whether the thread is detached from parent cancellation scope + * @param parentCancellationScope Parent cancellation scope + * @param runnable The runnable to execute (may be null for some implementations) + * @param contextPropagators Context propagators + * @param propagatedContexts Propagated contexts + * @return A RunnableWrapperBase implementation + */ + protected abstract RunnableWrapperBase createTaskWrapper( + @Nonnull String name, + boolean detached, + CancellationScopeImpl parentCancellationScope, + Runnable runnable, + List contextPropagators, + Map propagatedContexts); + + @Override + public void run() { + throw new UnsupportedOperationException("not used"); + } + + @Override + public boolean isDetached() { + return task.cancellationScope.isDetached(); + } + + @Override + public void cancel() { + task.cancellationScope.cancel(); + } + + @Override + public void cancel(String reason) { + task.cancellationScope.cancel(reason); + } + + @Override + public String getCancellationReason() { + return task.cancellationScope.getCancellationReason(); + } + + @Override + public boolean isCancelRequested() { + return task.cancellationScope.isCancelRequested(); + } + + @Override + public Promise getCancellationRequest() { + return task.cancellationScope.getCancellationRequest(); + } + + @Override + public void start() { + context.verifyAndStart(); + while (true) { + try { + taskFuture = workflowThreadExecutor.submit(task); + return; + } catch (RejectedExecutionException e) { + if (cache != null) { + SyncWorkflowContext workflowContext = getWorkflowContext(); + ReplayWorkflowContext replayContext = workflowContext.getReplayContext(); + boolean evicted = + cache.evictAnyNotInProcessing( + replayContext.getWorkflowExecution(), workflowContext.getMetricsScope()); + if (!evicted) { + // Note here we need to throw error, not exception. Otherwise it will be + // translated to workflow execution exception and instead of failing the + // workflow task we will be failing the workflow. + throw new WorkflowRejectedExecutionError(e); + } + } else { + throw new WorkflowRejectedExecutionError(e); + } + } + } + } + + @Override + public boolean isStarted() { + return context.getStatus() != Status.CREATED; + } + + @Override + public WorkflowThreadContext getWorkflowThreadContext() { + return context; + } + + @Override + public DeterministicRunnerImpl getRunner() { + return runner; + } + + @Override + public SyncWorkflowContext getWorkflowContext() { + return syncWorkflowContext; + } + + @Override + public void setName(String name) { + task.setName(name); + } + + @Override + public String getName() { + return task.getName(); + } + + @Override + public long getId() { + return hashCode(); + } + + @Override + public int getPriority() { + return priority; + } + + @Override + public boolean runUntilBlocked(long deadlockDetectionTimeoutMs) { + if (taskFuture == null) { + start(); + } + return context.runUntilBlocked(deadlockDetectionTimeoutMs); + } + + @Override + public NonIdempotentHandle lockDeadlockDetector() { + return context.lockDeadlockDetector(); + } + + @Override + public boolean isDone() { + return context.isDone(); + } + + @Override + public Throwable getUnhandledException() { + return context.getUnhandledException(); + } + + /** + * Evaluates function in the threadContext of the coroutine without unblocking it. Used to get + * current coroutine status, like stack trace. + * + * @param function Parameter is reason for current goroutine blockage. + */ + public void evaluateInCoroutineContext(Functions.Proc1 function) { + context.evaluateInCoroutineContext(function); + } + + /** + * Interrupt coroutine by throwing DestroyWorkflowThreadError from an await method it is blocked + * on and return underlying Future to be waited on. + */ + @Override + public Future stopNow() { + // Cannot call destroy() on itself + @Nullable Thread thread = context.getCurrentThread(); + if (Thread.currentThread().equals(thread)) { + throw new Error("Cannot call destroy on itself: " + thread.getName()); + } + context.initiateDestroy(); + if (taskFuture == null) { + return getCompletedFuture(); + } + return taskFuture; + } + + private Future getCompletedFuture() { + CompletableFuture f = new CompletableFuture<>(); + f.complete("done"); + return f; + } + + @Override + public void addStackTrace(StringBuilder result) { + result.append(getName()); + @Nullable Thread thread = context.getCurrentThread(); + if (thread == null) { + result.append("(NEW)"); + return; + } + result + .append(": (BLOCKED on ") + .append(getWorkflowThreadContext().getYieldReason()) + .append(")\n"); + // These numbers might change if implementation changes. + int omitTop = 5; + int omitBottom = 7; + // TODO it's not a good idea to rely on the name to understand the thread type. Instead of that + // we would better + // assign an explicit thread type enum to the threads. This will be especially important when we + // refactor + // root and workflow-method + // thread names into names that will include workflowId + if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) { + // TODO revisit this number + omitBottom = 11; + } else if (getName().startsWith(WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX)) { + // TODO revisit this number + omitBottom = 11; + } + StackTraceElement[] stackTrace = thread.getStackTrace(); + for (int i = omitTop; i < stackTrace.length - omitBottom; i++) { + StackTraceElement e = stackTrace[i]; + if (i == omitTop && "await".equals(e.getMethodName())) continue; + result.append(e); + result.append("\n"); + } + } + + @Override + public void yield(String reason, Supplier unblockCondition) { + context.yield(reason, unblockCondition); + } + + @Override + public void exitThread() { + runner.exit(); + throw new DestroyWorkflowThreadError("exit"); + } + + @Override + public void setThreadLocal(WorkflowThreadLocalInternal key, T value) { + threadLocalMap.put(key, value); + } + + /** + * Retrieve data from thread locals. Returns 1. not found (an empty Optional) 2. found but null + * (an Optional of an empty Optional) 3. found and non-null (an Optional of an Optional of a + * value). The type nesting is because Java Optionals cannot understand "Some null" vs "None", + * which is exactly what we need here. + * + * @param key + * @return one of three cases + * @param + */ + @SuppressWarnings("unchecked") + public Optional> getThreadLocal(WorkflowThreadLocalInternal key) { + if (!threadLocalMap.containsKey(key)) { + return Optional.empty(); + } + return Optional.of(Optional.ofNullable((T) threadLocalMap.get(key))); + } + + /** + * @return stack trace of the coroutine thread + */ + @Override + public String getStackTrace() { + StackTraceElement[] st = task.getStackTrace(); + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + pw.append(task.getName()); + pw.append("\n"); + for (StackTraceElement se : st) { + pw.println("\tat " + se); + } + return sw.toString(); + } + + static class YieldWithTimeoutCondition implements Supplier { + + private final Supplier unblockCondition; + private final long blockedUntil; + private boolean timedOut; + + YieldWithTimeoutCondition(Supplier unblockCondition, long blockedUntil) { + this.unblockCondition = unblockCondition; + this.blockedUntil = blockedUntil; + } + + boolean isTimedOut() { + return timedOut; + } + + /** + * @return true if condition matched or timed out + */ + @Override + public Boolean get() { + boolean result = unblockCondition.get(); + if (result) { + return true; + } + long currentTimeMillis = WorkflowInternal.currentTimeMillis(); + timedOut = currentTimeMillis >= blockedUntil; + return timedOut; + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java index 98b9d293c7..7b0b68450f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java @@ -2,150 +2,47 @@ import com.google.common.base.Preconditions; import io.temporal.common.context.ContextPropagator; -import io.temporal.failure.CanceledFailure; -import io.temporal.internal.common.NonIdempotentHandle; -import io.temporal.internal.common.SdkFlag; -import io.temporal.internal.context.ContextThreadLocal; -import io.temporal.internal.logging.LoggerTag; -import io.temporal.internal.replay.ReplayWorkflowContext; import io.temporal.internal.worker.WorkflowExecutorCache; -import io.temporal.workflow.Functions; -import io.temporal.workflow.Promise; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.function.Supplier; import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -class WorkflowThreadImpl implements WorkflowThread { - /** - * Runnable passed to the thread that wraps a runnable passed to the WorkflowThreadImpl - * constructor. - */ - class RunnableWrapper implements Runnable { +/** + * Implementation of WorkflowThread that executes a provided Runnable. This is the standard workflow + * thread implementation used for executing workflow code and async operations. + */ +class WorkflowThreadImpl extends WorkflowThreadBase { - private final WorkflowThreadContext threadContext; - // TODO: Move MDC injection logic into an interceptor as this context shouldn't be leaked - // to the WorkflowThreadImpl - private final ReplayWorkflowContext replayWorkflowContext; - private String originalName; - private String name; - private final CancellationScopeImpl cancellationScope; - private final List contextPropagators; - private final Map propagatedContexts; + /** Runnable wrapper that executes the provided runnable within a cancellation scope. */ + class RunnableWrapper extends RunnableWrapperBase { RunnableWrapper( WorkflowThreadContext threadContext, - ReplayWorkflowContext replayWorkflowContext, + io.temporal.internal.replay.ReplayWorkflowContext replayWorkflowContext, String name, boolean detached, CancellationScopeImpl parent, Runnable runnable, List contextPropagators, Map propagatedContexts) { - this.threadContext = threadContext; - this.replayWorkflowContext = replayWorkflowContext; - this.name = name; - boolean deterministicCancellationScopeOrder = - replayWorkflowContext.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER); - this.cancellationScope = - new CancellationScopeImpl( - detached, deterministicCancellationScopeOrder, runnable, parent); - Preconditions.checkState( - context.getStatus() == Status.CREATED, "threadContext not in CREATED state"); - this.contextPropagators = contextPropagators; - this.propagatedContexts = propagatedContexts; + super( + threadContext, + replayWorkflowContext, + name, + detached, + parent, + runnable, + contextPropagators, + propagatedContexts); } @Override - public void run() { - Thread thread = Thread.currentThread(); - originalName = thread.getName(); - thread.setName(name); - - threadContext.initializeCurrentThread(thread); - DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this); - - MDC.put(LoggerTag.WORKFLOW_ID, replayWorkflowContext.getWorkflowId()); - MDC.put(LoggerTag.WORKFLOW_TYPE, replayWorkflowContext.getWorkflowType().getName()); - MDC.put(LoggerTag.RUN_ID, replayWorkflowContext.getRunId()); - MDC.put(LoggerTag.TASK_QUEUE, replayWorkflowContext.getTaskQueue()); - MDC.put(LoggerTag.NAMESPACE, replayWorkflowContext.getNamespace()); - - // Repopulate the context(s) - ContextThreadLocal.setContextPropagators(this.contextPropagators); - ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts); - try { - // initialYield blocks thread until the first runUntilBlocked is called. - // Otherwise, r starts executing without control of the sync. - threadContext.initialYield(); - cancellationScope.run(); - } catch (DestroyWorkflowThreadError e) { - if (!threadContext.isDestroyRequested()) { - threadContext.setUnhandledException(e); - } - } catch (Error e) { - threadContext.setUnhandledException(e); - } catch (CanceledFailure e) { - if (!isCancelRequested()) { - threadContext.setUnhandledException(e); - } - if (log.isDebugEnabled()) { - log.debug(String.format("Workflow thread \"%s\" run canceled", name)); - } - } catch (Throwable e) { - threadContext.setUnhandledException(e); - } finally { - DeterministicRunnerImpl.setCurrentThreadInternal(null); - threadContext.makeDone(); - thread.setName(originalName); - MDC.clear(); - } - } - - public String getName() { - return name; - } - - StackTraceElement[] getStackTrace() { - @Nullable Thread thread = threadContext.getCurrentThread(); - if (thread != null) { - return thread.getStackTrace(); - } - return new StackTraceElement[0]; - } - - public void setName(String name) { - this.name = name; - @Nullable Thread thread = threadContext.getCurrentThread(); - if (thread != null) { - thread.setName(name); - } + protected void executeLogic() { + cancellationScope.run(); } } - private static final Logger log = LoggerFactory.getLogger(WorkflowThreadImpl.class); - - private final WorkflowThreadExecutor workflowThreadExecutor; - private final WorkflowThreadContext context; - private final WorkflowExecutorCache cache; - private final SyncWorkflowContext syncWorkflowContext; - - private final DeterministicRunnerImpl runner; - private final RunnableWrapper task; - private final int priority; - private Future taskFuture; - private final Map, Object> threadLocalMap = new HashMap<>(); + private final Runnable runnable; WorkflowThreadImpl( WorkflowThreadExecutor workflowThreadExecutor, @@ -159,16 +56,10 @@ public void setName(String name) { WorkflowExecutorCache cache, List contextPropagators, Map propagatedContexts) { - this.workflowThreadExecutor = workflowThreadExecutor; - this.syncWorkflowContext = Preconditions.checkNotNull(syncWorkflowContext); - this.runner = runner; - this.context = new WorkflowThreadContext(runner.getLock()); - this.cache = cache; - this.priority = priority; + super(workflowThreadExecutor, syncWorkflowContext, runner, priority, cache); + this.runnable = runnable; this.task = - new RunnableWrapper( - context, - syncWorkflowContext.getReplayContext(), + createTaskWrapper( Preconditions.checkNotNull(name, "Thread name shouldn't be null"), detached, parentCancellationScope, @@ -178,278 +69,21 @@ public void setName(String name) { } @Override - public void run() { - throw new UnsupportedOperationException("not used"); - } - - @Override - public boolean isDetached() { - return task.cancellationScope.isDetached(); - } - - @Override - public void cancel() { - task.cancellationScope.cancel(); - } - - @Override - public void cancel(String reason) { - task.cancellationScope.cancel(reason); - } - - @Override - public String getCancellationReason() { - return task.cancellationScope.getCancellationReason(); - } - - @Override - public boolean isCancelRequested() { - return task.cancellationScope.isCancelRequested(); - } - - @Override - public Promise getCancellationRequest() { - return task.cancellationScope.getCancellationRequest(); - } - - @Override - public void start() { - context.verifyAndStart(); - while (true) { - try { - taskFuture = workflowThreadExecutor.submit(task); - return; - } catch (RejectedExecutionException e) { - if (cache != null) { - SyncWorkflowContext workflowContext = getWorkflowContext(); - ReplayWorkflowContext context = workflowContext.getReplayContext(); - boolean evicted = - cache.evictAnyNotInProcessing( - context.getWorkflowExecution(), workflowContext.getMetricsScope()); - if (!evicted) { - // Note here we need to throw error, not exception. Otherwise it will be - // translated to workflow execution exception and instead of failing the - // workflow task we will be failing the workflow. - throw new WorkflowRejectedExecutionError(e); - } - } else { - throw new WorkflowRejectedExecutionError(e); - } - } - } - } - - @Override - public boolean isStarted() { - return context.getStatus() != Status.CREATED; - } - - @Override - public WorkflowThreadContext getWorkflowThreadContext() { - return context; - } - - @Override - public DeterministicRunnerImpl getRunner() { - return runner; - } - - @Override - public SyncWorkflowContext getWorkflowContext() { - return syncWorkflowContext; - } - - @Override - public void setName(String name) { - task.setName(name); - } - - @Override - public String getName() { - return task.getName(); - } - - @Override - public long getId() { - return hashCode(); - } - - @Override - public int getPriority() { - return priority; - } - - @Override - public boolean runUntilBlocked(long deadlockDetectionTimeoutMs) { - if (taskFuture == null) { - start(); - } - return context.runUntilBlocked(deadlockDetectionTimeoutMs); - } - - @Override - public NonIdempotentHandle lockDeadlockDetector() { - return context.lockDeadlockDetector(); - } - - @Override - public boolean isDone() { - return context.isDone(); - } - - @Override - public Throwable getUnhandledException() { - return context.getUnhandledException(); - } - - /** - * Evaluates function in the threadContext of the coroutine without unblocking it. Used to get - * current coroutine status, like stack trace. - * - * @param function Parameter is reason for current goroutine blockage. - */ - public void evaluateInCoroutineContext(Functions.Proc1 function) { - context.evaluateInCoroutineContext(function); - } - - /** - * Interrupt coroutine by throwing DestroyWorkflowThreadError from an await method it is blocked - * on and return underlying Future to be waited on. - */ - @Override - public Future stopNow() { - // Cannot call destroy() on itself - @Nullable Thread thread = context.getCurrentThread(); - if (Thread.currentThread().equals(thread)) { - throw new Error("Cannot call destroy on itself: " + thread.getName()); - } - context.initiateDestroy(); - if (taskFuture == null) { - return getCompletedFuture(); - } - return taskFuture; - } - - private Future getCompletedFuture() { - CompletableFuture f = new CompletableFuture<>(); - f.complete("done"); - return f; - } - - @Override - public void addStackTrace(StringBuilder result) { - result.append(getName()); - @Nullable Thread thread = context.getCurrentThread(); - if (thread == null) { - result.append("(NEW)"); - return; - } - result - .append(": (BLOCKED on ") - .append(getWorkflowThreadContext().getYieldReason()) - .append(")\n"); - // These numbers might change if implementation changes. - int omitTop = 5; - int omitBottom = 7; - // TODO it's not a good idea to rely on the name to understand the thread type. Instead of that - // we would better - // assign an explicit thread type enum to the threads. This will be especially important when we - // refactor - // root and workflow-method - // thread names into names that will include workflowId - if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) { - // TODO revisit this number - omitBottom = 11; - } else if (getName().startsWith(WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX)) { - // TODO revisit this number - omitBottom = 11; - } - StackTraceElement[] stackTrace = thread.getStackTrace(); - for (int i = omitTop; i < stackTrace.length - omitBottom; i++) { - StackTraceElement e = stackTrace[i]; - if (i == omitTop && "await".equals(e.getMethodName())) continue; - result.append(e); - result.append("\n"); - } - } - - @Override - public void yield(String reason, Supplier unblockCondition) { - context.yield(reason, unblockCondition); - } - - @Override - public void exitThread() { - runner.exit(); - throw new DestroyWorkflowThreadError("exit"); - } - - @Override - public void setThreadLocal(WorkflowThreadLocalInternal key, T value) { - threadLocalMap.put(key, value); - } - - /** - * Retrieve data from thread locals. Returns 1. not found (an empty Optional) 2. found but null - * (an Optional of an empty Optional) 3. found and non-null (an Optional of an Optional of a - * value). The type nesting is because Java Optionals cannot understand "Some null" vs "None", - * which is exactly what we need here. - * - * @param key - * @return one of three cases - * @param - */ - @SuppressWarnings("unchecked") - public Optional> getThreadLocal(WorkflowThreadLocalInternal key) { - if (!threadLocalMap.containsKey(key)) { - return Optional.empty(); - } - return Optional.of(Optional.ofNullable((T) threadLocalMap.get(key))); - } - - /** - * @return stack trace of the coroutine thread - */ - @Override - public String getStackTrace() { - StackTraceElement[] st = task.getStackTrace(); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - pw.append(task.getName()); - pw.append("\n"); - for (StackTraceElement se : st) { - pw.println("\tat " + se); - } - return sw.toString(); - } - - static class YieldWithTimeoutCondition implements Supplier { - - private final Supplier unblockCondition; - private final long blockedUntil; - private boolean timedOut; - - YieldWithTimeoutCondition(Supplier unblockCondition, long blockedUntil) { - this.unblockCondition = unblockCondition; - this.blockedUntil = blockedUntil; - } - - boolean isTimedOut() { - return timedOut; - } - - /** - * @return true if condition matched or timed out - */ - @Override - public Boolean get() { - boolean result = unblockCondition.get(); - if (result) { - return true; - } - long currentTimeMillis = WorkflowInternal.currentTimeMillis(); - timedOut = currentTimeMillis >= blockedUntil; - return timedOut; - } + protected RunnableWrapperBase createTaskWrapper( + @Nonnull String name, + boolean detached, + CancellationScopeImpl parentCancellationScope, + Runnable runnable, + List contextPropagators, + Map propagatedContexts) { + return new RunnableWrapper( + context, + syncWorkflowContext.getReplayContext(), + name, + detached, + parentCancellationScope, + runnable, + contextPropagators, + propagatedContexts); } } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Async.java b/temporal-sdk/src/main/java/io/temporal/workflow/Async.java index b8f8e251c6..91af823b43 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Async.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Async.java @@ -2,6 +2,7 @@ import io.temporal.common.RetryOptions; import io.temporal.internal.sync.AsyncInternal; +import io.temporal.internal.sync.WorkflowInternal; import java.time.Duration; import java.util.Optional; @@ -231,6 +232,65 @@ public static Promise retry( return AsyncInternal.retry(options, expiration, fn); } + /** + * Asynchronously wait until unblockCondition evaluates to true. + * + * @param unblockCondition condition that should return true to indicate completion. The condition + * is called on every state transition, so it should never call any blocking operations or + * contain code that mutates workflow state. + * @return Promise that completes when the condition becomes true, or completes exceptionally with + * CanceledFailure if the enclosing CancellationScope is canceled. + * @see Workflow#await(java.util.function.Supplier) for a blocking version + */ + public static Promise await(java.util.function.Supplier unblockCondition) { + return WorkflowInternal.awaitAsync(unblockCondition); + } + + /** + * Asynchronously wait until unblockCondition evaluates to true or timeout expires. + * + * @param timeout maximum time to wait for the condition + * @param unblockCondition condition that should return true to indicate completion. The condition + * is called on every state transition, so it should never call any blocking operations or + * contain code that mutates workflow state. + * @return Promise that completes with: + *

    + *
  • true if the condition was satisfied + *
  • false if the timeout expired before the condition was satisfied + *
  • exceptionally with CanceledFailure if the enclosing CancellationScope is canceled + *
+ * + * @see Workflow#await(Duration, java.util.function.Supplier) for a blocking version + */ + public static Promise await( + Duration timeout, java.util.function.Supplier unblockCondition) { + return WorkflowInternal.awaitAsync(timeout, null, unblockCondition); + } + + /** + * Asynchronously wait until unblockCondition evaluates to true or timeout expires. + * + * @param timeout maximum time to wait for the condition + * @param timerSummary summary for the timer created by this await, used in workflow history + * @param unblockCondition condition that should return true to indicate completion. The condition + * is called on every state transition, so it should never call any blocking operations or + * contain code that mutates workflow state. + * @return Promise that completes with: + *
    + *
  • true if the condition was satisfied + *
  • false if the timeout expired before the condition was satisfied + *
  • exceptionally with CanceledFailure if the enclosing CancellationScope is canceled + *
+ * + * @see Workflow#await(Duration, java.util.function.Supplier) for a blocking version + */ + public static Promise await( + Duration timeout, + String timerSummary, + java.util.function.Supplier unblockCondition) { + return WorkflowInternal.awaitAsync(timeout, timerSummary, unblockCondition); + } + /** Prohibits instantiation. */ private Async() {} } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 61c787757f..eb2b46891f 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -474,6 +474,7 @@ public static CancellationScope newDetachedCancellationScope(Runnable runnable) * * @return feature that becomes ready when at least specified number of seconds passes. promise is * failed with {@link CanceledFailure} if enclosing scope is canceled. + * @see #sleep(Duration) for a blocking version */ public static Promise newTimer(Duration delay) { return WorkflowInternal.newTimer(delay); @@ -485,6 +486,7 @@ public static Promise newTimer(Duration delay) { * * @return feature that becomes ready when at least specified number of seconds passes. promise is * failed with {@link CanceledFailure} if enclosing scope is canceled. + * @see #sleep(Duration) for a blocking version */ public static Promise newTimer(Duration delay, TimerOptions options) { return WorkflowInternal.newTimer(delay, options); @@ -566,12 +568,20 @@ public static long currentTimeMillis() { return WorkflowInternal.currentTimeMillis(); } - /** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */ + /** + * Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. + * + * @see #newTimer(Duration) for a non-blocking version that returns a Promise + */ public static void sleep(Duration duration) { WorkflowInternal.sleep(duration); } - /** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */ + /** + * Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. + * + * @see #newTimer(Duration) for a non-blocking version that returns a Promise + */ public static void sleep(long millis) { WorkflowInternal.sleep(Duration.ofMillis(millis)); } @@ -585,6 +595,7 @@ public static void sleep(long millis) { * contain any time based conditions. Use {@link #await(Duration, Supplier)} for those * instead. * @throws CanceledFailure if thread (or current {@link CancellationScope} was canceled). + * @see Async#await(java.util.function.Supplier) for a non-blocking version that returns a Promise */ public static void await(Supplier unblockCondition) { WorkflowInternal.await( @@ -606,6 +617,8 @@ public static void await(Supplier unblockCondition) { * Use timeout parameter for those. * @return false if timed out. * @throws CanceledFailure if thread (or current {@link CancellationScope} was canceled). + * @see Async#await(Duration, java.util.function.Supplier) for a non-blocking version that returns + * a Promise */ public static boolean await(Duration timeout, Supplier unblockCondition) { return WorkflowInternal.await( @@ -617,6 +630,32 @@ public static boolean await(Duration timeout, Supplier unblockCondition }); } + /** + * Block current workflow thread until unblockCondition is evaluated to true or timeout passes. + * + * @param timeout time to unblock even if unblockCondition is not satisfied. + * @param timerSummary summary for the timer created by this await, used in workflow history + * @param unblockCondition condition that should return true to indicate that thread should + * unblock. The condition is called on every state transition, so it should not contain any + * code that mutates any workflow state. It should also not contain any time based conditions. + * Use timeout parameter for those. + * @return false if timed out. + * @throws CanceledFailure if thread (or current {@link CancellationScope} was canceled). + * @see Async#await(Duration, String, java.util.function.Supplier) for a non-blocking version that + * returns a Promise + */ + public static boolean await( + Duration timeout, String timerSummary, Supplier unblockCondition) { + return WorkflowInternal.awaitAsync( + timeout, + timerSummary, + () -> { + CancellationScope.throwCanceled(); + return unblockCondition.get(); + }) + .get(); + } + /** * Invokes function retrying in case of failures according to retry options. Synchronous variant. * Use {@link Async#retry(RetryOptions, Optional, Functions.Func)} for asynchronous functions. diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java index ba3a0eb332..d504efab21 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java @@ -963,4 +963,80 @@ public void testSupplierCalledMultipleWithoutCaching() { }); d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); } + + /** + * Test that beforeThreadsWakeUp callback is invoked BEFORE threads run. The callback sets a value + * that the thread reads, proving the callback ran first. + */ + @Test + public void testBeforeThreadsWakeUpCallbackInvokedBeforeThreads() { + AtomicBoolean valueSetByCallback = new AtomicBoolean(false); + AtomicBoolean threadSawValue = new AtomicBoolean(false); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + // Thread checks if callback set the value + threadSawValue.set(valueSetByCallback.get()); + status = "done"; + }, + null, + () -> { + // Callback sets value before threads run + valueSetByCallback.set(true); + return false; + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertTrue(d.isDone()); + assertTrue("Callback should set value before thread runs", valueSetByCallback.get()); + assertTrue("Thread should see value set by callback", threadSawValue.get()); + } + + /** + * Test that when beforeThreadsWakeUp returns true (progress made), the loop continues and threads + * run again. The callback can return true multiple times when notifying multiple conditions. + */ + @Test + public void testBeforeThreadsWakeUpProgressContinuesLoop() { + AtomicBoolean shouldUnblock1 = new AtomicBoolean(false); + AtomicBoolean shouldUnblock2 = new AtomicBoolean(false); + AtomicInteger trueCount = new AtomicInteger(0); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "waiting1"; + WorkflowThread.await("wait1", shouldUnblock1::get); + status = "waiting2"; + WorkflowThread.await("wait2", shouldUnblock2::get); + status = "done"; + }, + null, + () -> { + // Callback can return true multiple times - once for each condition it unblocks + if (status.equals("waiting1") && !shouldUnblock1.get()) { + shouldUnblock1.set(true); + trueCount.incrementAndGet(); + return true; + } + if (status.equals("waiting2") && !shouldUnblock2.get()) { + shouldUnblock2.set(true); + trueCount.incrementAndGet(); + return true; + } + return false; + }); + + // Single runUntilAllBlocked: callback returns true twice (once per condition), + // thread advances through both waits to completion + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done", status); + assertTrue(d.isDone()); + assertEquals("Callback should return true twice (once per condition)", 2, trueCount.get()); + } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/RepeatableWorkflowThreadTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/RepeatableWorkflowThreadTest.java new file mode 100644 index 0000000000..e4304dd32f --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/RepeatableWorkflowThreadTest.java @@ -0,0 +1,1037 @@ +package io.temporal.internal.sync; + +import static org.junit.Assert.*; + +import io.temporal.failure.CanceledFailure; +import io.temporal.workflow.Async; +import io.temporal.workflow.CancellationScope; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unit tests for RepeatableWorkflowThread functionality. These tests verify the behavior of + * workflow threads that repeatedly evaluate a condition on each runUntilBlocked() call. + * + *

RepeatableWorkflowThread is used for implementing await-style operations that need to + * repeatedly check a condition across multiple workflow task executions. + */ +public class RepeatableWorkflowThreadTest { + + @Rule public final Tracer trace = new Tracer(); + + private static ExecutorService threadPool; + + private String status; + private boolean unblock; + private Throwable failure; + + @BeforeClass + public static void beforeClass() { + threadPool = new ThreadPoolExecutor(1, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>()); + } + + @AfterClass + public static void afterClass() { + threadPool.shutdown(); + } + + @Before + public void setUp() { + unblock = false; + failure = null; + status = "initial"; + } + + // ==================== Basic Condition Evaluation Tests ==================== + + /** Test that when condition returns true immediately, the thread completes. */ + @Test(timeout = 5000) + public void testConditionReturnsTrueImmediately_ThreadCompletes() { + AtomicInteger evaluationCount = new AtomicInteger(0); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + // Create repeatable thread with condition that returns true immediately + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + evaluationCount.incrementAndGet(); + return true; // Immediately true + }, + false, + "test-repeatable"); + + repeatableThread.start(); + + // Wait for the repeatable thread to complete + WorkflowThread.await( + "wait for repeatable", () -> repeatableThread.isDone() || unblock); + + status = "done"; + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done", status); + assertEquals( + "Condition should be evaluated exactly once when it returns true immediately", + 1, + evaluationCount.get()); + assertTrue(d.isDone()); + } + + /** + * Test that condition returning false several times, then true, completes after multiple runs. + */ + @Test(timeout = 5000) + public void testConditionReturnsFalseThenTrue_ThreadCompletesAfterMultipleRuns() { + AtomicInteger evaluationCount = new AtomicInteger(0); + AtomicInteger targetCount = new AtomicInteger(3); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + int count = evaluationCount.incrementAndGet(); + // Return true on the third evaluation + return count >= targetCount.get(); + }, + false, + "test-repeatable"); + + repeatableThread.start(); + + // Wait for the repeatable thread to complete + WorkflowThread.await("wait for repeatable", repeatableThread::isDone); + + status = "done"; + }); + + // First runUntilAllBlocked: condition evaluated once, returns false + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + assertEquals(1, evaluationCount.get()); + + // Second runUntilAllBlocked: condition evaluated again, returns false + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + assertEquals(2, evaluationCount.get()); + + // Third runUntilAllBlocked: condition evaluated, returns true, thread completes + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done", status); + assertEquals(3, evaluationCount.get()); + assertTrue(d.isDone()); + } + + /** Test that condition always returning false keeps thread alive and yields properly. */ + @Test(timeout = 5000) + public void testConditionAlwaysFalse_ThreadStaysAliveAndYields() { + AtomicInteger evaluationCount = new AtomicInteger(0); + AtomicReference threadRef = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + evaluationCount.incrementAndGet(); + return false; // Never satisfied + }, + false, + "test-repeatable"); + + threadRef.set(repeatableThread); + repeatableThread.start(); + + // Wait indefinitely (will be controlled externally) + WorkflowThread.await("wait", () -> unblock); + + status = "done"; + }); + + // Run multiple times + for (int i = 1; i <= 5; i++) { + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse("Thread should not be done when condition is always false", d.isDone()); + assertEquals( + "Condition should be evaluated once per runUntilAllBlocked", i, evaluationCount.get()); + + // Verify the repeatable thread is not done + WorkflowThread repeatableThread = threadRef.get(); + assertNotNull(repeatableThread); + assertFalse(repeatableThread.isDone()); + } + + // Clean up by unblocking and closing + d.close(); + } + + // ==================== Cancellation Tests ==================== + + /** Test cancellation while the condition is being evaluated. */ + @Test(timeout = 5000) + public void testCancellationWhileRunning_ThreadStops() { + AtomicInteger evaluationCount = new AtomicInteger(0); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + evaluationCount.incrementAndGet(); + return false; // Never returns true + }, + false, + "test-repeatable"); + repeatableThread.start(); + + try { + WorkflowThread.await("wait for repeatable", repeatableThread::isDone); + status = "repeatable done"; + } catch (CanceledFailure e) { + status = "canceled"; + } + + status = "done: " + status; + }); + + // First run - thread evaluates condition + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + + // Cancel the workflow + d.cancel("cancel workflow"); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertTrue(d.isDone()); + assertTrue("At least one evaluation should have happened", evaluationCount.get() >= 1); + } + + /** Test cancellation while thread is yielded waiting. */ + @Test(timeout = 5000) + public void testCancellationWhileYielded_ThreadStops() { + AtomicInteger evaluationCount = new AtomicInteger(0); + AtomicBoolean shouldSatisfy = new AtomicBoolean(false); + AtomicReference scopeRef = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + CancellationScope scope = + Workflow.newCancellationScope( + () -> { + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + evaluationCount.incrementAndGet(); + return shouldSatisfy.get(); + }, + false, + "test-repeatable"); + repeatableThread.start(); + + try { + WorkflowThread.await("wait for repeatable", repeatableThread::isDone); + status = "thread completed"; + } catch (CanceledFailure e) { + status = "canceled"; + } + }); + + scopeRef.set(scope); + scope.run(); + status = "done: " + status; + }); + + // First run - thread evaluates condition, returns false, yields + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, evaluationCount.get()); + assertFalse(d.isDone()); + + // Second run - thread is yielded, evaluate again + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(2, evaluationCount.get()); + assertFalse(d.isDone()); + + // Now cancel the scope while the thread is yielded + d.cancel("cancel during yield"); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertTrue(d.isDone()); + assertTrue(status.contains("canceled") || status.contains("done")); + } + + // ==================== Destroy Thread (stopNow) Tests ==================== + + /** Test that stopNow properly destroys the thread. */ + @Test(timeout = 5000) + public void testStopNow_DestroysThread() { + AtomicInteger evaluationCount = new AtomicInteger(0); + AtomicReference capturedError = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + evaluationCount.incrementAndGet(); + return false; + }, + false, + "test-repeatable"); + repeatableThread.start(); + + try { + WorkflowThread.await("wait", () -> unblock); + } catch (DestroyWorkflowThreadError e) { + capturedError.set(e); + throw e; + } + status = "done"; + }); + + // Run to get the thread started + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + assertTrue(evaluationCount.get() >= 1); + + // Close/destroy the runner + d.close(); + + assertTrue(d.isDone()); + assertNotNull("DestroyWorkflowThreadError should be thrown", capturedError.get()); + assertTrue(capturedError.get() instanceof DestroyWorkflowThreadError); + } + + // ==================== Exception in Condition Tests ==================== + + /** Test that exception thrown in condition is properly captured. */ + @Test(timeout = 5000) + public void testExceptionInCondition_ProperlyCaptured() { + RuntimeException testException = new RuntimeException("condition exception"); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + throw testException; + }, + false, + "test-repeatable"); + repeatableThread.start(); + + WorkflowThread.await("wait", () -> unblock); + status = "done"; + }); + + try { + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + fail("Expected exception to be thrown"); + } catch (RuntimeException e) { + assertTrue( + "Exception should contain original message", + e.getMessage().contains("condition exception") + || (e.getCause() != null && e.getCause().getMessage().contains("condition exception")) + || e.equals(testException)); + } + assertTrue(d.isDone()); + } + + // ==================== Multiple Concurrent RepeatableThreads Tests ==================== + + /** Test multiple concurrent RepeatableThreads with different conditions. */ + @Test(timeout = 5000) + public void testMultipleConcurrentRepeatableThreads() { + AtomicInteger thread1Count = new AtomicInteger(0); + AtomicInteger thread2Count = new AtomicInteger(0); + AtomicInteger thread3Count = new AtomicInteger(0); + + AtomicBoolean satisfy1 = new AtomicBoolean(false); + AtomicBoolean satisfy2 = new AtomicBoolean(false); + AtomicBoolean satisfy3 = new AtomicBoolean(false); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + DeterministicRunnerImpl runner = + DeterministicRunnerImpl.currentThreadInternal().getRunner(); + + // Create three repeatable threads with different conditions + WorkflowThread t1 = + runner.newRepeatableThread( + () -> { + thread1Count.incrementAndGet(); + return satisfy1.get(); + }, + false, + "repeatable-1"); + + WorkflowThread t2 = + runner.newRepeatableThread( + () -> { + thread2Count.incrementAndGet(); + return satisfy2.get(); + }, + false, + "repeatable-2"); + + WorkflowThread t3 = + runner.newRepeatableThread( + () -> { + thread3Count.incrementAndGet(); + return satisfy3.get(); + }, + false, + "repeatable-3"); + + t1.start(); + t2.start(); + t3.start(); + + // Wait for all three to complete + WorkflowThread.await("wait all", () -> t1.isDone() && t2.isDone() && t3.isDone()); + + status = "done"; + }); + + // First run - all threads evaluate their conditions + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, thread1Count.get()); + assertEquals(1, thread2Count.get()); + assertEquals(1, thread3Count.get()); + assertFalse(d.isDone()); + + // Second run - all threads evaluate again + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(2, thread1Count.get()); + assertEquals(2, thread2Count.get()); + assertEquals(2, thread3Count.get()); + assertFalse(d.isDone()); + + // Satisfy thread 1 + satisfy1.set(true); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + // Thread1 evaluated and completed. Thread2 and Thread3 may get extra evaluations + // because the event loop continues while there's progress from thread1 completing. + assertTrue(thread1Count.get() >= 3); + assertTrue(thread2Count.get() >= 3); + assertTrue(thread3Count.get() >= 3); + assertFalse(d.isDone()); + + // Satisfy threads 2 and 3 + satisfy2.set(true); + satisfy3.set(true); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done", status); + assertTrue(d.isDone()); + } + + // ==================== Detached Thread Tests ==================== + + /** Test that detached repeatable thread is not affected by parent cancellation. */ + @Test(timeout = 5000) + public void testDetachedRepeatableThread_NotAffectedByParentCancellation() { + AtomicInteger evaluationCount = new AtomicInteger(0); + AtomicBoolean shouldSatisfy = new AtomicBoolean(false); + AtomicReference threadRef = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + // Create detached repeatable thread + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + evaluationCount.incrementAndGet(); + return shouldSatisfy.get(); + }, + true, // detached + "detached-repeatable"); + + threadRef.set(repeatableThread); + repeatableThread.start(); + + // Wait for external unblock + WorkflowThread.await("wait", () -> unblock); + + status = "done"; + }); + + // First run + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, evaluationCount.get()); + assertFalse(d.isDone()); + + // Cancel the workflow - detached thread should continue + d.cancel("cancel workflow"); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + // The detached thread should still be running + WorkflowThread detachedThread = threadRef.get(); + assertNotNull(detachedThread); + + // Thread may have evaluated more times during cancellation processing + assertTrue(evaluationCount.get() >= 1); + + // Eventually satisfy and unblock + shouldSatisfy.set(true); + unblock = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertTrue(d.isDone()); + } + + // ==================== Integration with Regular Workflow Threads Tests ==================== + + /** Test RepeatableWorkflowThread working alongside regular WorkflowThreadImpl. */ + @Test(timeout = 5000) + public void testRepeatableThreadWithRegularThread() { + trace.add("init"); + AtomicInteger repeatableCount = new AtomicInteger(0); + AtomicBoolean satisfyRepeatable = new AtomicBoolean(false); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + trace.add("root started"); + DeterministicRunnerImpl runner = + DeterministicRunnerImpl.currentThreadInternal().getRunner(); + + // Create regular thread + Promise regularThread = + Async.procedure( + () -> { + trace.add("regular started"); + WorkflowThread.await("wait regular", () -> unblock); + trace.add("regular done"); + }); + + // Create repeatable thread + WorkflowThread repeatableThread = + runner.newRepeatableThread( + () -> { + repeatableCount.incrementAndGet(); + return satisfyRepeatable.get(); + }, + false, + "test-repeatable"); + repeatableThread.start(); + + // Wait for both + trace.add("waiting for both"); + WorkflowThread.await( + "wait both", () -> regularThread.isCompleted() && repeatableThread.isDone()); + + trace.add("root done"); + }); + + // First run - both threads start + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + + // Satisfy the repeatable thread + satisfyRepeatable.set(true); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); // Regular thread still waiting + + // Unblock the regular thread + unblock = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertTrue(d.isDone()); + + trace.setExpected( + "init", "root started", "waiting for both", "regular started", "regular done", "root done"); + } + + // ==================== Edge Case Tests ==================== + + /** Test repeatable thread with condition that alternates between true and false. */ + @Test(timeout = 5000) + public void testConditionAlternates_CompletesOnFirstTrue() { + AtomicInteger evaluationCount = new AtomicInteger(0); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + int count = evaluationCount.incrementAndGet(); + // Return true only on even counts + return count % 2 == 0; + }, + false, + "test-repeatable"); + + repeatableThread.start(); + WorkflowThread.await("wait", repeatableThread::isDone); + status = "done"; + }); + + // First run - count=1, returns false + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, evaluationCount.get()); + assertFalse(d.isDone()); + + // Second run - count=2, returns true + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(2, evaluationCount.get()); + assertEquals("done", status); + assertTrue(d.isDone()); + } + + /** Test thread name is correctly set. */ + @Test(timeout = 5000) + public void testThreadNameIsSet() { + AtomicReference capturedName = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread(() -> true, false, "my-custom-repeatable-name"); + + capturedName.set(repeatableThread.getName()); + repeatableThread.start(); + + WorkflowThread.await("wait", repeatableThread::isDone); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("my-custom-repeatable-name", capturedName.get()); + assertTrue(d.isDone()); + } + + /** Test that thread priority is correctly assigned. */ + @Test(timeout = 5000) + public void testThreadPriorityIsAssigned() { + AtomicInteger capturedPriority = new AtomicInteger(-1); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread(() -> true, false, "test-repeatable"); + + capturedPriority.set(repeatableThread.getPriority()); + repeatableThread.start(); + + WorkflowThread.await("wait", repeatableThread::isDone); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertTrue("Priority should be positive", capturedPriority.get() > 0); + assertTrue(d.isDone()); + } + + /** Test getting stack trace from repeatable thread. */ + @Test(timeout = 5000) + public void testStackTraceCanBeObtained() { + AtomicReference stackTrace = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> false, // Never completes + false, + "test-repeatable"); + + repeatableThread.start(); + WorkflowThread.await("wait", () -> unblock); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + stackTrace.set(d.stackTrace()); + assertNotNull(stackTrace.get()); + assertTrue( + "Stack trace should contain the repeatable thread name", + stackTrace.get().contains("test-repeatable") || stackTrace.get().contains("workflow-root")); + + d.close(); + } + + // ==================== Condition Side Effects Tests ==================== + + /** Test that condition can modify external state safely. */ + @Test(timeout = 5000) + public void testConditionCanModifyExternalState() { + AtomicInteger counter = new AtomicInteger(0); + AtomicInteger targetValue = new AtomicInteger(5); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + // Condition increments counter as a side effect + int current = counter.incrementAndGet(); + return current >= targetValue.get(); + }, + false, + "test-repeatable"); + + repeatableThread.start(); + WorkflowThread.await("wait", repeatableThread::isDone); + status = "counter=" + counter.get(); + }); + + // Run until completion (use do-while to ensure at least one run) + do { + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + } while (!d.isDone()); + + assertEquals("counter=5", status); + assertEquals(5, counter.get()); + } + + /** Test condition that depends on workflow time. */ + @Test(timeout = 5000) + public void testConditionDependsOnWorkflowState() { + AtomicInteger evaluationCount = new AtomicInteger(0); + AtomicBoolean externalSignal = new AtomicBoolean(false); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + evaluationCount.incrementAndGet(); + // Condition depends on external signal + return externalSignal.get(); + }, + false, + "test-repeatable"); + + repeatableThread.start(); + WorkflowThread.await("wait", repeatableThread::isDone); + status = "done after " + evaluationCount.get() + " evaluations"; + }); + + // Run a few times without signal + for (int i = 0; i < 3; i++) { + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + } + + // Set external signal + externalSignal.set(true); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertTrue(status.startsWith("done after")); + assertTrue(d.isDone()); + } + + // ==================== Blocking Condition Tests ==================== + + /** + * Test that condition code can use Workflow.await() to block. This verifies that the + * RepeatableWorkflowThread properly supports full workflow capabilities inside conditions. + */ + @Test(timeout = 5000) + public void testConditionWithBlockingAwait() { + AtomicInteger conditionEvaluationCount = new AtomicInteger(0); + AtomicBoolean awaitSignal = new AtomicBoolean(false); + AtomicBoolean conditionResult = new AtomicBoolean(false); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + // Create repeatable thread with a condition that blocks using Workflow.await() + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + conditionEvaluationCount.incrementAndGet(); + + // Block until awaitSignal becomes true + // This tests that await works inside condition code + Workflow.await(awaitSignal::get); + + // After the await unblocks, return the condition result + return conditionResult.get(); + }, + false, + "blocking-condition"); + + repeatableThread.start(); + WorkflowThread.await("wait for repeatable", repeatableThread::isDone); + status = "done"; + }); + + // First run - condition starts evaluating but blocks on Workflow.await() + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, conditionEvaluationCount.get()); + assertFalse(d.isDone()); + + // Second run - still blocked on await (no signal yet) + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, conditionEvaluationCount.get()); // Same evaluation, still blocked + assertFalse(d.isDone()); + + // Unblock the await inside the condition, but condition returns false + // The first evaluation completes + awaitSignal.set(true); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, conditionEvaluationCount.get()); // Same evaluation completed + assertFalse(d.isDone()); + + // Next run creates a new evaluation (since previous completed with false) + // Reset signal so it blocks again + awaitSignal.set(false); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(2, conditionEvaluationCount.get()); // New evaluation started + assertFalse(d.isDone()); + + // Now set signal and make condition return true + awaitSignal.set(true); + conditionResult.set(true); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertEquals("done", status); + assertTrue(d.isDone()); + } + + /** + * Test that condition can block multiple times in sequence. This verifies that the internal + * workflow thread properly handles multiple yield points within a single condition evaluation. + */ + @Test(timeout = 5000) + public void testConditionWithMultipleBlockingAwaits() { + AtomicInteger step = new AtomicInteger(0); + AtomicBoolean signal1 = new AtomicBoolean(false); + AtomicBoolean signal2 = new AtomicBoolean(false); + AtomicBoolean signal3 = new AtomicBoolean(false); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + // First await + step.set(1); + Workflow.await(signal1::get); + + // Second await + step.set(2); + Workflow.await(signal2::get); + + // Third await + step.set(3); + Workflow.await(signal3::get); + + step.set(4); + return true; // Complete after all awaits + }, + false, + "multi-await-condition"); + + repeatableThread.start(); + WorkflowThread.await("wait for repeatable", repeatableThread::isDone); + status = "done at step " + step.get(); + }); + + // First run - blocks on first await + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, step.get()); + assertFalse(d.isDone()); + + // Unblock first await + signal1.set(true); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(2, step.get()); // Now at second await + assertFalse(d.isDone()); + + // Unblock second await + signal2.set(true); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(3, step.get()); // Now at third await + assertFalse(d.isDone()); + + // Unblock third await - condition should complete with true + signal3.set(true); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done at step 4", status); + assertTrue(d.isDone()); + } + + /** + * Test cancellation while the condition function is blocked on an internal Workflow.await(). This + * verifies that cancellation properly propagates through the blocking call inside the condition. + */ + @Test(timeout = 5000) + public void testCancellationWhileConditionBlocked() { + AtomicInteger step = new AtomicInteger(0); + AtomicBoolean awaitSignal = new AtomicBoolean(false); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + + WorkflowThread repeatableThread = + DeterministicRunnerImpl.currentThreadInternal() + .getRunner() + .newRepeatableThread( + () -> { + step.set(1); + // This await will block, and we'll cancel while blocked here + Workflow.await(awaitSignal::get); + step.set(2); + return true; + }, + false, + "blocking-condition"); + + repeatableThread.start(); + + try { + WorkflowThread.await("wait for repeatable", repeatableThread::isDone); + status = "completed at step " + step.get(); + } catch (CanceledFailure e) { + status = "canceled at step " + step.get(); + } + }); + + // First run - condition starts and blocks on Workflow.await() + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, step.get()); + assertFalse(d.isDone()); + + // Second run - still blocked (signal not set) + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals(1, step.get()); + assertFalse(d.isDone()); + + // Cancel the entire workflow while the condition is blocked on the internal await + d.cancel("cancel while blocked"); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertTrue(d.isDone()); + // Verify that cancellation happened while still at step 1 (before internal await completed) + assertEquals(1, step.get()); + assertTrue( + "Expected status to indicate cancellation at step 1, but was: " + status, + status.contains("canceled") || status.contains("step 1")); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowThreadBaseTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowThreadBaseTest.java new file mode 100644 index 0000000000..9a017b0e71 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowThreadBaseTest.java @@ -0,0 +1,783 @@ +package io.temporal.internal.sync; + +import static org.junit.Assert.*; + +import io.temporal.failure.CanceledFailure; +import io.temporal.workflow.Async; +import io.temporal.workflow.Promise; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unit tests for WorkflowThreadBase functionality. These tests verify the core behavior of workflow + * threads including lifecycle management, blocking/yielding, cancellation, and exception handling. + * + *

WorkflowThreadBase is the abstract base class that provides shared functionality for + * WorkflowThreadImpl and future implementations like RepeatableWorkflowThread. + */ +public class WorkflowThreadBaseTest { + + @Rule public final Tracer trace = new Tracer(); + + private static ExecutorService threadPool; + + private String status; + private boolean unblock1; + private boolean unblock2; + private Throwable failure; + + @BeforeClass + public static void beforeClass() { + threadPool = new ThreadPoolExecutor(1, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>()); + } + + @AfterClass + public static void afterClass() { + threadPool.shutdown(); + } + + @Before + public void setUp() { + unblock1 = false; + unblock2 = false; + failure = null; + status = "initial"; + } + + // ==================== Thread Lifecycle Tests ==================== + + /** + * Test that a workflow thread goes through the expected lifecycle states: CREATED -> RUNNING -> + * YIELDED -> RUNNING -> DONE + */ + @Test(timeout = 5000) + public void testThreadLifecycle_CreatedToRunningToYieldedToDone() { + AtomicReference statusAtStart = new AtomicReference<>(); + AtomicReference statusDuringYield = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + // Capture status when thread first runs + WorkflowThread current = DeterministicRunnerImpl.currentThreadInternal(); + statusAtStart.set(current.getWorkflowThreadContext().getStatus()); + + // Yield and capture status + WorkflowThread.await( + "test yield", + () -> { + statusDuringYield.set(current.getWorkflowThreadContext().getStatus()); + return unblock1; + }); + + status = "done"; + }); + + // Run until blocked - this starts the root thread + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + // Thread should be in RUNNING state when executing user code + assertEquals(Status.RUNNING, statusAtStart.get()); + // Thread is yielded but not done + assertFalse(d.isDone()); + + // Unblock and complete + unblock1 = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done", status); + assertTrue(d.isDone()); + } + + /** Test that thread runnable is only executed after runUntilAllBlocked is called. */ + @Test(timeout = 5000) + public void testThreadLifecycle_InitialStatusIsCreated() { + AtomicBoolean startWasCalled = new AtomicBoolean(false); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + startWasCalled.set(true); + status = "done"; + }); + + // Before runUntilAllBlocked, the root thread hasn't started executing user code + assertFalse(startWasCalled.get()); + + // Now run - thread will start and complete + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertTrue(startWasCalled.get()); + assertTrue(d.isDone()); + } + + // ==================== runUntilBlocked Tests ==================== + + /** Test that runUntilBlocked runs the thread until it yields, then blocks. */ + @Test(timeout = 5000) + public void testRunUntilBlocked_ThreadRunsUntilYield() { + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + WorkflowThread.await("reason1", () -> unblock1); + status = "after yield"; + }); + + assertEquals("initial", status); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("started", status); + assertFalse(d.isDone()); + } + + /** Test that multiple yields work correctly with runUntilBlocked. */ + @Test(timeout = 5000) + public void testRunUntilBlocked_MultipleYields() { + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + WorkflowThread.await("reason1", () -> unblock1); + status = "after1"; + WorkflowThread.await("reason2", () -> unblock2); + status = "done"; + }); + + // First run - blocks on first await + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("started", status); + assertFalse(d.isDone()); + + // Unblock first await + unblock1 = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("after1", status); + assertFalse(d.isDone()); + + // Running again without unblocking doesn't make progress + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("after1", status); + assertFalse(d.isDone()); + + // Unblock second await + unblock2 = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done", status); + assertTrue(d.isDone()); + } + + /** + * Test that runUntilBlocked returns true when progress is made and false when thread is blocked. + */ + @Test(timeout = 5000) + public void testRunUntilBlocked_ReturnsProgressIndicator() { + AtomicReference threadRef = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + threadRef.set(DeterministicRunnerImpl.currentThreadInternal()); + status = "started"; + WorkflowThread.await("reason1", () -> unblock1); + status = "done"; + }); + + // First run makes progress (thread starts and runs until yield) + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("started", status); + + // Get the thread context to test runUntilBlocked directly + WorkflowThread thread = threadRef.get(); + assertNotNull(thread); + + // Running again without unblocking - no progress expected + boolean progress = + thread.runUntilBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse("Should not make progress when still blocked", progress); + + // Unblock and run - should make progress + unblock1 = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done", status); + } + + // ==================== isDone Tests ==================== + + /** Test that isDone returns false while thread is running and true after completion. */ + @Test(timeout = 5000) + public void testIsDone_ReturnsFalseWhileRunning() { + AtomicReference threadRef = new AtomicReference<>(); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + threadRef.set(DeterministicRunnerImpl.currentThreadInternal()); + status = "started"; + WorkflowThread.await("reason1", () -> unblock1); + status = "done"; + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + WorkflowThread thread = threadRef.get(); + + // Thread is yielded but not done + assertFalse(thread.isDone()); + assertFalse(d.isDone()); + + // Complete the thread + unblock1 = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + // Now isDone should be true + assertTrue(thread.isDone()); + assertTrue(d.isDone()); + } + + /** Test that isDone returns true after thread completes normally. */ + @Test(timeout = 5000) + public void testIsDone_TrueAfterNormalCompletion() { + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "done"; + // No yield - completes immediately + }); + + // Run - thread will start and complete without blocking + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("done", status); + assertTrue(d.isDone()); + } + + /** Test that isDone returns true after thread throws exception. */ + @Test(timeout = 5000) + public void testIsDone_TrueAfterException() { + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + throw new RuntimeException("test exception"); + }); + + try { + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + fail("Expected exception to be thrown"); + } catch (RuntimeException e) { + assertTrue( + e.getMessage().contains("test exception") + || e.getCause().getMessage().contains("test exception")); + } + assertTrue(d.isDone()); + } + + // ==================== Cancellation Tests ==================== + + /** Test that thread responds to cancel requests properly. */ + @Test(timeout = 5000) + public void testCancellation_ThreadRespondsToCancelRequest() { + trace.add("init"); + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + trace.add("started"); + WorkflowThread.await( + "waiting for cancel", + () -> DeterministicRunnerImpl.currentThreadInternal().isCancelRequested()); + trace.add("cancel detected"); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + + // Cancel the thread + d.cancel("test cancellation"); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertTrue(d.isDone()); + trace.setExpected("init", "started", "cancel detected"); + } + + /** Test that cancel reason is properly set and retrievable. */ + @Test(timeout = 5000) + public void testCancellation_CancelReasonIsSet() { + AtomicReference cancelReason = new AtomicReference<>(); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread current = DeterministicRunnerImpl.currentThreadInternal(); + WorkflowThread.await("waiting for cancel", current::isCancelRequested); + cancelReason.set(current.getCancellationReason()); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + d.cancel("specific reason"); + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertEquals("specific reason", cancelReason.get()); + } + + /** Test that stopNow properly destroys the thread. */ + @Test(timeout = 5000) + public void testCancellation_StopNowDestroysThread() { + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + status = "started"; + try { + WorkflowThread.await("reason1", () -> unblock1); + status = "after yield"; + } catch (DestroyWorkflowThreadError e) { + failure = e; + throw e; + } + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("started", status); + assertFalse(d.isDone()); + + // Stop/close the runner + d.close(); + + assertTrue(d.isDone()); + assertNotNull("Should have received DestroyWorkflowThreadError", failure); + assertTrue(failure instanceof DestroyWorkflowThreadError); + } + + // ==================== Exception Handling Tests ==================== + + /** Test that unhandled exceptions are captured correctly. */ + @Test(timeout = 5000) + public void testExceptionHandling_UnhandledExceptionsAreCaptured() { + AtomicReference threadRef = new AtomicReference<>(); + RuntimeException testException = new RuntimeException("test exception message"); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + threadRef.set(DeterministicRunnerImpl.currentThreadInternal()); + throw testException; + }); + + try { + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + fail("Expected exception"); + } catch (RuntimeException ignored) { + } + + WorkflowThread thread = threadRef.get(); + assertNotNull(thread); + Throwable unhandledException = thread.getUnhandledException(); + assertNotNull("Unhandled exception should be captured", unhandledException); + assertEquals("test exception message", unhandledException.getMessage()); + } + + /** Test that CanceledFailure is handled properly when thread is not cancelled. */ + @Test(timeout = 5000) + public void testExceptionHandling_CanceledFailureWhenNotCancelled() { + AtomicReference threadRef = new AtomicReference<>(); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + threadRef.set(DeterministicRunnerImpl.currentThreadInternal()); + // Throw CanceledFailure without actually being cancelled + throw new CanceledFailure("unexpected cancel"); + }); + + try { + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + fail("Expected exception"); + } catch (Exception ignored) { + } + + WorkflowThread thread = threadRef.get(); + assertNotNull(thread); + // When CanceledFailure is thrown but thread wasn't actually cancelled, + // it should be captured as an unhandled exception + Throwable unhandledException = thread.getUnhandledException(); + assertNotNull(unhandledException); + assertTrue(unhandledException instanceof CanceledFailure); + } + + // ==================== Thread Context Tests ==================== + + /** Test that WorkflowThread.currentThreadInternal() returns the correct thread. */ + @Test(timeout = 5000) + public void testThreadContext_CurrentThreadInternalReturnsCorrectThread() { + AtomicReference capturedThread = new AtomicReference<>(); + AtomicReference capturedChildThread = new AtomicReference<>(); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + capturedThread.set(DeterministicRunnerImpl.currentThreadInternal()); + + // Create a child thread and verify it gets its own context + Promise child = + Async.procedure( + () -> { + capturedChildThread.set(DeterministicRunnerImpl.currentThreadInternal()); + }); + child.get(); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertNotNull("Root thread should be captured", capturedThread.get()); + assertNotNull("Child thread should be captured", capturedChildThread.get()); + assertNotSame( + "Child thread should be different from root thread", + capturedThread.get(), + capturedChildThread.get()); + } + + /** Test that thread name can be set and retrieved. */ + @Test(timeout = 5000) + public void testThreadContext_ThreadNameCanBeSetAndRetrieved() { + AtomicReference capturedName = new AtomicReference<>(); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread current = DeterministicRunnerImpl.currentThreadInternal(); + capturedName.set(current.getName()); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertNotNull(capturedName.get()); + assertEquals("workflow-root", capturedName.get()); + } + + /** Test that getWorkflowContext returns the correct context. */ + @Test(timeout = 5000) + public void testThreadContext_WorkflowContextIsAccessible() { + AtomicReference capturedContext = new AtomicReference<>(); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread current = DeterministicRunnerImpl.currentThreadInternal(); + capturedContext.set(current.getWorkflowContext()); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertNotNull("Workflow context should be accessible", capturedContext.get()); + } + + /** Test that getRunner returns the correct runner. */ + @Test(timeout = 5000) + public void testThreadContext_RunnerIsAccessible() { + AtomicReference capturedRunner = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread current = DeterministicRunnerImpl.currentThreadInternal(); + capturedRunner.set(current.getRunner()); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertNotNull("Runner should be accessible", capturedRunner.get()); + assertSame("Should return the same runner", d, capturedRunner.get()); + } + + // ==================== Thread Locals Tests ==================== + + /** Test that thread locals work correctly. */ + @Test(timeout = 5000) + public void testThreadLocals_SetAndGetValues() { + AtomicReference retrievedValue = new AtomicReference<>(); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread current = DeterministicRunnerImpl.currentThreadInternal(); + WorkflowThreadLocalInternal local = new WorkflowThreadLocalInternal<>(); + + // Initially not present + assertFalse(current.getThreadLocal(local).isPresent()); + + // Set a value + current.setThreadLocal(local, "test value"); + + // Now it should be present + assertTrue(current.getThreadLocal(local).isPresent()); + assertTrue(current.getThreadLocal(local).get().isPresent()); + retrievedValue.set(current.getThreadLocal(local).get().get()); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertEquals("test value", retrievedValue.get()); + } + + /** Test that thread locals can store null values. */ + @Test(timeout = 5000) + public void testThreadLocals_NullValueCanBeStored() { + AtomicBoolean wasPresent = new AtomicBoolean(false); + AtomicBoolean innerValuePresent = new AtomicBoolean(true); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread current = DeterministicRunnerImpl.currentThreadInternal(); + WorkflowThreadLocalInternal local = new WorkflowThreadLocalInternal<>(); + + // Set null value + current.setThreadLocal(local, null); + + // Should be present but inner value is not present (null) + wasPresent.set(current.getThreadLocal(local).isPresent()); + innerValuePresent.set(current.getThreadLocal(local).get().isPresent()); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertTrue("Thread local should be present", wasPresent.get()); + assertFalse("Inner value should not be present (null)", innerValuePresent.get()); + } + + // ==================== Child Thread Tests ==================== + + /** Test that child threads work correctly with the base class. */ + @Test(timeout = 5000) + public void testChildThreads_BasicChildThreadExecution() { + trace.add("init"); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + trace.add("root started"); + + Promise child = + Async.procedure( + () -> { + trace.add("child started"); + WorkflowThread.await("wait in child", () -> unblock1); + trace.add("child done"); + }); + + trace.add("root waiting"); + child.get(); + trace.add("root done"); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + + unblock1 = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertTrue(d.isDone()); + + trace.setExpected( + "init", "root started", "root waiting", "child started", "child done", "root done"); + } + + /** Test that multiple child threads can run concurrently. */ + @Test(timeout = 5000) + public void testChildThreads_MultipleChildThreads() { + trace.add("init"); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + trace.add("root started"); + + Promise child1 = + Async.procedure( + () -> { + trace.add("child1 started"); + WorkflowThread.await("wait in child1", () -> unblock1); + trace.add("child1 done"); + }); + + Promise child2 = + Async.procedure( + () -> { + trace.add("child2 started"); + WorkflowThread.await("wait in child2", () -> unblock2); + trace.add("child2 done"); + }); + + child1.get(); + child2.get(); + trace.add("root done"); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + + // Unblock child1 + unblock1 = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertFalse(d.isDone()); + + // Unblock child2 + unblock2 = true; + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + assertTrue(d.isDone()); + + trace.setExpected( + "init", + "root started", + "child1 started", + "child2 started", + "child1 done", + "child2 done", + "root done"); + } + + // ==================== Stack Trace Tests ==================== + + /** Test that stack traces can be obtained from threads. */ + @Test(timeout = 5000) + public void testStackTrace_CanBeObtained() { + AtomicReference stackTrace = new AtomicReference<>(); + + DeterministicRunnerImpl d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread.await("test await", () -> unblock1); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + // Get stack trace from runner + stackTrace.set(d.stackTrace()); + + assertNotNull(stackTrace.get()); + assertTrue( + "Stack trace should contain workflow-root", stackTrace.get().contains("workflow-root")); + } + + /** Test that thread priority is correctly returned. */ + @Test(timeout = 5000) + public void testThreadPriority_IsAccessible() { + AtomicReference capturedPriority = new AtomicReference<>(); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread current = DeterministicRunnerImpl.currentThreadInternal(); + capturedPriority.set(current.getPriority()); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertNotNull("Priority should be accessible", capturedPriority.get()); + // Root thread has priority 0 + assertEquals(Integer.valueOf(0), capturedPriority.get()); + } + + /** Test that isStarted returns correct values. */ + @Test(timeout = 5000) + public void testIsStarted_ReturnsCorrectValues() { + AtomicReference threadRef = new AtomicReference<>(); + AtomicBoolean wasStartedDuringExecution = new AtomicBoolean(false); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread current = DeterministicRunnerImpl.currentThreadInternal(); + threadRef.set(current); + wasStartedDuringExecution.set(current.isStarted()); + WorkflowThread.await("wait", () -> unblock1); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + WorkflowThread thread = threadRef.get(); + assertNotNull(thread); + assertTrue("Thread should be started during execution", wasStartedDuringExecution.get()); + assertTrue("Thread should still report as started", thread.isStarted()); + } + + /** Test that thread ID is accessible and unique. */ + @Test(timeout = 5000) + public void testThreadId_IsAccessibleAndUnique() { + AtomicReference rootThreadId = new AtomicReference<>(); + AtomicReference childThreadId = new AtomicReference<>(); + + DeterministicRunner d = + new DeterministicRunnerImpl( + threadPool::submit, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + () -> { + WorkflowThread root = DeterministicRunnerImpl.currentThreadInternal(); + rootThreadId.set(root.getId()); + + Promise child = + Async.procedure( + () -> { + WorkflowThread childThread = + DeterministicRunnerImpl.currentThreadInternal(); + childThreadId.set(childThread.getId()); + }); + child.get(); + }); + + d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + assertNotNull("Root thread ID should be set", rootThreadId.get()); + assertNotNull("Child thread ID should be set", childThreadId.get()); + assertNotEquals("Thread IDs should be different", rootThreadId.get(), childThreadId.get()); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java new file mode 100644 index 0000000000..34ddff7b27 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java @@ -0,0 +1,394 @@ +package io.temporal.workflow; + +import static org.junit.Assert.*; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowStub; +import io.temporal.failure.CanceledFailure; +import io.temporal.testUtils.HistoryUtils; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@link Async#await} - the asynchronous, non-blocking version of {@link Workflow#await}. + */ +public class AsyncAwaitTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestAsyncAwaitWorkflow.class).build(); + + @Test + public void testBasicAsyncAwait() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("basic"); + assertEquals("condition1-met condition2-met done", result); + } + + @Test + public void testConditionTrueImmediately() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("immediate"); + assertEquals("immediate-true", result); + } + + @Test + public void testMultipleAsyncAwaits() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("multi"); + assertTrue(result.contains("first")); + assertTrue(result.contains("second")); + assertTrue(result.contains("third")); + } + + @Test + public void testTimedAwaitConditionMetFirst() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("timed-condition-first"); + assertEquals("condition-met:true", result); + } + + @Test + public void testTimedAwaitTimeoutFirst() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("timed-timeout-first"); + assertEquals("timeout:false", result); + } + + @Test + public void testTimedAwaitConditionAlreadyTrue() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("already-true"); + assertEquals("already-true:true", result); + } + + @Test + public void testPromiseAnyOfAsyncAwait() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("anyof"); + assertTrue(result.equals("first-won") || result.equals("second-won") || result.equals("both")); + } + + @Test + public void testPromiseAllOfAsyncAwait() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("allof"); + assertEquals("all-completed", result); + } + + @Test + public void testAsyncAwaitChaining() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("chaining"); + assertEquals("chained-result:42", result); + } + + @Test + public void testAsyncAwaitCancellation() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("cancellation"); + assertEquals("cancelled", result); + } + + @Test + public void testTimedAsyncAwaitCancellation() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("timed-cancellation"); + assertEquals("timed-cancelled", result); + } + + @Test + public void testAsyncAwaitConditionThrows() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("condition-throws"); + assertEquals("caught:simulated error", result); + } + + static final String awaitTimerSummary = "await-timer-summary"; + + @Test + public void testAwaitWithOptionsSetsTimerSummary() { + TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflow.execute("await-with-options"); + assertEquals("await-with-options:false", result); + + // Verify the timer summary is set in the workflow history + WorkflowExecution exec = WorkflowStub.fromTyped(workflow).getExecution(); + HistoryEvent timerStartedEvent = + testWorkflowRule.getHistoryEvent(exec.getWorkflowId(), EventType.EVENT_TYPE_TIMER_STARTED); + HistoryUtils.assertEventMetadata(timerStartedEvent, awaitTimerSummary, null); + } + + /** Combined workflow that handles all test scenarios. */ + public static class TestAsyncAwaitWorkflow implements TestWorkflow1 { + private boolean condition1 = false; + private boolean condition2 = false; + private int counter = 0; + private int value = 0; + + @Override + public String execute(String testCase) { + switch (testCase) { + case "basic": + return testBasic(); + case "immediate": + return testImmediate(); + case "multi": + return testMultiple(); + case "timed-condition-first": + return testTimedConditionFirst(); + case "timed-timeout-first": + return testTimedTimeoutFirst(); + case "already-true": + return testAlreadyTrue(); + case "anyof": + return testAnyOf(); + case "allof": + return testAllOf(); + case "chaining": + return testChaining(); + case "cancellation": + return testCancellation(); + case "timed-cancellation": + return testTimedCancellation(); + case "condition-throws": + return testConditionThrows(); + case "await-with-options": + return testAwaitWithOptions(); + default: + return "unknown test case"; + } + } + + private String testBasic() { + StringBuilder result = new StringBuilder(); + + Promise await1 = Async.await(() -> condition1); + Promise await2 = Async.await(() -> condition2); + + condition1 = true; + await1.get(); + result.append("condition1-met "); + + condition2 = true; + await2.get(); + result.append("condition2-met "); + + result.append("done"); + return result.toString(); + } + + private String testImmediate() { + Promise promise = Async.await(() -> true); + promise.get(); + return "immediate-true"; + } + + private String testMultiple() { + List results = new ArrayList<>(); + + Promise first = Async.await(() -> counter >= 1); + Promise second = Async.await(() -> counter >= 2); + Promise third = Async.await(() -> counter >= 3); + + first.thenApply( + v -> { + results.add("first"); + return null; + }); + second.thenApply( + v -> { + results.add("second"); + return null; + }); + third.thenApply( + v -> { + results.add("third"); + return null; + }); + + counter = 1; + Workflow.sleep(Duration.ofMillis(1)); + counter = 2; + Workflow.sleep(Duration.ofMillis(1)); + counter = 3; + + Promise.allOf(first, second, third).get(); + + return String.join(" ", results); + } + + private String testTimedConditionFirst() { + condition1 = false; + Promise promise = Async.await(Duration.ofSeconds(10), () -> condition1); + + Workflow.sleep(Duration.ofMillis(100)); + condition1 = true; + + boolean result = promise.get(); + return "condition-met:" + result; + } + + private String testTimedTimeoutFirst() { + Promise promise = Async.await(Duration.ofMillis(100), () -> false); + boolean result = promise.get(); + return "timeout:" + result; + } + + private String testAlreadyTrue() { + Promise promise = Async.await(Duration.ofSeconds(10), () -> true); + boolean result = promise.get(); + return "already-true:" + result; + } + + private String testAnyOf() { + condition1 = false; + condition2 = false; + + Promise first = Async.await(() -> condition1); + Promise second = Async.await(() -> condition2); + + condition1 = true; + + Promise.anyOf(first, second).get(); + + if (first.isCompleted() && !second.isCompleted()) { + return "first-won"; + } else if (second.isCompleted() && !first.isCompleted()) { + return "second-won"; + } else { + return "both"; + } + } + + private String testAllOf() { + condition1 = false; + condition2 = false; + + Promise await1 = Async.await(() -> condition1); + Promise await2 = Async.await(() -> condition2); + + condition1 = true; + Workflow.sleep(Duration.ofMillis(1)); + condition2 = true; + + Promise.allOf(await1, await2).get(); + return "all-completed"; + } + + private String testChaining() { + value = 0; + Promise chainedPromise = + Async.await(() -> value > 0) + .thenApply(v -> value * 2) + .handle( + (result, failure) -> { + if (failure != null) { + return -1; + } + return result; + }); + + value = 21; + + int result = chainedPromise.get(); + return "chained-result:" + result; + } + + private String testCancellation() { + condition1 = false; + final Promise[] promiseHolder = new Promise[1]; + + CancellationScope scope = + Workflow.newCancellationScope( + () -> { + // Create an async await that will never complete on its own + promiseHolder[0] = Async.await(() -> condition1); + }); + + // Run the scope (this is non-blocking since Async.await returns immediately) + scope.run(); + + // Cancel the scope + scope.cancel(); + + // The promise should fail with CanceledFailure when we try to get it + try { + promiseHolder[0].get(); + return "not-cancelled"; + } catch (CanceledFailure e) { + return "cancelled"; + } + } + + private String testTimedCancellation() { + condition1 = false; + final Promise[] promiseHolder = new Promise[1]; + + CancellationScope scope = + Workflow.newCancellationScope( + () -> { + // Create a timed async await that will never complete on its own + promiseHolder[0] = Async.await(Duration.ofHours(1), () -> condition1); + }); + + // Run the scope (this is non-blocking since Async.await returns immediately) + scope.run(); + + // Cancel the scope + scope.cancel(); + + // The promise should fail with CanceledFailure when we try to get it + try { + promiseHolder[0].get(); + return "timed-not-cancelled"; + } catch (CanceledFailure e) { + return "timed-cancelled"; + } + } + + private String testConditionThrows() { + // Start with condition that doesn't throw, but will throw on subsequent evaluation + // Initial check returns false (doesn't throw), then later evaluation throws + counter = 0; + + Promise promise = + Async.await( + () -> { + counter++; + // First evaluation (initial check) returns false + // Second evaluation (in evaluateConditionWatchers) throws + if (counter > 1) { + throw new RuntimeException("simulated error"); + } + return false; + }); + + // Trigger re-evaluation by sleeping (causes event loop iteration) + Workflow.sleep(Duration.ofMillis(1)); + + try { + promise.get(); + return "no-exception"; + } catch (RuntimeException e) { + return "caught:" + e.getMessage(); + } + } + + private String testAwaitWithOptions() { + // Use Async.await with reason parameter which is used as timer summary + // Use a condition that will never be true, so it times out + Promise promise = + Async.await(Duration.ofMillis(100), awaitTimerSummary, () -> false); + boolean result = promise.get(); + return "await-with-options:" + result; + } + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 1221dec557..37b2d887a9 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -394,6 +394,17 @@ public void await(String reason, Supplier unblockCondition) { throw new UnsupportedOperationException("not implemented"); } + @Override + public Promise awaitAsync(Supplier unblockCondition) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Promise awaitAsync( + Duration timeout, String timerSummary, Supplier unblockCondition) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public Promise newTimer(Duration duration) { throw new UnsupportedOperationException("not implemented"); diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 77d82bdaee..4939bd4e39 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -233,6 +233,23 @@ public void await(String reason, Supplier unblockCondition) { next.await(reason, unblockCondition); } + @Override + public Promise awaitAsync(Supplier unblockCondition) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("awaitAsync"); + } + return next.awaitAsync(unblockCondition); + } + + @Override + public Promise awaitAsync( + Duration timeout, String timerSummary, Supplier unblockCondition) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("awaitAsync " + timeout + " " + timerSummary); + } + return next.awaitAsync(timeout, timerSummary, unblockCondition); + } + @Override public Promise newTimer(Duration duration) { if (!WorkflowUnsafe.isReplaying()) {